From aad95602de71a70b590616af1dd1d1a1286dd12d Mon Sep 17 00:00:00 2001 From: Patrick762 Date: Sun, 21 May 2023 15:01:06 +0200 Subject: [PATCH] added button state management --- README.md | 2 + streamdeckapi/const.py | 3 + streamdeckapi/server.py | 130 +++++++++++++++++++++++++++++++++++++--- 3 files changed, 126 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index a125ff5..2e3d885 100644 --- a/README.md +++ b/README.md @@ -16,3 +16,5 @@ For this to work, the following software is required: - LibUSB HIDAPI [Installation instructions](https://python-elgato-streamdeck.readthedocs.io/en/stable/pages/backend_libusb_hidapi.html) - cairo [Installation instructions for Windows](https://stackoverflow.com/a/73913080) + +The event `doubleTap` is not working with this server software. diff --git a/streamdeckapi/const.py b/streamdeckapi/const.py index 3791e68..ceda11c 100644 --- a/streamdeckapi/const.py +++ b/streamdeckapi/const.py @@ -6,4 +6,7 @@ PLUGIN_PORT = 6153 PLUGIN_INFO = "/sd/info" PLUGIN_ICON = "/sd/icon" +DB_FILE = "streamdeckapi.db" SD_SSDP = "urn:home-assistant.io:device:stream-deck" +DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f" +LONG_PRESS_SECONDS = 2 diff --git a/streamdeckapi/server.py b/streamdeckapi/server.py index 26860e6..aac23bb 100644 --- a/streamdeckapi/server.py +++ b/streamdeckapi/server.py @@ -6,6 +6,7 @@ import asyncio import platform import sqlite3 import base64 +from datetime import datetime import aiohttp import human_readable_ids as hri from jsonpickle import encode @@ -17,11 +18,17 @@ import cairosvg from PIL import Image from ssdpy import SSDPServer -from streamdeckapi.const import PLUGIN_ICON, PLUGIN_INFO, PLUGIN_PORT, SD_SSDP +from streamdeckapi.const import ( + DATETIME_FORMAT, + DB_FILE, + LONG_PRESS_SECONDS, + PLUGIN_ICON, + PLUGIN_INFO, + PLUGIN_PORT, + SD_SSDP +) from streamdeckapi.types import SDApplication, SDButton, SDButtonPosition, SDDevice -# TODO: Websocket broadcast not working yet - DEFAULT_ICON = re.sub( "\r\n|\n|\r", @@ -53,8 +60,8 @@ devices: list[SDDevice] = [] # Database # -database = sqlite3.connect("streamdeckapi.db") -table_cursor = database.cursor() +database_first = sqlite3.connect(DB_FILE) +table_cursor = database_first.cursor() table_cursor.execute(""" CREATE TABLE IF NOT EXISTS buttons( key integer PRIMARY KEY, @@ -64,11 +71,19 @@ table_cursor.execute(""" y integer, svg text )""") +table_cursor.execute(""" + CREATE TABLE IF NOT EXISTS button_states( + key integer PRIMARY KEY, + state integer, + state_update text + )""") table_cursor.close() +database_first.close() def save_button(key: int, button: SDButton): """Save button to database.""" + database = sqlite3.connect(DB_FILE) cursor = database.cursor() svg_bytes = button.svg.encode() base64_bytes = base64.b64encode(svg_bytes) @@ -88,10 +103,12 @@ def save_button(key: int, button: SDButton): database.commit() print(f"Saved button {button.uuid} with key {key} to database") cursor.close() + database.close() def get_button(key: int) -> SDButton | None: """Get a button from the database.""" + database = sqlite3.connect(DB_FILE) cursor = database.cursor() result = cursor.execute( f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE key={key}") @@ -109,11 +126,13 @@ def get_button(key: int) -> SDButton | None: "svg": svg_string, }) cursor.close() + database.close() return button def get_button_by_uuid(uuid: str) -> SDButton | None: """Get a button from the database.""" + database = sqlite3.connect(DB_FILE) cursor = database.cursor() result = cursor.execute( f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE uuid=\"{uuid}\"") @@ -131,11 +150,13 @@ def get_button_by_uuid(uuid: str) -> SDButton | None: "svg": svg_string, }) cursor.close() + database.close() return button def get_button_key(uuid: str) -> int: """Get a button key from the database.""" + database = sqlite3.connect(DB_FILE) cursor = database.cursor() result = cursor.execute(f"SELECT key FROM buttons WHERE uuid=\"{uuid}\"") matching_buttons = result.fetchall() @@ -144,12 +165,14 @@ def get_button_key(uuid: str) -> int: row = matching_buttons[0] key = row[0] cursor.close() + database.close() return key def get_buttons() -> dict[str, SDButton]: """Load all buttons from the database.""" result: dict[str, SDButton] = {} + database = sqlite3.connect(DB_FILE) cursor = database.cursor() for row in cursor.execute("SELECT key,uuid,device,x,y,svg FROM buttons"): base64_bytes = row[5].encode() @@ -162,10 +185,57 @@ def get_buttons() -> dict[str, SDButton]: "svg": svg_string, }) cursor.close() + database.close() print(f"Loaded {len(result)} buttons from DB") return result +def write_button_state(key: int, state: bool, update: str): + """Write button state to database.""" + state_int = 0 + if state is True: + state_int = 1 + + database = sqlite3.connect(DB_FILE) + cursor = database.cursor() + + # Check if exists + result = cursor.execute(f"SELECT state FROM button_states WHERE key={key}") + matching_states = result.fetchall() + if len(matching_states) > 0: + # Perform update + cursor.execute( + f"UPDATE button_states SET state={state_int}, state_update=\"{update}\" WHERE key={key}") + else: + # Create new row + cursor.execute( + f"INSERT INTO button_states VALUES ({key}, {state_int}, \"{update}\")") + database.commit() + print(f"Saved button_state with key {key} to database") + cursor.close() + database.close() + + +def get_button_state(key: int) -> tuple | None: + """Load button_state from database.""" + result = () + database = sqlite3.connect(DB_FILE) + cursor = database.cursor() + result = cursor.execute( + f"SELECT key,state,state_update FROM button_states WHERE key={key}") + matching_states = result.fetchall() + if len(matching_states) == 0: + return None + row = matching_states[0] + state = False + if row[1] == 1: + state = True + result = (state, row[2]) + cursor.close() + database.close() + return result + + # # API # @@ -232,6 +302,12 @@ async def websocket_handler(request: web.Request): return web_socket +# TODO: Websocket broadcast not working yet +def websocket_broadcast(message: str): + """Send a message to each websocket client.""" + print(f"BROADCAST: {message}") + + # # Functions # @@ -265,10 +341,46 @@ def get_position(deck: StreamDeck, key: int) -> SDButtonPosition: return SDButtonPosition({"x": int(key / deck.KEY_COLS), "y": key % deck.KEY_COLS}) -def on_key_change(deck: StreamDeck, key: int, state: bool): +def on_key_change(_: StreamDeck, key: int, state: bool): """Handle key change callbacks.""" - position = get_position(deck, key) - print(f"Key at {position.x_pos}|{position.y_pos} is state {state}") + button = get_button(key) + if button is None: + return + + if state is True: + websocket_broadcast(encode( + {"event": "keyDown", "args": button.uuid})) + else: + websocket_broadcast(encode( + {"event": "keyUp", "args": button.uuid})) + + now = datetime.now() + + db_button_state = get_button_state(key) + + if db_button_state is None: + write_button_state(key, state, now.strftime(DATETIME_FORMAT)) + return + + last_state: bool = db_button_state[0] + last_update: str = db_button_state[1] + last_update_datetime = datetime.strptime(last_update, DATETIME_FORMAT) + diff = now - last_update_datetime + + if last_state is True and state is False and diff.seconds < LONG_PRESS_SECONDS: + websocket_broadcast( + encode({"event": "singleTap", "args": button.uuid})) + write_button_state(key, state, now.strftime(DATETIME_FORMAT)) + return + + # TODO: Work with timer instead + if last_state is True and state is False and diff.seconds >= LONG_PRESS_SECONDS: + websocket_broadcast( + encode({"event": "longPress", "args": button.uuid})) + write_button_state(key, state, now.strftime(DATETIME_FORMAT)) + return + + write_button_state(key, state, now.strftime(DATETIME_FORMAT)) def update_button_icon(uuid: str, svg: str): @@ -292,7 +404,7 @@ def set_icon(deck: StreamDeck, key: int, svg: str): """Draw an icon to the button.""" png_bytes = io.BytesIO() cairosvg.svg2png(svg.encode("utf-8"), write_to=png_bytes) - + # Debug cairosvg.svg2png(svg.encode("utf-8"), write_to=f"icon_{key}.png")