Zeroconf update

This commit is contained in:
Patrick762
2023-06-16 18:00:34 +02:00
committed by GitHub

View File

@@ -19,8 +19,7 @@ from StreamDeck.Devices.StreamDeck import StreamDeck
from StreamDeck.ImageHelpers import PILHelper
import cairosvg
from PIL import Image
from zeroconf import IPVersion, ServiceInfo, Zeroconf
from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf
from zeroconf import ServiceInfo, Zeroconf
from streamdeckapi.const import (
DATETIME_FORMAT,
@@ -49,6 +48,25 @@ DEFAULT_ICON = re.sub(
)
# Copy of MDI Icon "alert"
NO_CONN_ICON = re.sub(
"\r\n|\n|\r",
"",
"""
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 72 72">
<rect width="72" height="72" fill="#000" />
<g transform="translate(1, 1) scale(1)">
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24">
<path fill="yellow" d="M13 14H11V9H13M13 18H11V16H13M1 21H23L12 2L1 21Z" />
</svg>
</g>
</svg>
""",
)
application: SDApplication = SDApplication(
{
"font": "Segoe UI",
@@ -383,6 +401,21 @@ def create_runner():
return web.AppRunner(app)
async def check_websocket():
"""Check if a websocket client is connected."""
if len(websocket_connections) == 0:
print("No connection")
for deck in streamdecks:
if not deck.is_visual():
continue
if not deck.is_open():
deck.open()
for key in range(deck.key_count()):
set_icon(deck, key, NO_CONN_ICON)
async def start_server_async(host: str = "0.0.0.0", port: int = PLUGIN_PORT):
"""Start API server."""
runner = create_runner()
@@ -392,7 +425,7 @@ async def start_server_async(host: str = "0.0.0.0", port: int = PLUGIN_PORT):
print("Started Stream Deck API server on port", PLUGIN_PORT)
Timer(10, broadcast_status)
# TODO add check if websocket is used, otherwise display warning on streamdeck
Timer(3, check_websocket)
def get_position(deck: StreamDeck, key: int) -> SDButtonPosition:
@@ -552,14 +585,33 @@ class Timer:
self._task.cancel()
def get_local_ip():
"""Get local ip address."""
connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
connection.connect(("192.255.255.255", 1))
address = connection.getsockname()[0]
except socket.error:
address = "127.0.0.1"
finally:
connection.close()
return address
def start_zeroconf():
"""Start Zeroconf server."""
host = get_local_ip()
print("Using host", host, "for Zeroconf")
info = ServiceInfo(
SD_ZEROCONF,
f"Stream Deck API Server.{SD_ZEROCONF}",
addresses=[socket.inet_aton("127.0.0.1")],
port=80,
f"Stream Deck API Server at {host}.{SD_ZEROCONF}",
addresses=[socket.inet_aton(host)],
port=PLUGIN_PORT,
properties={"path": "/sd/info"},
server="pythonserver.local.",
)
zeroconf = Zeroconf()