mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-01-08 22:07:06 +00:00
a01247d48f
Python 3.11 ships with latest Debian Bookworm. pypy is not that widely used with this library based on statistics. It could be added back when pypy supports python 3.11.
227 lines
7.3 KiB
Python
227 lines
7.3 KiB
Python
from __future__ import annotations
|
|
|
|
import copy
|
|
import glob
|
|
import json
|
|
import os
|
|
from collections.abc import Iterable
|
|
from pathlib import Path
|
|
from typing import NamedTuple
|
|
|
|
import pytest
|
|
|
|
from kasa.device_type import DeviceType
|
|
from kasa.iot import IotDevice
|
|
from kasa.smart.smartdevice import SmartDevice
|
|
from kasa.smartcamera.smartcamera import SmartCamera
|
|
|
|
|
|
class FixtureInfo(NamedTuple):
|
|
name: str
|
|
protocol: str
|
|
data: dict
|
|
|
|
|
|
class ComponentFilter(NamedTuple):
|
|
component_name: str
|
|
minimum_version: int = 0
|
|
maximum_version: int | None = None
|
|
|
|
|
|
FixtureInfo.__hash__ = lambda self: hash((self.name, self.protocol)) # type: ignore[attr-defined, method-assign]
|
|
FixtureInfo.__eq__ = lambda x, y: hash(x) == hash(y) # type: ignore[method-assign]
|
|
|
|
|
|
SUPPORTED_IOT_DEVICES = [
|
|
(device, "IOT")
|
|
for device in glob.glob(
|
|
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/*.json"
|
|
)
|
|
]
|
|
|
|
SUPPORTED_SMART_DEVICES = [
|
|
(device, "SMART")
|
|
for device in glob.glob(
|
|
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/smart/*.json"
|
|
)
|
|
]
|
|
|
|
SUPPORTED_SMART_CHILD_DEVICES = [
|
|
(device, "SMART.CHILD")
|
|
for device in glob.glob(
|
|
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/smart/child/*.json"
|
|
)
|
|
]
|
|
|
|
SUPPORTED_SMARTCAMERA_DEVICES = [
|
|
(device, "SMARTCAMERA")
|
|
for device in glob.glob(
|
|
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/smartcamera/*.json"
|
|
)
|
|
]
|
|
|
|
SUPPORTED_DEVICES = (
|
|
SUPPORTED_IOT_DEVICES
|
|
+ SUPPORTED_SMART_DEVICES
|
|
+ SUPPORTED_SMART_CHILD_DEVICES
|
|
+ SUPPORTED_SMARTCAMERA_DEVICES
|
|
)
|
|
|
|
|
|
def idgenerator(paramtuple: FixtureInfo):
|
|
try:
|
|
return paramtuple.name + (
|
|
"" if paramtuple.protocol == "IOT" else "-" + paramtuple.protocol
|
|
)
|
|
except: # TODO: HACK as idgenerator is now used by default # noqa: E722
|
|
return None
|
|
|
|
|
|
def get_fixture_info() -> list[FixtureInfo]:
|
|
"""Return raw discovery file contents as JSON. Used for discovery tests."""
|
|
fixture_data = []
|
|
for file, protocol in SUPPORTED_DEVICES:
|
|
p = Path(file)
|
|
folder = Path(__file__).parent / "fixtures"
|
|
if protocol == "SMART":
|
|
folder = folder / "smart"
|
|
if protocol == "SMART.CHILD":
|
|
folder = folder / "smart/child"
|
|
p = folder / file
|
|
|
|
with open(p) as f:
|
|
data = json.load(f)
|
|
|
|
fixture_name = p.name
|
|
fixture_data.append(
|
|
FixtureInfo(data=data, protocol=protocol, name=fixture_name)
|
|
)
|
|
return fixture_data
|
|
|
|
|
|
FIXTURE_DATA: list[FixtureInfo] = get_fixture_info()
|
|
|
|
|
|
def filter_fixtures(
|
|
desc,
|
|
*,
|
|
data_root_filter: str | None = None,
|
|
protocol_filter: set[str] | None = None,
|
|
model_filter: set[str] | None = None,
|
|
model_startswith_filter: str | None = None,
|
|
component_filter: str | ComponentFilter | None = None,
|
|
device_type_filter: Iterable[DeviceType] | None = None,
|
|
fixture_list: list[FixtureInfo] = FIXTURE_DATA,
|
|
):
|
|
"""Filter the fixtures based on supplied parameters.
|
|
|
|
data_root_filter: return fixtures containing the supplied top
|
|
level key, i.e. discovery_result
|
|
protocol_filter: set of protocols to match, IOT, SMART, SMART.CHILD
|
|
model_filter: set of device models to match
|
|
component_filter: filter SMART fixtures that have the provided
|
|
component in component_nego details.
|
|
"""
|
|
|
|
def _model_match(fixture_data: FixtureInfo, model_filter: set[str]):
|
|
if isinstance(model_filter, str):
|
|
model_filter = {model_filter}
|
|
assert isinstance(model_filter, set), "model filter must be a set"
|
|
model_filter_list = [mf for mf in model_filter]
|
|
if (
|
|
len(model_filter_list) == 1
|
|
and (model := model_filter_list[0])
|
|
and len(model.split("_")) == 3
|
|
):
|
|
# filter string includes hw and fw, return exact match
|
|
return fixture_data.name == f"{model}.json"
|
|
file_model_region = fixture_data.name.split("_")[0]
|
|
file_model = file_model_region.split("(")[0]
|
|
return file_model in model_filter
|
|
|
|
def _model_startswith_match(fixture_data: FixtureInfo, starts_with: str):
|
|
return fixture_data.name.startswith(starts_with)
|
|
|
|
def _component_match(
|
|
fixture_data: FixtureInfo, component_filter: str | ComponentFilter
|
|
):
|
|
if (component_nego := fixture_data.data.get("component_nego")) is None:
|
|
return False
|
|
components = {
|
|
component["id"]: component["ver_code"]
|
|
for component in component_nego["component_list"]
|
|
}
|
|
if isinstance(component_filter, str):
|
|
return component_filter in components
|
|
else:
|
|
return (
|
|
(ver_code := components.get(component_filter.component_name))
|
|
and ver_code >= component_filter.minimum_version
|
|
and (
|
|
component_filter.maximum_version is None
|
|
or ver_code <= component_filter.maximum_version
|
|
)
|
|
)
|
|
|
|
def _device_type_match(fixture_data: FixtureInfo, device_type):
|
|
if fixture_data.protocol in {"SMART", "SMART.CHILD"}:
|
|
info = fixture_data.data["get_device_info"]
|
|
component_nego = fixture_data.data["component_nego"]
|
|
components = [
|
|
component["id"] for component in component_nego["component_list"]
|
|
]
|
|
return (
|
|
SmartDevice._get_device_type_from_components(components, info["type"])
|
|
in device_type
|
|
)
|
|
elif fixture_data.protocol == "IOT":
|
|
return (
|
|
IotDevice._get_device_type_from_sys_info(fixture_data.data)
|
|
in device_type
|
|
)
|
|
elif fixture_data.protocol == "SMARTCAMERA":
|
|
info = fixture_data.data["getDeviceInfo"]["device_info"]["basic_info"]
|
|
return SmartCamera._get_device_type_from_sysinfo(info) in device_type
|
|
return False
|
|
|
|
filtered = []
|
|
if protocol_filter is None:
|
|
protocol_filter = {"IOT", "SMART", "SMARTCAMERA"}
|
|
for fixture_data in fixture_list:
|
|
if data_root_filter and data_root_filter not in fixture_data.data:
|
|
continue
|
|
if fixture_data.protocol not in protocol_filter:
|
|
continue
|
|
if model_filter is not None and not _model_match(fixture_data, model_filter):
|
|
continue
|
|
if model_startswith_filter is not None and not _model_startswith_match(
|
|
fixture_data, model_startswith_filter
|
|
):
|
|
continue
|
|
if component_filter and not _component_match(fixture_data, component_filter):
|
|
continue
|
|
if device_type_filter and not _device_type_match(
|
|
fixture_data, device_type_filter
|
|
):
|
|
continue
|
|
|
|
filtered.append(fixture_data)
|
|
|
|
if desc:
|
|
print(f"# {desc}")
|
|
for value in filtered:
|
|
print(f"\t{value.name}")
|
|
filtered.sort()
|
|
return filtered
|
|
|
|
|
|
@pytest.fixture(
|
|
params=filter_fixtures("all fixture infos"),
|
|
ids=idgenerator,
|
|
)
|
|
def fixture_info(request, mocker):
|
|
"""Return raw discovery file contents as JSON. Used for discovery tests."""
|
|
fixture_info = request.param
|
|
fixture_data = copy.deepcopy(fixture_info.data)
|
|
return FixtureInfo(fixture_info.name, fixture_info.protocol, fixture_data)
|