Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion roborock/devices/traits/b01/q10/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,41 @@ class MyStatus(RoborockBase):
"""

import dataclasses
import logging
from collections.abc import Callable
from typing import Any

from roborock.callbacks import CallbackList
from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.data.containers import RoborockBase


class TraitUpdateListener:
"""Trait update listener.

This is a base class for traits to support notifying listeners when they
have been updated. Clients may register callbacks to be notified when the
trait has been updated. When the listener callback is invoked, the client
should read the trait's properties to get the updated values.
"""

def __init__(self, logger: logging.Logger) -> None:
"""Initialize the trait update listener."""
self._update_callbacks: CallbackList[None] = CallbackList(logger=logger)

def add_update_listener(self, callback: Callable[[], None]) -> Callable[[], None]:
"""Register a callback when the trait has been updated.

Returns a callable to remove the listener.
"""
# We wrap the callback to ignore the value passed to it.
return self._update_callbacks.add_callback(lambda _: callback())

def _notify_update(self) -> None:
"""Notify all update listeners."""
self._update_callbacks(None)


class DpsDataConverter:
"""Utility to handle the transformation and merging of DPS data into models.

Expand All @@ -66,7 +95,7 @@ def from_dataclass(cls, dataclass_type: type[RoborockBase]):
dps_field_map[dps_id] = field_obj.name
return cls(dps_type_map, dps_field_map)

def update_from_dps(self, target: RoborockBase, decoded_dps: dict[B01_Q10_DP, Any]) -> None:
def update_from_dps(self, target: RoborockBase, decoded_dps: dict[B01_Q10_DP, Any]) -> bool:
"""Convert and merge raw DPS data into the target object.

Uses the pre-calculated type mapping to ensure values are converted to the
Expand All @@ -75,8 +104,12 @@ def update_from_dps(self, target: RoborockBase, decoded_dps: dict[B01_Q10_DP, An
Args:
target: The target object to update.
decoded_dps: The decoded DPS data to convert.

Returns:
True if any values were updated, False otherwise.
"""
conversions = RoborockBase.convert_dict(self._dps_type_map, decoded_dps)
for dps_id, value in conversions.items():
field_name = self._dps_field_map[dps_id]
setattr(target, field_name, value)
return bool(conversions)
19 changes: 5 additions & 14 deletions roborock/devices/traits/b01/q10/status.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
"""Status trait for Q10 B01 devices."""

import logging
from collections.abc import Callable
from typing import Any

from roborock.callbacks import CallbackList
from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.data.b01_q10.b01_q10_containers import Q10Status

from .common import DpsDataConverter
from .common import DpsDataConverter, TraitUpdateListener

_LOGGER = logging.getLogger(__name__)

_CONVERTER = DpsDataConverter.from_dataclass(Q10Status)


class StatusTrait(Q10Status):
class StatusTrait(Q10Status, TraitUpdateListener):
"""Trait for managing the status of Q10 Roborock devices.

This is a thin wrapper around Q10Status that provides the Trait interface.
Expand All @@ -26,16 +24,9 @@ class StatusTrait(Q10Status):
def __init__(self) -> None:
"""Initialize the status trait."""
super().__init__()
self._update_callbacks: CallbackList[dict[B01_Q10_DP, Any]] = CallbackList(logger=_LOGGER)

def add_update_listener(self, callback: Callable[[dict[B01_Q10_DP, Any]], None]) -> Callable[[], None]:
"""Register a callback for decoded DPS updates.

Returns a callable to remove the listener.
"""
return self._update_callbacks.add_callback(callback)
TraitUpdateListener.__init__(self, logger=_LOGGER)

def update_from_dps(self, decoded_dps: dict[B01_Q10_DP, Any]) -> None:
"""Update the trait from raw DPS data."""
_CONVERTER.update_from_dps(self, decoded_dps)
self._update_callbacks(decoded_dps)
if _CONVERTER.update_from_dps(self, decoded_dps):
self._notify_update()
23 changes: 19 additions & 4 deletions tests/devices/traits/b01/q10/test_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,33 @@ async def test_status_trait_refresh(

def test_status_trait_update_listener(q10_api: Q10PropertiesApi) -> None:
"""Test that status listeners receive updates and can unsubscribe."""
updates: list[dict[B01_Q10_DP, Any]] = []
event = asyncio.Event()

unsubscribe = q10_api.status.add_update_listener(updates.append)
unsubscribe = q10_api.status.add_update_listener(event.set)

first_update = {B01_Q10_DP.BATTERY: 88}
q10_api.status.update_from_dps(first_update)

assert updates == [first_update]
assert event.is_set()
event.clear()

unsubscribe()

second_update = {B01_Q10_DP.BATTERY: 87}
q10_api.status.update_from_dps(second_update)

assert updates == [first_update]
assert not event.is_set()


def test_status_trait_update_listener_ignores_value(q10_api: Q10PropertiesApi) -> None:
"""Test that status listeners are not notified for unrelated updates."""
event = asyncio.Event()

unsubscribe = q10_api.status.add_update_listener(event.set)

first_update = {B01_Q10_DP.HEARTBEAT: 1} # Not a value in `Status` dataclass
q10_api.status.update_from_dps(first_update)

assert not event.is_set()

unsubscribe()
Loading