added sqlite database to save button configs

This commit is contained in:
Patrick762
2023-05-20 14:28:49 +02:00
parent f808deec3d
commit 85272e6c8e
4 changed files with 235 additions and 55 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
**/__pycache__/
build
*.egg-info
*.db
*.db-journal

View File

@@ -10,6 +10,9 @@ Only compatible with separate [Stream Deck Plugin](https://github.com/Patrick762
## Server
This library also contains a server to use the streamdeck with linux or without the official Stream Deck Software.
This library also contains a server to use the streamdeck with Linux or without the official Stream Deck Software.
For this to work, the LibUSB HIDAPI is required. [Installation instructions](https://python-elgato-streamdeck.readthedocs.io/en/stable/pages/backend_libusb_hidapi.html)
For this to work, the following software is required:
- LibUSB HIDAPI [Installation instructions](https://python-elgato-streamdeck.readthedocs.io/en/stable/pages/backend_libusb_hidapi.html)
- cairo [Installation instructions for Windows](https://stackoverflow.com/a/73913080)

View File

@@ -30,6 +30,7 @@ setup(
"jsonpickle==3.0.1",
"streamdeck==0.9.3",
"pillow>=9.4.0,<10.0.0",
"svglib==1.5.1",
],
keywords=[],
entry_points={

View File

@@ -1,31 +1,41 @@
"""Stream Deck API Server."""
import re
import aiohttp
import io
import asyncio
import platform
import sqlite3
import base64
import aiohttp
import human_readable_ids as hri
from jsonpickle import encode
from aiohttp import web
from StreamDeck.DeviceManager import DeviceManager
from StreamDeck.Devices.StreamDeck import StreamDeck
from StreamDeck.ImageHelpers import PILHelper
from svglib.svglib import svg2rlg
from PIL import Image
from streamdeckapi.const import PLUGIN_ICON, PLUGIN_INFO, PLUGIN_PORT
from streamdeckapi.types import SDApplication, SDButton, SDButtonPosition, SDDevice
# TODO: MDI Icons not showing
# TODO: Text too small, positioning off
# TODO: Websocket broadcast not working yet
DEFAULT_ICON = re.sub(
"\r\n|\n|\r",
"",
"""
<svg xmlns="http://www.w3.org/2000/svg" height="144" width="144">
<rect width="144" height="144" fill="black" />
<circle cx="32" cy="72" r="10" fill="white" />
<circle cx="72" cy="72" r="10" fill="white" />
<circle cx="112" cy="72" r="10" fill="white" />
<text x="10" y="120" font-size="28px" fill="white">Configure</text>
</svg>
""",
<svg xmlns="http://www.w3.org/2000/svg" height="144" width="144">
<rect width="144" height="144" fill="black" />
<circle cx="32" cy="72" r="10" fill="white" />
<circle cx="72" cy="72" r="10" fill="white" />
<circle cx="112" cy="72" r="10" fill="white" />
<text x="10" y="120" font-size="28px" fill="white">Configure</text>
</svg>
""",
)
@@ -39,81 +49,211 @@ application: SDApplication = SDApplication(
}
)
devices: list[SDDevice] = []
buttons: dict[str, SDButton] = {}
#
# Database
#
database = sqlite3.connect("streamdeckapi.db")
table_cursor = database.cursor()
table_cursor.execute("""
CREATE TABLE IF NOT EXISTS buttons(
key integer PRIMARY KEY,
uuid text NOT NULL,
device text,
x integer,
y integer,
svg text
)""")
table_cursor.close()
async def api_info_handler(
request: web.Request,
): # FIXME: unparseable json (x -> x_pos, y -> y_pos, platformVersion -> platform_version)
def save_button(key: int, button: SDButton):
"""Save button to database."""
cursor = database.cursor()
svg_bytes = button.svg.encode()
base64_bytes = base64.b64encode(svg_bytes)
base64_string = base64_bytes.decode()
# Check if exists
result = cursor.execute(f"SELECT uuid FROM buttons WHERE key={key}")
matching_buttons = result.fetchall()
if len(matching_buttons) > 0:
# Perform update
cursor.execute(
f"UPDATE buttons SET svg=\"{base64_string}\" WHERE key={key}")
else:
# Create new row
cursor.execute(
f"INSERT INTO buttons VALUES ({key}, \"{button.uuid}\", \"{button.device}\", {button.position.x_pos}, {button.position.y_pos}, \"{base64_string}\")")
database.commit()
print(f"Saved button {button.uuid} with key {key} to database")
cursor.close()
def get_button(key: int) -> SDButton | None:
"""Get a button from the database."""
cursor = database.cursor()
result = cursor.execute(
f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE key={key}")
matching_buttons = result.fetchall()
if len(matching_buttons) == 0:
return None
row = matching_buttons[0]
base64_bytes = row[5].encode()
svg_bytes = base64.b64decode(base64_bytes)
svg_string = svg_bytes.decode()
button = SDButton({
"uuid": row[1],
"device": row[2],
"position": {"x": row[3], "y": row[4]},
"svg": svg_string,
})
cursor.close()
return button
def get_button_by_uuid(uuid: str) -> SDButton | None:
"""Get a button from the database."""
cursor = database.cursor()
result = cursor.execute(
f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE uuid=\"{uuid}\"")
matching_buttons = result.fetchall()
if len(matching_buttons) == 0:
return None
row = matching_buttons[0]
base64_bytes = row[5].encode()
svg_bytes = base64.b64decode(base64_bytes)
svg_string = svg_bytes.decode()
button = SDButton({
"uuid": row[1],
"device": row[2],
"position": {"x": row[3], "y": row[4]},
"svg": svg_string,
})
cursor.close()
return button
def get_button_key(uuid: str) -> int:
"""Get a button key from the database."""
cursor = database.cursor()
result = cursor.execute(f"SELECT key FROM buttons WHERE uuid=\"{uuid}\"")
matching_buttons = result.fetchall()
if len(matching_buttons) == 0:
return -1
row = matching_buttons[0]
key = row[0]
cursor.close()
return key
def get_buttons() -> dict[str, SDButton]:
"""Load all buttons from the database."""
result: dict[str, SDButton] = {}
cursor = database.cursor()
for row in cursor.execute("SELECT key,uuid,device,x,y,svg FROM buttons"):
base64_bytes = row[5].encode()
svg_bytes = base64.b64decode(base64_bytes)
svg_string = svg_bytes.decode()
result[row[0]] = SDButton({
"uuid": row[1],
"device": row[2],
"position": {"x": row[3], "y": row[4]},
"svg": svg_string,
})
cursor.close()
print(f"Loaded {len(result)} buttons from DB")
return result
#
# API
#
async def api_info_handler(_: web.Request):
"""Handle info requests."""
json_data = encode(
{"devices": devices, "application": application, "buttons": buttons},
{"devices": devices, "application": application, "buttons": get_buttons()},
unpicklable=False,
)
if not isinstance(json_data, str):
return web.Response(status=500, text="jsonpickle error")
json_data = json_data.replace('"x_pos"', '"x"').replace('"y_pos"', '"y"').replace(
'"platform_version"', '"platformVersion"'
json_data = (
json_data.replace('"x_pos"', '"x"')
.replace('"y_pos"', '"y"')
.replace('"platform_version"', '"platformVersion"')
)
return web.Response(text=json_data, content_type="application/json")
async def api_icon_get_handler(request: web.Request):
btnId = request.match_info["btnId"]
for _, btn in buttons.items():
if btn.uuid != btnId:
continue
return web.Response(text=btn.svg, content_type="image/svg+xml")
return web.Response(status=404, text="Button not found")
"""Handle icon get requests."""
uuid = request.match_info["uuid"]
button = get_button_by_uuid(uuid)
if button is None:
return web.Response(status=404, text="Button not found")
return web.Response(text=button.svg, content_type="image/svg+xml")
async def api_icon_set_handler(request: web.Request):
btnId = request.match_info["btnId"]
"""Handle icon set requests."""
uuid = request.match_info["uuid"]
if not request.has_body:
return web.Response(status=422, text="No data in request")
body = await request.text()
print(body)
if not body.startswith("<svg"):
return web.Response(status=422, text="Only svgs are supported")
btnKey = None
for key, btn in buttons.items():
if btn.uuid == btnId:
btnKey = key
if btnKey is None:
button = get_button_by_uuid(uuid)
if button is None:
return web.Response(status=404, text="Button not found")
buttons[btnKey].svg = body
# Update icon
update_button_icon(uuid, body)
print("Icon for button", uuid, "changed")
return web.Response(text="Icon changed")
async def websocket_handler(request: web.Request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
"""Handle websocket."""
web_socket = web.WebSocketResponse()
await web_socket.prepare(request)
async for msg in web_socket:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
await web_socket.close()
else:
await ws.send_str("some websocket message payload")
await web_socket.send_str("some websocket message payload")
elif msg.type == aiohttp.WSMsgType.ERROR:
print("ws connection closed with exception %s" % ws.exception())
return ws
print(
f"Websocket connection closed with exception {web_socket.exception()}")
return web_socket
#
# Functions
#
def create_runner():
"""Create background runner"""
app = web.Application()
app.add_routes(
[
web.get("/", websocket_handler),
web.get(PLUGIN_INFO, api_info_handler),
web.get(PLUGIN_ICON + "/{btnId}", api_icon_get_handler),
web.post(PLUGIN_ICON + "/{btnId}", api_icon_set_handler),
web.get(PLUGIN_ICON + "/{uuid}", api_icon_get_handler),
web.post(PLUGIN_ICON + "/{uuid}", api_icon_set_handler),
]
)
return web.AppRunner(app)
async def start_server(host="0.0.0.0", port=PLUGIN_PORT):
"""Start API server."""
runner = create_runner()
await runner.setup()
site = web.TCPSite(runner, host, port)
@@ -132,12 +272,40 @@ def on_key_change(deck: StreamDeck, key: int, state: bool):
print(f"Key at {position.x_pos}|{position.y_pos} is state {state}")
def update_button_icon(uuid: str, svg: str):
"""Update a button icon."""
streamdecks: list[StreamDeck] = DeviceManager().enumerate()
for deck in streamdecks:
if not deck.is_visual():
continue
deck.open()
button = get_button_by_uuid(uuid)
button_key = get_button_key(uuid)
if button is not None and button_key >= 0:
set_icon(deck, button_key, svg)
button.svg = svg
save_button(button_key, button)
def set_icon(deck: StreamDeck, key: int, svg: str):
"""Draw an icon to the button."""
svg_io = io.StringIO(svg)
drawing = svg2rlg(svg_io)
png_string = drawing.asString("png")
png_bytes = io.BytesIO(png_string)
icon = Image.open(png_bytes)
image = PILHelper.create_scaled_image(deck, icon, margins=[0, 0, 20, 0])
deck.set_key_image(key, PILHelper.to_native_format(deck, image))
def init_all():
"""Init Stream Deck devices."""
# TODO: Load buttons from storage and save asap
streamdecks: list[StreamDeck] = DeviceManager().enumerate()
print("Found {} Stream Deck(s).\n".format(len(streamdecks)))
print(f"Found {len(streamdecks)} Stream Deck(s).")
for deck in streamdecks:
if not deck.is_visual():
@@ -159,24 +327,30 @@ def init_all():
)
for key in range(deck.key_count()):
# FIXME: only add if not already in dict
position = get_position(deck, key)
buttons[key] = SDButton(
{
"uuid": hri.get_new_id().lower().replace(" ", "-"),
"device": serial,
"position": {"x": position.x_pos, "y": position.y_pos},
"svg": DEFAULT_ICON,
}
)
# Only add if not already in dict
button = get_button(key)
if button is None:
position = get_position(deck, key)
new_button = SDButton(
{
"uuid": hri.get_new_id().lower().replace(" ", "-"),
"device": serial,
"position": {"x": position.x_pos, "y": position.y_pos},
"svg": DEFAULT_ICON,
}
)
save_button(key, new_button)
deck.reset()
# TODO: write svg to buttons
# Write svg to buttons
for key, button in get_buttons().items():
set_icon(deck, key, button.svg)
deck.set_key_callback(on_key_change)
def start():
"""Entrypoint."""
init_all()
loop = asyncio.get_event_loop()