diff --git a/kasa/experimental/smartcamera.py b/kasa/experimental/smartcamera.py index b70ef5df..3224c003 100644 --- a/kasa/experimental/smartcamera.py +++ b/kasa/experimental/smartcamera.py @@ -2,6 +2,8 @@ from __future__ import annotations +from typing import Any + from ..device_type import DeviceType from ..exceptions import SmartErrorCode from ..smart import SmartDevice @@ -10,6 +12,14 @@ from ..smart import SmartDevice class SmartCamera(SmartDevice): """Class for smart cameras.""" + @staticmethod + def _get_device_type_from_sysinfo(sysinfo: dict[str, Any]) -> DeviceType: + """Find type to be displayed as a supported device category.""" + device_type = sysinfo["device_type"] + if device_type.endswith("HUB"): + return DeviceType.Hub + return DeviceType.Camera + async def update(self, update_children: bool = False): """Update the device.""" initial_query = { @@ -26,7 +36,7 @@ class SmartCamera(SmartDevice): basic_info = device_info["basic_info"] return { "model": basic_info["device_model"], - "type": basic_info["device_type"], + "device_type": basic_info["device_type"], "alias": basic_info["device_alias"], "fw_ver": basic_info["sw_version"], "hw_ver": basic_info["hw_version"], @@ -61,7 +71,9 @@ class SmartCamera(SmartDevice): @property def device_type(self) -> DeviceType: """Return the device type.""" - return DeviceType.Camera + if self._device_type == DeviceType.Unknown: + self._device_type = self._get_device_type_from_sysinfo(self._info) + return self._device_type @property def alias(self) -> str | None: diff --git a/kasa/tests/device_fixtures.py b/kasa/tests/device_fixtures.py index fca5960a..1608be94 100644 --- a/kasa/tests/device_fixtures.py +++ b/kasa/tests/device_fixtures.py @@ -10,11 +10,13 @@ from kasa import ( DeviceType, Discover, ) +from kasa.experimental.smartcamera import SmartCamera from kasa.iot import IotBulb, IotDimmer, IotLightStrip, IotPlug, IotStrip, IotWallSwitch from kasa.smart import SmartDevice from .fakeprotocol_iot import FakeIotProtocol from .fakeprotocol_smart import FakeSmartProtocol +from .fakeprotocol_smartcamera import FakeSmartCameraProtocol from .fixtureinfo import ( FIXTURE_DATA, ComponentFilter, @@ -313,6 +315,17 @@ device_smart = parametrize( device_iot = parametrize( "devices iot", model_filter=ALL_DEVICES_IOT, protocol_filter={"IOT"} ) +device_smartcamera = parametrize("devices smartcamera", protocol_filter={"SMARTCAMERA"}) +camera_smartcamera = parametrize( + "camera smartcamera", + device_type_filter=[DeviceType.Camera], + protocol_filter={"SMARTCAMERA"}, +) +hub_smartcamera = parametrize( + "hub smartcamera", + device_type_filter=[DeviceType.Hub], + protocol_filter={"SMARTCAMERA"}, +) def check_categories(): @@ -329,6 +342,8 @@ def check_categories(): + hubs_smart.args[1] + sensors_smart.args[1] + thermostats_smart.args[1] + + camera_smartcamera.args[1] + + hub_smartcamera.args[1] ) diffs: set[FixtureInfo] = set(FIXTURE_DATA) - set(categorized_fixtures) if diffs: @@ -344,8 +359,10 @@ check_categories() def device_for_fixture_name(model, protocol): - if "SMART" in protocol: + if protocol in {"SMART", "SMART.CHILD"}: return SmartDevice + elif protocol == "SMARTCAMERA": + return SmartCamera else: for d in STRIPS_IOT: if d in model: @@ -395,8 +412,10 @@ async def get_device_for_fixture(fixture_data: FixtureInfo) -> Device: d = device_for_fixture_name(fixture_data.name, fixture_data.protocol)( host="127.0.0.123" ) - if "SMART" in fixture_data.protocol: + if fixture_data.protocol in {"SMART", "SMART.CHILD"}: d.protocol = FakeSmartProtocol(fixture_data.data, fixture_data.name) + elif fixture_data.protocol == "SMARTCAMERA": + d.protocol = FakeSmartCameraProtocol(fixture_data.data, fixture_data.name) else: d.protocol = FakeIotProtocol(fixture_data.data) diff --git a/kasa/tests/fakeprotocol_smartcamera.py b/kasa/tests/fakeprotocol_smartcamera.py new file mode 100644 index 00000000..e2a849db --- /dev/null +++ b/kasa/tests/fakeprotocol_smartcamera.py @@ -0,0 +1,217 @@ +from __future__ import annotations + +import copy +from json import loads as json_loads +from warnings import warn + +from kasa import Credentials, DeviceConfig, SmartProtocol +from kasa.experimental.smartcameraprotocol import SmartCameraProtocol +from kasa.protocol import BaseTransport +from kasa.smart import SmartChildDevice + +from .fakeprotocol_smart import FakeSmartProtocol + + +class FakeSmartCameraProtocol(SmartCameraProtocol): + def __init__(self, info, fixture_name): + super().__init__( + transport=FakeSmartCameraTransport(info, fixture_name), + ) + + async def query(self, request, retry_count: int = 3): + """Implement query here so can still patch SmartProtocol.query.""" + resp_dict = await self._query(request, retry_count) + return resp_dict + + +class FakeSmartCameraTransport(BaseTransport): + def __init__( + self, + info, + fixture_name, + *, + list_return_size=10, + ): + super().__init__( + config=DeviceConfig( + "127.0.0.123", + credentials=Credentials( + username="dummy_user", + password="dummy_password", # noqa: S106 + ), + ), + ) + self.fixture_name = fixture_name + self.info = copy.deepcopy(info) + self.child_protocols = self._get_child_protocols() + self.list_return_size = list_return_size + + @property + def default_port(self): + """Default port for the transport.""" + return 443 + + @property + def credentials_hash(self): + """The hashed credentials used by the transport.""" + return self._credentials.username + self._credentials.password + "camerahash" + + async def send(self, request: str): + request_dict = json_loads(request) + method = request_dict["method"] + + if method == "multipleRequest": + params = request_dict["params"] + responses = [] + for request in params["requests"]: + response = await self._send_request(request) # type: ignore[arg-type] + # Devices do not continue after error + if response["error_code"] != 0: + break + response["method"] = request["method"] # type: ignore[index] + responses.append(response) + return {"result": {"responses": responses}, "error_code": 0} + else: + return await self._send_request(request_dict) + + def _get_child_protocols(self): + child_infos = self.info.get("getChildDeviceList", {}).get( + "child_device_list", [] + ) + found_child_fixture_infos = [] + child_protocols = {} + # imported here to avoid circular import + from .conftest import filter_fixtures + + for child_info in child_infos: + if ( + (device_id := child_info.get("device_id")) + and (category := child_info.get("category")) + and category in SmartChildDevice.CHILD_DEVICE_TYPE_MAP + ): + hw_version = child_info["hw_ver"] + sw_version = child_info["fw_ver"] + sw_version = sw_version.split(" ")[0] + model = child_info["model"] + region = child_info["specs"] + child_fixture_name = f"{model}({region})_{hw_version}_{sw_version}" + child_fixtures = filter_fixtures( + "Child fixture", + protocol_filter={"SMART.CHILD"}, + model_filter=child_fixture_name, + ) + if child_fixtures: + fixture_info = next(iter(child_fixtures)) + found_child_fixture_infos.append(child_info) + child_protocols[device_id] = FakeSmartProtocol( + fixture_info.data, fixture_info.name + ) + else: + warn( + f"Could not find child fixture {child_fixture_name}", + stacklevel=1, + ) + else: + warn( + f"Child is a cameraprotocol which needs to be implemented {child_info}", + stacklevel=1, + ) + # Replace child infos with the infos that found child fixtures + if child_infos: + self.info["getChildDeviceList"]["child_device_list"] = ( + found_child_fixture_infos + ) + return child_protocols + + async def _handle_control_child(self, params: dict): + """Handle control_child command.""" + device_id = params.get("device_id") + assert device_id in self.child_protocols, "Fixture does not have child info" + + child_protocol: SmartProtocol = self.child_protocols[device_id] + + request_data = params.get("request_data", {}) + + child_method = request_data.get("method") + child_params = request_data.get("params") # noqa: F841 + + resp = await child_protocol.query({child_method: child_params}) + resp["error_code"] = 0 + for val in resp.values(): + return { + "result": {"response_data": {"result": val, "error_code": 0}}, + "error_code": 0, + } + + @staticmethod + def _get_param_set_value(info: dict, set_keys: list[str], value): + for key in set_keys[:-1]: + info = info[key] + info[set_keys[-1]] = value + + SETTERS = { + ("system", "sys", "dev_alias"): [ + "getDeviceInfo", + "device_info", + "basic_info", + "device_alias", + ], + ("lens_mask", "lens_mask_info", "enabled"): [ + "getLensMaskConfig", + "lens_mask", + "lens_mask_info", + "enabled", + ], + } + + async def _send_request(self, request_dict: dict): + method = request_dict["method"] + + info = self.info + if method == "controlChild": + return await self._handle_control_child( + request_dict["params"]["childControl"] + ) + + if method == "set": + for key, val in request_dict.items(): + if key != "method": + module = key + section = next(iter(val)) + skey_val = val[section] + for skey, sval in skey_val.items(): + section_key = skey + section_value = sval + break + if setter_keys := self.SETTERS.get((module, section, section_key)): + self._get_param_set_value(info, setter_keys, section_value) + return {"error_code": 0} + else: + return {"error_code": -1} + elif method[:3] == "get": + params = request_dict.get("params") + if method in info: + result = copy.deepcopy(info[method]) + if "start_index" in result and "sum" in result: + list_key = next( + iter([key for key in result if isinstance(result[key], list)]) + ) + start_index = ( + start_index + if (params and (start_index := params.get("start_index"))) + else 0 + ) + + result[list_key] = result[list_key][ + start_index : start_index + self.list_return_size + ] + return {"result": result, "error_code": 0} + else: + return {"error_code": -1} + return {"error_code": -1} + + async def close(self) -> None: + pass + + async def reset(self) -> None: + pass diff --git a/kasa/tests/fixtureinfo.py b/kasa/tests/fixtureinfo.py index 9abf0f06..8db96024 100644 --- a/kasa/tests/fixtureinfo.py +++ b/kasa/tests/fixtureinfo.py @@ -4,10 +4,11 @@ import glob import json import os from pathlib import Path -from typing import NamedTuple +from typing import Iterable, NamedTuple from kasa.device_factory import _get_device_type_from_sys_info from kasa.device_type import DeviceType +from kasa.experimental.smartcamera import SmartCamera from kasa.smart.smartdevice import SmartDevice @@ -48,9 +49,18 @@ SUPPORTED_SMART_CHILD_DEVICES = [ ) ] +SUPPORTED_SMARTCAMERA_DEVICES = [ + (device, "SMARTCAMERA") + for device in glob.glob( + os.path.dirname(os.path.abspath(__file__)) + "/fixtures/smartcamera/*.json" + ) +] SUPPORTED_DEVICES = ( - SUPPORTED_IOT_DEVICES + SUPPORTED_SMART_DEVICES + SUPPORTED_SMART_CHILD_DEVICES + SUPPORTED_IOT_DEVICES + + SUPPORTED_SMART_DEVICES + + SUPPORTED_SMART_CHILD_DEVICES + + SUPPORTED_SMARTCAMERA_DEVICES ) @@ -95,7 +105,7 @@ def filter_fixtures( protocol_filter: set[str] | None = None, model_filter: set[str] | None = None, component_filter: str | ComponentFilter | None = None, - device_type_filter: list[DeviceType] | None = None, + device_type_filter: Iterable[DeviceType] | None = None, ): """Filter the fixtures based on supplied parameters. @@ -107,7 +117,11 @@ def filter_fixtures( component in component_nego details. """ - def _model_match(fixture_data: FixtureInfo, model_filter): + def _model_match(fixture_data: FixtureInfo, model_filter: set[str]): + model_filter_list = [mf for mf in model_filter] + if len(model_filter_list) == 1 and model_filter_list[0].split("_") == 3: + # return exact match + return fixture_data.name == model_filter_list[0] file_model_region = fixture_data.name.split("_")[0] file_model = file_model_region.split("(")[0] return file_model in model_filter @@ -134,16 +148,21 @@ def filter_fixtures( ) def _device_type_match(fixture_data: FixtureInfo, device_type): - if (component_nego := fixture_data.data.get("component_nego")) is None: - return _get_device_type_from_sys_info(fixture_data.data) in device_type - components = [component["id"] for component in component_nego["component_list"]] - if (info := fixture_data.data.get("get_device_info")) and ( - type_ := info.get("type") - ): + if fixture_data.protocol in {"SMART", "SMART.CHILD"}: + info = fixture_data.data["get_device_info"] + component_nego = fixture_data.data["component_nego"] + components = [ + component["id"] for component in component_nego["component_list"] + ] return ( - SmartDevice._get_device_type_from_components(components, type_) + SmartDevice._get_device_type_from_components(components, info["type"]) in device_type ) + elif fixture_data.protocol == "IOT": + return _get_device_type_from_sys_info(fixture_data.data) in device_type + elif fixture_data.protocol == "SMARTCAMERA": + info = fixture_data.data["getDeviceInfo"]["device_info"]["basic_info"] + return SmartCamera._get_device_type_from_sysinfo(info) in device_type return False filtered = [] diff --git a/kasa/tests/smartcamera/__init__.py b/kasa/tests/smartcamera/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/kasa/tests/smartcamera/test_smartcamera.py b/kasa/tests/smartcamera/test_smartcamera.py new file mode 100644 index 00000000..9c8893c0 --- /dev/null +++ b/kasa/tests/smartcamera/test_smartcamera.py @@ -0,0 +1,20 @@ +"""Tests for smart camera devices.""" + +from __future__ import annotations + +import pytest + +from kasa import Device, DeviceType + +from ..conftest import device_smartcamera + + +@device_smartcamera +async def test_state(dev: Device): + if dev.device_type is DeviceType.Hub: + pytest.skip("Hubs cannot be switched on and off") + + state = dev.is_on + await dev.set_state(not state) + await dev.update() + assert dev.is_on is not state