python-kasa/tests/fixtureinfo.py

209 lines
6.8 KiB
Python
Raw Normal View History

from __future__ import annotations
import glob
import json
import os
from pathlib import Path
from typing import Iterable, NamedTuple
from kasa.device_factory import _get_device_type_from_sys_info
from kasa.device_type import DeviceType
from kasa.smart.smartdevice import SmartDevice
from kasa.smartcamera.smartcamera import SmartCamera
class FixtureInfo(NamedTuple):
name: str
protocol: str
data: dict
class ComponentFilter(NamedTuple):
component_name: str
minimum_version: int = 0
maximum_version: int | None = None
FixtureInfo.__hash__ = lambda self: hash((self.name, self.protocol)) # type: ignore[attr-defined, method-assign]
FixtureInfo.__eq__ = lambda x, y: hash(x) == hash(y) # type: ignore[method-assign]
SUPPORTED_IOT_DEVICES = [
(device, "IOT")
for device in glob.glob(
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/*.json"
)
]
SUPPORTED_SMART_DEVICES = [
(device, "SMART")
for device in glob.glob(
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/smart/*.json"
)
]
SUPPORTED_SMART_CHILD_DEVICES = [
(device, "SMART.CHILD")
for device in glob.glob(
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/smart/child/*.json"
)
]
SUPPORTED_SMARTCAMERA_DEVICES = [
(device, "SMARTCAMERA")
for device in glob.glob(
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/smartcamera/*.json"
)
]
SUPPORTED_DEVICES = (
SUPPORTED_IOT_DEVICES
+ SUPPORTED_SMART_DEVICES
+ SUPPORTED_SMART_CHILD_DEVICES
+ SUPPORTED_SMARTCAMERA_DEVICES
)
def idgenerator(paramtuple: FixtureInfo):
try:
return paramtuple.name + (
"" if paramtuple.protocol == "IOT" else "-" + paramtuple.protocol
)
except: # TODO: HACK as idgenerator is now used by default # noqa: E722
return None
def get_fixture_info() -> list[FixtureInfo]:
"""Return raw discovery file contents as JSON. Used for discovery tests."""
fixture_data = []
for file, protocol in SUPPORTED_DEVICES:
p = Path(file)
folder = Path(__file__).parent / "fixtures"
if protocol == "SMART":
folder = folder / "smart"
if protocol == "SMART.CHILD":
folder = folder / "smart/child"
p = folder / file
with open(p) as f:
data = json.load(f)
fixture_name = p.name
fixture_data.append(
FixtureInfo(data=data, protocol=protocol, name=fixture_name)
)
return fixture_data
FIXTURE_DATA: list[FixtureInfo] = get_fixture_info()
def filter_fixtures(
desc,
*,
data_root_filter: str | None = None,
protocol_filter: set[str] | None = None,
model_filter: set[str] | None = None,
model_startswith_filter: str | None = None,
component_filter: str | ComponentFilter | None = None,
device_type_filter: Iterable[DeviceType] | None = None,
fixture_list: list[FixtureInfo] = FIXTURE_DATA,
):
"""Filter the fixtures based on supplied parameters.
data_root_filter: return fixtures containing the supplied top
level key, i.e. discovery_result
protocol_filter: set of protocols to match, IOT, SMART, SMART.CHILD
model_filter: set of device models to match
component_filter: filter SMART fixtures that have the provided
component in component_nego details.
"""
def _model_match(fixture_data: FixtureInfo, model_filter: set[str]):
if isinstance(model_filter, str):
model_filter = {model_filter}
assert isinstance(model_filter, set), "model filter must be a set"
model_filter_list = [mf for mf in model_filter]
if (
len(model_filter_list) == 1
and (model := model_filter_list[0])
and len(model.split("_")) == 3
):
# filter string includes hw and fw, return exact match
return fixture_data.name == f"{model}.json"
file_model_region = fixture_data.name.split("_")[0]
file_model = file_model_region.split("(")[0]
return file_model in model_filter
def _model_startswith_match(fixture_data: FixtureInfo, starts_with: str):
return fixture_data.name.startswith(starts_with)
def _component_match(
fixture_data: FixtureInfo, component_filter: str | ComponentFilter
):
if (component_nego := fixture_data.data.get("component_nego")) is None:
return False
components = {
component["id"]: component["ver_code"]
for component in component_nego["component_list"]
}
if isinstance(component_filter, str):
return component_filter in components
else:
return (
(ver_code := components.get(component_filter.component_name))
and ver_code >= component_filter.minimum_version
and (
component_filter.maximum_version is None
or ver_code <= component_filter.maximum_version
)
)
def _device_type_match(fixture_data: FixtureInfo, device_type):
if fixture_data.protocol in {"SMART", "SMART.CHILD"}:
info = fixture_data.data["get_device_info"]
component_nego = fixture_data.data["component_nego"]
components = [
component["id"] for component in component_nego["component_list"]
]
return (
SmartDevice._get_device_type_from_components(components, info["type"])
in device_type
)
elif fixture_data.protocol == "IOT":
return _get_device_type_from_sys_info(fixture_data.data) in device_type
elif fixture_data.protocol == "SMARTCAMERA":
info = fixture_data.data["getDeviceInfo"]["device_info"]["basic_info"]
return SmartCamera._get_device_type_from_sysinfo(info) in device_type
return False
filtered = []
if protocol_filter is None:
protocol_filter = {"IOT", "SMART", "SMARTCAMERA"}
for fixture_data in fixture_list:
if data_root_filter and data_root_filter not in fixture_data.data:
continue
if fixture_data.protocol not in protocol_filter:
continue
if model_filter is not None and not _model_match(fixture_data, model_filter):
continue
if model_startswith_filter is not None and not _model_startswith_match(
fixture_data, model_startswith_filter
):
continue
if component_filter and not _component_match(fixture_data, component_filter):
continue
if device_type_filter and not _device_type_match(
fixture_data, device_type_filter
):
continue
filtered.append(fixture_data)
if desc:
print(f"# {desc}")
for value in filtered:
print(f"\t{value.name}")
filtered.sort()
return filtered