Support smart child modules queries (#967)

Required for the P300 firmware update with `auto_off` module on child
devices. Will query child modules for parent devices that are not hubs.

Coverage will be fixed when the P300 fixture is added
https://github.com/python-kasa/python-kasa/pull/915
This commit is contained in:
Steven B 2024-06-10 15:47:00 +01:00 committed by GitHub
parent 927fe648ac
commit db6276d3fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 64 additions and 17 deletions

View File

@ -99,3 +99,11 @@ class AutoOff(SmartModule):
sysinfo = self._device.sys_info
return self._device.time + timedelta(seconds=sysinfo["auto_off_remain_time"])
async def _check_supported(self):
"""Additional check to see if the module is supported by the device.
Parent devices that report components of children such as P300 will not have
the auto_off_status is sysinfo.
"""
return "auto_off_status" in self._device.sys_info

View File

@ -3,6 +3,7 @@
from __future__ import annotations
import logging
from typing import Any
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
@ -34,7 +35,17 @@ class SmartChildDevice(SmartDevice):
self.protocol = _ChildProtocolWrapper(self._id, parent.protocol)
async def update(self, update_children: bool = True):
"""Noop update. The parent updates our internals."""
"""Update child module info.
The parent updates our internal info so just update modules with
their own queries.
"""
req: dict[str, Any] = {}
for module in self.modules.values():
if mod_query := module.query():
req.update(mod_query)
if req:
self._last_update = await self.protocol.query(req)
@classmethod
async def create(cls, parent: SmartDevice, child_info, child_components):

View File

@ -149,7 +149,7 @@ class SmartDevice(Device):
if "child_device" in self._components and not self.children:
await self._initialize_children()
async def update(self, update_children: bool = True):
async def update(self, update_children: bool = False):
"""Update the device."""
if self.credentials is None and self.credentials_hash is None:
raise AuthenticationError("Tapo plug requires authentication.")
@ -167,9 +167,14 @@ class SmartDevice(Device):
self._last_update = resp = await self.protocol.query(req)
self._info = self._try_get_response(resp, "get_device_info")
# Call child update which will only update module calls, info is updated
# from get_child_device_list. update_children only affects hub devices, other
# devices will always update children to prevent errors on module access.
if update_children or self.device_type != DeviceType.Hub:
for child in self._children.values():
await child.update()
if child_info := self._try_get_response(resp, "get_child_device_list", {}):
# TODO: we don't currently perform queries on children based on modules,
# but just update the information that is returned in the main query.
for info in child_info["child_device_list"]:
self._children[info["device_id"]]._update_internal_state(info)
@ -352,8 +357,7 @@ class SmartDevice(Device):
@property
def time(self) -> datetime:
"""Return the time."""
# TODO: Default to parent's time module for child devices
if self._parent and Module.Time in self.modules:
if self._parent and Module.Time in self._parent.modules:
_timemod = self._parent.modules[Module.Time]
else:
_timemod = self.modules[Module.Time]

View File

@ -149,6 +149,11 @@ class FakeSmartTransport(BaseTransport):
if child["device_id"] == device_id:
info = child
break
# Create the child_devices fixture section for fixtures generated before it was added
if "child_devices" not in self.info:
self.info["child_devices"] = {}
# Get the method calls made directly on the child devices
child_device_calls = self.info["child_devices"].setdefault(device_id, {})
# We only support get & set device info for now.
if child_method == "get_device_info":
@ -159,14 +164,27 @@ class FakeSmartTransport(BaseTransport):
return {"error_code": 0}
elif child_method == "set_preset_rules":
return self._set_child_preset_rules(info, child_params)
elif child_method in child_device_calls:
result = copy.deepcopy(child_device_calls[child_method])
return {"result": result, "error_code": 0}
elif (
# FIXTURE_MISSING is for service calls not in place when
# SMART fixtures started to be generated
missing_result := self.FIXTURE_MISSING_MAP.get(child_method)
) and missing_result[0] in self.components:
result = copy.deepcopy(missing_result[1])
# Copy to info so it will work with update methods
child_device_calls[child_method] = copy.deepcopy(missing_result[1])
result = copy.deepcopy(info[child_method])
retval = {"result": result, "error_code": 0}
return retval
elif child_method[:4] == "set_":
target_method = f"get_{child_method[4:]}"
if target_method not in child_device_calls:
raise RuntimeError(
f"No {target_method} in child info, calling set before get not supported."
)
child_device_calls[target_method].update(child_params)
return {"error_code": 0}
else:
# PARAMS error returned for KS240 when get_device_usage called
# on parent device. Could be any error code though.

View File

@ -9,7 +9,7 @@ from pytest_mock import MockerFixture
from kasa import Module
from kasa.smart import SmartDevice
from kasa.tests.device_fixtures import parametrize
from kasa.tests.device_fixtures import get_parent_and_child_modules, parametrize
autooff = parametrize(
"has autooff", component_filter="auto_off", protocol_filter={"SMART"}
@ -33,13 +33,13 @@ async def test_autooff_features(
dev: SmartDevice, feature: str, prop_name: str, type: type
):
"""Test that features are registered and work as expected."""
autooff = dev.modules.get(Module.AutoOff)
autooff = next(get_parent_and_child_modules(dev, Module.AutoOff))
assert autooff is not None
prop = getattr(autooff, prop_name)
assert isinstance(prop, type)
feat = dev.features[feature]
feat = autooff._device.features[feature]
assert feat.value == prop
assert isinstance(feat.value, type)
@ -47,13 +47,13 @@ async def test_autooff_features(
@autooff
async def test_settings(dev: SmartDevice, mocker: MockerFixture):
"""Test autooff settings."""
autooff = dev.modules.get(Module.AutoOff)
autooff = next(get_parent_and_child_modules(dev, Module.AutoOff))
assert autooff
enabled = dev.features["auto_off_enabled"]
enabled = autooff._device.features["auto_off_enabled"]
assert autooff.enabled == enabled.value
delay = dev.features["auto_off_minutes"]
delay = autooff._device.features["auto_off_minutes"]
assert autooff.delay == delay.value
call = mocker.spy(autooff, "call")
@ -86,10 +86,10 @@ async def test_auto_off_at(
dev: SmartDevice, mocker: MockerFixture, is_timer_active: bool
):
"""Test auto-off at sensor."""
autooff = dev.modules.get(Module.AutoOff)
autooff = next(get_parent_and_child_modules(dev, Module.AutoOff))
assert autooff
autooff_at = dev.features["auto_off_at"]
autooff_at = autooff._device.features["auto_off_at"]
mocker.patch.object(
type(autooff),

View File

@ -9,7 +9,7 @@ from unittest.mock import patch
import pytest
from pytest_mock import MockerFixture
from kasa import KasaException, Module
from kasa import Device, KasaException, Module
from kasa.exceptions import SmartErrorCode
from kasa.smart import SmartDevice
@ -112,6 +112,11 @@ async def test_update_module_queries(dev: SmartDevice, mocker: MockerFixture):
device_queries: dict[SmartDevice, dict[str, Any]] = {}
for mod in dev._modules.values():
device_queries.setdefault(mod._device, {}).update(mod.query())
# Hubs do not query child modules by default.
if dev.device_type != Device.Type.Hub:
for child in dev.children:
for mod in child.modules.values():
device_queries.setdefault(mod._device, {}).update(mod.query())
spies = {}
for device in device_queries:
@ -120,7 +125,8 @@ async def test_update_module_queries(dev: SmartDevice, mocker: MockerFixture):
await dev.update()
for device in device_queries:
if device_queries[device]:
spies[device].assert_called_with(device_queries[device])
# Need assert any here because the child device updates use the parent's protocol
spies[device].assert_any_call(device_queries[device])
else:
spies[device].assert_not_called()