diff --git a/kasa/experimental/modules/__init__.py b/kasa/experimental/modules/__init__.py index 9f168384..48c4c2ac 100644 --- a/kasa/experimental/modules/__init__.py +++ b/kasa/experimental/modules/__init__.py @@ -3,9 +3,11 @@ from .camera import Camera from .childdevice import ChildDevice from .device import DeviceModule +from .time import Time __all__ = [ "Camera", "ChildDevice", "DeviceModule", + "Time", ] diff --git a/kasa/experimental/modules/time.py b/kasa/experimental/modules/time.py new file mode 100644 index 00000000..33070892 --- /dev/null +++ b/kasa/experimental/modules/time.py @@ -0,0 +1,91 @@ +"""Implementation of time module.""" + +from __future__ import annotations + +from datetime import datetime, timezone, tzinfo +from typing import cast + +from zoneinfo import ZoneInfo, ZoneInfoNotFoundError + +from ...cachedzoneinfo import CachedZoneInfo +from ...feature import Feature +from ...interfaces import Time as TimeInterface +from ..smartcameramodule import SmartCameraModule + + +class Time(SmartCameraModule, TimeInterface): + """Implementation of device_local_time.""" + + QUERY_GETTER_NAME = "getTimezone" + QUERY_MODULE_NAME = "system" + QUERY_SECTION_NAMES = "basic" + + _timezone: tzinfo = timezone.utc + _time: datetime + + def _initialize_features(self) -> None: + """Initialize features after the initial update.""" + self._add_feature( + Feature( + device=self._device, + id="device_time", + name="Device time", + attribute_getter="time", + container=self, + category=Feature.Category.Debug, + type=Feature.Type.Sensor, + ) + ) + + def query(self) -> dict: + """Query to execute during the update cycle.""" + q = super().query() + q["getClockStatus"] = {self.QUERY_MODULE_NAME: {"name": "clock_status"}} + + return q + + async def _post_update_hook(self) -> None: + """Perform actions after a device update.""" + time_data = self.data["getClockStatus"]["system"]["clock_status"] + timezone_data = self.data["getTimezone"]["system"]["basic"] + zone_id = timezone_data["zone_id"] + timestamp = time_data["seconds_from_1970"] + try: + # Zoneinfo will return a DST aware object + tz: tzinfo = await CachedZoneInfo.get_cached_zone_info(zone_id) + except ZoneInfoNotFoundError: + # timezone string like: UTC+10:00 + timezone_str = timezone_data["timezone"] + tz = cast(tzinfo, datetime.strptime(timezone_str[-6:], "%z").tzinfo) + + self._timezone = tz + self._time = datetime.fromtimestamp( + cast(float, timestamp), + tz=tz, + ) + + @property + def timezone(self) -> tzinfo: + """Return current timezone.""" + return self._timezone + + @property + def time(self) -> datetime: + """Return device's current datetime.""" + return self._time + + async def set_time(self, dt: datetime) -> dict: + """Set device time.""" + if not dt.tzinfo: + timestamp = dt.replace(tzinfo=self.timezone).timestamp() + else: + timestamp = dt.timestamp() + + lt = datetime.fromtimestamp(timestamp).isoformat().replace("T", " ") + params = {"seconds_from_1970": int(timestamp), "local_time": lt} + # Doesn't seem to update the time, perhaps because timing_mode is ntp + res = await self.call("setTimezone", {"system": {"clock_status": params}}) + if (zinfo := dt.tzinfo) and isinstance(zinfo, ZoneInfo): + tz_params = {"zone_id": zinfo.key} + res = await self.call("setTimezone", {"system": {"basic": tz_params}}) + return res diff --git a/kasa/experimental/smartcameramodule.py b/kasa/experimental/smartcameramodule.py index fed97cb3..bfb42fc0 100644 --- a/kasa/experimental/smartcameramodule.py +++ b/kasa/experimental/smartcameramodule.py @@ -3,7 +3,7 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, cast from ..exceptions import DeviceError, KasaException, SmartErrorCode from ..smart.smartmodule import SmartModule @@ -54,7 +54,11 @@ class SmartCameraModule(SmartModule): if method[:3] == "get": return await self._device._query_getter_helper(method, module, section) - return await self._device._query_setter_helper(method, module, section, params) + if TYPE_CHECKING: + params = cast(dict[str, dict[str, Any]], params) + return await self._device._query_setter_helper( + method, module, section, params[module][section] + ) @property def data(self) -> dict: diff --git a/kasa/tests/fakeprotocol_smartcamera.py b/kasa/tests/fakeprotocol_smartcamera.py index 50d34e93..a8c49bd4 100644 --- a/kasa/tests/fakeprotocol_smartcamera.py +++ b/kasa/tests/fakeprotocol_smartcamera.py @@ -162,6 +162,24 @@ class FakeSmartCameraTransport(BaseTransport): "lens_mask_info", "enabled", ], + ("system", "clock_status", "seconds_from_1970"): [ + "getClockStatus", + "system", + "clock_status", + "seconds_from_1970", + ], + ("system", "clock_status", "local_time"): [ + "getClockStatus", + "system", + "clock_status", + "local_time", + ], + ("system", "basic", "zone_id"): [ + "getTimezone", + "system", + "basic", + "zone_id", + ], } async def _send_request(self, request_dict: dict): @@ -188,12 +206,14 @@ class FakeSmartCameraTransport(BaseTransport): for skey, sval in skey_val.items(): section_key = skey section_value = sval + if setter_keys := self.SETTERS.get( + (module, section, section_key) + ): + self._get_param_set_value(info, setter_keys, section_value) + else: + return {"error_code": -1} break - if setter_keys := self.SETTERS.get((module, section, section_key)): - self._get_param_set_value(info, setter_keys, section_value) - return {"error_code": 0} - else: - return {"error_code": -1} + return {"error_code": 0} elif method[:3] == "get": params = request_dict.get("params") if method in info: diff --git a/kasa/tests/smartcamera/test_smartcamera.py b/kasa/tests/smartcamera/test_smartcamera.py index 50a1a136..3e12dcfb 100644 --- a/kasa/tests/smartcamera/test_smartcamera.py +++ b/kasa/tests/smartcamera/test_smartcamera.py @@ -2,9 +2,12 @@ from __future__ import annotations -import pytest +from datetime import datetime, timezone -from kasa import Device, DeviceType +import pytest +from freezegun.api import FrozenDateTimeFactory + +from kasa import Device, DeviceType, Module from ..conftest import device_smartcamera, hub_smartcamera @@ -45,3 +48,14 @@ async def test_hub(dev): await child.update() assert "Time" not in child.modules assert child.time + + +@device_smartcamera +async def test_device_time(dev: Device, freezer: FrozenDateTimeFactory): + """Test a child device gets the time from it's parent module.""" + fallback_time = datetime.now(timezone.utc).astimezone().replace(microsecond=0) + assert dev.time != fallback_time + module = dev.modules[Module.Time] + await module.set_time(fallback_time) + await dev.update() + assert dev.time == fallback_time