From 589d15091a051b29688f207e146dff8096536ccc Mon Sep 17 00:00:00 2001 From: "Steven B." <51370195+sdb9696@users.noreply.github.com> Date: Tue, 14 Jan 2025 08:38:04 +0000 Subject: [PATCH] Add smartcam child device support for smartcam hubs (#1413) --- devtools/dump_devinfo.py | 185 +++++++++++++++++++------- devtools/generate_supported.py | 4 +- kasa/smartcam/__init__.py | 3 +- kasa/smartcam/smartcamchild.py | 115 ++++++++++++++++ kasa/smartcam/smartcamdevice.py | 50 ++++++- tests/device_fixtures.py | 6 +- tests/fakeprotocol_smart.py | 47 +++++-- tests/fakeprotocol_smartcam.py | 17 +++ tests/fixtureinfo.py | 18 +-- tests/smartcam/modules/test_camera.py | 12 +- tests/smartcam/test_smartcamdevice.py | 8 +- tests/test_device.py | 37 +++++- tests/test_devtools.py | 21 ++- 13 files changed, 431 insertions(+), 92 deletions(-) create mode 100644 kasa/smartcam/smartcamchild.py diff --git a/devtools/dump_devinfo.py b/devtools/dump_devinfo.py index e985ab40..cee7a7bf 100644 --- a/devtools/dump_devinfo.py +++ b/devtools/dump_devinfo.py @@ -54,7 +54,8 @@ from kasa.protocols.smartcamprotocol import ( from kasa.protocols.smartprotocol import REDACTORS as SMART_REDACTORS from kasa.protocols.smartprotocol import SmartProtocol, _ChildProtocolWrapper from kasa.smart import SmartChildDevice, SmartDevice -from kasa.smartcam import SmartCamDevice +from kasa.smartcam import SmartCamChild, SmartCamDevice +from kasa.smartcam.smartcamchild import CHILD_INFO_FROM_PARENT Call = namedtuple("Call", "module method") FixtureResult = namedtuple("FixtureResult", "filename, folder, data, protocol_suffix") @@ -62,11 +63,13 @@ FixtureResult = namedtuple("FixtureResult", "filename, folder, data, protocol_su SMART_FOLDER = "tests/fixtures/smart/" SMARTCAM_FOLDER = "tests/fixtures/smartcam/" SMART_CHILD_FOLDER = "tests/fixtures/smart/child/" +SMARTCAM_CHILD_FOLDER = "tests/fixtures/smartcam/child/" IOT_FOLDER = "tests/fixtures/iot/" SMART_PROTOCOL_SUFFIX = "SMART" SMARTCAM_SUFFIX = "SMARTCAM" SMART_CHILD_SUFFIX = "SMART.CHILD" +SMARTCAM_CHILD_SUFFIX = "SMARTCAM.CHILD" IOT_SUFFIX = "IOT" NO_GIT_FIXTURE_FOLDER = "kasa-fixtures" @@ -844,9 +847,8 @@ async def get_smart_test_calls(protocol: SmartProtocol): return test_calls, successes -def get_smart_child_fixture(response): +def get_smart_child_fixture(response, model_info, folder, suffix): """Get a seperate fixture for the child device.""" - model_info = SmartDevice._get_device_info(response, None) hw_version = model_info.hardware_version fw_version = model_info.firmware_version model = model_info.long_name @@ -855,12 +857,68 @@ def get_smart_child_fixture(response): save_filename = f"{model}_{hw_version}_{fw_version}" return FixtureResult( filename=save_filename, - folder=SMART_CHILD_FOLDER, + folder=folder, data=response, - protocol_suffix=SMART_CHILD_SUFFIX, + protocol_suffix=suffix, ) +def scrub_child_device_ids( + main_response: dict, child_responses: dict +) -> dict[str, str]: + """Scrub all the child device ids in the responses.""" + # Make the scrubbed id map + scrubbed_child_id_map = { + device_id: f"SCRUBBED_CHILD_DEVICE_ID_{index + 1}" + for index, device_id in enumerate(child_responses.keys()) + if device_id != "" + } + + for child_id, response in child_responses.items(): + scrubbed_child_id = scrubbed_child_id_map[child_id] + # scrub the device id in the child's get info response + # The checks for the device_id will ensure we can get a fixture + # even if the data is unexpectedly not available although it should + # always be there + if "get_device_info" in response and "device_id" in response["get_device_info"]: + response["get_device_info"]["device_id"] = scrubbed_child_id + elif ( + basic_info := response.get("getDeviceInfo", {}) + .get("device_info", {}) + .get("basic_info") + ) and "dev_id" in basic_info: + basic_info["dev_id"] = scrubbed_child_id + else: + _LOGGER.error( + "Cannot find device id in child get device info: %s", child_id + ) + + # Scrub the device ids in the parent for smart protocol + if gc := main_response.get("get_child_device_component_list"): + for child in gc["child_component_list"]: + device_id = child["device_id"] + child["device_id"] = scrubbed_child_id_map[device_id] + for child in main_response["get_child_device_list"]["child_device_list"]: + device_id = child["device_id"] + child["device_id"] = scrubbed_child_id_map[device_id] + + # Scrub the device ids in the parent for the smart camera protocol + if gc := main_response.get("getChildDeviceComponentList"): + for child in gc["child_component_list"]: + device_id = child["device_id"] + child["device_id"] = scrubbed_child_id_map[device_id] + for child in main_response["getChildDeviceList"]["child_device_list"]: + if device_id := child.get("device_id"): + child["device_id"] = scrubbed_child_id_map[device_id] + continue + elif dev_id := child.get("dev_id"): + child["dev_id"] = scrubbed_child_id_map[dev_id] + continue + _LOGGER.error("Could not find a device id for the child device: %s", child) + + return scrubbed_child_id_map + + async def get_smart_fixtures( protocol: SmartProtocol, *, @@ -917,21 +975,19 @@ async def get_smart_fixtures( finally: await protocol.close() + # Put all the successes into a dict[child_device_id or "", successes[]] device_requests: dict[str, list[SmartCall]] = {} for success in successes: device_request = device_requests.setdefault(success.child_device_id, []) device_request.append(success) - scrubbed_device_ids = { - device_id: f"SCRUBBED_CHILD_DEVICE_ID_{index}" - for index, device_id in enumerate(device_requests.keys()) - if device_id != "" - } - final = await _make_final_calls( protocol, device_requests[""], "All successes", batch_size, child_device_id="" ) fixture_results = [] + + # Make the final child calls + child_responses = {} for child_device_id, requests in device_requests.items(): if child_device_id == "": continue @@ -942,55 +998,82 @@ async def get_smart_fixtures( batch_size, child_device_id=child_device_id, ) + child_responses[child_device_id] = response - scrubbed = scrubbed_device_ids[child_device_id] - if "get_device_info" in response and "device_id" in response["get_device_info"]: - response["get_device_info"]["device_id"] = scrubbed - # If the child is a different model to the parent create a seperate fixture - if "get_device_info" in final: - parent_model = final["get_device_info"]["model"] - elif "getDeviceInfo" in final: - parent_model = final["getDeviceInfo"]["device_info"]["basic_info"][ - "device_model" - ] + # scrub the child ids + scrubbed_child_id_map = scrub_child_device_ids(final, child_responses) + + # Redact data from the main device response. _wrap_redactors ensure we do + # not redact the scrubbed child device ids and replaces REDACTED_partial_id + # with zeros + final = redact_data(final, _wrap_redactors(SMART_REDACTORS)) + + # smart cam child devices provide more information in getChildDeviceList on the + # parent than they return when queried directly for getDeviceInfo so we will store + # it in the child fixture. + if smart_cam_child_list := final.get("getChildDeviceList"): + child_infos_on_parent = { + info["device_id"]: info + for info in smart_cam_child_list["child_device_list"] + } + + for child_id, response in child_responses.items(): + scrubbed_child_id = scrubbed_child_id_map[child_id] + + # Get the parent model for checking whether to create a seperate child fixture + if model := final.get("get_device_info", {}).get("model"): + parent_model = model + elif ( + device_model := final.get("getDeviceInfo", {}) + .get("device_info", {}) + .get("basic_info", {}) + .get("device_model") + ): + parent_model = device_model else: - raise KasaException("Cannot determine parent device model.") + parent_model = None + _LOGGER.error("Cannot determine parent device model.") + + # different model smart child device if ( - "component_nego" in response - and "get_device_info" in response - and (child_model := response["get_device_info"].get("model")) + (child_model := response.get("get_device_info", {}).get("model")) + and parent_model and child_model != parent_model ): response = redact_data(response, _wrap_redactors(SMART_REDACTORS)) - fixture_results.append(get_smart_child_fixture(response)) + model_info = SmartDevice._get_device_info(response, None) + fixture_results.append( + get_smart_child_fixture( + response, model_info, SMART_CHILD_FOLDER, SMART_CHILD_SUFFIX + ) + ) + # different model smartcam child device + elif ( + ( + child_model := response.get("getDeviceInfo", {}) + .get("device_info", {}) + .get("basic_info", {}) + .get("device_model") + ) + and parent_model + and child_model != parent_model + ): + response = redact_data(response, _wrap_redactors(SMART_REDACTORS)) + # There is more info in the childDeviceList on the parent + # particularly the region is needed here. + child_info_from_parent = child_infos_on_parent[scrubbed_child_id] + response[CHILD_INFO_FROM_PARENT] = child_info_from_parent + model_info = SmartCamChild._get_device_info(response, None) + fixture_results.append( + get_smart_child_fixture( + response, model_info, SMARTCAM_CHILD_FOLDER, SMARTCAM_CHILD_SUFFIX + ) + ) + # same model child device else: cd = final.setdefault("child_devices", {}) - cd[scrubbed] = response + cd[scrubbed_child_id] = response - # Scrub the device ids in the parent for smart protocol - if gc := final.get("get_child_device_component_list"): - for child in gc["child_component_list"]: - device_id = child["device_id"] - child["device_id"] = scrubbed_device_ids[device_id] - for child in final["get_child_device_list"]["child_device_list"]: - device_id = child["device_id"] - child["device_id"] = scrubbed_device_ids[device_id] - - # Scrub the device ids in the parent for the smart camera protocol - if gc := final.get("getChildDeviceComponentList"): - for child in gc["child_component_list"]: - device_id = child["device_id"] - child["device_id"] = scrubbed_device_ids[device_id] - for child in final["getChildDeviceList"]["child_device_list"]: - if device_id := child.get("device_id"): - child["device_id"] = scrubbed_device_ids[device_id] - continue - elif dev_id := child.get("dev_id"): - child["dev_id"] = scrubbed_device_ids[dev_id] - continue - _LOGGER.error("Could not find a device for the child device: %s", child) - - final = redact_data(final, _wrap_redactors(SMART_REDACTORS)) discovery_result = None if discovery_info: final["discovery_result"] = redact_data( diff --git a/devtools/generate_supported.py b/devtools/generate_supported.py index 7e946e1a..f97c01c1 100755 --- a/devtools/generate_supported.py +++ b/devtools/generate_supported.py @@ -13,7 +13,7 @@ from typing import Any, NamedTuple from kasa.device_type import DeviceType from kasa.iot import IotDevice from kasa.smart import SmartDevice -from kasa.smartcam import SmartCamDevice +from kasa.smartcam import SmartCamChild, SmartCamDevice class SupportedVersion(NamedTuple): @@ -49,6 +49,7 @@ IOT_FOLDER = "tests/fixtures/iot/" SMART_FOLDER = "tests/fixtures/smart/" SMART_CHILD_FOLDER = "tests/fixtures/smart/child" SMARTCAM_FOLDER = "tests/fixtures/smartcam/" +SMARTCAM_CHILD_FOLDER = "tests/fixtures/smartcam/child" def generate_supported(args): @@ -66,6 +67,7 @@ def generate_supported(args): _get_supported_devices(supported, SMART_FOLDER, SmartDevice) _get_supported_devices(supported, SMART_CHILD_FOLDER, SmartDevice) _get_supported_devices(supported, SMARTCAM_FOLDER, SmartCamDevice) + _get_supported_devices(supported, SMARTCAM_CHILD_FOLDER, SmartCamChild) readme_updated = _update_supported_file( README_FILENAME, _supported_summary(supported), print_diffs diff --git a/kasa/smartcam/__init__.py b/kasa/smartcam/__init__.py index 574459f4..21cbeb50 100644 --- a/kasa/smartcam/__init__.py +++ b/kasa/smartcam/__init__.py @@ -1,5 +1,6 @@ """Package for supporting tapo-branded cameras.""" +from .smartcamchild import SmartCamChild from .smartcamdevice import SmartCamDevice -__all__ = ["SmartCamDevice"] +__all__ = ["SmartCamDevice", "SmartCamChild"] diff --git a/kasa/smartcam/smartcamchild.py b/kasa/smartcam/smartcamchild.py new file mode 100644 index 00000000..f02f21c9 --- /dev/null +++ b/kasa/smartcam/smartcamchild.py @@ -0,0 +1,115 @@ +"""Child device implementation.""" + +from __future__ import annotations + +import logging +from typing import Any + +from ..device import DeviceInfo +from ..device_type import DeviceType +from ..deviceconfig import DeviceConfig +from ..protocols.smartcamprotocol import _ChildCameraProtocolWrapper +from ..protocols.smartprotocol import SmartProtocol +from ..smart.smartchilddevice import SmartChildDevice +from ..smart.smartdevice import ComponentsRaw, SmartDevice +from .smartcamdevice import SmartCamDevice + +_LOGGER = logging.getLogger(__name__) + +# SmartCamChild devices have a different info format from getChildDeviceInfo +# than when querying getDeviceInfo directly on the child. +# As _get_device_info is also called by dump_devtools and generate_supported +# this key will be expected by _get_device_info +CHILD_INFO_FROM_PARENT = "child_info_from_parent" + + +class SmartCamChild(SmartChildDevice, SmartCamDevice): + """Presentation of a child device. + + This wraps the protocol communications and sets internal data for the child. + """ + + CHILD_DEVICE_TYPE_MAP = { + "camera": DeviceType.Camera, + } + + def __init__( + self, + parent: SmartDevice, + info: dict, + component_info_raw: ComponentsRaw, + *, + config: DeviceConfig | None = None, + protocol: SmartProtocol | None = None, + ) -> None: + _protocol = protocol or _ChildCameraProtocolWrapper( + info["device_id"], parent.protocol + ) + super().__init__(parent, info, component_info_raw, protocol=_protocol) + self._child_info_from_parent: dict = {} + + @property + def device_info(self) -> DeviceInfo: + """Return device info. + + Child device does not have it info and components in _last_update so + this overrides the base implementation to call _get_device_info with + info and components combined as they would be in _last_update. + """ + return self._get_device_info( + { + CHILD_INFO_FROM_PARENT: self._child_info_from_parent, + }, + None, + ) + + def _map_child_info_from_parent(self, device_info: dict) -> dict: + return { + "model": device_info["device_model"], + "device_type": device_info["device_type"], + "alias": device_info["alias"], + "fw_ver": device_info["sw_ver"], + "hw_ver": device_info["hw_ver"], + "mac": device_info["mac"], + "hwId": device_info.get("hw_id"), + "oem_id": device_info["oem_id"], + "device_id": device_info["device_id"], + } + + def _update_internal_state(self, info: dict[str, Any]) -> None: + """Update the internal info state. + + This is used by the parent to push updates to its children. + """ + # smartcam children have info with different keys to their own + # getDeviceInfo queries + self._child_info_from_parent = info + + # self._info will have the values normalized across smart and smartcam + # devices + self._info = self._map_child_info_from_parent(info) + + @staticmethod + def _get_device_info( + info: dict[str, Any], discovery_info: dict[str, Any] | None + ) -> DeviceInfo: + """Get model information for a device.""" + if not (cifp := info.get(CHILD_INFO_FROM_PARENT)): + return SmartCamDevice._get_device_info(info, discovery_info) + + model = cifp["device_model"] + device_type = SmartCamDevice._get_device_type_from_sysinfo(cifp) + fw_version_full = cifp["sw_ver"] + firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1) + return DeviceInfo( + short_name=model, + long_name=model, + brand="tapo", + device_family=cifp["device_type"], + device_type=device_type, + hardware_version=cifp["hw_ver"], + firmware_version=firmware_version, + firmware_build=firmware_build, + requires_auth=True, + region=cifp.get("region"), + ) diff --git a/kasa/smartcam/smartcamdevice.py b/kasa/smartcam/smartcamdevice.py index fdae3140..06629678 100644 --- a/kasa/smartcam/smartcamdevice.py +++ b/kasa/smartcam/smartcamdevice.py @@ -63,6 +63,13 @@ class SmartCamDevice(SmartDevice): info = self._try_get_response(info_resp, "getDeviceInfo") self._info = self._map_info(info["device_info"]) + def _update_internal_state(self, info: dict[str, Any]) -> None: + """Update the internal info state. + + This is used by the parent to push updates to its children. + """ + self._info = self._map_info(info) + def _update_children_info(self) -> None: """Update the internal child device info from the parent info.""" if child_info := self._try_get_response( @@ -99,6 +106,27 @@ class SmartCamDevice(SmartDevice): last_update=initial_response, ) + async def _initialize_smartcam_child( + self, info: dict, child_components_raw: ComponentsRaw + ) -> SmartDevice: + """Initialize a smart child device attached to a smartcam device.""" + child_id = info["device_id"] + child_protocol = _ChildCameraProtocolWrapper(child_id, self.protocol) + + last_update = {"getDeviceInfo": {"device_info": {"basic_info": info}}} + app_component_list = { + "app_component_list": child_components_raw["component_list"] + } + from .smartcamchild import SmartCamChild + + return await SmartCamChild.create( + parent=self, + child_info=info, + child_components_raw=app_component_list, + protocol=child_protocol, + last_update=last_update, + ) + async def _initialize_children(self) -> None: """Initialize children for hubs.""" child_info_query = { @@ -113,18 +141,28 @@ class SmartCamDevice(SmartDevice): for child in resp["getChildDeviceComponentList"]["child_component_list"] } children = {} + from .smartcamchild import SmartCamChild + for info in resp["getChildDeviceList"]["child_device_list"]: if ( (category := info.get("category")) - and category in SmartChildDevice.CHILD_DEVICE_TYPE_MAP and (child_id := info.get("device_id")) and (child_components := smart_children_components.get(child_id)) ): - children[child_id] = await self._initialize_smart_child( - info, child_components - ) - else: - _LOGGER.debug("Child device type not supported: %s", info) + # Smart + if category in SmartChildDevice.CHILD_DEVICE_TYPE_MAP: + children[child_id] = await self._initialize_smart_child( + info, child_components + ) + continue + # Smartcam + if category in SmartCamChild.CHILD_DEVICE_TYPE_MAP: + children[child_id] = await self._initialize_smartcam_child( + info, child_components + ) + continue + + _LOGGER.debug("Child device type not supported: %s", info) self._children = children diff --git a/tests/device_fixtures.py b/tests/device_fixtures.py index af9b52cc..295e66ab 100644 --- a/tests/device_fixtures.py +++ b/tests/device_fixtures.py @@ -335,7 +335,7 @@ device_smartcam = parametrize("devices smartcam", protocol_filter={"SMARTCAM"}) camera_smartcam = parametrize( "camera smartcam", device_type_filter=[DeviceType.Camera], - protocol_filter={"SMARTCAM"}, + protocol_filter={"SMARTCAM", "SMARTCAM.CHILD"}, ) hub_smartcam = parametrize( "hub smartcam", @@ -377,7 +377,7 @@ check_categories() def device_for_fixture_name(model, protocol): if protocol in {"SMART", "SMART.CHILD"}: return SmartDevice - elif protocol == "SMARTCAM": + elif protocol in {"SMARTCAM", "SMARTCAM.CHILD"}: return SmartCamDevice else: for d in STRIPS_IOT: @@ -434,7 +434,7 @@ async def get_device_for_fixture( d.protocol = FakeSmartProtocol( fixture_data.data, fixture_data.name, verbatim=verbatim ) - elif fixture_data.protocol == "SMARTCAM": + elif fixture_data.protocol in {"SMARTCAM", "SMARTCAM.CHILD"}: d.protocol = FakeSmartCamProtocol( fixture_data.data, fixture_data.name, verbatim=verbatim ) diff --git a/tests/fakeprotocol_smart.py b/tests/fakeprotocol_smart.py index a2fc3926..7e4774b6 100644 --- a/tests/fakeprotocol_smart.py +++ b/tests/fakeprotocol_smart.py @@ -7,6 +7,8 @@ import pytest from kasa import Credentials, DeviceConfig, SmartProtocol from kasa.exceptions import SmartErrorCode from kasa.smart import SmartChildDevice +from kasa.smartcam import SmartCamChild +from kasa.smartcam.smartcamchild import CHILD_INFO_FROM_PARENT from kasa.transports.basetransport import BaseTransport @@ -227,16 +229,20 @@ class FakeSmartTransport(BaseTransport): # imported here to avoid circular import from .conftest import filter_fixtures - def try_get_child_fixture_info(child_dev_info): + def try_get_child_fixture_info(child_dev_info, protocol): hw_version = child_dev_info["hw_ver"] - sw_version = child_dev_info["fw_ver"] + sw_version = child_dev_info.get("sw_ver", child_dev_info.get("fw_ver")) sw_version = sw_version.split(" ")[0] - model = child_dev_info["model"] - region = child_dev_info.get("specs", "XX") - child_fixture_name = f"{model}({region})_{hw_version}_{sw_version}" + model = child_dev_info.get("device_model", child_dev_info.get("model")) + assert sw_version + assert model + + region = child_dev_info.get("specs", child_dev_info.get("region")) + region = f"({region})" if region else "" + child_fixture_name = f"{model}{region}_{hw_version}_{sw_version}" child_fixtures = filter_fixtures( "Child fixture", - protocol_filter={"SMART.CHILD"}, + protocol_filter={protocol}, model_filter={child_fixture_name}, ) if child_fixtures: @@ -249,7 +255,9 @@ class FakeSmartTransport(BaseTransport): and (category := child_info.get("category")) and category in SmartChildDevice.CHILD_DEVICE_TYPE_MAP ): - if fixture_info_tuple := try_get_child_fixture_info(child_info): + if fixture_info_tuple := try_get_child_fixture_info( + child_info, "SMART.CHILD" + ): child_fixture = copy.deepcopy(fixture_info_tuple.data) child_fixture["get_device_info"]["device_id"] = device_id found_child_fixture_infos.append(child_fixture["get_device_info"]) @@ -270,9 +278,32 @@ class FakeSmartTransport(BaseTransport): pytest.fixtures_missing_methods.setdefault( # type: ignore[attr-defined] parent_fixture_name, set() ).add("child_devices") + elif ( + (device_id := child_info.get("device_id")) + and (category := child_info.get("category")) + and category in SmartCamChild.CHILD_DEVICE_TYPE_MAP + and ( + fixture_info_tuple := try_get_child_fixture_info( + child_info, "SMARTCAM.CHILD" + ) + ) + ): + from .fakeprotocol_smartcam import FakeSmartCamProtocol + + child_fixture = copy.deepcopy(fixture_info_tuple.data) + child_fixture["getDeviceInfo"]["device_info"]["basic_info"][ + "dev_id" + ] = device_id + child_fixture[CHILD_INFO_FROM_PARENT]["device_id"] = device_id + # We copy the child device info to the parent getChildDeviceInfo + # list for smartcam children in order for updates to work. + found_child_fixture_infos.append(child_fixture[CHILD_INFO_FROM_PARENT]) + child_protocols[device_id] = FakeSmartCamProtocol( + child_fixture, fixture_info_tuple.name, is_child=True + ) else: warn( - f"Child is a cameraprotocol which needs to be implemented {child_info}", + f"Child is a protocol which needs to be implemented {child_info}", stacklevel=2, ) # Replace parent child infos with the infos from the child fixtures so diff --git a/tests/fakeprotocol_smartcam.py b/tests/fakeprotocol_smartcam.py index 17b14979..431a761d 100644 --- a/tests/fakeprotocol_smartcam.py +++ b/tests/fakeprotocol_smartcam.py @@ -6,6 +6,7 @@ from typing import Any from kasa import Credentials, DeviceConfig, SmartProtocol from kasa.protocols.smartcamprotocol import SmartCamProtocol +from kasa.smartcam.smartcamchild import CHILD_INFO_FROM_PARENT from kasa.transports.basetransport import BaseTransport from .fakeprotocol_smart import FakeSmartTransport @@ -125,10 +126,26 @@ class FakeSmartCamTransport(BaseTransport): @staticmethod def _get_param_set_value(info: dict, set_keys: list[str], value): + cifp = info.get(CHILD_INFO_FROM_PARENT) + for key in set_keys[:-1]: info = info[key] info[set_keys[-1]] = value + if ( + cifp + and set_keys[0] == "getDeviceInfo" + and ( + child_info_parent_key + := FakeSmartCamTransport.CHILD_INFO_SETTER_MAP.get(set_keys[-1]) + ) + ): + cifp[child_info_parent_key] = value + + CHILD_INFO_SETTER_MAP = { + "device_alias": "alias", + } + FIXTURE_MISSING_MAP = { "getMatterSetupInfo": ( "matter", diff --git a/tests/fixtureinfo.py b/tests/fixtureinfo.py index 8988be1d..fbfe6ff8 100644 --- a/tests/fixtureinfo.py +++ b/tests/fixtureinfo.py @@ -60,11 +60,19 @@ SUPPORTED_SMARTCAM_DEVICES = [ ) ] +SUPPORTED_SMARTCAM_CHILD_DEVICES = [ + (device, "SMARTCAM.CHILD") + for device in glob.glob( + os.path.dirname(os.path.abspath(__file__)) + "/fixtures/smartcam/child/*.json" + ) +] + SUPPORTED_DEVICES = ( SUPPORTED_IOT_DEVICES + SUPPORTED_SMART_DEVICES + SUPPORTED_SMART_CHILD_DEVICES + SUPPORTED_SMARTCAM_DEVICES + + SUPPORTED_SMARTCAM_CHILD_DEVICES ) @@ -82,14 +90,8 @@ def get_fixture_infos() -> list[FixtureInfo]: fixture_data = [] for file, protocol in SUPPORTED_DEVICES: p = Path(file) - folder = Path(__file__).parent / "fixtures" - if protocol == "SMART": - folder = folder / "smart" - if protocol == "SMART.CHILD": - folder = folder / "smart/child" - p = folder / file - with open(p) as f: + with open(file) as f: data = json.load(f) fixture_name = p.name @@ -188,7 +190,7 @@ def filter_fixtures( IotDevice._get_device_type_from_sys_info(fixture_data.data) in device_type ) - elif fixture_data.protocol == "SMARTCAM": + elif fixture_data.protocol in {"SMARTCAM", "SMARTCAM.CHILD"}: info = fixture_data.data["getDeviceInfo"]["device_info"]["basic_info"] return SmartCamDevice._get_device_type_from_sysinfo(info) in device_type return False diff --git a/tests/smartcam/modules/test_camera.py b/tests/smartcam/modules/test_camera.py index ebc08101..d668f9f4 100644 --- a/tests/smartcam/modules/test_camera.py +++ b/tests/smartcam/modules/test_camera.py @@ -10,7 +10,13 @@ import pytest from kasa import Credentials, Device, DeviceType, Module, StreamResolution -from ...conftest import camera_smartcam, device_smartcam +from ...conftest import device_smartcam, parametrize + +not_child_camera_smartcam = parametrize( + "not child camera smartcam", + device_type_filter=[DeviceType.Camera], + protocol_filter={"SMARTCAM"}, +) @device_smartcam @@ -24,7 +30,7 @@ async def test_state(dev: Device): assert dev.is_on is not state -@camera_smartcam +@not_child_camera_smartcam async def test_stream_rtsp_url(dev: Device): camera_module = dev.modules.get(Module.Camera) assert camera_module @@ -84,7 +90,7 @@ async def test_stream_rtsp_url(dev: Device): assert url is None -@camera_smartcam +@not_child_camera_smartcam async def test_onvif_url(dev: Device): """Test the onvif url.""" camera_module = dev.modules.get(Module.Camera) diff --git a/tests/smartcam/test_smartcamdevice.py b/tests/smartcam/test_smartcamdevice.py index 3355d2f0..8675b693 100644 --- a/tests/smartcam/test_smartcamdevice.py +++ b/tests/smartcam/test_smartcamdevice.py @@ -52,12 +52,12 @@ async def test_alias(dev): async def test_hub(dev): assert dev.children for child in dev.children: - assert "Cloud" in child.modules - assert child.modules["Cloud"].data + assert child.modules + assert child.device_info + assert child.alias await child.update() - assert "Time" not in child.modules - assert child.time + assert child.device_id @device_smartcam diff --git a/tests/test_device.py b/tests/test_device.py index 20e5bef8..4f74e89c 100644 --- a/tests/test_device.py +++ b/tests/test_device.py @@ -31,7 +31,7 @@ from kasa.iot.iottimezone import ( ) from kasa.iot.modules import IotLightPreset from kasa.smart import SmartChildDevice, SmartDevice -from kasa.smartcam import SmartCamDevice +from kasa.smartcam import SmartCamChild, SmartCamDevice def _get_subclasses(of_class): @@ -84,13 +84,24 @@ async def test_device_class_ctors(device_class_name_obj): credentials = Credentials("foo", "bar") config = DeviceConfig(host, port_override=port, credentials=credentials) klass = device_class_name_obj[1] - if issubclass(klass, SmartChildDevice): + if issubclass(klass, SmartChildDevice | SmartCamChild): parent = SmartDevice(host, config=config) + smartcam_required = { + "device_model": "foo", + "device_type": "SMART.TAPODOORBELL", + "alias": "Foo", + "sw_ver": "1.1", + "hw_ver": "1.0", + "mac": "1.2.3.4", + "hwId": "hw_id", + "oem_id": "oem_id", + } dev = klass( parent, - {"dummy": "info", "device_id": "dummy"}, + {"dummy": "info", "device_id": "dummy", **smartcam_required}, { "component_list": [{"id": "device", "ver_code": 1}], + "app_component_list": [{"name": "device", "version": 1}], }, ) else: @@ -108,13 +119,24 @@ async def test_device_class_repr(device_class_name_obj): credentials = Credentials("foo", "bar") config = DeviceConfig(host, port_override=port, credentials=credentials) klass = device_class_name_obj[1] - if issubclass(klass, SmartChildDevice): + if issubclass(klass, SmartChildDevice | SmartCamChild): parent = SmartDevice(host, config=config) + smartcam_required = { + "device_model": "foo", + "device_type": "SMART.TAPODOORBELL", + "alias": "Foo", + "sw_ver": "1.1", + "hw_ver": "1.0", + "mac": "1.2.3.4", + "hwId": "hw_id", + "oem_id": "oem_id", + } dev = klass( parent, - {"dummy": "info", "device_id": "dummy"}, + {"dummy": "info", "device_id": "dummy", **smartcam_required}, { "component_list": [{"id": "device", "ver_code": 1}], + "app_component_list": [{"name": "device", "version": 1}], }, ) else: @@ -132,11 +154,14 @@ async def test_device_class_repr(device_class_name_obj): SmartChildDevice: DeviceType.Unknown, SmartDevice: DeviceType.Unknown, SmartCamDevice: DeviceType.Camera, + SmartCamChild: DeviceType.Camera, } type_ = CLASS_TO_DEFAULT_TYPE[klass] child_repr = ">" not_child_repr = f"<{type_} at 127.0.0.2 - update() needed>" - expected_repr = child_repr if klass is SmartChildDevice else not_child_repr + expected_repr = ( + child_repr if klass in {SmartChildDevice, SmartCamChild} else not_child_repr + ) assert repr(dev) == expected_repr diff --git a/tests/test_devtools.py b/tests/test_devtools.py index 3af20035..b49268d3 100644 --- a/tests/test_devtools.py +++ b/tests/test_devtools.py @@ -4,11 +4,18 @@ import copy import pytest -from devtools.dump_devinfo import get_legacy_fixture, get_smart_fixtures +from devtools.dump_devinfo import ( + _wrap_redactors, + get_legacy_fixture, + get_smart_fixtures, +) from kasa.iot import IotDevice from kasa.protocols import IotProtocol +from kasa.protocols.protocol import redact_data +from kasa.protocols.smartprotocol import REDACTORS as SMART_REDACTORS from kasa.smart import SmartDevice from kasa.smartcam import SmartCamDevice +from kasa.smartcam.smartcamchild import CHILD_INFO_FROM_PARENT from .conftest import ( FixtureInfo, @@ -113,6 +120,18 @@ async def test_smartcam_fixtures(fixture_info: FixtureInfo): saved_fixture_data = { key: val for key, val in saved_fixture_data.items() if val != -1001 } + + # Remove the child info from parent from the comparison because the + # child may have been created by a different parent fixture + saved_fixture_data.pop(CHILD_INFO_FROM_PARENT, None) + created_cifp = created_child_fixture.data.pop(CHILD_INFO_FROM_PARENT, None) + + # Still check that the created child info from parent was redacted. + # only smartcam children generate child_info_from_parent + if created_cifp: + redacted_cifp = redact_data(created_cifp, _wrap_redactors(SMART_REDACTORS)) + assert created_cifp == redacted_cifp + assert saved_fixture_data == created_child_fixture.data