diff --git a/.gitignore b/.gitignore
index 4f60f87..7aa4d20 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,5 @@
**/__pycache__/
build
*.egg-info
+*.db
+*.db-journal
diff --git a/README.md b/README.md
index c051468..a125ff5 100644
--- a/README.md
+++ b/README.md
@@ -10,6 +10,9 @@ Only compatible with separate [Stream Deck Plugin](https://github.com/Patrick762
## Server
-This library also contains a server to use the streamdeck with linux or without the official Stream Deck Software.
+This library also contains a server to use the streamdeck with Linux or without the official Stream Deck Software.
-For this to work, the LibUSB HIDAPI is required. [Installation instructions](https://python-elgato-streamdeck.readthedocs.io/en/stable/pages/backend_libusb_hidapi.html)
+For this to work, the following software is required:
+
+- LibUSB HIDAPI [Installation instructions](https://python-elgato-streamdeck.readthedocs.io/en/stable/pages/backend_libusb_hidapi.html)
+- cairo [Installation instructions for Windows](https://stackoverflow.com/a/73913080)
diff --git a/setup.py b/setup.py
index 707ba3e..7affb80 100644
--- a/setup.py
+++ b/setup.py
@@ -30,6 +30,7 @@ setup(
"jsonpickle==3.0.1",
"streamdeck==0.9.3",
"pillow>=9.4.0,<10.0.0",
+ "svglib==1.5.1",
],
keywords=[],
entry_points={
diff --git a/streamdeckapi/server.py b/streamdeckapi/server.py
index b32907c..6a9791b 100644
--- a/streamdeckapi/server.py
+++ b/streamdeckapi/server.py
@@ -1,31 +1,41 @@
"""Stream Deck API Server."""
import re
-import aiohttp
+import io
import asyncio
import platform
+import sqlite3
+import base64
+import aiohttp
import human_readable_ids as hri
from jsonpickle import encode
from aiohttp import web
from StreamDeck.DeviceManager import DeviceManager
from StreamDeck.Devices.StreamDeck import StreamDeck
+from StreamDeck.ImageHelpers import PILHelper
+from svglib.svglib import svg2rlg
+from PIL import Image
from streamdeckapi.const import PLUGIN_ICON, PLUGIN_INFO, PLUGIN_PORT
from streamdeckapi.types import SDApplication, SDButton, SDButtonPosition, SDDevice
+# TODO: MDI Icons not showing
+# TODO: Text too small, positioning off
+# TODO: Websocket broadcast not working yet
+
DEFAULT_ICON = re.sub(
"\r\n|\n|\r",
"",
"""
-
-""",
+
+ """,
)
@@ -39,81 +49,211 @@ application: SDApplication = SDApplication(
}
)
devices: list[SDDevice] = []
-buttons: dict[str, SDButton] = {}
+
+#
+# Database
+#
+
+database = sqlite3.connect("streamdeckapi.db")
+table_cursor = database.cursor()
+table_cursor.execute("""
+ CREATE TABLE IF NOT EXISTS buttons(
+ key integer PRIMARY KEY,
+ uuid text NOT NULL,
+ device text,
+ x integer,
+ y integer,
+ svg text
+ )""")
+table_cursor.close()
-async def api_info_handler(
- request: web.Request,
-): # FIXME: unparseable json (x -> x_pos, y -> y_pos, platformVersion -> platform_version)
+def save_button(key: int, button: SDButton):
+ """Save button to database."""
+ cursor = database.cursor()
+ svg_bytes = button.svg.encode()
+ base64_bytes = base64.b64encode(svg_bytes)
+ base64_string = base64_bytes.decode()
+
+ # Check if exists
+ result = cursor.execute(f"SELECT uuid FROM buttons WHERE key={key}")
+ matching_buttons = result.fetchall()
+ if len(matching_buttons) > 0:
+ # Perform update
+ cursor.execute(
+ f"UPDATE buttons SET svg=\"{base64_string}\" WHERE key={key}")
+ else:
+ # Create new row
+ cursor.execute(
+ f"INSERT INTO buttons VALUES ({key}, \"{button.uuid}\", \"{button.device}\", {button.position.x_pos}, {button.position.y_pos}, \"{base64_string}\")")
+ database.commit()
+ print(f"Saved button {button.uuid} with key {key} to database")
+ cursor.close()
+
+
+def get_button(key: int) -> SDButton | None:
+ """Get a button from the database."""
+ cursor = database.cursor()
+ result = cursor.execute(
+ f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE key={key}")
+ matching_buttons = result.fetchall()
+ if len(matching_buttons) == 0:
+ return None
+ row = matching_buttons[0]
+ base64_bytes = row[5].encode()
+ svg_bytes = base64.b64decode(base64_bytes)
+ svg_string = svg_bytes.decode()
+ button = SDButton({
+ "uuid": row[1],
+ "device": row[2],
+ "position": {"x": row[3], "y": row[4]},
+ "svg": svg_string,
+ })
+ cursor.close()
+ return button
+
+
+def get_button_by_uuid(uuid: str) -> SDButton | None:
+ """Get a button from the database."""
+ cursor = database.cursor()
+ result = cursor.execute(
+ f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE uuid=\"{uuid}\"")
+ matching_buttons = result.fetchall()
+ if len(matching_buttons) == 0:
+ return None
+ row = matching_buttons[0]
+ base64_bytes = row[5].encode()
+ svg_bytes = base64.b64decode(base64_bytes)
+ svg_string = svg_bytes.decode()
+ button = SDButton({
+ "uuid": row[1],
+ "device": row[2],
+ "position": {"x": row[3], "y": row[4]},
+ "svg": svg_string,
+ })
+ cursor.close()
+ return button
+
+
+def get_button_key(uuid: str) -> int:
+ """Get a button key from the database."""
+ cursor = database.cursor()
+ result = cursor.execute(f"SELECT key FROM buttons WHERE uuid=\"{uuid}\"")
+ matching_buttons = result.fetchall()
+ if len(matching_buttons) == 0:
+ return -1
+ row = matching_buttons[0]
+ key = row[0]
+ cursor.close()
+ return key
+
+
+def get_buttons() -> dict[str, SDButton]:
+ """Load all buttons from the database."""
+ result: dict[str, SDButton] = {}
+ cursor = database.cursor()
+ for row in cursor.execute("SELECT key,uuid,device,x,y,svg FROM buttons"):
+ base64_bytes = row[5].encode()
+ svg_bytes = base64.b64decode(base64_bytes)
+ svg_string = svg_bytes.decode()
+ result[row[0]] = SDButton({
+ "uuid": row[1],
+ "device": row[2],
+ "position": {"x": row[3], "y": row[4]},
+ "svg": svg_string,
+ })
+ cursor.close()
+ print(f"Loaded {len(result)} buttons from DB")
+ return result
+
+
+#
+# API
+#
+
+
+async def api_info_handler(_: web.Request):
+ """Handle info requests."""
json_data = encode(
- {"devices": devices, "application": application, "buttons": buttons},
+ {"devices": devices, "application": application, "buttons": get_buttons()},
unpicklable=False,
)
if not isinstance(json_data, str):
return web.Response(status=500, text="jsonpickle error")
- json_data = json_data.replace('"x_pos"', '"x"').replace('"y_pos"', '"y"').replace(
- '"platform_version"', '"platformVersion"'
+ json_data = (
+ json_data.replace('"x_pos"', '"x"')
+ .replace('"y_pos"', '"y"')
+ .replace('"platform_version"', '"platformVersion"')
)
return web.Response(text=json_data, content_type="application/json")
async def api_icon_get_handler(request: web.Request):
- btnId = request.match_info["btnId"]
- for _, btn in buttons.items():
- if btn.uuid != btnId:
- continue
- return web.Response(text=btn.svg, content_type="image/svg+xml")
- return web.Response(status=404, text="Button not found")
+ """Handle icon get requests."""
+ uuid = request.match_info["uuid"]
+ button = get_button_by_uuid(uuid)
+ if button is None:
+ return web.Response(status=404, text="Button not found")
+ return web.Response(text=button.svg, content_type="image/svg+xml")
async def api_icon_set_handler(request: web.Request):
- btnId = request.match_info["btnId"]
+ """Handle icon set requests."""
+ uuid = request.match_info["uuid"]
if not request.has_body:
return web.Response(status=422, text="No data in request")
body = await request.text()
- print(body)
if not body.startswith("