Use _get_device_info methods for smart and iot devs in devtools (#1265)

This commit is contained in:
Steven B. 2024-11-18 14:53:11 +00:00 committed by GitHub
parent 9d46996e9b
commit e209d40a6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 386 additions and 168 deletions

View File

@ -179,9 +179,9 @@ All Tapo devices require authentication.<br>Hub-Connected Devices may work acros
### Plugs
- **P100**
- Hardware: 1.0.0 / Firmware: 1.1.3
- Hardware: 1.0.0 / Firmware: 1.3.7
- Hardware: 1.0.0 / Firmware: 1.4.0
- Hardware: 1.0.0 (US) / Firmware: 1.1.3
- Hardware: 1.0.0 (US) / Firmware: 1.3.7
- Hardware: 1.0.0 (US) / Firmware: 1.4.0
- **P110**
- Hardware: 1.0 (EU) / Firmware: 1.0.7
- Hardware: 1.0 (EU) / Firmware: 1.2.3

View File

@ -21,6 +21,7 @@ import traceback
from collections import defaultdict, namedtuple
from pathlib import Path
from pprint import pprint
from typing import Any
import asyncclick as click
@ -40,12 +41,13 @@ from kasa.device_factory import get_protocol
from kasa.deviceconfig import DeviceEncryptionType, DeviceFamily
from kasa.discover import DiscoveryResult
from kasa.exceptions import SmartErrorCode
from kasa.protocols import IotProtocol
from kasa.protocols.smartcameraprotocol import (
SmartCameraProtocol,
_ChildCameraProtocolWrapper,
)
from kasa.protocols.smartprotocol import SmartProtocol, _ChildProtocolWrapper
from kasa.smart import SmartChildDevice
from kasa.smart import SmartChildDevice, SmartDevice
from kasa.smartcamera import SmartCamera
Call = namedtuple("Call", "module method")
@ -389,7 +391,9 @@ async def cli(
)
async def get_legacy_fixture(protocol, *, discovery_info):
async def get_legacy_fixture(
protocol: IotProtocol, *, discovery_info: dict[str, Any] | None
) -> FixtureResult:
"""Get fixture for legacy IOT style protocol."""
items = [
Call(module="system", method="get_sysinfo"),
@ -422,8 +426,8 @@ async def get_legacy_fixture(protocol, *, discovery_info):
finally:
await protocol.close()
final_query = defaultdict(defaultdict)
final = defaultdict(defaultdict)
final_query: dict = defaultdict(defaultdict)
final: dict = defaultdict(defaultdict)
for succ, resp in successes:
final_query[succ.module][succ.method] = {}
final[succ.module][succ.method] = resp
@ -433,16 +437,14 @@ async def get_legacy_fixture(protocol, *, discovery_info):
try:
final = await protocol.query(final_query)
except Exception as ex:
_echo_error(f"Unable to query all successes at once: {ex}", bold=True, fg="red")
_echo_error(f"Unable to query all successes at once: {ex}")
finally:
await protocol.close()
if discovery_info and not discovery_info.get("system"):
# Need to recreate a DiscoverResult here because we don't want the aliases
# in the fixture, we want the actual field names as returned by the device.
dr = DiscoveryResult.from_dict(protocol._discovery_info)
final["discovery_result"] = dr.dict(
by_alias=False, exclude_unset=True, exclude_none=True, exclude_defaults=True
)
dr = DiscoveryResult.from_dict(discovery_info)
final["discovery_result"] = dr.to_dict()
click.echo("Got %s successes" % len(successes))
click.echo(click.style("## device info file ##", bold=True))
@ -817,23 +819,21 @@ async def get_smart_test_calls(protocol: SmartProtocol):
def get_smart_child_fixture(response):
"""Get a seperate fixture for the child device."""
info = response["get_device_info"]
hw_version = info["hw_ver"]
sw_version = info["fw_ver"]
sw_version = sw_version.split(" ", maxsplit=1)[0]
model = info["model"]
if region := info.get("specs"):
model += f"({region})"
save_filename = f"{model}_{hw_version}_{sw_version}.json"
model_info = SmartDevice._get_device_info(response, None)
hw_version = model_info.hardware_version
fw_version = model_info.firmware_version
model = model_info.long_name
if model_info.region is not None:
model = f"{model}({model_info.region})"
save_filename = f"{model}_{hw_version}_{fw_version}.json"
return FixtureResult(
filename=save_filename, folder=SMART_CHILD_FOLDER, data=response
)
async def get_smart_fixtures(
protocol: SmartProtocol, *, discovery_info=None, batch_size: int
):
protocol: SmartProtocol, *, discovery_info: dict[str, Any] | None, batch_size: int
) -> list[FixtureResult]:
"""Get fixture for new TAPO style protocol."""
if isinstance(protocol, SmartCameraProtocol):
test_calls, successes = await get_smart_camera_test_calls(protocol)
@ -964,23 +964,17 @@ async def get_smart_fixtures(
if "get_device_info" in final:
# smart protocol
hw_version = final["get_device_info"]["hw_ver"]
sw_version = final["get_device_info"]["fw_ver"]
if discovery_info:
model = discovery_info["device_model"]
else:
model = final["get_device_info"]["model"] + "(XX)"
sw_version = sw_version.split(" ", maxsplit=1)[0]
model_info = SmartDevice._get_device_info(final, discovery_info)
copy_folder = SMART_FOLDER
else:
# smart camera protocol
model_info = SmartCamera._get_device_info(final, discovery_info)
model = model_info.long_name
hw_version = model_info.hardware_version
sw_version = model_info.firmare_version
if model_info.region is not None:
model = f"{model}({model_info.region})"
copy_folder = SMARTCAMERA_FOLDER
hw_version = model_info.hardware_version
sw_version = model_info.firmware_version
model = model_info.long_name
if model_info.region is not None:
model = f"{model}({model_info.region})"
save_filename = f"{model}_{hw_version}_{sw_version}.json"

View File

@ -1,15 +1,17 @@
#!/usr/bin/env python
"""Script that checks supported devices and updates README.md and SUPPORTED.md."""
from __future__ import annotations
import json
import os
import sys
from pathlib import Path
from string import Template
from typing import NamedTuple
from typing import Any, NamedTuple
from kasa.device_factory import _get_device_type_from_sys_info
from kasa.device_type import DeviceType
from kasa.iot import IotDevice
from kasa.smart import SmartDevice
from kasa.smartcamera import SmartCamera
@ -17,7 +19,7 @@ from kasa.smartcamera import SmartCamera
class SupportedVersion(NamedTuple):
"""Supported version."""
region: str
region: str | None
hw: str
fw: str
auth: bool
@ -45,6 +47,7 @@ README_FILENAME = "README.md"
IOT_FOLDER = "tests/fixtures/"
SMART_FOLDER = "tests/fixtures/smart/"
SMART_CHILD_FOLDER = "tests/fixtures/smart/child"
SMARTCAMERA_FOLDER = "tests/fixtures/smartcamera/"
@ -59,9 +62,10 @@ def generate_supported(args):
supported = {"kasa": {}, "tapo": {}}
_get_iot_supported(supported)
_get_smart_supported(supported)
_get_smartcamera_supported(supported)
_get_supported_devices(supported, IOT_FOLDER, IotDevice)
_get_supported_devices(supported, SMART_FOLDER, SmartDevice)
_get_supported_devices(supported, SMART_CHILD_FOLDER, SmartDevice)
_get_supported_devices(supported, SMARTCAMERA_FOLDER, SmartCamera)
readme_updated = _update_supported_file(
README_FILENAME, _supported_summary(supported), print_diffs
@ -201,49 +205,16 @@ def _supported_text(
return brands
def _get_smart_supported(supported):
for file in Path(SMART_FOLDER).glob("**/*.json"):
def _get_supported_devices(
supported: dict[str, Any],
fixture_location: str,
device_cls: type[IotDevice | SmartDevice | SmartCamera],
):
for file in Path(fixture_location).glob("*.json"):
with file.open() as f:
fixture_data = json.load(f)
if "discovery_result" in fixture_data:
model, _, region = fixture_data["discovery_result"][
"device_model"
].partition("(")
device_type = fixture_data["discovery_result"]["device_type"]
else: # child devices of hubs do not have discovery result
model = fixture_data["get_device_info"]["model"]
region = fixture_data["get_device_info"].get("specs")
device_type = fixture_data["get_device_info"]["type"]
# P100 doesn't have region HW
region = region.replace(")", "") if region else ""
_protocol, devicetype = device_type.split(".")
brand = devicetype[:4].lower()
components = [
component["id"]
for component in fixture_data["component_nego"]["component_list"]
]
dt = SmartDevice._get_device_type_from_components(components, device_type)
supported_type = DEVICE_TYPE_TO_PRODUCT_GROUP[dt]
hw_version = fixture_data["get_device_info"]["hw_ver"]
fw_version = fixture_data["get_device_info"]["fw_ver"]
fw_version = fw_version.split(" ", maxsplit=1)[0]
stype = supported[brand].setdefault(supported_type, {})
smodel = stype.setdefault(model, [])
smodel.append(
SupportedVersion(region=region, hw=hw_version, fw=fw_version, auth=True)
)
def _get_smartcamera_supported(supported):
for file in Path(SMARTCAMERA_FOLDER).glob("**/*.json"):
with file.open() as f:
fixture_data = json.load(f)
model_info = SmartCamera._get_device_info(
model_info = device_cls._get_device_info(
fixture_data, fixture_data.get("discovery_result")
)
@ -255,30 +226,12 @@ def _get_smartcamera_supported(supported):
SupportedVersion(
region=model_info.region,
hw=model_info.hardware_version,
fw=model_info.firmare_version,
fw=model_info.firmware_version,
auth=model_info.requires_auth,
)
)
def _get_iot_supported(supported):
for file in Path(IOT_FOLDER).glob("*.json"):
with file.open() as f:
fixture_data = json.load(f)
sysinfo = fixture_data["system"]["get_sysinfo"]
dt = _get_device_type_from_sys_info(fixture_data)
supported_type = DEVICE_TYPE_TO_PRODUCT_GROUP[dt]
model, _, region = sysinfo["model"][:-1].partition("(")
auth = "discovery_result" in fixture_data
stype = supported["kasa"].setdefault(supported_type, {})
smodel = stype.setdefault(model, [])
fw = sysinfo["sw_ver"].split(" ", maxsplit=1)[0]
smodel.append(
SupportedVersion(region=region, hw=sysinfo["hw_ver"], fw=fw, auth=auth)
)
def main():
"""Entry point to module."""
generate_supported(sys.argv[1:])

View File

@ -162,7 +162,7 @@ class _DeviceInfo:
device_family: str
device_type: DeviceType
hardware_version: str
firmare_version: str
firmware_version: str
firmware_build: str
requires_auth: bool
region: str | None

View File

@ -128,34 +128,6 @@ async def _connect(config: DeviceConfig, protocol: BaseProtocol) -> Device:
)
def _get_device_type_from_sys_info(info: dict[str, Any]) -> DeviceType:
"""Find SmartDevice subclass for device described by passed data."""
if "system" not in info or "get_sysinfo" not in info["system"]:
raise KasaException("No 'system' or 'get_sysinfo' in response")
sysinfo: dict[str, Any] = info["system"]["get_sysinfo"]
type_: str | None = sysinfo.get("type", sysinfo.get("mic_type"))
if type_ is None:
raise KasaException("Unable to find the device type field!")
if "dev_name" in sysinfo and "Dimmer" in sysinfo["dev_name"]:
return DeviceType.Dimmer
if "smartplug" in type_.lower():
if "children" in sysinfo:
return DeviceType.Strip
if (dev_name := sysinfo.get("dev_name")) and "light" in dev_name.lower():
return DeviceType.WallSwitch
return DeviceType.Plug
if "smartbulb" in type_.lower():
if "length" in sysinfo: # strips have length
return DeviceType.LightStrip
return DeviceType.Bulb
raise UnsupportedDeviceError(f"Unknown device type: {type_}")
def get_device_class_from_sys_info(sysinfo: dict[str, Any]) -> type[IotDevice]:
"""Find SmartDevice subclass for device described by passed data."""
TYPE_TO_CLASS = {
@ -166,7 +138,7 @@ def get_device_class_from_sys_info(sysinfo: dict[str, Any]) -> type[IotDevice]:
DeviceType.WallSwitch: IotWallSwitch,
DeviceType.LightStrip: IotLightStrip,
}
return TYPE_TO_CLASS[_get_device_type_from_sys_info(sysinfo)]
return TYPE_TO_CLASS[IotDevice._get_device_type_from_sys_info(sysinfo)]
def get_device_class_from_family(

View File

@ -22,7 +22,8 @@ from datetime import datetime, timedelta, tzinfo
from typing import TYPE_CHECKING, Any, Callable, cast
from warnings import warn
from ..device import Device, WifiNetwork
from ..device import Device, WifiNetwork, _DeviceInfo
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..exceptions import KasaException
from ..feature import Feature
@ -692,3 +693,66 @@ class IotDevice(Device):
This should only be used for debugging purposes.
"""
return self._last_update or self._discovery_info
@staticmethod
def _get_device_type_from_sys_info(info: dict[str, Any]) -> DeviceType:
"""Find SmartDevice subclass for device described by passed data."""
if "system" not in info or "get_sysinfo" not in info["system"]:
raise KasaException("No 'system' or 'get_sysinfo' in response")
sysinfo: dict[str, Any] = info["system"]["get_sysinfo"]
type_: str | None = sysinfo.get("type", sysinfo.get("mic_type"))
if type_ is None:
raise KasaException("Unable to find the device type field!")
if "dev_name" in sysinfo and "Dimmer" in sysinfo["dev_name"]:
return DeviceType.Dimmer
if "smartplug" in type_.lower():
if "children" in sysinfo:
return DeviceType.Strip
if (dev_name := sysinfo.get("dev_name")) and "light" in dev_name.lower():
return DeviceType.WallSwitch
return DeviceType.Plug
if "smartbulb" in type_.lower():
if "length" in sysinfo: # strips have length
return DeviceType.LightStrip
return DeviceType.Bulb
_LOGGER.warning("Unknown device type %s, falling back to plug", type_)
return DeviceType.Plug
@staticmethod
def _get_device_info(
info: dict[str, Any], discovery_info: dict[str, Any] | None
) -> _DeviceInfo:
"""Get model information for a device."""
sys_info = info["system"]["get_sysinfo"]
# Get model and region info
region = None
device_model = sys_info["model"]
long_name, _, region = device_model.partition("(")
if region: # All iot devices have region but just in case
region = region.replace(")", "")
# Get other info
device_family = sys_info.get("type", sys_info.get("mic_type"))
device_type = IotDevice._get_device_type_from_sys_info(info)
fw_version_full = sys_info["sw_ver"]
firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1)
auth = bool(discovery_info and ("mgt_encrypt_schm" in discovery_info))
return _DeviceInfo(
short_name=long_name,
long_name=long_name,
brand="kasa",
device_family=device_family,
device_type=device_type,
hardware_version=sys_info["hw_ver"],
firmware_version=firmware_version,
firmware_build=firmware_build,
requires_auth=auth,
region=region,
)

View File

@ -9,7 +9,7 @@ from collections.abc import Mapping, Sequence
from datetime import datetime, timedelta, timezone, tzinfo
from typing import TYPE_CHECKING, Any, cast
from ..device import Device, WifiNetwork
from ..device import Device, WifiNetwork, _DeviceInfo
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..exceptions import AuthenticationError, DeviceError, KasaException, SmartErrorCode
@ -794,3 +794,48 @@ class SmartDevice(Device):
return DeviceType.Thermostat
_LOGGER.warning("Unknown device type, falling back to plug")
return DeviceType.Plug
@staticmethod
def _get_device_info(
info: dict[str, Any], discovery_info: dict[str, Any] | None
) -> _DeviceInfo:
"""Get model information for a device."""
di = info["get_device_info"]
components = [comp["id"] for comp in info["component_nego"]["component_list"]]
# Get model/region info
short_name = di["model"]
region = None
if discovery_info:
device_model = discovery_info["device_model"]
long_name, _, region = device_model.partition("(")
if region: # P100 doesn't have region
region = region.replace(")", "")
else:
long_name = short_name
if not region: # some devices have region in specs
region = di.get("specs")
# Get other info
device_family = di["type"]
device_type = SmartDevice._get_device_type_from_components(
components, device_family
)
fw_version_full = di["fw_ver"]
firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1)
_protocol, devicetype = device_family.split(".")
# Brand inferred from SMART.KASAPLUG/SMART.TAPOPLUG etc.
brand = devicetype[:4].lower()
return _DeviceInfo(
short_name=short_name,
long_name=long_name,
brand=brand,
device_family=device_family,
device_type=device_type,
hardware_version=di["hw_ver"],
firmware_version=firmware_version,
firmware_build=firmware_build,
requires_auth=True,
region=region,
)

View File

@ -43,7 +43,7 @@ class SmartCamera(SmartDevice):
long_name = discovery_info["device_model"] if discovery_info else short_name
device_type = SmartCamera._get_device_type_from_sysinfo(basic_info)
fw_version_full = basic_info["sw_version"]
firmare_version, firmware_build = fw_version_full.split(" ", maxsplit=1)
firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1)
return _DeviceInfo(
short_name=basic_info["device_model"],
long_name=long_name,
@ -51,7 +51,7 @@ class SmartCamera(SmartDevice):
device_family=basic_info["device_type"],
device_type=device_type,
hardware_version=basic_info["hw_version"],
firmare_version=firmare_version,
firmware_version=firmware_version,
firmware_build=firmware_build,
requires_auth=True,
region=basic_info.get("region"),

View File

@ -15,6 +15,7 @@ from kasa.transports.basetransport import BaseTransport
from .device_fixtures import * # noqa: F403
from .discovery_fixtures import * # noqa: F403
from .fixtureinfo import fixture_info # noqa: F401
# Parametrize tests to run with device both on and off
turn_on = pytest.mark.parametrize("turn_on", [True, False])

View File

@ -188,11 +188,12 @@ def parametrize(
data_root_filter=None,
device_type_filter=None,
ids=None,
fixture_name="dev",
):
if ids is None:
ids = idgenerator
return pytest.mark.parametrize(
"dev",
fixture_name,
filter_fixtures(
desc,
model_filter=model_filter,
@ -407,22 +408,28 @@ async def _discover_update_and_close(ip, username, password) -> Device:
return await _update_and_close(d)
async def get_device_for_fixture(fixture_data: FixtureInfo) -> Device:
async def get_device_for_fixture(
fixture_data: FixtureInfo, *, verbatim=False, update_after_init=True
) -> Device:
# if the wanted file is not an absolute path, prepend the fixtures directory
d = device_for_fixture_name(fixture_data.name, fixture_data.protocol)(
host="127.0.0.123"
)
if fixture_data.protocol in {"SMART", "SMART.CHILD"}:
d.protocol = FakeSmartProtocol(fixture_data.data, fixture_data.name)
d.protocol = FakeSmartProtocol(
fixture_data.data, fixture_data.name, verbatim=verbatim
)
elif fixture_data.protocol == "SMARTCAMERA":
d.protocol = FakeSmartCameraProtocol(fixture_data.data, fixture_data.name)
d.protocol = FakeSmartCameraProtocol(
fixture_data.data, fixture_data.name, verbatim=verbatim
)
else:
d.protocol = FakeIotProtocol(fixture_data.data)
d.protocol = FakeIotProtocol(fixture_data.data, verbatim=verbatim)
discovery_data = None
if "discovery_result" in fixture_data.data:
discovery_data = {"result": fixture_data.data["discovery_result"]}
discovery_data = fixture_data.data["discovery_result"]
elif "system" in fixture_data.data:
discovery_data = {
"system": {"get_sysinfo": fixture_data.data["system"]["get_sysinfo"]}
@ -431,7 +438,8 @@ async def get_device_for_fixture(fixture_data: FixtureInfo) -> Device:
if discovery_data: # Child devices do not have discovery info
d.update_from_discover_info(discovery_data)
await _update_and_close(d)
if update_after_init:
await _update_and_close(d)
return d

View File

@ -177,9 +177,9 @@ MOTION_MODULE = {
class FakeIotProtocol(IotProtocol):
def __init__(self, info, fixture_name=None):
def __init__(self, info, fixture_name=None, *, verbatim=False):
super().__init__(
transport=FakeIotTransport(info, fixture_name),
transport=FakeIotTransport(info, fixture_name, verbatim=verbatim),
)
async def query(self, request, retry_count: int = 3):
@ -189,21 +189,33 @@ class FakeIotProtocol(IotProtocol):
class FakeIotTransport(BaseTransport):
def __init__(self, info, fixture_name=None):
def __init__(self, info, fixture_name=None, *, verbatim=False):
super().__init__(config=DeviceConfig("127.0.0.123"))
info = copy.deepcopy(info)
self.discovery_data = info
self.fixture_name = fixture_name
self.writer = None
self.reader = None
self.verbatim = verbatim
# When True verbatim will bypass any extra processing of missing
# methods and is used to test the fixture creation itself.
if verbatim:
self.proto = copy.deepcopy(info)
else:
self.proto = self._build_fake_proto(info)
@staticmethod
def _build_fake_proto(info):
"""Create an internal protocol with extra data not in the fixture."""
proto = copy.deepcopy(FakeIotTransport.baseproto)
for target in info:
# print("target %s" % target)
if target != "discovery_result":
for cmd in info[target]:
# print("initializing tgt %s cmd %s" % (target, cmd))
proto[target][cmd] = info[target][cmd]
# if we have emeter support, we need to add the missing pieces
for module in ["emeter", "smartlife.iot.common.emeter"]:
if (
@ -223,10 +235,7 @@ class FakeIotTransport(BaseTransport):
dummy_data = emeter_commands[module][etype]
# print("got %s %s from dummy: %s" % (module, etype, dummy_data))
proto[module][etype] = dummy_data
# print("initialized: %s" % proto[module])
self.proto = proto
return proto
@property
def default_port(self) -> int:
@ -421,8 +430,20 @@ class FakeIotTransport(BaseTransport):
}
async def send(self, request, port=9999):
proto = self.proto
if not self.verbatim:
return await self._send(request, port)
# Simply return whatever is in the fixture
response = {}
for target in request:
if target in self.proto:
response.update({target: self.proto[target]})
else:
response.update({"err_msg": "module not support"})
return copy.deepcopy(response)
async def _send(self, request, port=9999):
proto = self.proto
# collect child ids from context
try:
child_ids = request["context"]["child_ids"]

View File

@ -11,9 +11,11 @@ from kasa.transports.basetransport import BaseTransport
class FakeSmartProtocol(SmartProtocol):
def __init__(self, info, fixture_name, *, is_child=False):
def __init__(self, info, fixture_name, *, is_child=False, verbatim=False):
super().__init__(
transport=FakeSmartTransport(info, fixture_name, is_child=is_child),
transport=FakeSmartTransport(
info, fixture_name, is_child=is_child, verbatim=verbatim
),
)
async def query(self, request, retry_count: int = 3):
@ -34,6 +36,7 @@ class FakeSmartTransport(BaseTransport):
fix_incomplete_fixture_lists=True,
is_child=False,
get_child_fixtures=True,
verbatim=False,
):
super().__init__(
config=DeviceConfig(
@ -64,6 +67,13 @@ class FakeSmartTransport(BaseTransport):
self.warn_fixture_missing_methods = warn_fixture_missing_methods
self.fix_incomplete_fixture_lists = fix_incomplete_fixture_lists
# When True verbatim will bypass any extra processing of missing
# methods and is used to test the fixture creation itself.
self.verbatim = verbatim
if verbatim:
self.warn_fixture_missing_methods = False
self.fix_incomplete_fixture_lists = False
@property
def default_port(self):
"""Default port for the transport."""
@ -444,10 +454,10 @@ class FakeSmartTransport(BaseTransport):
return await self._handle_control_child(request_dict["params"])
params = request_dict.get("params", {})
if method == "component_nego" or method[:4] == "get_":
if method in {"component_nego", "qs_component_nego"} or method[:4] == "get_":
if method in info:
result = copy.deepcopy(info[method])
if "start_index" in result and "sum" in result:
if result and "start_index" in result and "sum" in result:
list_key = next(
iter([key for key in result if isinstance(result[key], list)])
)
@ -473,6 +483,12 @@ class FakeSmartTransport(BaseTransport):
]
return {"result": result, "error_code": 0}
if self.verbatim:
return {
"error_code": SmartErrorCode.PARAMS_ERROR.value,
"method": method,
}
if (
# FIXTURE_MISSING is for service calls not in place when
# SMART fixtures started to be generated

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import copy
from json import loads as json_loads
from typing import Any
from kasa import Credentials, DeviceConfig, SmartProtocol
from kasa.protocols.smartcameraprotocol import SmartCameraProtocol
@ -11,9 +12,11 @@ from .fakeprotocol_smart import FakeSmartTransport
class FakeSmartCameraProtocol(SmartCameraProtocol):
def __init__(self, info, fixture_name, *, is_child=False):
def __init__(self, info, fixture_name, *, is_child=False, verbatim=False):
super().__init__(
transport=FakeSmartCameraTransport(info, fixture_name, is_child=is_child),
transport=FakeSmartCameraTransport(
info, fixture_name, is_child=is_child, verbatim=verbatim
),
)
async def query(self, request, retry_count: int = 3):
@ -30,6 +33,7 @@ class FakeSmartCameraTransport(BaseTransport):
*,
list_return_size=10,
is_child=False,
verbatim=False,
):
super().__init__(
config=DeviceConfig(
@ -41,6 +45,9 @@ class FakeSmartCameraTransport(BaseTransport):
),
)
self.fixture_name = fixture_name
# When True verbatim will bypass any extra processing of missing
# methods and is used to test the fixture creation itself.
self.verbatim = verbatim
if not is_child:
self.info = copy.deepcopy(info)
self.child_protocols = FakeSmartTransport._get_child_protocols(
@ -70,11 +77,11 @@ class FakeSmartCameraTransport(BaseTransport):
responses = []
for request in params["requests"]:
response = await self._send_request(request) # type: ignore[arg-type]
response["method"] = request["method"] # type: ignore[index]
responses.append(response)
# Devices do not continue after error
if response["error_code"] != 0:
break
response["method"] = request["method"] # type: ignore[index]
responses.append(response)
return {"result": {"responses": responses}, "error_code": 0}
else:
return await self._send_request(request_dict)
@ -129,6 +136,15 @@ class FakeSmartCameraTransport(BaseTransport):
],
}
@staticmethod
def _get_second_key(request_dict: dict[str, Any]) -> str:
assert (
len(request_dict) == 2
), f"Unexpected dict {request_dict}, should be length 2"
it = iter(request_dict)
next(it, None)
return next(it)
async def _send_request(self, request_dict: dict):
method = request_dict["method"]
@ -175,6 +191,14 @@ class FakeSmartCameraTransport(BaseTransport):
return {"error_code": -1}
break
return {"error_code": 0}
elif method == "get":
module = self._get_second_key(request_dict)
get_method = f"get_{module}"
if get_method in info:
result = copy.deepcopy(info[get_method]["get"])
return {**result, "error_code": 0}
else:
return {"error_code": -1}
elif method[:3] == "get":
params = request_dict.get("params")
if method in info:

View File

@ -1,13 +1,16 @@
from __future__ import annotations
import copy
import glob
import json
import os
from pathlib import Path
from typing import Iterable, NamedTuple
from kasa.device_factory import _get_device_type_from_sys_info
import pytest
from kasa.device_type import DeviceType
from kasa.iot import IotDevice
from kasa.smart.smartdevice import SmartDevice
from kasa.smartcamera.smartcamera import SmartCamera
@ -171,7 +174,10 @@ def filter_fixtures(
in device_type
)
elif fixture_data.protocol == "IOT":
return _get_device_type_from_sys_info(fixture_data.data) in device_type
return (
IotDevice._get_device_type_from_sys_info(fixture_data.data)
in device_type
)
elif fixture_data.protocol == "SMARTCAMERA":
info = fixture_data.data["getDeviceInfo"]["device_info"]["basic_info"]
return SmartCamera._get_device_type_from_sysinfo(info) in device_type
@ -206,3 +212,14 @@ def filter_fixtures(
print(f"\t{value.name}")
filtered.sort()
return filtered
@pytest.fixture(
params=filter_fixtures("all fixture infos"),
ids=idgenerator,
)
def fixture_info(request, mocker):
"""Return raw discovery file contents as JSON. Used for discovery tests."""
fixture_info = request.param
fixture_data = copy.deepcopy(fixture_info.data)
return FixtureInfo(fixture_info.name, fixture_info.protocol, fixture_data)

View File

@ -19,9 +19,9 @@ from kasa import (
)
from kasa.device_factory import (
Device,
IotDevice,
SmartCamera,
SmartDevice,
_get_device_type_from_sys_info,
connect,
get_device_class_from_family,
get_protocol,
@ -182,12 +182,12 @@ async def test_device_types(dev: Device):
res = SmartCamera._get_device_type_from_sysinfo(dev.sys_info)
elif isinstance(dev, SmartDevice):
assert dev._discovery_info
device_type = cast(str, dev._discovery_info["result"]["device_type"])
device_type = cast(str, dev._discovery_info["device_type"])
res = SmartDevice._get_device_type_from_components(
list(dev._components.keys()), device_type
)
else:
res = _get_device_type_from_sys_info(dev._last_update)
res = IotDevice._get_device_type_from_sys_info(dev._last_update)
assert dev.device_type == res

103
tests/test_devtools.py Normal file
View File

@ -0,0 +1,103 @@
"""Module for dump_devinfo tests."""
import pytest
from devtools.dump_devinfo import get_legacy_fixture, get_smart_fixtures
from kasa.iot import IotDevice
from kasa.protocols import IotProtocol
from kasa.smart import SmartDevice
from kasa.smartcamera import SmartCamera
from .conftest import (
FixtureInfo,
get_device_for_fixture,
parametrize,
)
smart_fixtures = parametrize(
"smart fixtures", protocol_filter={"SMART"}, fixture_name="fixture_info"
)
smartcamera_fixtures = parametrize(
"smartcamera fixtures", protocol_filter={"SMARTCAMERA"}, fixture_name="fixture_info"
)
iot_fixtures = parametrize(
"iot fixtures", protocol_filter={"IOT"}, fixture_name="fixture_info"
)
async def test_fixture_names(fixture_info: FixtureInfo):
"""Test that device info gets the right fixture names."""
if fixture_info.protocol in {"SMARTCAMERA"}:
device_info = SmartCamera._get_device_info(
fixture_info.data, fixture_info.data.get("discovery_result")
)
elif fixture_info.protocol in {"SMART"}:
device_info = SmartDevice._get_device_info(
fixture_info.data, fixture_info.data.get("discovery_result")
)
elif fixture_info.protocol in {"SMART.CHILD"}:
device_info = SmartDevice._get_device_info(fixture_info.data, None)
else:
device_info = IotDevice._get_device_info(fixture_info.data, None)
region = f"({device_info.region})" if device_info.region else ""
expected = f"{device_info.long_name}{region}_{device_info.hardware_version}_{device_info.firmware_version}.json"
assert fixture_info.name == expected
@smart_fixtures
async def test_smart_fixtures(fixture_info: FixtureInfo):
"""Test that smart fixtures are created the same."""
dev = await get_device_for_fixture(fixture_info, verbatim=True)
assert isinstance(dev, SmartDevice)
if dev.children:
pytest.skip("Test not currently implemented for devices with children.")
fixtures = await get_smart_fixtures(
dev.protocol,
discovery_info=fixture_info.data.get("discovery_result"),
batch_size=5,
)
fixture_result = fixtures[0]
assert fixture_info.data == fixture_result.data
@smartcamera_fixtures
async def test_smartcamera_fixtures(fixture_info: FixtureInfo):
"""Test that smartcamera fixtures are created the same."""
dev = await get_device_for_fixture(fixture_info, verbatim=True)
assert isinstance(dev, SmartCamera)
if dev.children:
pytest.skip("Test not currently implemented for devices with children.")
fixtures = await get_smart_fixtures(
dev.protocol,
discovery_info=fixture_info.data.get("discovery_result"),
batch_size=5,
)
fixture_result = fixtures[0]
assert fixture_info.data == fixture_result.data
@iot_fixtures
async def test_iot_fixtures(fixture_info: FixtureInfo):
"""Test that iot fixtures are created the same."""
# Iot fixtures often do not have enough data to perform a device update()
# without missing info being added to suppress the update
dev = await get_device_for_fixture(
fixture_info, verbatim=True, update_after_init=False
)
assert isinstance(dev.protocol, IotProtocol)
fixture = await get_legacy_fixture(
dev.protocol, discovery_info=fixture_info.data.get("discovery_result")
)
fixture_result = fixture
created_fixture = {
key: val for key, val in fixture_result.data.items() if "err_code" not in val
}
saved_fixture = {
key: val for key, val in fixture_info.data.items() if "err_code" not in val
}
assert saved_fixture == created_fixture

View File

@ -39,7 +39,7 @@ from kasa.discover import (
json_dumps,
)
from kasa.exceptions import AuthenticationError, UnsupportedDeviceError
from kasa.iot import IotDevice
from kasa.iot import IotDevice, IotPlug
from kasa.transports.aestransport import AesEncyptionSession
from kasa.transports.xortransport import XorEncryption, XorTransport
@ -119,10 +119,11 @@ async def test_type_detection_lightstrip(dev: Device):
assert d.device_type == DeviceType.LightStrip
async def test_type_unknown():
async def test_type_unknown(caplog):
invalid_info = {"system": {"get_sysinfo": {"type": "nosuchtype"}}}
with pytest.raises(UnsupportedDeviceError):
Discover._get_device_class(invalid_info)
assert Discover._get_device_class(invalid_info) is IotPlug
msg = "Unknown device type nosuchtype, falling back to plug"
assert msg in caplog.text
@pytest.mark.parametrize("custom_port", [123, None])
@ -266,7 +267,6 @@ INVALIDS = [
"Unable to find the device type field",
{"system": {"get_sysinfo": {"missing_type": 1}}},
),
("Unknown device type: foo", {"system": {"get_sysinfo": {"type": "foo"}}}),
]