diff --git a/SUPPORTED.md b/SUPPORTED.md index 2da28b44..ca207a03 100644 --- a/SUPPORTED.md +++ b/SUPPORTED.md @@ -179,9 +179,9 @@ All Tapo devices require authentication.
Hub-Connected Devices may work acros ### Plugs - **P100** - - Hardware: 1.0.0 / Firmware: 1.1.3 - - Hardware: 1.0.0 / Firmware: 1.3.7 - - Hardware: 1.0.0 / Firmware: 1.4.0 + - Hardware: 1.0.0 (US) / Firmware: 1.1.3 + - Hardware: 1.0.0 (US) / Firmware: 1.3.7 + - Hardware: 1.0.0 (US) / Firmware: 1.4.0 - **P110** - Hardware: 1.0 (EU) / Firmware: 1.0.7 - Hardware: 1.0 (EU) / Firmware: 1.2.3 diff --git a/devtools/dump_devinfo.py b/devtools/dump_devinfo.py index 36ddaaee..2650882e 100644 --- a/devtools/dump_devinfo.py +++ b/devtools/dump_devinfo.py @@ -21,6 +21,7 @@ import traceback from collections import defaultdict, namedtuple from pathlib import Path from pprint import pprint +from typing import Any import asyncclick as click @@ -40,12 +41,13 @@ from kasa.device_factory import get_protocol from kasa.deviceconfig import DeviceEncryptionType, DeviceFamily from kasa.discover import DiscoveryResult from kasa.exceptions import SmartErrorCode +from kasa.protocols import IotProtocol from kasa.protocols.smartcameraprotocol import ( SmartCameraProtocol, _ChildCameraProtocolWrapper, ) from kasa.protocols.smartprotocol import SmartProtocol, _ChildProtocolWrapper -from kasa.smart import SmartChildDevice +from kasa.smart import SmartChildDevice, SmartDevice from kasa.smartcamera import SmartCamera Call = namedtuple("Call", "module method") @@ -389,7 +391,9 @@ async def cli( ) -async def get_legacy_fixture(protocol, *, discovery_info): +async def get_legacy_fixture( + protocol: IotProtocol, *, discovery_info: dict[str, Any] | None +) -> FixtureResult: """Get fixture for legacy IOT style protocol.""" items = [ Call(module="system", method="get_sysinfo"), @@ -422,8 +426,8 @@ async def get_legacy_fixture(protocol, *, discovery_info): finally: await protocol.close() - final_query = defaultdict(defaultdict) - final = defaultdict(defaultdict) + final_query: dict = defaultdict(defaultdict) + final: dict = defaultdict(defaultdict) for succ, resp in successes: final_query[succ.module][succ.method] = {} final[succ.module][succ.method] = resp @@ -433,16 +437,14 @@ async def get_legacy_fixture(protocol, *, discovery_info): try: final = await protocol.query(final_query) except Exception as ex: - _echo_error(f"Unable to query all successes at once: {ex}", bold=True, fg="red") + _echo_error(f"Unable to query all successes at once: {ex}") finally: await protocol.close() if discovery_info and not discovery_info.get("system"): # Need to recreate a DiscoverResult here because we don't want the aliases # in the fixture, we want the actual field names as returned by the device. - dr = DiscoveryResult.from_dict(protocol._discovery_info) - final["discovery_result"] = dr.dict( - by_alias=False, exclude_unset=True, exclude_none=True, exclude_defaults=True - ) + dr = DiscoveryResult.from_dict(discovery_info) + final["discovery_result"] = dr.to_dict() click.echo("Got %s successes" % len(successes)) click.echo(click.style("## device info file ##", bold=True)) @@ -817,23 +819,21 @@ async def get_smart_test_calls(protocol: SmartProtocol): def get_smart_child_fixture(response): """Get a seperate fixture for the child device.""" - info = response["get_device_info"] - hw_version = info["hw_ver"] - sw_version = info["fw_ver"] - sw_version = sw_version.split(" ", maxsplit=1)[0] - model = info["model"] - if region := info.get("specs"): - model += f"({region})" - - save_filename = f"{model}_{hw_version}_{sw_version}.json" + model_info = SmartDevice._get_device_info(response, None) + hw_version = model_info.hardware_version + fw_version = model_info.firmware_version + model = model_info.long_name + if model_info.region is not None: + model = f"{model}({model_info.region})" + save_filename = f"{model}_{hw_version}_{fw_version}.json" return FixtureResult( filename=save_filename, folder=SMART_CHILD_FOLDER, data=response ) async def get_smart_fixtures( - protocol: SmartProtocol, *, discovery_info=None, batch_size: int -): + protocol: SmartProtocol, *, discovery_info: dict[str, Any] | None, batch_size: int +) -> list[FixtureResult]: """Get fixture for new TAPO style protocol.""" if isinstance(protocol, SmartCameraProtocol): test_calls, successes = await get_smart_camera_test_calls(protocol) @@ -964,23 +964,17 @@ async def get_smart_fixtures( if "get_device_info" in final: # smart protocol - hw_version = final["get_device_info"]["hw_ver"] - sw_version = final["get_device_info"]["fw_ver"] - if discovery_info: - model = discovery_info["device_model"] - else: - model = final["get_device_info"]["model"] + "(XX)" - sw_version = sw_version.split(" ", maxsplit=1)[0] + model_info = SmartDevice._get_device_info(final, discovery_info) copy_folder = SMART_FOLDER else: # smart camera protocol model_info = SmartCamera._get_device_info(final, discovery_info) - model = model_info.long_name - hw_version = model_info.hardware_version - sw_version = model_info.firmare_version - if model_info.region is not None: - model = f"{model}({model_info.region})" copy_folder = SMARTCAMERA_FOLDER + hw_version = model_info.hardware_version + sw_version = model_info.firmware_version + model = model_info.long_name + if model_info.region is not None: + model = f"{model}({model_info.region})" save_filename = f"{model}_{hw_version}_{sw_version}.json" diff --git a/devtools/generate_supported.py b/devtools/generate_supported.py index e7ae732d..499f073c 100755 --- a/devtools/generate_supported.py +++ b/devtools/generate_supported.py @@ -1,15 +1,17 @@ #!/usr/bin/env python """Script that checks supported devices and updates README.md and SUPPORTED.md.""" +from __future__ import annotations + import json import os import sys from pathlib import Path from string import Template -from typing import NamedTuple +from typing import Any, NamedTuple -from kasa.device_factory import _get_device_type_from_sys_info from kasa.device_type import DeviceType +from kasa.iot import IotDevice from kasa.smart import SmartDevice from kasa.smartcamera import SmartCamera @@ -17,7 +19,7 @@ from kasa.smartcamera import SmartCamera class SupportedVersion(NamedTuple): """Supported version.""" - region: str + region: str | None hw: str fw: str auth: bool @@ -45,6 +47,7 @@ README_FILENAME = "README.md" IOT_FOLDER = "tests/fixtures/" SMART_FOLDER = "tests/fixtures/smart/" +SMART_CHILD_FOLDER = "tests/fixtures/smart/child" SMARTCAMERA_FOLDER = "tests/fixtures/smartcamera/" @@ -59,9 +62,10 @@ def generate_supported(args): supported = {"kasa": {}, "tapo": {}} - _get_iot_supported(supported) - _get_smart_supported(supported) - _get_smartcamera_supported(supported) + _get_supported_devices(supported, IOT_FOLDER, IotDevice) + _get_supported_devices(supported, SMART_FOLDER, SmartDevice) + _get_supported_devices(supported, SMART_CHILD_FOLDER, SmartDevice) + _get_supported_devices(supported, SMARTCAMERA_FOLDER, SmartCamera) readme_updated = _update_supported_file( README_FILENAME, _supported_summary(supported), print_diffs @@ -201,49 +205,16 @@ def _supported_text( return brands -def _get_smart_supported(supported): - for file in Path(SMART_FOLDER).glob("**/*.json"): +def _get_supported_devices( + supported: dict[str, Any], + fixture_location: str, + device_cls: type[IotDevice | SmartDevice | SmartCamera], +): + for file in Path(fixture_location).glob("*.json"): with file.open() as f: fixture_data = json.load(f) - if "discovery_result" in fixture_data: - model, _, region = fixture_data["discovery_result"][ - "device_model" - ].partition("(") - device_type = fixture_data["discovery_result"]["device_type"] - else: # child devices of hubs do not have discovery result - model = fixture_data["get_device_info"]["model"] - region = fixture_data["get_device_info"].get("specs") - device_type = fixture_data["get_device_info"]["type"] - # P100 doesn't have region HW - region = region.replace(")", "") if region else "" - - _protocol, devicetype = device_type.split(".") - brand = devicetype[:4].lower() - components = [ - component["id"] - for component in fixture_data["component_nego"]["component_list"] - ] - dt = SmartDevice._get_device_type_from_components(components, device_type) - supported_type = DEVICE_TYPE_TO_PRODUCT_GROUP[dt] - - hw_version = fixture_data["get_device_info"]["hw_ver"] - fw_version = fixture_data["get_device_info"]["fw_ver"] - fw_version = fw_version.split(" ", maxsplit=1)[0] - - stype = supported[brand].setdefault(supported_type, {}) - smodel = stype.setdefault(model, []) - smodel.append( - SupportedVersion(region=region, hw=hw_version, fw=fw_version, auth=True) - ) - - -def _get_smartcamera_supported(supported): - for file in Path(SMARTCAMERA_FOLDER).glob("**/*.json"): - with file.open() as f: - fixture_data = json.load(f) - - model_info = SmartCamera._get_device_info( + model_info = device_cls._get_device_info( fixture_data, fixture_data.get("discovery_result") ) @@ -255,30 +226,12 @@ def _get_smartcamera_supported(supported): SupportedVersion( region=model_info.region, hw=model_info.hardware_version, - fw=model_info.firmare_version, + fw=model_info.firmware_version, auth=model_info.requires_auth, ) ) -def _get_iot_supported(supported): - for file in Path(IOT_FOLDER).glob("*.json"): - with file.open() as f: - fixture_data = json.load(f) - sysinfo = fixture_data["system"]["get_sysinfo"] - dt = _get_device_type_from_sys_info(fixture_data) - supported_type = DEVICE_TYPE_TO_PRODUCT_GROUP[dt] - - model, _, region = sysinfo["model"][:-1].partition("(") - auth = "discovery_result" in fixture_data - stype = supported["kasa"].setdefault(supported_type, {}) - smodel = stype.setdefault(model, []) - fw = sysinfo["sw_ver"].split(" ", maxsplit=1)[0] - smodel.append( - SupportedVersion(region=region, hw=sysinfo["hw_ver"], fw=fw, auth=auth) - ) - - def main(): """Entry point to module.""" generate_supported(sys.argv[1:]) diff --git a/kasa/device.py b/kasa/device.py index ecd3b052..755c89ef 100644 --- a/kasa/device.py +++ b/kasa/device.py @@ -162,7 +162,7 @@ class _DeviceInfo: device_family: str device_type: DeviceType hardware_version: str - firmare_version: str + firmware_version: str firmware_build: str requires_auth: bool region: str | None diff --git a/kasa/device_factory.py b/kasa/device_factory.py index d32f73a0..dab86799 100755 --- a/kasa/device_factory.py +++ b/kasa/device_factory.py @@ -128,34 +128,6 @@ async def _connect(config: DeviceConfig, protocol: BaseProtocol) -> Device: ) -def _get_device_type_from_sys_info(info: dict[str, Any]) -> DeviceType: - """Find SmartDevice subclass for device described by passed data.""" - if "system" not in info or "get_sysinfo" not in info["system"]: - raise KasaException("No 'system' or 'get_sysinfo' in response") - - sysinfo: dict[str, Any] = info["system"]["get_sysinfo"] - type_: str | None = sysinfo.get("type", sysinfo.get("mic_type")) - if type_ is None: - raise KasaException("Unable to find the device type field!") - - if "dev_name" in sysinfo and "Dimmer" in sysinfo["dev_name"]: - return DeviceType.Dimmer - - if "smartplug" in type_.lower(): - if "children" in sysinfo: - return DeviceType.Strip - if (dev_name := sysinfo.get("dev_name")) and "light" in dev_name.lower(): - return DeviceType.WallSwitch - return DeviceType.Plug - - if "smartbulb" in type_.lower(): - if "length" in sysinfo: # strips have length - return DeviceType.LightStrip - - return DeviceType.Bulb - raise UnsupportedDeviceError(f"Unknown device type: {type_}") - - def get_device_class_from_sys_info(sysinfo: dict[str, Any]) -> type[IotDevice]: """Find SmartDevice subclass for device described by passed data.""" TYPE_TO_CLASS = { @@ -166,7 +138,7 @@ def get_device_class_from_sys_info(sysinfo: dict[str, Any]) -> type[IotDevice]: DeviceType.WallSwitch: IotWallSwitch, DeviceType.LightStrip: IotLightStrip, } - return TYPE_TO_CLASS[_get_device_type_from_sys_info(sysinfo)] + return TYPE_TO_CLASS[IotDevice._get_device_type_from_sys_info(sysinfo)] def get_device_class_from_family( diff --git a/kasa/iot/iotdevice.py b/kasa/iot/iotdevice.py index 37f00ae6..95a774df 100755 --- a/kasa/iot/iotdevice.py +++ b/kasa/iot/iotdevice.py @@ -22,7 +22,8 @@ from datetime import datetime, timedelta, tzinfo from typing import TYPE_CHECKING, Any, Callable, cast from warnings import warn -from ..device import Device, WifiNetwork +from ..device import Device, WifiNetwork, _DeviceInfo +from ..device_type import DeviceType from ..deviceconfig import DeviceConfig from ..exceptions import KasaException from ..feature import Feature @@ -692,3 +693,66 @@ class IotDevice(Device): This should only be used for debugging purposes. """ return self._last_update or self._discovery_info + + @staticmethod + def _get_device_type_from_sys_info(info: dict[str, Any]) -> DeviceType: + """Find SmartDevice subclass for device described by passed data.""" + if "system" not in info or "get_sysinfo" not in info["system"]: + raise KasaException("No 'system' or 'get_sysinfo' in response") + + sysinfo: dict[str, Any] = info["system"]["get_sysinfo"] + type_: str | None = sysinfo.get("type", sysinfo.get("mic_type")) + if type_ is None: + raise KasaException("Unable to find the device type field!") + + if "dev_name" in sysinfo and "Dimmer" in sysinfo["dev_name"]: + return DeviceType.Dimmer + + if "smartplug" in type_.lower(): + if "children" in sysinfo: + return DeviceType.Strip + if (dev_name := sysinfo.get("dev_name")) and "light" in dev_name.lower(): + return DeviceType.WallSwitch + return DeviceType.Plug + + if "smartbulb" in type_.lower(): + if "length" in sysinfo: # strips have length + return DeviceType.LightStrip + + return DeviceType.Bulb + _LOGGER.warning("Unknown device type %s, falling back to plug", type_) + return DeviceType.Plug + + @staticmethod + def _get_device_info( + info: dict[str, Any], discovery_info: dict[str, Any] | None + ) -> _DeviceInfo: + """Get model information for a device.""" + sys_info = info["system"]["get_sysinfo"] + + # Get model and region info + region = None + device_model = sys_info["model"] + long_name, _, region = device_model.partition("(") + if region: # All iot devices have region but just in case + region = region.replace(")", "") + + # Get other info + device_family = sys_info.get("type", sys_info.get("mic_type")) + device_type = IotDevice._get_device_type_from_sys_info(info) + fw_version_full = sys_info["sw_ver"] + firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1) + auth = bool(discovery_info and ("mgt_encrypt_schm" in discovery_info)) + + return _DeviceInfo( + short_name=long_name, + long_name=long_name, + brand="kasa", + device_family=device_family, + device_type=device_type, + hardware_version=sys_info["hw_ver"], + firmware_version=firmware_version, + firmware_build=firmware_build, + requires_auth=auth, + region=region, + ) diff --git a/kasa/smart/smartdevice.py b/kasa/smart/smartdevice.py index 270b2959..64f6aa7c 100644 --- a/kasa/smart/smartdevice.py +++ b/kasa/smart/smartdevice.py @@ -9,7 +9,7 @@ from collections.abc import Mapping, Sequence from datetime import datetime, timedelta, timezone, tzinfo from typing import TYPE_CHECKING, Any, cast -from ..device import Device, WifiNetwork +from ..device import Device, WifiNetwork, _DeviceInfo from ..device_type import DeviceType from ..deviceconfig import DeviceConfig from ..exceptions import AuthenticationError, DeviceError, KasaException, SmartErrorCode @@ -794,3 +794,48 @@ class SmartDevice(Device): return DeviceType.Thermostat _LOGGER.warning("Unknown device type, falling back to plug") return DeviceType.Plug + + @staticmethod + def _get_device_info( + info: dict[str, Any], discovery_info: dict[str, Any] | None + ) -> _DeviceInfo: + """Get model information for a device.""" + di = info["get_device_info"] + components = [comp["id"] for comp in info["component_nego"]["component_list"]] + + # Get model/region info + short_name = di["model"] + region = None + if discovery_info: + device_model = discovery_info["device_model"] + long_name, _, region = device_model.partition("(") + if region: # P100 doesn't have region + region = region.replace(")", "") + else: + long_name = short_name + if not region: # some devices have region in specs + region = di.get("specs") + + # Get other info + device_family = di["type"] + device_type = SmartDevice._get_device_type_from_components( + components, device_family + ) + fw_version_full = di["fw_ver"] + firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1) + _protocol, devicetype = device_family.split(".") + # Brand inferred from SMART.KASAPLUG/SMART.TAPOPLUG etc. + brand = devicetype[:4].lower() + + return _DeviceInfo( + short_name=short_name, + long_name=long_name, + brand=brand, + device_family=device_family, + device_type=device_type, + hardware_version=di["hw_ver"], + firmware_version=firmware_version, + firmware_build=firmware_build, + requires_auth=True, + region=region, + ) diff --git a/kasa/smartcamera/smartcamera.py b/kasa/smartcamera/smartcamera.py index b99945b3..2c09e4dd 100644 --- a/kasa/smartcamera/smartcamera.py +++ b/kasa/smartcamera/smartcamera.py @@ -43,7 +43,7 @@ class SmartCamera(SmartDevice): long_name = discovery_info["device_model"] if discovery_info else short_name device_type = SmartCamera._get_device_type_from_sysinfo(basic_info) fw_version_full = basic_info["sw_version"] - firmare_version, firmware_build = fw_version_full.split(" ", maxsplit=1) + firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1) return _DeviceInfo( short_name=basic_info["device_model"], long_name=long_name, @@ -51,7 +51,7 @@ class SmartCamera(SmartDevice): device_family=basic_info["device_type"], device_type=device_type, hardware_version=basic_info["hw_version"], - firmare_version=firmare_version, + firmware_version=firmware_version, firmware_build=firmware_build, requires_auth=True, region=basic_info.get("region"), diff --git a/tests/conftest.py b/tests/conftest.py index c56cba0f..3ff11096 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,6 +15,7 @@ from kasa.transports.basetransport import BaseTransport from .device_fixtures import * # noqa: F403 from .discovery_fixtures import * # noqa: F403 +from .fixtureinfo import fixture_info # noqa: F401 # Parametrize tests to run with device both on and off turn_on = pytest.mark.parametrize("turn_on", [True, False]) diff --git a/tests/device_fixtures.py b/tests/device_fixtures.py index 359d7164..faaec64f 100644 --- a/tests/device_fixtures.py +++ b/tests/device_fixtures.py @@ -188,11 +188,12 @@ def parametrize( data_root_filter=None, device_type_filter=None, ids=None, + fixture_name="dev", ): if ids is None: ids = idgenerator return pytest.mark.parametrize( - "dev", + fixture_name, filter_fixtures( desc, model_filter=model_filter, @@ -407,22 +408,28 @@ async def _discover_update_and_close(ip, username, password) -> Device: return await _update_and_close(d) -async def get_device_for_fixture(fixture_data: FixtureInfo) -> Device: +async def get_device_for_fixture( + fixture_data: FixtureInfo, *, verbatim=False, update_after_init=True +) -> Device: # if the wanted file is not an absolute path, prepend the fixtures directory d = device_for_fixture_name(fixture_data.name, fixture_data.protocol)( host="127.0.0.123" ) if fixture_data.protocol in {"SMART", "SMART.CHILD"}: - d.protocol = FakeSmartProtocol(fixture_data.data, fixture_data.name) + d.protocol = FakeSmartProtocol( + fixture_data.data, fixture_data.name, verbatim=verbatim + ) elif fixture_data.protocol == "SMARTCAMERA": - d.protocol = FakeSmartCameraProtocol(fixture_data.data, fixture_data.name) + d.protocol = FakeSmartCameraProtocol( + fixture_data.data, fixture_data.name, verbatim=verbatim + ) else: - d.protocol = FakeIotProtocol(fixture_data.data) + d.protocol = FakeIotProtocol(fixture_data.data, verbatim=verbatim) discovery_data = None if "discovery_result" in fixture_data.data: - discovery_data = {"result": fixture_data.data["discovery_result"]} + discovery_data = fixture_data.data["discovery_result"] elif "system" in fixture_data.data: discovery_data = { "system": {"get_sysinfo": fixture_data.data["system"]["get_sysinfo"]} @@ -431,7 +438,8 @@ async def get_device_for_fixture(fixture_data: FixtureInfo) -> Device: if discovery_data: # Child devices do not have discovery info d.update_from_discover_info(discovery_data) - await _update_and_close(d) + if update_after_init: + await _update_and_close(d) return d diff --git a/tests/fakeprotocol_iot.py b/tests/fakeprotocol_iot.py index 2e3d2810..b03564d1 100644 --- a/tests/fakeprotocol_iot.py +++ b/tests/fakeprotocol_iot.py @@ -177,9 +177,9 @@ MOTION_MODULE = { class FakeIotProtocol(IotProtocol): - def __init__(self, info, fixture_name=None): + def __init__(self, info, fixture_name=None, *, verbatim=False): super().__init__( - transport=FakeIotTransport(info, fixture_name), + transport=FakeIotTransport(info, fixture_name, verbatim=verbatim), ) async def query(self, request, retry_count: int = 3): @@ -189,21 +189,33 @@ class FakeIotProtocol(IotProtocol): class FakeIotTransport(BaseTransport): - def __init__(self, info, fixture_name=None): + def __init__(self, info, fixture_name=None, *, verbatim=False): super().__init__(config=DeviceConfig("127.0.0.123")) info = copy.deepcopy(info) self.discovery_data = info self.fixture_name = fixture_name self.writer = None self.reader = None + self.verbatim = verbatim + + # When True verbatim will bypass any extra processing of missing + # methods and is used to test the fixture creation itself. + if verbatim: + self.proto = copy.deepcopy(info) + else: + self.proto = self._build_fake_proto(info) + + @staticmethod + def _build_fake_proto(info): + """Create an internal protocol with extra data not in the fixture.""" proto = copy.deepcopy(FakeIotTransport.baseproto) for target in info: - # print("target %s" % target) if target != "discovery_result": for cmd in info[target]: # print("initializing tgt %s cmd %s" % (target, cmd)) proto[target][cmd] = info[target][cmd] + # if we have emeter support, we need to add the missing pieces for module in ["emeter", "smartlife.iot.common.emeter"]: if ( @@ -223,10 +235,7 @@ class FakeIotTransport(BaseTransport): dummy_data = emeter_commands[module][etype] # print("got %s %s from dummy: %s" % (module, etype, dummy_data)) proto[module][etype] = dummy_data - - # print("initialized: %s" % proto[module]) - - self.proto = proto + return proto @property def default_port(self) -> int: @@ -421,8 +430,20 @@ class FakeIotTransport(BaseTransport): } async def send(self, request, port=9999): - proto = self.proto + if not self.verbatim: + return await self._send(request, port) + # Simply return whatever is in the fixture + response = {} + for target in request: + if target in self.proto: + response.update({target: self.proto[target]}) + else: + response.update({"err_msg": "module not support"}) + return copy.deepcopy(response) + + async def _send(self, request, port=9999): + proto = self.proto # collect child ids from context try: child_ids = request["context"]["child_ids"] diff --git a/tests/fakeprotocol_smart.py b/tests/fakeprotocol_smart.py index bde90885..99b75a1a 100644 --- a/tests/fakeprotocol_smart.py +++ b/tests/fakeprotocol_smart.py @@ -11,9 +11,11 @@ from kasa.transports.basetransport import BaseTransport class FakeSmartProtocol(SmartProtocol): - def __init__(self, info, fixture_name, *, is_child=False): + def __init__(self, info, fixture_name, *, is_child=False, verbatim=False): super().__init__( - transport=FakeSmartTransport(info, fixture_name, is_child=is_child), + transport=FakeSmartTransport( + info, fixture_name, is_child=is_child, verbatim=verbatim + ), ) async def query(self, request, retry_count: int = 3): @@ -34,6 +36,7 @@ class FakeSmartTransport(BaseTransport): fix_incomplete_fixture_lists=True, is_child=False, get_child_fixtures=True, + verbatim=False, ): super().__init__( config=DeviceConfig( @@ -64,6 +67,13 @@ class FakeSmartTransport(BaseTransport): self.warn_fixture_missing_methods = warn_fixture_missing_methods self.fix_incomplete_fixture_lists = fix_incomplete_fixture_lists + # When True verbatim will bypass any extra processing of missing + # methods and is used to test the fixture creation itself. + self.verbatim = verbatim + if verbatim: + self.warn_fixture_missing_methods = False + self.fix_incomplete_fixture_lists = False + @property def default_port(self): """Default port for the transport.""" @@ -444,10 +454,10 @@ class FakeSmartTransport(BaseTransport): return await self._handle_control_child(request_dict["params"]) params = request_dict.get("params", {}) - if method == "component_nego" or method[:4] == "get_": + if method in {"component_nego", "qs_component_nego"} or method[:4] == "get_": if method in info: result = copy.deepcopy(info[method]) - if "start_index" in result and "sum" in result: + if result and "start_index" in result and "sum" in result: list_key = next( iter([key for key in result if isinstance(result[key], list)]) ) @@ -473,6 +483,12 @@ class FakeSmartTransport(BaseTransport): ] return {"result": result, "error_code": 0} + if self.verbatim: + return { + "error_code": SmartErrorCode.PARAMS_ERROR.value, + "method": method, + } + if ( # FIXTURE_MISSING is for service calls not in place when # SMART fixtures started to be generated diff --git a/tests/fakeprotocol_smartcamera.py b/tests/fakeprotocol_smartcamera.py index e84b5bf9..4059fbfb 100644 --- a/tests/fakeprotocol_smartcamera.py +++ b/tests/fakeprotocol_smartcamera.py @@ -2,6 +2,7 @@ from __future__ import annotations import copy from json import loads as json_loads +from typing import Any from kasa import Credentials, DeviceConfig, SmartProtocol from kasa.protocols.smartcameraprotocol import SmartCameraProtocol @@ -11,9 +12,11 @@ from .fakeprotocol_smart import FakeSmartTransport class FakeSmartCameraProtocol(SmartCameraProtocol): - def __init__(self, info, fixture_name, *, is_child=False): + def __init__(self, info, fixture_name, *, is_child=False, verbatim=False): super().__init__( - transport=FakeSmartCameraTransport(info, fixture_name, is_child=is_child), + transport=FakeSmartCameraTransport( + info, fixture_name, is_child=is_child, verbatim=verbatim + ), ) async def query(self, request, retry_count: int = 3): @@ -30,6 +33,7 @@ class FakeSmartCameraTransport(BaseTransport): *, list_return_size=10, is_child=False, + verbatim=False, ): super().__init__( config=DeviceConfig( @@ -41,6 +45,9 @@ class FakeSmartCameraTransport(BaseTransport): ), ) self.fixture_name = fixture_name + # When True verbatim will bypass any extra processing of missing + # methods and is used to test the fixture creation itself. + self.verbatim = verbatim if not is_child: self.info = copy.deepcopy(info) self.child_protocols = FakeSmartTransport._get_child_protocols( @@ -70,11 +77,11 @@ class FakeSmartCameraTransport(BaseTransport): responses = [] for request in params["requests"]: response = await self._send_request(request) # type: ignore[arg-type] + response["method"] = request["method"] # type: ignore[index] + responses.append(response) # Devices do not continue after error if response["error_code"] != 0: break - response["method"] = request["method"] # type: ignore[index] - responses.append(response) return {"result": {"responses": responses}, "error_code": 0} else: return await self._send_request(request_dict) @@ -129,6 +136,15 @@ class FakeSmartCameraTransport(BaseTransport): ], } + @staticmethod + def _get_second_key(request_dict: dict[str, Any]) -> str: + assert ( + len(request_dict) == 2 + ), f"Unexpected dict {request_dict}, should be length 2" + it = iter(request_dict) + next(it, None) + return next(it) + async def _send_request(self, request_dict: dict): method = request_dict["method"] @@ -175,6 +191,14 @@ class FakeSmartCameraTransport(BaseTransport): return {"error_code": -1} break return {"error_code": 0} + elif method == "get": + module = self._get_second_key(request_dict) + get_method = f"get_{module}" + if get_method in info: + result = copy.deepcopy(info[get_method]["get"]) + return {**result, "error_code": 0} + else: + return {"error_code": -1} elif method[:3] == "get": params = request_dict.get("params") if method in info: diff --git a/tests/fixtureinfo.py b/tests/fixtureinfo.py index 7d7607ef..644a3810 100644 --- a/tests/fixtureinfo.py +++ b/tests/fixtureinfo.py @@ -1,13 +1,16 @@ from __future__ import annotations +import copy import glob import json import os from pathlib import Path from typing import Iterable, NamedTuple -from kasa.device_factory import _get_device_type_from_sys_info +import pytest + from kasa.device_type import DeviceType +from kasa.iot import IotDevice from kasa.smart.smartdevice import SmartDevice from kasa.smartcamera.smartcamera import SmartCamera @@ -171,7 +174,10 @@ def filter_fixtures( in device_type ) elif fixture_data.protocol == "IOT": - return _get_device_type_from_sys_info(fixture_data.data) in device_type + return ( + IotDevice._get_device_type_from_sys_info(fixture_data.data) + in device_type + ) elif fixture_data.protocol == "SMARTCAMERA": info = fixture_data.data["getDeviceInfo"]["device_info"]["basic_info"] return SmartCamera._get_device_type_from_sysinfo(info) in device_type @@ -206,3 +212,14 @@ def filter_fixtures( print(f"\t{value.name}") filtered.sort() return filtered + + +@pytest.fixture( + params=filter_fixtures("all fixture infos"), + ids=idgenerator, +) +def fixture_info(request, mocker): + """Return raw discovery file contents as JSON. Used for discovery tests.""" + fixture_info = request.param + fixture_data = copy.deepcopy(fixture_info.data) + return FixtureInfo(fixture_info.name, fixture_info.protocol, fixture_data) diff --git a/tests/fixtures/smart/P100_1.0.0_1.1.3.json b/tests/fixtures/smart/P100(US)_1.0.0_1.1.3.json similarity index 100% rename from tests/fixtures/smart/P100_1.0.0_1.1.3.json rename to tests/fixtures/smart/P100(US)_1.0.0_1.1.3.json diff --git a/tests/fixtures/smart/P100_1.0.0_1.3.7.json b/tests/fixtures/smart/P100(US)_1.0.0_1.3.7.json similarity index 100% rename from tests/fixtures/smart/P100_1.0.0_1.3.7.json rename to tests/fixtures/smart/P100(US)_1.0.0_1.3.7.json diff --git a/tests/fixtures/smart/P100_1.0.0_1.4.0.json b/tests/fixtures/smart/P100(US)_1.0.0_1.4.0.json similarity index 100% rename from tests/fixtures/smart/P100_1.0.0_1.4.0.json rename to tests/fixtures/smart/P100(US)_1.0.0_1.4.0.json diff --git a/tests/test_device_factory.py b/tests/test_device_factory.py index 4f71888b..9102a528 100644 --- a/tests/test_device_factory.py +++ b/tests/test_device_factory.py @@ -19,9 +19,9 @@ from kasa import ( ) from kasa.device_factory import ( Device, + IotDevice, SmartCamera, SmartDevice, - _get_device_type_from_sys_info, connect, get_device_class_from_family, get_protocol, @@ -182,12 +182,12 @@ async def test_device_types(dev: Device): res = SmartCamera._get_device_type_from_sysinfo(dev.sys_info) elif isinstance(dev, SmartDevice): assert dev._discovery_info - device_type = cast(str, dev._discovery_info["result"]["device_type"]) + device_type = cast(str, dev._discovery_info["device_type"]) res = SmartDevice._get_device_type_from_components( list(dev._components.keys()), device_type ) else: - res = _get_device_type_from_sys_info(dev._last_update) + res = IotDevice._get_device_type_from_sys_info(dev._last_update) assert dev.device_type == res diff --git a/tests/test_devtools.py b/tests/test_devtools.py new file mode 100644 index 00000000..fa60acd5 --- /dev/null +++ b/tests/test_devtools.py @@ -0,0 +1,103 @@ +"""Module for dump_devinfo tests.""" + +import pytest + +from devtools.dump_devinfo import get_legacy_fixture, get_smart_fixtures +from kasa.iot import IotDevice +from kasa.protocols import IotProtocol +from kasa.smart import SmartDevice +from kasa.smartcamera import SmartCamera + +from .conftest import ( + FixtureInfo, + get_device_for_fixture, + parametrize, +) + +smart_fixtures = parametrize( + "smart fixtures", protocol_filter={"SMART"}, fixture_name="fixture_info" +) +smartcamera_fixtures = parametrize( + "smartcamera fixtures", protocol_filter={"SMARTCAMERA"}, fixture_name="fixture_info" +) +iot_fixtures = parametrize( + "iot fixtures", protocol_filter={"IOT"}, fixture_name="fixture_info" +) + + +async def test_fixture_names(fixture_info: FixtureInfo): + """Test that device info gets the right fixture names.""" + if fixture_info.protocol in {"SMARTCAMERA"}: + device_info = SmartCamera._get_device_info( + fixture_info.data, fixture_info.data.get("discovery_result") + ) + elif fixture_info.protocol in {"SMART"}: + device_info = SmartDevice._get_device_info( + fixture_info.data, fixture_info.data.get("discovery_result") + ) + elif fixture_info.protocol in {"SMART.CHILD"}: + device_info = SmartDevice._get_device_info(fixture_info.data, None) + else: + device_info = IotDevice._get_device_info(fixture_info.data, None) + + region = f"({device_info.region})" if device_info.region else "" + expected = f"{device_info.long_name}{region}_{device_info.hardware_version}_{device_info.firmware_version}.json" + assert fixture_info.name == expected + + +@smart_fixtures +async def test_smart_fixtures(fixture_info: FixtureInfo): + """Test that smart fixtures are created the same.""" + dev = await get_device_for_fixture(fixture_info, verbatim=True) + assert isinstance(dev, SmartDevice) + if dev.children: + pytest.skip("Test not currently implemented for devices with children.") + fixtures = await get_smart_fixtures( + dev.protocol, + discovery_info=fixture_info.data.get("discovery_result"), + batch_size=5, + ) + fixture_result = fixtures[0] + + assert fixture_info.data == fixture_result.data + + +@smartcamera_fixtures +async def test_smartcamera_fixtures(fixture_info: FixtureInfo): + """Test that smartcamera fixtures are created the same.""" + dev = await get_device_for_fixture(fixture_info, verbatim=True) + assert isinstance(dev, SmartCamera) + if dev.children: + pytest.skip("Test not currently implemented for devices with children.") + fixtures = await get_smart_fixtures( + dev.protocol, + discovery_info=fixture_info.data.get("discovery_result"), + batch_size=5, + ) + fixture_result = fixtures[0] + + assert fixture_info.data == fixture_result.data + + +@iot_fixtures +async def test_iot_fixtures(fixture_info: FixtureInfo): + """Test that iot fixtures are created the same.""" + # Iot fixtures often do not have enough data to perform a device update() + # without missing info being added to suppress the update + dev = await get_device_for_fixture( + fixture_info, verbatim=True, update_after_init=False + ) + assert isinstance(dev.protocol, IotProtocol) + + fixture = await get_legacy_fixture( + dev.protocol, discovery_info=fixture_info.data.get("discovery_result") + ) + fixture_result = fixture + + created_fixture = { + key: val for key, val in fixture_result.data.items() if "err_code" not in val + } + saved_fixture = { + key: val for key, val in fixture_info.data.items() if "err_code" not in val + } + assert saved_fixture == created_fixture diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 8d4582b0..cfc85cd1 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -39,7 +39,7 @@ from kasa.discover import ( json_dumps, ) from kasa.exceptions import AuthenticationError, UnsupportedDeviceError -from kasa.iot import IotDevice +from kasa.iot import IotDevice, IotPlug from kasa.transports.aestransport import AesEncyptionSession from kasa.transports.xortransport import XorEncryption, XorTransport @@ -119,10 +119,11 @@ async def test_type_detection_lightstrip(dev: Device): assert d.device_type == DeviceType.LightStrip -async def test_type_unknown(): +async def test_type_unknown(caplog): invalid_info = {"system": {"get_sysinfo": {"type": "nosuchtype"}}} - with pytest.raises(UnsupportedDeviceError): - Discover._get_device_class(invalid_info) + assert Discover._get_device_class(invalid_info) is IotPlug + msg = "Unknown device type nosuchtype, falling back to plug" + assert msg in caplog.text @pytest.mark.parametrize("custom_port", [123, None]) @@ -266,7 +267,6 @@ INVALIDS = [ "Unable to find the device type field", {"system": {"get_sysinfo": {"missing_type": 1}}}, ), - ("Unknown device type: foo", {"system": {"get_sysinfo": {"type": "foo"}}}), ]