From 85272e6c8e12e3c3b34095c1b533a77b0b1c368c Mon Sep 17 00:00:00 2001 From: Patrick762 Date: Sat, 20 May 2023 14:28:49 +0200 Subject: [PATCH] added sqlite database to save button configs --- .gitignore | 2 + README.md | 7 +- setup.py | 1 + streamdeckapi/server.py | 280 ++++++++++++++++++++++++++++++++-------- 4 files changed, 235 insertions(+), 55 deletions(-) diff --git a/.gitignore b/.gitignore index 4f60f87..7aa4d20 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ **/__pycache__/ build *.egg-info +*.db +*.db-journal diff --git a/README.md b/README.md index c051468..a125ff5 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,9 @@ Only compatible with separate [Stream Deck Plugin](https://github.com/Patrick762 ## Server -This library also contains a server to use the streamdeck with linux or without the official Stream Deck Software. +This library also contains a server to use the streamdeck with Linux or without the official Stream Deck Software. -For this to work, the LibUSB HIDAPI is required. [Installation instructions](https://python-elgato-streamdeck.readthedocs.io/en/stable/pages/backend_libusb_hidapi.html) +For this to work, the following software is required: + +- LibUSB HIDAPI [Installation instructions](https://python-elgato-streamdeck.readthedocs.io/en/stable/pages/backend_libusb_hidapi.html) +- cairo [Installation instructions for Windows](https://stackoverflow.com/a/73913080) diff --git a/setup.py b/setup.py index 707ba3e..7affb80 100644 --- a/setup.py +++ b/setup.py @@ -30,6 +30,7 @@ setup( "jsonpickle==3.0.1", "streamdeck==0.9.3", "pillow>=9.4.0,<10.0.0", + "svglib==1.5.1", ], keywords=[], entry_points={ diff --git a/streamdeckapi/server.py b/streamdeckapi/server.py index b32907c..6a9791b 100644 --- a/streamdeckapi/server.py +++ b/streamdeckapi/server.py @@ -1,31 +1,41 @@ """Stream Deck API Server.""" import re -import aiohttp +import io import asyncio import platform +import sqlite3 +import base64 +import aiohttp import human_readable_ids as hri from jsonpickle import encode from aiohttp import web from StreamDeck.DeviceManager import DeviceManager from StreamDeck.Devices.StreamDeck import StreamDeck +from StreamDeck.ImageHelpers import PILHelper +from svglib.svglib import svg2rlg +from PIL import Image from streamdeckapi.const import PLUGIN_ICON, PLUGIN_INFO, PLUGIN_PORT from streamdeckapi.types import SDApplication, SDButton, SDButtonPosition, SDDevice +# TODO: MDI Icons not showing +# TODO: Text too small, positioning off +# TODO: Websocket broadcast not working yet + DEFAULT_ICON = re.sub( "\r\n|\n|\r", "", """ - - - - - -Configure - -""", + + + + + + Configure + + """, ) @@ -39,81 +49,211 @@ application: SDApplication = SDApplication( } ) devices: list[SDDevice] = [] -buttons: dict[str, SDButton] = {} + +# +# Database +# + +database = sqlite3.connect("streamdeckapi.db") +table_cursor = database.cursor() +table_cursor.execute(""" + CREATE TABLE IF NOT EXISTS buttons( + key integer PRIMARY KEY, + uuid text NOT NULL, + device text, + x integer, + y integer, + svg text + )""") +table_cursor.close() -async def api_info_handler( - request: web.Request, -): # FIXME: unparseable json (x -> x_pos, y -> y_pos, platformVersion -> platform_version) +def save_button(key: int, button: SDButton): + """Save button to database.""" + cursor = database.cursor() + svg_bytes = button.svg.encode() + base64_bytes = base64.b64encode(svg_bytes) + base64_string = base64_bytes.decode() + + # Check if exists + result = cursor.execute(f"SELECT uuid FROM buttons WHERE key={key}") + matching_buttons = result.fetchall() + if len(matching_buttons) > 0: + # Perform update + cursor.execute( + f"UPDATE buttons SET svg=\"{base64_string}\" WHERE key={key}") + else: + # Create new row + cursor.execute( + f"INSERT INTO buttons VALUES ({key}, \"{button.uuid}\", \"{button.device}\", {button.position.x_pos}, {button.position.y_pos}, \"{base64_string}\")") + database.commit() + print(f"Saved button {button.uuid} with key {key} to database") + cursor.close() + + +def get_button(key: int) -> SDButton | None: + """Get a button from the database.""" + cursor = database.cursor() + result = cursor.execute( + f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE key={key}") + matching_buttons = result.fetchall() + if len(matching_buttons) == 0: + return None + row = matching_buttons[0] + base64_bytes = row[5].encode() + svg_bytes = base64.b64decode(base64_bytes) + svg_string = svg_bytes.decode() + button = SDButton({ + "uuid": row[1], + "device": row[2], + "position": {"x": row[3], "y": row[4]}, + "svg": svg_string, + }) + cursor.close() + return button + + +def get_button_by_uuid(uuid: str) -> SDButton | None: + """Get a button from the database.""" + cursor = database.cursor() + result = cursor.execute( + f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE uuid=\"{uuid}\"") + matching_buttons = result.fetchall() + if len(matching_buttons) == 0: + return None + row = matching_buttons[0] + base64_bytes = row[5].encode() + svg_bytes = base64.b64decode(base64_bytes) + svg_string = svg_bytes.decode() + button = SDButton({ + "uuid": row[1], + "device": row[2], + "position": {"x": row[3], "y": row[4]}, + "svg": svg_string, + }) + cursor.close() + return button + + +def get_button_key(uuid: str) -> int: + """Get a button key from the database.""" + cursor = database.cursor() + result = cursor.execute(f"SELECT key FROM buttons WHERE uuid=\"{uuid}\"") + matching_buttons = result.fetchall() + if len(matching_buttons) == 0: + return -1 + row = matching_buttons[0] + key = row[0] + cursor.close() + return key + + +def get_buttons() -> dict[str, SDButton]: + """Load all buttons from the database.""" + result: dict[str, SDButton] = {} + cursor = database.cursor() + for row in cursor.execute("SELECT key,uuid,device,x,y,svg FROM buttons"): + base64_bytes = row[5].encode() + svg_bytes = base64.b64decode(base64_bytes) + svg_string = svg_bytes.decode() + result[row[0]] = SDButton({ + "uuid": row[1], + "device": row[2], + "position": {"x": row[3], "y": row[4]}, + "svg": svg_string, + }) + cursor.close() + print(f"Loaded {len(result)} buttons from DB") + return result + + +# +# API +# + + +async def api_info_handler(_: web.Request): + """Handle info requests.""" json_data = encode( - {"devices": devices, "application": application, "buttons": buttons}, + {"devices": devices, "application": application, "buttons": get_buttons()}, unpicklable=False, ) if not isinstance(json_data, str): return web.Response(status=500, text="jsonpickle error") - json_data = json_data.replace('"x_pos"', '"x"').replace('"y_pos"', '"y"').replace( - '"platform_version"', '"platformVersion"' + json_data = ( + json_data.replace('"x_pos"', '"x"') + .replace('"y_pos"', '"y"') + .replace('"platform_version"', '"platformVersion"') ) return web.Response(text=json_data, content_type="application/json") async def api_icon_get_handler(request: web.Request): - btnId = request.match_info["btnId"] - for _, btn in buttons.items(): - if btn.uuid != btnId: - continue - return web.Response(text=btn.svg, content_type="image/svg+xml") - return web.Response(status=404, text="Button not found") + """Handle icon get requests.""" + uuid = request.match_info["uuid"] + button = get_button_by_uuid(uuid) + if button is None: + return web.Response(status=404, text="Button not found") + return web.Response(text=button.svg, content_type="image/svg+xml") async def api_icon_set_handler(request: web.Request): - btnId = request.match_info["btnId"] + """Handle icon set requests.""" + uuid = request.match_info["uuid"] if not request.has_body: return web.Response(status=422, text="No data in request") body = await request.text() - print(body) if not body.startswith("= 0: + set_icon(deck, button_key, svg) + button.svg = svg + save_button(button_key, button) + + +def set_icon(deck: StreamDeck, key: int, svg: str): + """Draw an icon to the button.""" + svg_io = io.StringIO(svg) + drawing = svg2rlg(svg_io) + png_string = drawing.asString("png") + png_bytes = io.BytesIO(png_string) + + icon = Image.open(png_bytes) + image = PILHelper.create_scaled_image(deck, icon, margins=[0, 0, 20, 0]) + + deck.set_key_image(key, PILHelper.to_native_format(deck, image)) + + def init_all(): """Init Stream Deck devices.""" - # TODO: Load buttons from storage and save asap - streamdecks: list[StreamDeck] = DeviceManager().enumerate() - print("Found {} Stream Deck(s).\n".format(len(streamdecks))) + print(f"Found {len(streamdecks)} Stream Deck(s).") for deck in streamdecks: if not deck.is_visual(): @@ -159,24 +327,30 @@ def init_all(): ) for key in range(deck.key_count()): - # FIXME: only add if not already in dict - position = get_position(deck, key) - buttons[key] = SDButton( - { - "uuid": hri.get_new_id().lower().replace(" ", "-"), - "device": serial, - "position": {"x": position.x_pos, "y": position.y_pos}, - "svg": DEFAULT_ICON, - } - ) + # Only add if not already in dict + button = get_button(key) + if button is None: + position = get_position(deck, key) + new_button = SDButton( + { + "uuid": hri.get_new_id().lower().replace(" ", "-"), + "device": serial, + "position": {"x": position.x_pos, "y": position.y_pos}, + "svg": DEFAULT_ICON, + } + ) + save_button(key, new_button) deck.reset() - # TODO: write svg to buttons + # Write svg to buttons + for key, button in get_buttons().items(): + set_icon(deck, key, button.svg) deck.set_key_callback(on_key_change) def start(): + """Entrypoint.""" init_all() loop = asyncio.get_event_loop()