diff --git a/streamdeckapi/server.py b/streamdeckapi/server.py index b85e248..c5feb3b 100644 --- a/streamdeckapi/server.py +++ b/streamdeckapi/server.py @@ -382,6 +382,28 @@ def get_position(deck: StreamDeck, key: int) -> SDButtonPosition: return SDButtonPosition({"x": int(key / deck.KEY_COLS), "y": key % deck.KEY_COLS}) +async def long_press_callback(key: int): + """Handle callback after long press seconds.""" + print("Long press detected") + + # Check state of button + for deck in streamdecks: + if not deck.is_visual(): + continue + + if not deck.is_open(): + deck.open() + + states = deck.key_states() + + button = get_button(key) + if not isinstance(button, SDButton): + return + + if states[key] is True: + await websocket_broadcast(encode({"event": "longPress", "args": button.uuid})) + + async def on_key_change(_: StreamDeck, key: int, state: bool): """Handle key change callbacks.""" button = get_button(key) @@ -391,6 +413,9 @@ async def on_key_change(_: StreamDeck, key: int, state: bool): if state is True: await websocket_broadcast(encode( {"event": "keyDown", "args": button.uuid})) + print("Waiting for button release") + # Start timer + Timer(LONG_PRESS_SECONDS, lambda: long_press_callback(key), False) else: await websocket_broadcast(encode( {"event": "keyUp", "args": button.uuid})) @@ -414,13 +439,6 @@ async def on_key_change(_: StreamDeck, key: int, state: bool): write_button_state(key, state, now.strftime(DATETIME_FORMAT)) return - # TODO: Work with timer instead - if last_state is True and state is False and diff.seconds >= LONG_PRESS_SECONDS: - await websocket_broadcast( - encode({"event": "longPress", "args": button.uuid})) - write_button_state(key, state, now.strftime(DATETIME_FORMAT)) - return - write_button_state(key, state, now.strftime(DATETIME_FORMAT)) @@ -540,16 +558,18 @@ def start_ssdp_server(): class Timer: """Timer class.""" - def __init__(self, interval, callback): + def __init__(self, interval, callback, repeating=True): """Init timer.""" self._interval = interval self._callback = callback + self._repeating = repeating self._task = asyncio.ensure_future(self._job()) async def _job(self): await asyncio.sleep(self._interval) await self._callback() - self._task = asyncio.ensure_future(self._job()) + if self._repeating: + self._task = asyncio.ensure_future(self._job()) def cancel(self): """Cancel timer."""