Add test framework for smartcamera (#1192)

This commit is contained in:
Steven B. 2024-10-24 09:36:18 +01:00 committed by GitHub
parent 51958d8078
commit c839aaa1dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 302 additions and 15 deletions

View File

@ -2,6 +2,8 @@
from __future__ import annotations from __future__ import annotations
from typing import Any
from ..device_type import DeviceType from ..device_type import DeviceType
from ..exceptions import SmartErrorCode from ..exceptions import SmartErrorCode
from ..smart import SmartDevice from ..smart import SmartDevice
@ -10,6 +12,14 @@ from ..smart import SmartDevice
class SmartCamera(SmartDevice): class SmartCamera(SmartDevice):
"""Class for smart cameras.""" """Class for smart cameras."""
@staticmethod
def _get_device_type_from_sysinfo(sysinfo: dict[str, Any]) -> DeviceType:
"""Find type to be displayed as a supported device category."""
device_type = sysinfo["device_type"]
if device_type.endswith("HUB"):
return DeviceType.Hub
return DeviceType.Camera
async def update(self, update_children: bool = False): async def update(self, update_children: bool = False):
"""Update the device.""" """Update the device."""
initial_query = { initial_query = {
@ -26,7 +36,7 @@ class SmartCamera(SmartDevice):
basic_info = device_info["basic_info"] basic_info = device_info["basic_info"]
return { return {
"model": basic_info["device_model"], "model": basic_info["device_model"],
"type": basic_info["device_type"], "device_type": basic_info["device_type"],
"alias": basic_info["device_alias"], "alias": basic_info["device_alias"],
"fw_ver": basic_info["sw_version"], "fw_ver": basic_info["sw_version"],
"hw_ver": basic_info["hw_version"], "hw_ver": basic_info["hw_version"],
@ -61,7 +71,9 @@ class SmartCamera(SmartDevice):
@property @property
def device_type(self) -> DeviceType: def device_type(self) -> DeviceType:
"""Return the device type.""" """Return the device type."""
return DeviceType.Camera if self._device_type == DeviceType.Unknown:
self._device_type = self._get_device_type_from_sysinfo(self._info)
return self._device_type
@property @property
def alias(self) -> str | None: def alias(self) -> str | None:

View File

@ -10,11 +10,13 @@ from kasa import (
DeviceType, DeviceType,
Discover, Discover,
) )
from kasa.experimental.smartcamera import SmartCamera
from kasa.iot import IotBulb, IotDimmer, IotLightStrip, IotPlug, IotStrip, IotWallSwitch from kasa.iot import IotBulb, IotDimmer, IotLightStrip, IotPlug, IotStrip, IotWallSwitch
from kasa.smart import SmartDevice from kasa.smart import SmartDevice
from .fakeprotocol_iot import FakeIotProtocol from .fakeprotocol_iot import FakeIotProtocol
from .fakeprotocol_smart import FakeSmartProtocol from .fakeprotocol_smart import FakeSmartProtocol
from .fakeprotocol_smartcamera import FakeSmartCameraProtocol
from .fixtureinfo import ( from .fixtureinfo import (
FIXTURE_DATA, FIXTURE_DATA,
ComponentFilter, ComponentFilter,
@ -313,6 +315,17 @@ device_smart = parametrize(
device_iot = parametrize( device_iot = parametrize(
"devices iot", model_filter=ALL_DEVICES_IOT, protocol_filter={"IOT"} "devices iot", model_filter=ALL_DEVICES_IOT, protocol_filter={"IOT"}
) )
device_smartcamera = parametrize("devices smartcamera", protocol_filter={"SMARTCAMERA"})
camera_smartcamera = parametrize(
"camera smartcamera",
device_type_filter=[DeviceType.Camera],
protocol_filter={"SMARTCAMERA"},
)
hub_smartcamera = parametrize(
"hub smartcamera",
device_type_filter=[DeviceType.Hub],
protocol_filter={"SMARTCAMERA"},
)
def check_categories(): def check_categories():
@ -329,6 +342,8 @@ def check_categories():
+ hubs_smart.args[1] + hubs_smart.args[1]
+ sensors_smart.args[1] + sensors_smart.args[1]
+ thermostats_smart.args[1] + thermostats_smart.args[1]
+ camera_smartcamera.args[1]
+ hub_smartcamera.args[1]
) )
diffs: set[FixtureInfo] = set(FIXTURE_DATA) - set(categorized_fixtures) diffs: set[FixtureInfo] = set(FIXTURE_DATA) - set(categorized_fixtures)
if diffs: if diffs:
@ -344,8 +359,10 @@ check_categories()
def device_for_fixture_name(model, protocol): def device_for_fixture_name(model, protocol):
if "SMART" in protocol: if protocol in {"SMART", "SMART.CHILD"}:
return SmartDevice return SmartDevice
elif protocol == "SMARTCAMERA":
return SmartCamera
else: else:
for d in STRIPS_IOT: for d in STRIPS_IOT:
if d in model: if d in model:
@ -395,8 +412,10 @@ async def get_device_for_fixture(fixture_data: FixtureInfo) -> Device:
d = device_for_fixture_name(fixture_data.name, fixture_data.protocol)( d = device_for_fixture_name(fixture_data.name, fixture_data.protocol)(
host="127.0.0.123" host="127.0.0.123"
) )
if "SMART" in fixture_data.protocol: if fixture_data.protocol in {"SMART", "SMART.CHILD"}:
d.protocol = FakeSmartProtocol(fixture_data.data, fixture_data.name) d.protocol = FakeSmartProtocol(fixture_data.data, fixture_data.name)
elif fixture_data.protocol == "SMARTCAMERA":
d.protocol = FakeSmartCameraProtocol(fixture_data.data, fixture_data.name)
else: else:
d.protocol = FakeIotProtocol(fixture_data.data) d.protocol = FakeIotProtocol(fixture_data.data)

View File

@ -0,0 +1,217 @@
from __future__ import annotations
import copy
from json import loads as json_loads
from warnings import warn
from kasa import Credentials, DeviceConfig, SmartProtocol
from kasa.experimental.smartcameraprotocol import SmartCameraProtocol
from kasa.protocol import BaseTransport
from kasa.smart import SmartChildDevice
from .fakeprotocol_smart import FakeSmartProtocol
class FakeSmartCameraProtocol(SmartCameraProtocol):
def __init__(self, info, fixture_name):
super().__init__(
transport=FakeSmartCameraTransport(info, fixture_name),
)
async def query(self, request, retry_count: int = 3):
"""Implement query here so can still patch SmartProtocol.query."""
resp_dict = await self._query(request, retry_count)
return resp_dict
class FakeSmartCameraTransport(BaseTransport):
def __init__(
self,
info,
fixture_name,
*,
list_return_size=10,
):
super().__init__(
config=DeviceConfig(
"127.0.0.123",
credentials=Credentials(
username="dummy_user",
password="dummy_password", # noqa: S106
),
),
)
self.fixture_name = fixture_name
self.info = copy.deepcopy(info)
self.child_protocols = self._get_child_protocols()
self.list_return_size = list_return_size
@property
def default_port(self):
"""Default port for the transport."""
return 443
@property
def credentials_hash(self):
"""The hashed credentials used by the transport."""
return self._credentials.username + self._credentials.password + "camerahash"
async def send(self, request: str):
request_dict = json_loads(request)
method = request_dict["method"]
if method == "multipleRequest":
params = request_dict["params"]
responses = []
for request in params["requests"]:
response = await self._send_request(request) # type: ignore[arg-type]
# Devices do not continue after error
if response["error_code"] != 0:
break
response["method"] = request["method"] # type: ignore[index]
responses.append(response)
return {"result": {"responses": responses}, "error_code": 0}
else:
return await self._send_request(request_dict)
def _get_child_protocols(self):
child_infos = self.info.get("getChildDeviceList", {}).get(
"child_device_list", []
)
found_child_fixture_infos = []
child_protocols = {}
# imported here to avoid circular import
from .conftest import filter_fixtures
for child_info in child_infos:
if (
(device_id := child_info.get("device_id"))
and (category := child_info.get("category"))
and category in SmartChildDevice.CHILD_DEVICE_TYPE_MAP
):
hw_version = child_info["hw_ver"]
sw_version = child_info["fw_ver"]
sw_version = sw_version.split(" ")[0]
model = child_info["model"]
region = child_info["specs"]
child_fixture_name = f"{model}({region})_{hw_version}_{sw_version}"
child_fixtures = filter_fixtures(
"Child fixture",
protocol_filter={"SMART.CHILD"},
model_filter=child_fixture_name,
)
if child_fixtures:
fixture_info = next(iter(child_fixtures))
found_child_fixture_infos.append(child_info)
child_protocols[device_id] = FakeSmartProtocol(
fixture_info.data, fixture_info.name
)
else:
warn(
f"Could not find child fixture {child_fixture_name}",
stacklevel=1,
)
else:
warn(
f"Child is a cameraprotocol which needs to be implemented {child_info}",
stacklevel=1,
)
# Replace child infos with the infos that found child fixtures
if child_infos:
self.info["getChildDeviceList"]["child_device_list"] = (
found_child_fixture_infos
)
return child_protocols
async def _handle_control_child(self, params: dict):
"""Handle control_child command."""
device_id = params.get("device_id")
assert device_id in self.child_protocols, "Fixture does not have child info"
child_protocol: SmartProtocol = self.child_protocols[device_id]
request_data = params.get("request_data", {})
child_method = request_data.get("method")
child_params = request_data.get("params") # noqa: F841
resp = await child_protocol.query({child_method: child_params})
resp["error_code"] = 0
for val in resp.values():
return {
"result": {"response_data": {"result": val, "error_code": 0}},
"error_code": 0,
}
@staticmethod
def _get_param_set_value(info: dict, set_keys: list[str], value):
for key in set_keys[:-1]:
info = info[key]
info[set_keys[-1]] = value
SETTERS = {
("system", "sys", "dev_alias"): [
"getDeviceInfo",
"device_info",
"basic_info",
"device_alias",
],
("lens_mask", "lens_mask_info", "enabled"): [
"getLensMaskConfig",
"lens_mask",
"lens_mask_info",
"enabled",
],
}
async def _send_request(self, request_dict: dict):
method = request_dict["method"]
info = self.info
if method == "controlChild":
return await self._handle_control_child(
request_dict["params"]["childControl"]
)
if method == "set":
for key, val in request_dict.items():
if key != "method":
module = key
section = next(iter(val))
skey_val = val[section]
for skey, sval in skey_val.items():
section_key = skey
section_value = sval
break
if setter_keys := self.SETTERS.get((module, section, section_key)):
self._get_param_set_value(info, setter_keys, section_value)
return {"error_code": 0}
else:
return {"error_code": -1}
elif method[:3] == "get":
params = request_dict.get("params")
if method in info:
result = copy.deepcopy(info[method])
if "start_index" in result and "sum" in result:
list_key = next(
iter([key for key in result if isinstance(result[key], list)])
)
start_index = (
start_index
if (params and (start_index := params.get("start_index")))
else 0
)
result[list_key] = result[list_key][
start_index : start_index + self.list_return_size
]
return {"result": result, "error_code": 0}
else:
return {"error_code": -1}
return {"error_code": -1}
async def close(self) -> None:
pass
async def reset(self) -> None:
pass

View File

@ -4,10 +4,11 @@ import glob
import json import json
import os import os
from pathlib import Path from pathlib import Path
from typing import NamedTuple from typing import Iterable, NamedTuple
from kasa.device_factory import _get_device_type_from_sys_info from kasa.device_factory import _get_device_type_from_sys_info
from kasa.device_type import DeviceType from kasa.device_type import DeviceType
from kasa.experimental.smartcamera import SmartCamera
from kasa.smart.smartdevice import SmartDevice from kasa.smart.smartdevice import SmartDevice
@ -48,9 +49,18 @@ SUPPORTED_SMART_CHILD_DEVICES = [
) )
] ]
SUPPORTED_SMARTCAMERA_DEVICES = [
(device, "SMARTCAMERA")
for device in glob.glob(
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/smartcamera/*.json"
)
]
SUPPORTED_DEVICES = ( SUPPORTED_DEVICES = (
SUPPORTED_IOT_DEVICES + SUPPORTED_SMART_DEVICES + SUPPORTED_SMART_CHILD_DEVICES SUPPORTED_IOT_DEVICES
+ SUPPORTED_SMART_DEVICES
+ SUPPORTED_SMART_CHILD_DEVICES
+ SUPPORTED_SMARTCAMERA_DEVICES
) )
@ -95,7 +105,7 @@ def filter_fixtures(
protocol_filter: set[str] | None = None, protocol_filter: set[str] | None = None,
model_filter: set[str] | None = None, model_filter: set[str] | None = None,
component_filter: str | ComponentFilter | None = None, component_filter: str | ComponentFilter | None = None,
device_type_filter: list[DeviceType] | None = None, device_type_filter: Iterable[DeviceType] | None = None,
): ):
"""Filter the fixtures based on supplied parameters. """Filter the fixtures based on supplied parameters.
@ -107,7 +117,11 @@ def filter_fixtures(
component in component_nego details. component in component_nego details.
""" """
def _model_match(fixture_data: FixtureInfo, model_filter): def _model_match(fixture_data: FixtureInfo, model_filter: set[str]):
model_filter_list = [mf for mf in model_filter]
if len(model_filter_list) == 1 and model_filter_list[0].split("_") == 3:
# return exact match
return fixture_data.name == model_filter_list[0]
file_model_region = fixture_data.name.split("_")[0] file_model_region = fixture_data.name.split("_")[0]
file_model = file_model_region.split("(")[0] file_model = file_model_region.split("(")[0]
return file_model in model_filter return file_model in model_filter
@ -134,16 +148,21 @@ def filter_fixtures(
) )
def _device_type_match(fixture_data: FixtureInfo, device_type): def _device_type_match(fixture_data: FixtureInfo, device_type):
if (component_nego := fixture_data.data.get("component_nego")) is None: if fixture_data.protocol in {"SMART", "SMART.CHILD"}:
return _get_device_type_from_sys_info(fixture_data.data) in device_type info = fixture_data.data["get_device_info"]
components = [component["id"] for component in component_nego["component_list"]] component_nego = fixture_data.data["component_nego"]
if (info := fixture_data.data.get("get_device_info")) and ( components = [
type_ := info.get("type") component["id"] for component in component_nego["component_list"]
): ]
return ( return (
SmartDevice._get_device_type_from_components(components, type_) SmartDevice._get_device_type_from_components(components, info["type"])
in device_type in device_type
) )
elif fixture_data.protocol == "IOT":
return _get_device_type_from_sys_info(fixture_data.data) in device_type
elif fixture_data.protocol == "SMARTCAMERA":
info = fixture_data.data["getDeviceInfo"]["device_info"]["basic_info"]
return SmartCamera._get_device_type_from_sysinfo(info) in device_type
return False return False
filtered = [] filtered = []

View File

View File

@ -0,0 +1,20 @@
"""Tests for smart camera devices."""
from __future__ import annotations
import pytest
from kasa import Device, DeviceType
from ..conftest import device_smartcamera
@device_smartcamera
async def test_state(dev: Device):
if dev.device_type is DeviceType.Hub:
pytest.skip("Hubs cannot be switched on and off")
state = dev.is_on
await dev.set_state(not state)
await dev.update()
assert dev.is_on is not state