diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..c969b0b --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,15 @@ +{ + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter" + }, + "python.formatting.provider": "none", + "python.testing.unittestArgs": [ + "-v", + "-s", + "./tests", + "-p", + "*test.py" + ], + "python.testing.pytestEnabled": false, + "python.testing.unittestEnabled": true +} \ No newline at end of file diff --git a/setup.py b/setup.py index aaee6d5..1d1f8b4 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,8 @@ -from setuptools import setup, find_packages -import codecs +"""Setup for pypi package""" + import os +import codecs +from setuptools import setup, find_packages here = os.path.abspath(os.path.dirname(__file__)) diff --git a/streamdeckapi/api.py b/streamdeckapi/api.py index 9165457..ee1d04a 100644 --- a/streamdeckapi/api.py +++ b/streamdeckapi/api.py @@ -1,7 +1,7 @@ """Stream Deck API.""" import asyncio -from collections.abc import Callable +from typing import Callable import json import logging @@ -22,13 +22,44 @@ class StreamDeckApi: def __init__( self, host: str, - on_button_press: Callable[[str], None] | None = None, - on_button_release: Callable[[str], None] | None = None, - on_status_update: Callable[[SDInfo], None] | None = None, - on_ws_message: Callable[[SDWebsocketMessage], None] | None = None, - on_ws_connect: Callable[[], None] | None = None, + on_button_press: any = None, + on_button_release: any = None, + on_status_update: any = None, + on_ws_message: any = None, + on_ws_connect: any = None, ) -> None: - """Init Stream Deck API object.""" + """Init Stream Deck API object. + + Args: + on_button_press (Callable[[str], None] or None): Callback if button pressed + on_button_release (Callable[[str], None] or None): Callback if button released + on_status_update (Callable[[SDInfo], None] or None): Callback if status update received + on_ws_message (Callable[[SDWebsocketMessage], None] or None): Callback if websocket message received + on_ws_connect (Callable[[], None] or None): Callback on websocket connected + """ + + # Type checks + if on_button_press is not None and not isinstance( + on_button_press, Callable[[str], None] + ): + raise TypeError() + if on_button_release is not None and not isinstance( + on_button_release, Callable[[str], None] + ): + raise TypeError() + if on_status_update is not None and not isinstance( + on_status_update, Callable[[SDInfo], None] + ): + raise TypeError() + if on_ws_message is not None and not isinstance( + on_ws_message, Callable[[SDWebsocketMessage], None] + ): + raise TypeError() + if on_ws_connect is not None and not isinstance( + on_ws_connect, Callable[[], None] + ): + raise TypeError() + self._host = host self._on_button_press = on_button_press self._on_button_release = on_button_release @@ -37,7 +68,7 @@ class StreamDeckApi: self._on_ws_connect = on_ws_connect self._loop = asyncio.get_event_loop() self._running = False - self._task: asyncio.Task | None = None + self._task: any = None # # Properties @@ -68,8 +99,13 @@ class StreamDeckApi: # @staticmethod - def _get_request(url: str) -> None | requests.Response: - """Handle GET requests.""" + def _get_request(url: str) -> any: + """Handle GET requests. + + Returns: + requests.Response or None + """ + try: res = requests.get(url, timeout=5) except requests.RequestException: @@ -85,13 +121,17 @@ class StreamDeckApi: return res @staticmethod - def _post_request(url: str, data: str, headers) -> None | requests.Response: - """Handle POST requests.""" + def _post_request(url: str, data: str, headers) -> any: + """Handle POST requests. + + Returns: + requests.Response or None + """ + try: res = requests.post(url, data, headers=headers, timeout=5) except requests.RequestException: - _LOGGER.debug( - "Error sending data to Stream Deck Plugin (exception)") + _LOGGER.debug("Error sending data to Stream Deck Plugin (exception)") return None if res.status_code != 200: _LOGGER.debug( @@ -101,9 +141,14 @@ class StreamDeckApi: return None return res - async def get_info(self, in_executor: bool = True) -> None | SDInfo: - """Get info about Stream Deck.""" - res: requests.Response | None = None + async def get_info(self, in_executor: bool = True) -> any: + """Get info about Stream Deck. + + Returns: + SDInfo or None + """ + + res: any = None if in_executor: res = await self._loop.run_in_executor( None, self._get_request, self._info_url @@ -120,13 +165,17 @@ class StreamDeckApi: try: info = SDInfo(rjson) except KeyError: - _LOGGER.debug( - "Error parsing response from %s to SDInfo", self._info_url) + _LOGGER.debug("Error parsing response from %s to SDInfo", self._info_url) return None return info - async def get_icon(self, btn: str) -> None | str: - """Get svg icon from Stream Deck button.""" + async def get_icon(self, btn: str) -> any: + """Get svg icon from Stream Deck button. + + Returns: + str or None + """ + url = f"{self._icon_url}{btn}" res = await self._loop.run_in_executor(None, self._get_request, url) if res is None or res.status_code != 200: @@ -152,8 +201,14 @@ class StreamDeckApi: # Websocket Methods # - def _on_button_change(self, uuid: str | dict, state: bool): - """Handle button down event.""" + def _on_button_change(self, uuid: any, state: bool): + """Handle button down event. + + Args: + uuid (str or dict): UUID of the button + state (bool): State of the button + """ + if not isinstance(uuid, str): _LOGGER.debug("Method _on_button_change: uuid is not str") return @@ -162,8 +217,13 @@ class StreamDeckApi: elif state is False and self._on_button_release is not None: self._on_button_release(uuid) - def _on_ws_status_update(self, info: SDInfo | str | dict): - """Handle Stream Deck status update event.""" + def _on_ws_status_update(self, info: any): + """Handle Stream Deck status update event. + + Args: + info (SDInfo or str or dict): Stream Deck Info + """ + if not isinstance(info, SDInfo): _LOGGER.debug("Method _on_ws_status_update: info is not SDInfo") return @@ -180,8 +240,7 @@ class StreamDeckApi: try: datajson = json.loads(msg) except json.JSONDecodeError: - _LOGGER.debug( - "Method _on_message: Websocket message couldn't get parsed") + _LOGGER.debug("Method _on_message: Websocket message couldn't get parsed") return try: data = SDWebsocketMessage(datajson) @@ -226,8 +285,7 @@ class StreamDeckApi: ) self._on_message(data) await websocket.close() - _LOGGER.debug( - "Method _websocket_loop: Websocket closed") + _LOGGER.debug("Method _websocket_loop: Websocket closed") except WebSocketException: _LOGGER.debug( "Method _websocket_loop: Websocket client crashed. Restarting it" diff --git a/streamdeckapi/server.py b/streamdeckapi/server.py index 570e353..877519d 100644 --- a/streamdeckapi/server.py +++ b/streamdeckapi/server.py @@ -7,9 +7,9 @@ import platform import sqlite3 import base64 import socket -from concurrent.futures import ProcessPoolExecutor from uuid import uuid4 from datetime import datetime +from typing import List, Dict import aiohttp import human_readable_ids as hri from jsonpickle import encode @@ -28,7 +28,7 @@ from streamdeckapi.const import ( PLUGIN_ICON, PLUGIN_INFO, PLUGIN_PORT, - SD_SSDP + SD_SSDP, ) from streamdeckapi.types import SDApplication, SDButton, SDButtonPosition, SDDevice @@ -57,10 +57,10 @@ application: SDApplication = SDApplication( "version": "0.0.1", } ) -devices: list[SDDevice] = [] -websocket_connections: list[web.WebSocketResponse] = [] +devices: List[SDDevice] = [] +websocket_connections: List[web.WebSocketResponse] = [] -streamdecks: list[StreamDeck] = DeviceManager().enumerate() +streamdecks: List[StreamDeck] = DeviceManager().enumerate() # # Database @@ -68,7 +68,8 @@ streamdecks: list[StreamDeck] = DeviceManager().enumerate() database_first = sqlite3.connect(DB_FILE) table_cursor = database_first.cursor() -table_cursor.execute(""" +table_cursor.execute( + """ CREATE TABLE IF NOT EXISTS buttons( key integer PRIMARY KEY, uuid text NOT NULL, @@ -76,13 +77,16 @@ table_cursor.execute(""" x integer, y integer, svg text - );""") -table_cursor.execute(""" + );""" +) +table_cursor.execute( + """ CREATE TABLE IF NOT EXISTS button_states( key integer PRIMARY KEY, state integer, state_update text - );""") + );""" +) table_cursor.execute("DELETE FROM button_states;") database_first.commit() table_cursor.close() @@ -102,12 +106,12 @@ def save_button(key: int, button: SDButton): matching_buttons = result.fetchall() if len(matching_buttons) > 0: # Perform update - cursor.execute( - f"UPDATE buttons SET svg=\"{base64_string}\" WHERE key={key}") + cursor.execute(f'UPDATE buttons SET svg="{base64_string}" WHERE key={key}') else: # Create new row cursor.execute( - f"INSERT INTO buttons VALUES ({key}, \"{button.uuid}\", \"{button.device}\", {button.position.x_pos}, {button.position.y_pos}, \"{base64_string}\")") + f'INSERT INTO buttons VALUES ({key}, "{button.uuid}", "{button.device}", {button.position.x_pos}, {button.position.y_pos}, "{base64_string}")' + ) database.commit() print(f"Saved button {button.uuid} with key {key} to database") cursor.close() @@ -119,7 +123,8 @@ def get_button(key: int) -> any: database = sqlite3.connect(DB_FILE) cursor = database.cursor() result = cursor.execute( - f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE key={key}") + f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE key={key}" + ) matching_buttons = result.fetchall() if len(matching_buttons) == 0: return None @@ -127,12 +132,14 @@ def get_button(key: int) -> any: base64_bytes = row[5].encode() svg_bytes = base64.b64decode(base64_bytes) svg_string = svg_bytes.decode() - button = SDButton({ - "uuid": row[1], - "device": row[2], - "position": {"x": row[3], "y": row[4]}, - "svg": svg_string, - }) + button = SDButton( + { + "uuid": row[1], + "device": row[2], + "position": {"x": row[3], "y": row[4]}, + "svg": svg_string, + } + ) cursor.close() database.close() return button @@ -143,7 +150,8 @@ def get_button_by_uuid(uuid: str) -> any: database = sqlite3.connect(DB_FILE) cursor = database.cursor() result = cursor.execute( - f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE uuid=\"{uuid}\"") + f'SELECT key,uuid,device,x,y,svg FROM buttons WHERE uuid="{uuid}"' + ) matching_buttons = result.fetchall() if len(matching_buttons) == 0: return None @@ -151,12 +159,14 @@ def get_button_by_uuid(uuid: str) -> any: base64_bytes = row[5].encode() svg_bytes = base64.b64decode(base64_bytes) svg_string = svg_bytes.decode() - button = SDButton({ - "uuid": row[1], - "device": row[2], - "position": {"x": row[3], "y": row[4]}, - "svg": svg_string, - }) + button = SDButton( + { + "uuid": row[1], + "device": row[2], + "position": {"x": row[3], "y": row[4]}, + "svg": svg_string, + } + ) cursor.close() database.close() return button @@ -166,7 +176,7 @@ def get_button_key(uuid: str) -> int: """Get a button key from the database.""" database = sqlite3.connect(DB_FILE) cursor = database.cursor() - result = cursor.execute(f"SELECT key FROM buttons WHERE uuid=\"{uuid}\"") + result = cursor.execute(f'SELECT key FROM buttons WHERE uuid="{uuid}"') matching_buttons = result.fetchall() if len(matching_buttons) == 0: return -1 @@ -177,21 +187,23 @@ def get_button_key(uuid: str) -> int: return key -def get_buttons() -> dict[str, SDButton]: +def get_buttons() -> Dict[str, SDButton]: """Load all buttons from the database.""" - result: dict[str, SDButton] = {} + result: Dict[str, SDButton] = {} database = sqlite3.connect(DB_FILE) cursor = database.cursor() for row in cursor.execute("SELECT key,uuid,device,x,y,svg FROM buttons"): base64_bytes = row[5].encode() svg_bytes = base64.b64decode(base64_bytes) svg_string = svg_bytes.decode() - result[row[0]] = SDButton({ - "uuid": row[1], - "device": row[2], - "position": {"x": row[3], "y": row[4]}, - "svg": svg_string, - }) + result[row[0]] = SDButton( + { + "uuid": row[1], + "device": row[2], + "position": {"x": row[3], "y": row[4]}, + "svg": svg_string, + } + ) cursor.close() database.close() print(f"Loaded {len(result)} buttons from DB") @@ -213,11 +225,13 @@ def write_button_state(key: int, state: bool, update: str): if len(matching_states) > 0: # Perform update cursor.execute( - f"UPDATE button_states SET state={state_int}, state_update=\"{update}\" WHERE key={key}") + f'UPDATE button_states SET state={state_int}, state_update="{update}" WHERE key={key}' + ) else: # Create new row cursor.execute( - f"INSERT INTO button_states VALUES ({key}, {state_int}, \"{update}\")") + f'INSERT INTO button_states VALUES ({key}, {state_int}, "{update}")' + ) database.commit() print(f"Saved button_state with key {key} to database") cursor.close() @@ -230,7 +244,8 @@ def get_button_state(key: int) -> any: database = sqlite3.connect(DB_FILE) cursor = database.cursor() result = cursor.execute( - f"SELECT key,state,state_update FROM button_states WHERE key={key}") + f"SELECT key,state,state_update FROM button_states WHERE key={key}" + ) matching_states = result.fetchall() if len(matching_states) == 0: return None @@ -310,7 +325,8 @@ async def websocket_handler(request: web.Request): await web_socket.close() elif msg.type == aiohttp.WSMsgType.ERROR: print( - f"Websocket connection closed with exception {web_socket.exception()}") + f"Websocket connection closed with exception {web_socket.exception()}" + ) websocket_connections.remove(web_socket) return web_socket @@ -332,8 +348,8 @@ async def broadcast_status(): "args": { "devices": devices, "application": application, - "buttons": get_buttons() - } + "buttons": get_buttons(), + }, } data_str = encode(data, unpicklable=False) @@ -412,14 +428,12 @@ async def on_key_change(_: StreamDeck, key: int, state: bool): return if state is True: - await websocket_broadcast(encode( - {"event": "keyDown", "args": button.uuid})) + await websocket_broadcast(encode({"event": "keyDown", "args": button.uuid})) print("Waiting for button release") # Start timer Timer(LONG_PRESS_SECONDS, lambda: long_press_callback(key), False) else: - await websocket_broadcast(encode( - {"event": "keyUp", "args": button.uuid})) + await websocket_broadcast(encode({"event": "keyUp", "args": button.uuid})) now = datetime.now() @@ -519,10 +533,10 @@ def get_local_ip(): """Get local ip address.""" connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: - connection.connect(('192.255.255.255', 1)) + connection.connect(("192.255.255.255", 1)) address = connection.getsockname()[0] except socket.error: - address = '127.0.0.1' + address = "127.0.0.1" finally: connection.close() return address @@ -533,8 +547,12 @@ class StreamDeckApiSsdpProtocol(ssdp.SimpleServiceDiscoveryProtocol): def response_received(self, response: ssdp.SSDPResponse, addr: tuple): """Handle an incoming response.""" - print("received response: %s %s %s", response.status_code, - response.reason, response.version) + print( + "received response: %s %s %s", + response.status_code, + response.reason, + response.version, + ) for header in response.headers: print("header: %s", header) @@ -544,9 +562,7 @@ class StreamDeckApiSsdpProtocol(ssdp.SimpleServiceDiscoveryProtocol): def request_received(self, request: ssdp.SSDPRequest, addr: tuple): """Handle an incoming request and respond to it.""" print( - "received request: %s %s %s", - request.method, request.uri, request.version - + "received request: %s %s %s", request.method, request.uri, request.version ) for header in request.headers: @@ -607,7 +623,6 @@ def start(): """Entrypoint.""" init_all() - executor = ProcessPoolExecutor(2) loop = asyncio.get_event_loop() # SSDP server @@ -619,7 +634,7 @@ def start(): family=socket.AF_INET, local_addr=(StreamDeckApiSsdpProtocol.MULTICAST_ADDRESS, 1900), ) - transport, protocol = loop.run_until_complete(connect) + transport, _ = loop.run_until_complete(connect) StreamDeckApiSsdpProtocol.transport = transport diff --git a/streamdeckapi/types.py b/streamdeckapi/types.py index b361ddc..9838e84 100644 --- a/streamdeckapi/types.py +++ b/streamdeckapi/types.py @@ -1,4 +1,5 @@ """Stream Deck API types.""" +from typing import List, Dict class SDApplication: @@ -82,8 +83,8 @@ class SDInfo(dict): def __init__(self, obj: dict) -> None: """Init Stream Deck Info object.""" - self.devices: list[SDDevice] = [] - self.buttons: dict[str, SDButton] = {} + self.devices: List[SDDevice] = [] + self.buttons: Dict[str, SDButton] = {} dict.__init__(self, obj) self.application = SDApplication(obj["application"]) diff --git a/tests/api_test.py b/tests/api_test.py new file mode 100644 index 0000000..aee452b --- /dev/null +++ b/tests/api_test.py @@ -0,0 +1,35 @@ +"""Unittests for API client.""" + +import unittest +import streamdeckapi + + +class TestApi(unittest.TestCase): + """Api Test Case.""" + + def test_constructor(self): + """Constructor test.""" + host = "host.local" + + # Test valid types + api = streamdeckapi.StreamDeckApi(host) + self.assertEqual(api.host, host) + + streamdeckapi.StreamDeckApi(host, on_button_press=None) + streamdeckapi.StreamDeckApi(host, on_button_release=None) + streamdeckapi.StreamDeckApi(host, on_status_update=None) + streamdeckapi.StreamDeckApi(host, on_ws_connect=None) + streamdeckapi.StreamDeckApi(host, on_ws_message=None) + + # Test some invalid types + for i_type in ["string", 2345, [321, 6457], {"key": "value"}]: + with self.assertRaises(TypeError): + _ = streamdeckapi.StreamDeckApi(host, on_button_press=i_type) + with self.assertRaises(TypeError): + _ = streamdeckapi.StreamDeckApi(host, on_button_release=i_type) + with self.assertRaises(TypeError): + _ = streamdeckapi.StreamDeckApi(host, on_status_update=i_type) + with self.assertRaises(TypeError): + _ = streamdeckapi.StreamDeckApi(host, on_ws_connect=i_type) + with self.assertRaises(TypeError): + _ = streamdeckapi.StreamDeckApi(host, on_ws_message=i_type)