Server update

This commit is contained in:
Patrick762
2023-06-05 16:46:41 +02:00
committed by GitHub
2 changed files with 71 additions and 16 deletions

View File

@@ -5,9 +5,6 @@ Stream Deck API Library for Home Assistant Stream Deck Integration
Only compatible with separate [Stream Deck Plugin](https://github.com/Patrick762/streamdeckapi-plugin) or the bundled server. Only compatible with separate [Stream Deck Plugin](https://github.com/Patrick762/streamdeckapi-plugin) or the bundled server.
## Dependencies
- [websockets](https://pypi.org/project/websockets/) 11.0.2
## Server ## Server
This library also contains a server to use the streamdeck with Linux or without the official Stream Deck Software. This library also contains a server to use the streamdeck with Linux or without the official Stream Deck Software.
@@ -18,6 +15,9 @@ For this to work, the following software is required:
The event `doubleTap` is not working with this server software. The event `doubleTap` is not working with this server software.
### Limitations
Discovery over SSDP might not work.
### Installation on Linux / Raspberry Pi ### Installation on Linux / Raspberry Pi
Install requirements: Install requirements:

View File

@@ -6,6 +6,8 @@ import asyncio
import platform import platform
import sqlite3 import sqlite3
import base64 import base64
import socket
from uuid import uuid4
from datetime import datetime from datetime import datetime
from multiprocessing import Process from multiprocessing import Process
import aiohttp import aiohttp
@@ -48,8 +50,8 @@ DEFAULT_ICON = re.sub(
application: SDApplication = SDApplication( application: SDApplication = SDApplication(
{ {
"font": "", "font": "Segoe UI",
"language": "", "language": "en",
"platform": platform.system(), "platform": platform.system(),
"platformVersion": platform.version(), "platformVersion": platform.version(),
"version": "0.0.1", "version": "0.0.1",
@@ -380,6 +382,28 @@ def get_position(deck: StreamDeck, key: int) -> SDButtonPosition:
return SDButtonPosition({"x": int(key / deck.KEY_COLS), "y": key % deck.KEY_COLS}) return SDButtonPosition({"x": int(key / deck.KEY_COLS), "y": key % deck.KEY_COLS})
async def long_press_callback(key: int):
"""Handle callback after long press seconds."""
print("Long press detected")
# Check state of button
for deck in streamdecks:
if not deck.is_visual():
continue
if not deck.is_open():
deck.open()
states = deck.key_states()
button = get_button(key)
if not isinstance(button, SDButton):
return
if states[key] is True:
await websocket_broadcast(encode({"event": "longPress", "args": button.uuid}))
async def on_key_change(_: StreamDeck, key: int, state: bool): async def on_key_change(_: StreamDeck, key: int, state: bool):
"""Handle key change callbacks.""" """Handle key change callbacks."""
button = get_button(key) button = get_button(key)
@@ -389,6 +413,9 @@ async def on_key_change(_: StreamDeck, key: int, state: bool):
if state is True: if state is True:
await websocket_broadcast(encode( await websocket_broadcast(encode(
{"event": "keyDown", "args": button.uuid})) {"event": "keyDown", "args": button.uuid}))
print("Waiting for button release")
# Start timer
Timer(LONG_PRESS_SECONDS, lambda: long_press_callback(key), False)
else: else:
await websocket_broadcast(encode( await websocket_broadcast(encode(
{"event": "keyUp", "args": button.uuid})) {"event": "keyUp", "args": button.uuid}))
@@ -412,13 +439,6 @@ async def on_key_change(_: StreamDeck, key: int, state: bool):
write_button_state(key, state, now.strftime(DATETIME_FORMAT)) write_button_state(key, state, now.strftime(DATETIME_FORMAT))
return return
# TODO: Work with timer instead
if last_state is True and state is False and diff.seconds >= LONG_PRESS_SECONDS:
await websocket_broadcast(
encode({"event": "longPress", "args": button.uuid}))
write_button_state(key, state, now.strftime(DATETIME_FORMAT))
return
write_button_state(key, state, now.strftime(DATETIME_FORMAT)) write_button_state(key, state, now.strftime(DATETIME_FORMAT))
@@ -482,7 +502,7 @@ def init_all():
{ {
"uuid": hri.get_new_id().lower().replace(" ", "-"), "uuid": hri.get_new_id().lower().replace(" ", "-"),
"device": serial, "device": serial,
"position": {"x": position.x_pos, "y": position.y_pos}, "position": {"x": position.y_pos, "y": position.x_pos},
"svg": DEFAULT_ICON, "svg": DEFAULT_ICON,
} }
) )
@@ -496,25 +516,60 @@ def init_all():
deck.set_key_callback_async(on_key_change) deck.set_key_callback_async(on_key_change)
def get_local_ip():
"""Get local ip address."""
connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
connection.connect(('192.255.255.255', 1))
address = connection.getsockname()[0]
except socket.error:
address = '127.0.0.1'
finally:
connection.close()
return address
def start_ssdp_server(): def start_ssdp_server():
"""Start SSDP server.""" """Start SSDP server."""
print("Starting SSDP server ...") print("Starting SSDP server ...")
server = SSDPServer(SD_SSDP)
address = get_local_ip()
broadcast = "239.255.255.250"
location = f"http://{address}:{PLUGIN_PORT}/device.xml"
usn = f"uuid:{str(uuid4())}::{SD_SSDP}"
server = "python/3 UPnP/1.1 ssdpy/0.4.1"
print(f"IP Address for SSDP: {address}")
print(f"SSDP broadcast ip: {broadcast}")
print(f"SSDP location: {location}")
server = SSDPServer(usn, # FIXME socket.setsockopt(): no such device No such device
address=broadcast,
location=location,
max_age=1800,
extra_fields={
"st": SD_SSDP,
"server": server,
"deviceType": SD_SSDP,
})
server.serve_forever() server.serve_forever()
class Timer: class Timer:
"""Timer class.""" """Timer class."""
def __init__(self, interval, callback):
def __init__(self, interval, callback, repeating=True):
"""Init timer.""" """Init timer."""
self._interval = interval self._interval = interval
self._callback = callback self._callback = callback
self._repeating = repeating
self._task = asyncio.ensure_future(self._job()) self._task = asyncio.ensure_future(self._job())
async def _job(self): async def _job(self):
await asyncio.sleep(self._interval) await asyncio.sleep(self._interval)
await self._callback() await self._callback()
self._task = asyncio.ensure_future(self._job()) if self._repeating:
self._task = asyncio.ensure_future(self._job())
def cancel(self): def cancel(self):
"""Cancel timer.""" """Cancel timer."""