Add Time module to SmartCamera devices (#1182)

This commit is contained in:
Steven B. 2024-10-24 19:11:21 +01:00 committed by GitHub
parent 28361c1727
commit e3610cf37e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 140 additions and 9 deletions

View File

@ -3,9 +3,11 @@
from .camera import Camera from .camera import Camera
from .childdevice import ChildDevice from .childdevice import ChildDevice
from .device import DeviceModule from .device import DeviceModule
from .time import Time
__all__ = [ __all__ = [
"Camera", "Camera",
"ChildDevice", "ChildDevice",
"DeviceModule", "DeviceModule",
"Time",
] ]

View File

@ -0,0 +1,91 @@
"""Implementation of time module."""
from __future__ import annotations
from datetime import datetime, timezone, tzinfo
from typing import cast
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from ...cachedzoneinfo import CachedZoneInfo
from ...feature import Feature
from ...interfaces import Time as TimeInterface
from ..smartcameramodule import SmartCameraModule
class Time(SmartCameraModule, TimeInterface):
"""Implementation of device_local_time."""
QUERY_GETTER_NAME = "getTimezone"
QUERY_MODULE_NAME = "system"
QUERY_SECTION_NAMES = "basic"
_timezone: tzinfo = timezone.utc
_time: datetime
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
device=self._device,
id="device_time",
name="Device time",
attribute_getter="time",
container=self,
category=Feature.Category.Debug,
type=Feature.Type.Sensor,
)
)
def query(self) -> dict:
"""Query to execute during the update cycle."""
q = super().query()
q["getClockStatus"] = {self.QUERY_MODULE_NAME: {"name": "clock_status"}}
return q
async def _post_update_hook(self) -> None:
"""Perform actions after a device update."""
time_data = self.data["getClockStatus"]["system"]["clock_status"]
timezone_data = self.data["getTimezone"]["system"]["basic"]
zone_id = timezone_data["zone_id"]
timestamp = time_data["seconds_from_1970"]
try:
# Zoneinfo will return a DST aware object
tz: tzinfo = await CachedZoneInfo.get_cached_zone_info(zone_id)
except ZoneInfoNotFoundError:
# timezone string like: UTC+10:00
timezone_str = timezone_data["timezone"]
tz = cast(tzinfo, datetime.strptime(timezone_str[-6:], "%z").tzinfo)
self._timezone = tz
self._time = datetime.fromtimestamp(
cast(float, timestamp),
tz=tz,
)
@property
def timezone(self) -> tzinfo:
"""Return current timezone."""
return self._timezone
@property
def time(self) -> datetime:
"""Return device's current datetime."""
return self._time
async def set_time(self, dt: datetime) -> dict:
"""Set device time."""
if not dt.tzinfo:
timestamp = dt.replace(tzinfo=self.timezone).timestamp()
else:
timestamp = dt.timestamp()
lt = datetime.fromtimestamp(timestamp).isoformat().replace("T", " ")
params = {"seconds_from_1970": int(timestamp), "local_time": lt}
# Doesn't seem to update the time, perhaps because timing_mode is ntp
res = await self.call("setTimezone", {"system": {"clock_status": params}})
if (zinfo := dt.tzinfo) and isinstance(zinfo, ZoneInfo):
tz_params = {"zone_id": zinfo.key}
res = await self.call("setTimezone", {"system": {"basic": tz_params}})
return res

View File

@ -3,7 +3,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Any, cast
from ..exceptions import DeviceError, KasaException, SmartErrorCode from ..exceptions import DeviceError, KasaException, SmartErrorCode
from ..smart.smartmodule import SmartModule from ..smart.smartmodule import SmartModule
@ -54,7 +54,11 @@ class SmartCameraModule(SmartModule):
if method[:3] == "get": if method[:3] == "get":
return await self._device._query_getter_helper(method, module, section) return await self._device._query_getter_helper(method, module, section)
return await self._device._query_setter_helper(method, module, section, params) if TYPE_CHECKING:
params = cast(dict[str, dict[str, Any]], params)
return await self._device._query_setter_helper(
method, module, section, params[module][section]
)
@property @property
def data(self) -> dict: def data(self) -> dict:

View File

@ -162,6 +162,24 @@ class FakeSmartCameraTransport(BaseTransport):
"lens_mask_info", "lens_mask_info",
"enabled", "enabled",
], ],
("system", "clock_status", "seconds_from_1970"): [
"getClockStatus",
"system",
"clock_status",
"seconds_from_1970",
],
("system", "clock_status", "local_time"): [
"getClockStatus",
"system",
"clock_status",
"local_time",
],
("system", "basic", "zone_id"): [
"getTimezone",
"system",
"basic",
"zone_id",
],
} }
async def _send_request(self, request_dict: dict): async def _send_request(self, request_dict: dict):
@ -188,12 +206,14 @@ class FakeSmartCameraTransport(BaseTransport):
for skey, sval in skey_val.items(): for skey, sval in skey_val.items():
section_key = skey section_key = skey
section_value = sval section_value = sval
if setter_keys := self.SETTERS.get(
(module, section, section_key)
):
self._get_param_set_value(info, setter_keys, section_value)
else:
return {"error_code": -1}
break break
if setter_keys := self.SETTERS.get((module, section, section_key)): return {"error_code": 0}
self._get_param_set_value(info, setter_keys, section_value)
return {"error_code": 0}
else:
return {"error_code": -1}
elif method[:3] == "get": elif method[:3] == "get":
params = request_dict.get("params") params = request_dict.get("params")
if method in info: if method in info:

View File

@ -2,9 +2,12 @@
from __future__ import annotations from __future__ import annotations
import pytest from datetime import datetime, timezone
from kasa import Device, DeviceType import pytest
from freezegun.api import FrozenDateTimeFactory
from kasa import Device, DeviceType, Module
from ..conftest import device_smartcamera, hub_smartcamera from ..conftest import device_smartcamera, hub_smartcamera
@ -45,3 +48,14 @@ async def test_hub(dev):
await child.update() await child.update()
assert "Time" not in child.modules assert "Time" not in child.modules
assert child.time assert child.time
@device_smartcamera
async def test_device_time(dev: Device, freezer: FrozenDateTimeFactory):
"""Test a child device gets the time from it's parent module."""
fallback_time = datetime.now(timezone.utc).astimezone().replace(microsecond=0)
assert dev.time != fallback_time
module = dev.modules[Module.Time]
await module.set_time(fallback_time)
await dev.update()
assert dev.time == fallback_time