Merge pull request #4 from Patrick762/python3-8-compatibility

Compatibility for Python 3.8
This commit is contained in:
Patrick762
2023-06-15 18:28:45 +02:00
committed by GitHub
8 changed files with 238 additions and 164 deletions

15
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,15 @@
{
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
},
"python.formatting.provider": "none",
"python.testing.unittestArgs": [
"-v",
"-s",
"./tests",
"-p",
"*test.py"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true
}

View File

@@ -13,10 +13,17 @@ For this to work, the following software is required:
- LibUSB HIDAPI [Installation instructions](https://python-elgato-streamdeck.readthedocs.io/en/stable/pages/backend_libusb_hidapi.html) or [Installation instructions](https://github.com/jamesridgway/devdeck/wiki/Installation) - LibUSB HIDAPI [Installation instructions](https://python-elgato-streamdeck.readthedocs.io/en/stable/pages/backend_libusb_hidapi.html) or [Installation instructions](https://github.com/jamesridgway/devdeck/wiki/Installation)
- cairo [Installation instructions for Windows](https://stackoverflow.com/a/73913080) - cairo [Installation instructions for Windows](https://stackoverflow.com/a/73913080)
Cairo Installation for Windows:
```bash
pip install pipwin
pipwin install cairocffi
```
The event `doubleTap` is not working with this server software. The event `doubleTap` is not working with this server software.
### Limitations ### Limitations
Discovery over SSDP might not work. Discovery might not work.
### Installation on Linux / Raspberry Pi ### Installation on Linux / Raspberry Pi

View File

@@ -1,13 +1,15 @@
from setuptools import setup, find_packages """Setup for pypi package"""
import codecs
import os import os
import codecs
from setuptools import setup, find_packages
here = os.path.abspath(os.path.dirname(__file__)) here = os.path.abspath(os.path.dirname(__file__))
with codecs.open(os.path.join(here, "README.md"), encoding="utf-8") as fh: with codecs.open(os.path.join(here, "README.md"), encoding="utf-8") as fh:
long_description = "\n" + fh.read() long_description = "\n" + fh.read()
VERSION = "0.0.7" VERSION = "0.0.8"
DESCRIPTION = "Stream Deck API Library" DESCRIPTION = "Stream Deck API Library"
# Setting up # Setting up
@@ -30,7 +32,7 @@ setup(
"streamdeck==0.9.3", "streamdeck==0.9.3",
"pillow>=9.4.0,<10.0.0", "pillow>=9.4.0,<10.0.0",
"cairosvg==2.7.0", "cairosvg==2.7.0",
"ssdp", "zeroconf",
], ],
keywords=[], keywords=[],
entry_points={ entry_points={

View File

@@ -1,7 +1,7 @@
"""Stream Deck API.""" """Stream Deck API."""
import asyncio import asyncio
from collections.abc import Callable from typing import Callable
import json import json
import logging import logging
@@ -22,13 +22,44 @@ class StreamDeckApi:
def __init__( def __init__(
self, self,
host: str, host: str,
on_button_press: Callable[[str], None] | None = None, on_button_press: any = None,
on_button_release: Callable[[str], None] | None = None, on_button_release: any = None,
on_status_update: Callable[[SDInfo], None] | None = None, on_status_update: any = None,
on_ws_message: Callable[[SDWebsocketMessage], None] | None = None, on_ws_message: any = None,
on_ws_connect: Callable[[], None] | None = None, on_ws_connect: any = None,
) -> None: ) -> None:
"""Init Stream Deck API object.""" """Init Stream Deck API object.
Args:
on_button_press (Callable[[str], None] or None): Callback if button pressed
on_button_release (Callable[[str], None] or None): Callback if button released
on_status_update (Callable[[SDInfo], None] or None): Callback if status update received
on_ws_message (Callable[[SDWebsocketMessage], None] or None): Callback if websocket message received
on_ws_connect (Callable[[], None] or None): Callback on websocket connected
"""
# Type checks
if on_button_press is not None and not isinstance(
on_button_press, Callable[[str], None]
):
raise TypeError()
if on_button_release is not None and not isinstance(
on_button_release, Callable[[str], None]
):
raise TypeError()
if on_status_update is not None and not isinstance(
on_status_update, Callable[[SDInfo], None]
):
raise TypeError()
if on_ws_message is not None and not isinstance(
on_ws_message, Callable[[SDWebsocketMessage], None]
):
raise TypeError()
if on_ws_connect is not None and not isinstance(
on_ws_connect, Callable[[], None]
):
raise TypeError()
self._host = host self._host = host
self._on_button_press = on_button_press self._on_button_press = on_button_press
self._on_button_release = on_button_release self._on_button_release = on_button_release
@@ -37,7 +68,7 @@ class StreamDeckApi:
self._on_ws_connect = on_ws_connect self._on_ws_connect = on_ws_connect
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_event_loop()
self._running = False self._running = False
self._task: asyncio.Task | None = None self._task: any = None
# #
# Properties # Properties
@@ -68,8 +99,13 @@ class StreamDeckApi:
# #
@staticmethod @staticmethod
def _get_request(url: str) -> None | requests.Response: def _get_request(url: str) -> any:
"""Handle GET requests.""" """Handle GET requests.
Returns:
requests.Response or None
"""
try: try:
res = requests.get(url, timeout=5) res = requests.get(url, timeout=5)
except requests.RequestException: except requests.RequestException:
@@ -85,13 +121,17 @@ class StreamDeckApi:
return res return res
@staticmethod @staticmethod
def _post_request(url: str, data: str, headers) -> None | requests.Response: def _post_request(url: str, data: str, headers) -> any:
"""Handle POST requests.""" """Handle POST requests.
Returns:
requests.Response or None
"""
try: try:
res = requests.post(url, data, headers=headers, timeout=5) res = requests.post(url, data, headers=headers, timeout=5)
except requests.RequestException: except requests.RequestException:
_LOGGER.debug( _LOGGER.debug("Error sending data to Stream Deck Plugin (exception)")
"Error sending data to Stream Deck Plugin (exception)")
return None return None
if res.status_code != 200: if res.status_code != 200:
_LOGGER.debug( _LOGGER.debug(
@@ -101,9 +141,14 @@ class StreamDeckApi:
return None return None
return res return res
async def get_info(self, in_executor: bool = True) -> None | SDInfo: async def get_info(self, in_executor: bool = True) -> any:
"""Get info about Stream Deck.""" """Get info about Stream Deck.
res: requests.Response | None = None
Returns:
SDInfo or None
"""
res: any = None
if in_executor: if in_executor:
res = await self._loop.run_in_executor( res = await self._loop.run_in_executor(
None, self._get_request, self._info_url None, self._get_request, self._info_url
@@ -120,13 +165,17 @@ class StreamDeckApi:
try: try:
info = SDInfo(rjson) info = SDInfo(rjson)
except KeyError: except KeyError:
_LOGGER.debug( _LOGGER.debug("Error parsing response from %s to SDInfo", self._info_url)
"Error parsing response from %s to SDInfo", self._info_url)
return None return None
return info return info
async def get_icon(self, btn: str) -> None | str: async def get_icon(self, btn: str) -> any:
"""Get svg icon from Stream Deck button.""" """Get svg icon from Stream Deck button.
Returns:
str or None
"""
url = f"{self._icon_url}{btn}" url = f"{self._icon_url}{btn}"
res = await self._loop.run_in_executor(None, self._get_request, url) res = await self._loop.run_in_executor(None, self._get_request, url)
if res is None or res.status_code != 200: if res is None or res.status_code != 200:
@@ -152,8 +201,14 @@ class StreamDeckApi:
# Websocket Methods # Websocket Methods
# #
def _on_button_change(self, uuid: str | dict, state: bool): def _on_button_change(self, uuid: any, state: bool):
"""Handle button down event.""" """Handle button down event.
Args:
uuid (str or dict): UUID of the button
state (bool): State of the button
"""
if not isinstance(uuid, str): if not isinstance(uuid, str):
_LOGGER.debug("Method _on_button_change: uuid is not str") _LOGGER.debug("Method _on_button_change: uuid is not str")
return return
@@ -162,8 +217,13 @@ class StreamDeckApi:
elif state is False and self._on_button_release is not None: elif state is False and self._on_button_release is not None:
self._on_button_release(uuid) self._on_button_release(uuid)
def _on_ws_status_update(self, info: SDInfo | str | dict): def _on_ws_status_update(self, info: any):
"""Handle Stream Deck status update event.""" """Handle Stream Deck status update event.
Args:
info (SDInfo or str or dict): Stream Deck Info
"""
if not isinstance(info, SDInfo): if not isinstance(info, SDInfo):
_LOGGER.debug("Method _on_ws_status_update: info is not SDInfo") _LOGGER.debug("Method _on_ws_status_update: info is not SDInfo")
return return
@@ -180,8 +240,7 @@ class StreamDeckApi:
try: try:
datajson = json.loads(msg) datajson = json.loads(msg)
except json.JSONDecodeError: except json.JSONDecodeError:
_LOGGER.debug( _LOGGER.debug("Method _on_message: Websocket message couldn't get parsed")
"Method _on_message: Websocket message couldn't get parsed")
return return
try: try:
data = SDWebsocketMessage(datajson) data = SDWebsocketMessage(datajson)
@@ -226,8 +285,7 @@ class StreamDeckApi:
) )
self._on_message(data) self._on_message(data)
await websocket.close() await websocket.close()
_LOGGER.debug( _LOGGER.debug("Method _websocket_loop: Websocket closed")
"Method _websocket_loop: Websocket closed")
except WebSocketException: except WebSocketException:
_LOGGER.debug( _LOGGER.debug(
"Method _websocket_loop: Websocket client crashed. Restarting it" "Method _websocket_loop: Websocket client crashed. Restarting it"

View File

@@ -6,5 +6,6 @@ PLUGIN_ICON = "/sd/icon"
DB_FILE = "streamdeckapi.db" DB_FILE = "streamdeckapi.db"
SD_SSDP = "urn:home-assistant-device:stream-deck" SD_SSDP = "urn:home-assistant-device:stream-deck"
SD_ZEROCONF = "_stream-deck-api._tcp.local."
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f" DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
LONG_PRESS_SECONDS = 2 LONG_PRESS_SECONDS = 2

View File

@@ -1,5 +1,6 @@
"""Stream Deck API Server.""" """Stream Deck API Server."""
from concurrent.futures import ProcessPoolExecutor
import re import re
import io import io
import asyncio import asyncio
@@ -7,9 +8,8 @@ import platform
import sqlite3 import sqlite3
import base64 import base64
import socket import socket
from concurrent.futures import ProcessPoolExecutor
from uuid import uuid4
from datetime import datetime from datetime import datetime
from typing import List, Dict
import aiohttp import aiohttp
import human_readable_ids as hri import human_readable_ids as hri
from jsonpickle import encode from jsonpickle import encode
@@ -19,7 +19,8 @@ from StreamDeck.Devices.StreamDeck import StreamDeck
from StreamDeck.ImageHelpers import PILHelper from StreamDeck.ImageHelpers import PILHelper
import cairosvg import cairosvg
from PIL import Image from PIL import Image
import ssdp from zeroconf import IPVersion, ServiceInfo, Zeroconf
from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf
from streamdeckapi.const import ( from streamdeckapi.const import (
DATETIME_FORMAT, DATETIME_FORMAT,
@@ -28,7 +29,7 @@ from streamdeckapi.const import (
PLUGIN_ICON, PLUGIN_ICON,
PLUGIN_INFO, PLUGIN_INFO,
PLUGIN_PORT, PLUGIN_PORT,
SD_SSDP SD_ZEROCONF,
) )
from streamdeckapi.types import SDApplication, SDButton, SDButtonPosition, SDDevice from streamdeckapi.types import SDApplication, SDButton, SDButtonPosition, SDDevice
@@ -57,10 +58,10 @@ application: SDApplication = SDApplication(
"version": "0.0.1", "version": "0.0.1",
} }
) )
devices: list[SDDevice] = [] devices: List[SDDevice] = []
websocket_connections: list[web.WebSocketResponse] = [] websocket_connections: List[web.WebSocketResponse] = []
streamdecks: list[StreamDeck] = DeviceManager().enumerate() streamdecks: List[StreamDeck] = DeviceManager().enumerate()
# #
# Database # Database
@@ -68,7 +69,8 @@ streamdecks: list[StreamDeck] = DeviceManager().enumerate()
database_first = sqlite3.connect(DB_FILE) database_first = sqlite3.connect(DB_FILE)
table_cursor = database_first.cursor() table_cursor = database_first.cursor()
table_cursor.execute(""" table_cursor.execute(
"""
CREATE TABLE IF NOT EXISTS buttons( CREATE TABLE IF NOT EXISTS buttons(
key integer PRIMARY KEY, key integer PRIMARY KEY,
uuid text NOT NULL, uuid text NOT NULL,
@@ -76,13 +78,16 @@ table_cursor.execute("""
x integer, x integer,
y integer, y integer,
svg text svg text
);""") );"""
table_cursor.execute(""" )
table_cursor.execute(
"""
CREATE TABLE IF NOT EXISTS button_states( CREATE TABLE IF NOT EXISTS button_states(
key integer PRIMARY KEY, key integer PRIMARY KEY,
state integer, state integer,
state_update text state_update text
);""") );"""
)
table_cursor.execute("DELETE FROM button_states;") table_cursor.execute("DELETE FROM button_states;")
database_first.commit() database_first.commit()
table_cursor.close() table_cursor.close()
@@ -102,12 +107,12 @@ def save_button(key: int, button: SDButton):
matching_buttons = result.fetchall() matching_buttons = result.fetchall()
if len(matching_buttons) > 0: if len(matching_buttons) > 0:
# Perform update # Perform update
cursor.execute( cursor.execute(f'UPDATE buttons SET svg="{base64_string}" WHERE key={key}')
f"UPDATE buttons SET svg=\"{base64_string}\" WHERE key={key}")
else: else:
# Create new row # Create new row
cursor.execute( cursor.execute(
f"INSERT INTO buttons VALUES ({key}, \"{button.uuid}\", \"{button.device}\", {button.position.x_pos}, {button.position.y_pos}, \"{base64_string}\")") f'INSERT INTO buttons VALUES ({key}, "{button.uuid}", "{button.device}", {button.position.x_pos}, {button.position.y_pos}, "{base64_string}")'
)
database.commit() database.commit()
print(f"Saved button {button.uuid} with key {key} to database") print(f"Saved button {button.uuid} with key {key} to database")
cursor.close() cursor.close()
@@ -119,7 +124,8 @@ def get_button(key: int) -> any:
database = sqlite3.connect(DB_FILE) database = sqlite3.connect(DB_FILE)
cursor = database.cursor() cursor = database.cursor()
result = cursor.execute( result = cursor.execute(
f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE key={key}") f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE key={key}"
)
matching_buttons = result.fetchall() matching_buttons = result.fetchall()
if len(matching_buttons) == 0: if len(matching_buttons) == 0:
return None return None
@@ -127,12 +133,14 @@ def get_button(key: int) -> any:
base64_bytes = row[5].encode() base64_bytes = row[5].encode()
svg_bytes = base64.b64decode(base64_bytes) svg_bytes = base64.b64decode(base64_bytes)
svg_string = svg_bytes.decode() svg_string = svg_bytes.decode()
button = SDButton({ button = SDButton(
{
"uuid": row[1], "uuid": row[1],
"device": row[2], "device": row[2],
"position": {"x": row[3], "y": row[4]}, "position": {"x": row[3], "y": row[4]},
"svg": svg_string, "svg": svg_string,
}) }
)
cursor.close() cursor.close()
database.close() database.close()
return button return button
@@ -143,7 +151,8 @@ def get_button_by_uuid(uuid: str) -> any:
database = sqlite3.connect(DB_FILE) database = sqlite3.connect(DB_FILE)
cursor = database.cursor() cursor = database.cursor()
result = cursor.execute( result = cursor.execute(
f"SELECT key,uuid,device,x,y,svg FROM buttons WHERE uuid=\"{uuid}\"") f'SELECT key,uuid,device,x,y,svg FROM buttons WHERE uuid="{uuid}"'
)
matching_buttons = result.fetchall() matching_buttons = result.fetchall()
if len(matching_buttons) == 0: if len(matching_buttons) == 0:
return None return None
@@ -151,12 +160,14 @@ def get_button_by_uuid(uuid: str) -> any:
base64_bytes = row[5].encode() base64_bytes = row[5].encode()
svg_bytes = base64.b64decode(base64_bytes) svg_bytes = base64.b64decode(base64_bytes)
svg_string = svg_bytes.decode() svg_string = svg_bytes.decode()
button = SDButton({ button = SDButton(
{
"uuid": row[1], "uuid": row[1],
"device": row[2], "device": row[2],
"position": {"x": row[3], "y": row[4]}, "position": {"x": row[3], "y": row[4]},
"svg": svg_string, "svg": svg_string,
}) }
)
cursor.close() cursor.close()
database.close() database.close()
return button return button
@@ -166,7 +177,7 @@ def get_button_key(uuid: str) -> int:
"""Get a button key from the database.""" """Get a button key from the database."""
database = sqlite3.connect(DB_FILE) database = sqlite3.connect(DB_FILE)
cursor = database.cursor() cursor = database.cursor()
result = cursor.execute(f"SELECT key FROM buttons WHERE uuid=\"{uuid}\"") result = cursor.execute(f'SELECT key FROM buttons WHERE uuid="{uuid}"')
matching_buttons = result.fetchall() matching_buttons = result.fetchall()
if len(matching_buttons) == 0: if len(matching_buttons) == 0:
return -1 return -1
@@ -177,21 +188,23 @@ def get_button_key(uuid: str) -> int:
return key return key
def get_buttons() -> dict[str, SDButton]: def get_buttons() -> Dict[str, SDButton]:
"""Load all buttons from the database.""" """Load all buttons from the database."""
result: dict[str, SDButton] = {} result: Dict[str, SDButton] = {}
database = sqlite3.connect(DB_FILE) database = sqlite3.connect(DB_FILE)
cursor = database.cursor() cursor = database.cursor()
for row in cursor.execute("SELECT key,uuid,device,x,y,svg FROM buttons"): for row in cursor.execute("SELECT key,uuid,device,x,y,svg FROM buttons"):
base64_bytes = row[5].encode() base64_bytes = row[5].encode()
svg_bytes = base64.b64decode(base64_bytes) svg_bytes = base64.b64decode(base64_bytes)
svg_string = svg_bytes.decode() svg_string = svg_bytes.decode()
result[row[0]] = SDButton({ result[row[0]] = SDButton(
{
"uuid": row[1], "uuid": row[1],
"device": row[2], "device": row[2],
"position": {"x": row[3], "y": row[4]}, "position": {"x": row[3], "y": row[4]},
"svg": svg_string, "svg": svg_string,
}) }
)
cursor.close() cursor.close()
database.close() database.close()
print(f"Loaded {len(result)} buttons from DB") print(f"Loaded {len(result)} buttons from DB")
@@ -213,11 +226,13 @@ def write_button_state(key: int, state: bool, update: str):
if len(matching_states) > 0: if len(matching_states) > 0:
# Perform update # Perform update
cursor.execute( cursor.execute(
f"UPDATE button_states SET state={state_int}, state_update=\"{update}\" WHERE key={key}") f'UPDATE button_states SET state={state_int}, state_update="{update}" WHERE key={key}'
)
else: else:
# Create new row # Create new row
cursor.execute( cursor.execute(
f"INSERT INTO button_states VALUES ({key}, {state_int}, \"{update}\")") f'INSERT INTO button_states VALUES ({key}, {state_int}, "{update}")'
)
database.commit() database.commit()
print(f"Saved button_state with key {key} to database") print(f"Saved button_state with key {key} to database")
cursor.close() cursor.close()
@@ -230,7 +245,8 @@ def get_button_state(key: int) -> any:
database = sqlite3.connect(DB_FILE) database = sqlite3.connect(DB_FILE)
cursor = database.cursor() cursor = database.cursor()
result = cursor.execute( result = cursor.execute(
f"SELECT key,state,state_update FROM button_states WHERE key={key}") f"SELECT key,state,state_update FROM button_states WHERE key={key}"
)
matching_states = result.fetchall() matching_states = result.fetchall()
if len(matching_states) == 0: if len(matching_states) == 0:
return None return None
@@ -310,7 +326,8 @@ async def websocket_handler(request: web.Request):
await web_socket.close() await web_socket.close()
elif msg.type == aiohttp.WSMsgType.ERROR: elif msg.type == aiohttp.WSMsgType.ERROR:
print( print(
f"Websocket connection closed with exception {web_socket.exception()}") f"Websocket connection closed with exception {web_socket.exception()}"
)
websocket_connections.remove(web_socket) websocket_connections.remove(web_socket)
return web_socket return web_socket
@@ -332,8 +349,8 @@ async def broadcast_status():
"args": { "args": {
"devices": devices, "devices": devices,
"application": application, "application": application,
"buttons": get_buttons() "buttons": get_buttons(),
} },
} }
data_str = encode(data, unpicklable=False) data_str = encode(data, unpicklable=False)
@@ -375,6 +392,7 @@ async def start_server_async(host: str = "0.0.0.0", port: int = PLUGIN_PORT):
print("Started Stream Deck API server on port", PLUGIN_PORT) print("Started Stream Deck API server on port", PLUGIN_PORT)
Timer(10, broadcast_status) Timer(10, broadcast_status)
# TODO add check if websocket is used, otherwise display warning on streamdeck
def get_position(deck: StreamDeck, key: int) -> SDButtonPosition: def get_position(deck: StreamDeck, key: int) -> SDButtonPosition:
@@ -412,14 +430,12 @@ async def on_key_change(_: StreamDeck, key: int, state: bool):
return return
if state is True: if state is True:
await websocket_broadcast(encode( await websocket_broadcast(encode({"event": "keyDown", "args": button.uuid}))
{"event": "keyDown", "args": button.uuid}))
print("Waiting for button release") print("Waiting for button release")
# Start timer # Start timer
Timer(LONG_PRESS_SECONDS, lambda: long_press_callback(key), False) Timer(LONG_PRESS_SECONDS, lambda: long_press_callback(key), False)
else: else:
await websocket_broadcast(encode( await websocket_broadcast(encode({"event": "keyUp", "args": button.uuid}))
{"event": "keyUp", "args": button.uuid}))
now = datetime.now() now = datetime.now()
@@ -515,73 +531,6 @@ def init_all():
deck.set_key_callback_async(on_key_change) deck.set_key_callback_async(on_key_change)
def get_local_ip():
"""Get local ip address."""
connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
connection.connect(('192.255.255.255', 1))
address = connection.getsockname()[0]
except socket.error:
address = '127.0.0.1'
finally:
connection.close()
return address
class StreamDeckApiSsdpProtocol(ssdp.SimpleServiceDiscoveryProtocol):
"""Protocol to handle responses and requests."""
def response_received(self, response: ssdp.SSDPResponse, addr: tuple):
"""Handle an incoming response."""
print("received response: %s %s %s", response.status_code,
response.reason, response.version)
for header in response.headers:
print("header: %s", header)
print()
def request_received(self, request: ssdp.SSDPRequest, addr: tuple):
"""Handle an incoming request and respond to it."""
print(
"received request: %s %s %s",
request.method, request.uri, request.version
)
for header in request.headers:
print("header: %s", header)
print()
# Build response and send it.
print("Sending a response back to %s:%s", *addr)
address = get_local_ip()
location = f"http://example.net:{PLUGIN_PORT}/device.xml"
usn = f"uuid:{str(uuid4())}::{SD_SSDP}"
server = "python/3 UPnP/1.1 ssdpy/0.4.1"
print(f"IP Address for SSDP: {address}")
print(f"SSDP location: {location}")
ssdp_response = ssdp.SSDPResponse(
200,
"OK",
headers={
"Cache-Control": "max-age=30",
"Location": location,
"Server": server,
"ST": SD_SSDP,
"USN": usn,
"EXT": "",
},
)
msg = bytes(ssdp_response) + b"\r\n" + b"\r\n"
self.transport.sendto(msg, addr)
class Timer: class Timer:
"""Timer class.""" """Timer class."""
@@ -603,25 +552,32 @@ class Timer:
self._task.cancel() self._task.cancel()
def start_zeroconf():
"""Start Zeroconf server."""
info = ServiceInfo(
SD_ZEROCONF,
f"Stream Deck API Server.{SD_ZEROCONF}",
addresses=[socket.inet_aton("127.0.0.1")],
port=80,
)
zeroconf = Zeroconf()
print("Zeroconf starting")
zeroconf.register_service(info)
def start(): def start():
"""Entrypoint.""" """Entrypoint."""
init_all() init_all()
executor = ProcessPoolExecutor(2)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(2)
# SSDP server # Zeroconf server
if platform.system() == "Windows": loop.run_in_executor(executor, start_zeroconf)
print("SSDP not working on windows. Skipping ...")
else:
connect = loop.create_datagram_endpoint(
StreamDeckApiSsdpProtocol,
family=socket.AF_INET,
local_addr=(StreamDeckApiSsdpProtocol.MULTICAST_ADDRESS, 1900),
)
transport, protocol = loop.run_until_complete(connect)
StreamDeckApiSsdpProtocol.transport = transport
# API server # API server
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
@@ -632,5 +588,4 @@ def start():
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
transport.close()
loop.close() loop.close()

View File

@@ -1,4 +1,5 @@
"""Stream Deck API types.""" """Stream Deck API types."""
from typing import List, Dict
class SDApplication: class SDApplication:
@@ -82,8 +83,8 @@ class SDInfo(dict):
def __init__(self, obj: dict) -> None: def __init__(self, obj: dict) -> None:
"""Init Stream Deck Info object.""" """Init Stream Deck Info object."""
self.devices: list[SDDevice] = [] self.devices: List[SDDevice] = []
self.buttons: dict[str, SDButton] = {} self.buttons: Dict[str, SDButton] = {}
dict.__init__(self, obj) dict.__init__(self, obj)
self.application = SDApplication(obj["application"]) self.application = SDApplication(obj["application"])

35
tests/api_test.py Normal file
View File

@@ -0,0 +1,35 @@
"""Unittests for API client."""
import unittest
import streamdeckapi
class TestApi(unittest.TestCase):
"""Api Test Case."""
def test_constructor(self):
"""Constructor test."""
host = "host.local"
# Test valid types
api = streamdeckapi.StreamDeckApi(host)
self.assertEqual(api.host, host)
streamdeckapi.StreamDeckApi(host, on_button_press=None)
streamdeckapi.StreamDeckApi(host, on_button_release=None)
streamdeckapi.StreamDeckApi(host, on_status_update=None)
streamdeckapi.StreamDeckApi(host, on_ws_connect=None)
streamdeckapi.StreamDeckApi(host, on_ws_message=None)
# Test some invalid types
for i_type in ["string", 2345, [321, 6457], {"key": "value"}]:
with self.assertRaises(TypeError):
_ = streamdeckapi.StreamDeckApi(host, on_button_press=i_type)
with self.assertRaises(TypeError):
_ = streamdeckapi.StreamDeckApi(host, on_button_release=i_type)
with self.assertRaises(TypeError):
_ = streamdeckapi.StreamDeckApi(host, on_status_update=i_type)
with self.assertRaises(TypeError):
_ = streamdeckapi.StreamDeckApi(host, on_ws_connect=i_type)
with self.assertRaises(TypeError):
_ = streamdeckapi.StreamDeckApi(host, on_ws_message=i_type)