mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-01-08 22:07:06 +00:00
Refactor test framework (#794)
This is in preparation for tests based on supporting features amongst other tweaks: - Consolidates the filtering logic that was split across `filter_model` and `filter_fixture` - Allows filtering `dev` fixture by `component` - Consolidates fixtures missing method warnings into one warning - Does not raise exceptions from `FakeSmartTransport` for missing methods (required for KS240)
This commit is contained in:
parent
996322cea8
commit
97680bdcee
@ -1,341 +1,17 @@
|
|||||||
import asyncio
|
import warnings
|
||||||
import glob
|
from typing import Dict
|
||||||
import json
|
|
||||||
import os
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from json import dumps as json_dumps
|
|
||||||
from os.path import basename
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Dict, Optional
|
|
||||||
from unittest.mock import MagicMock
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
import pytest # type: ignore # see https://github.com/pytest-dev/pytest/issues/3342
|
import pytest # type: ignore # see https://github.com/pytest-dev/pytest/issues/3342
|
||||||
|
|
||||||
from kasa import (
|
from kasa import (
|
||||||
Credentials,
|
|
||||||
Device,
|
|
||||||
DeviceConfig,
|
DeviceConfig,
|
||||||
Discover,
|
|
||||||
SmartProtocol,
|
SmartProtocol,
|
||||||
)
|
)
|
||||||
from kasa.iot import IotBulb, IotDimmer, IotLightStrip, IotPlug, IotStrip
|
|
||||||
from kasa.protocol import BaseTransport
|
from kasa.protocol import BaseTransport
|
||||||
from kasa.smart import SmartBulb, SmartDevice
|
|
||||||
from kasa.xortransport import XorEncryption
|
|
||||||
|
|
||||||
from .fakeprotocol_iot import FakeIotProtocol
|
from .device_fixtures import * # noqa: F403
|
||||||
from .fakeprotocol_smart import FakeSmartProtocol
|
from .discovery_fixtures import * # noqa: F403
|
||||||
|
|
||||||
SUPPORTED_IOT_DEVICES = [
|
|
||||||
(device, "IOT")
|
|
||||||
for device in glob.glob(
|
|
||||||
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/*.json"
|
|
||||||
)
|
|
||||||
]
|
|
||||||
|
|
||||||
SUPPORTED_SMART_DEVICES = [
|
|
||||||
(device, "SMART")
|
|
||||||
for device in glob.glob(
|
|
||||||
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/smart/*.json"
|
|
||||||
)
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
SUPPORTED_DEVICES = SUPPORTED_IOT_DEVICES + SUPPORTED_SMART_DEVICES
|
|
||||||
|
|
||||||
# Tapo bulbs
|
|
||||||
BULBS_SMART_VARIABLE_TEMP = {"L530E", "L930-5"}
|
|
||||||
BULBS_SMART_LIGHT_STRIP = {"L900-5", "L900-10", "L920-5", "L930-5"}
|
|
||||||
BULBS_SMART_COLOR = {"L530E", *BULBS_SMART_LIGHT_STRIP}
|
|
||||||
BULBS_SMART_DIMMABLE = {"L510B", "L510E"}
|
|
||||||
BULBS_SMART = (
|
|
||||||
BULBS_SMART_VARIABLE_TEMP.union(BULBS_SMART_COLOR)
|
|
||||||
.union(BULBS_SMART_DIMMABLE)
|
|
||||||
.union(BULBS_SMART_LIGHT_STRIP)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Kasa (IOT-prefixed) bulbs
|
|
||||||
BULBS_IOT_LIGHT_STRIP = {"KL400L5", "KL430", "KL420L5"}
|
|
||||||
BULBS_IOT_VARIABLE_TEMP = {
|
|
||||||
"LB120",
|
|
||||||
"LB130",
|
|
||||||
"KL120",
|
|
||||||
"KL125",
|
|
||||||
"KL130",
|
|
||||||
"KL135",
|
|
||||||
"KL430",
|
|
||||||
}
|
|
||||||
BULBS_IOT_COLOR = {"LB130", "KL125", "KL130", "KL135", *BULBS_IOT_LIGHT_STRIP}
|
|
||||||
BULBS_IOT_DIMMABLE = {"KL50", "KL60", "LB100", "LB110", "KL110"}
|
|
||||||
BULBS_IOT = (
|
|
||||||
BULBS_IOT_VARIABLE_TEMP.union(BULBS_IOT_COLOR)
|
|
||||||
.union(BULBS_IOT_DIMMABLE)
|
|
||||||
.union(BULBS_IOT_LIGHT_STRIP)
|
|
||||||
)
|
|
||||||
|
|
||||||
BULBS_VARIABLE_TEMP = {*BULBS_SMART_VARIABLE_TEMP, *BULBS_IOT_VARIABLE_TEMP}
|
|
||||||
BULBS_COLOR = {*BULBS_SMART_COLOR, *BULBS_IOT_COLOR}
|
|
||||||
|
|
||||||
|
|
||||||
LIGHT_STRIPS = {*BULBS_SMART_LIGHT_STRIP, *BULBS_IOT_LIGHT_STRIP}
|
|
||||||
BULBS = {
|
|
||||||
*BULBS_IOT,
|
|
||||||
*BULBS_SMART,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
PLUGS_IOT = {
|
|
||||||
"HS100",
|
|
||||||
"HS103",
|
|
||||||
"HS105",
|
|
||||||
"HS110",
|
|
||||||
"HS200",
|
|
||||||
"HS210",
|
|
||||||
"EP10",
|
|
||||||
"KP100",
|
|
||||||
"KP105",
|
|
||||||
"KP115",
|
|
||||||
"KP125",
|
|
||||||
"KP401",
|
|
||||||
"KS200M",
|
|
||||||
}
|
|
||||||
# P135 supports dimming, but its not currently support
|
|
||||||
# by the library
|
|
||||||
PLUGS_SMART = {
|
|
||||||
"P100",
|
|
||||||
"P110",
|
|
||||||
"KP125M",
|
|
||||||
"EP25",
|
|
||||||
"KS205",
|
|
||||||
"P125M",
|
|
||||||
"S505",
|
|
||||||
"TP15",
|
|
||||||
}
|
|
||||||
PLUGS = {
|
|
||||||
*PLUGS_IOT,
|
|
||||||
*PLUGS_SMART,
|
|
||||||
}
|
|
||||||
STRIPS_IOT = {"HS107", "HS300", "KP303", "KP200", "KP400", "EP40"}
|
|
||||||
STRIPS_SMART = {"P300", "TP25"}
|
|
||||||
STRIPS = {*STRIPS_IOT, *STRIPS_SMART}
|
|
||||||
|
|
||||||
DIMMERS_IOT = {"ES20M", "HS220", "KS220M", "KS230", "KP405"}
|
|
||||||
DIMMERS_SMART = {"KS225", "S500D", "P135"}
|
|
||||||
DIMMERS = {
|
|
||||||
*DIMMERS_IOT,
|
|
||||||
*DIMMERS_SMART,
|
|
||||||
}
|
|
||||||
|
|
||||||
HUBS_SMART = {"H100"}
|
|
||||||
|
|
||||||
WITH_EMETER_IOT = {"HS110", "HS300", "KP115", "KP125", *BULBS_IOT}
|
|
||||||
WITH_EMETER_SMART = {"P110", "KP125M", "EP25"}
|
|
||||||
WITH_EMETER = {*WITH_EMETER_IOT, *WITH_EMETER_SMART}
|
|
||||||
|
|
||||||
DIMMABLE = {*BULBS, *DIMMERS}
|
|
||||||
|
|
||||||
ALL_DEVICES_IOT = BULBS_IOT.union(PLUGS_IOT).union(STRIPS_IOT).union(DIMMERS_IOT)
|
|
||||||
ALL_DEVICES_SMART = (
|
|
||||||
BULBS_SMART.union(PLUGS_SMART)
|
|
||||||
.union(STRIPS_SMART)
|
|
||||||
.union(DIMMERS_SMART)
|
|
||||||
.union(HUBS_SMART)
|
|
||||||
)
|
|
||||||
ALL_DEVICES = ALL_DEVICES_IOT.union(ALL_DEVICES_SMART)
|
|
||||||
|
|
||||||
IP_MODEL_CACHE: Dict[str, str] = {}
|
|
||||||
|
|
||||||
|
|
||||||
def _make_unsupported(device_family, encrypt_type):
|
|
||||||
return {
|
|
||||||
"result": {
|
|
||||||
"device_id": "xx",
|
|
||||||
"owner": "xx",
|
|
||||||
"device_type": device_family,
|
|
||||||
"device_model": "P110(EU)",
|
|
||||||
"ip": "127.0.0.1",
|
|
||||||
"mac": "48-22xxx",
|
|
||||||
"is_support_iot_cloud": True,
|
|
||||||
"obd_src": "tplink",
|
|
||||||
"factory_default": False,
|
|
||||||
"mgt_encrypt_schm": {
|
|
||||||
"is_support_https": False,
|
|
||||||
"encrypt_type": encrypt_type,
|
|
||||||
"http_port": 80,
|
|
||||||
"lv": 2,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"error_code": 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
UNSUPPORTED_DEVICES = {
|
|
||||||
"unknown_device_family": _make_unsupported("SMART.TAPOXMASTREE", "AES"),
|
|
||||||
"wrong_encryption_iot": _make_unsupported("IOT.SMARTPLUGSWITCH", "AES"),
|
|
||||||
"wrong_encryption_smart": _make_unsupported("SMART.TAPOBULB", "IOT"),
|
|
||||||
"unknown_encryption": _make_unsupported("IOT.SMARTPLUGSWITCH", "FOO"),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def idgenerator(paramtuple):
|
|
||||||
try:
|
|
||||||
return basename(paramtuple[0]) + (
|
|
||||||
"" if paramtuple[1] == "IOT" else "-" + paramtuple[1]
|
|
||||||
)
|
|
||||||
except: # TODO: HACK as idgenerator is now used by default # noqa: E722
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def filter_model(desc, model_filter, protocol_filter=None):
|
|
||||||
if protocol_filter is None:
|
|
||||||
protocol_filter = {"IOT", "SMART"}
|
|
||||||
filtered = list()
|
|
||||||
for file, protocol in SUPPORTED_DEVICES:
|
|
||||||
if protocol in protocol_filter:
|
|
||||||
file_model_region = basename(file).split("_")[0]
|
|
||||||
file_model = file_model_region.split("(")[0]
|
|
||||||
for model in model_filter:
|
|
||||||
if model == file_model:
|
|
||||||
filtered.append((file, protocol))
|
|
||||||
|
|
||||||
filtered_basenames = [basename(f) + "-" + p for f, p in filtered]
|
|
||||||
print(f"# {desc}")
|
|
||||||
for file in filtered_basenames:
|
|
||||||
print(f"\t{file}")
|
|
||||||
return filtered
|
|
||||||
|
|
||||||
|
|
||||||
def parametrize(desc, devices, protocol_filter=None, ids=None):
|
|
||||||
if ids is None:
|
|
||||||
ids = idgenerator
|
|
||||||
return pytest.mark.parametrize(
|
|
||||||
"dev", filter_model(desc, devices, protocol_filter), indirect=True, ids=ids
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
has_emeter = parametrize("has emeter", WITH_EMETER, protocol_filter={"SMART", "IOT"})
|
|
||||||
no_emeter = parametrize(
|
|
||||||
"no emeter", ALL_DEVICES - WITH_EMETER, protocol_filter={"SMART", "IOT"}
|
|
||||||
)
|
|
||||||
has_emeter_iot = parametrize("has emeter iot", WITH_EMETER_IOT, protocol_filter={"IOT"})
|
|
||||||
no_emeter_iot = parametrize(
|
|
||||||
"no emeter iot", ALL_DEVICES_IOT - WITH_EMETER_IOT, protocol_filter={"IOT"}
|
|
||||||
)
|
|
||||||
|
|
||||||
bulb = parametrize("bulbs", BULBS, protocol_filter={"SMART", "IOT"})
|
|
||||||
plug = parametrize("plugs", PLUGS, protocol_filter={"IOT"})
|
|
||||||
strip = parametrize("strips", STRIPS, protocol_filter={"SMART", "IOT"})
|
|
||||||
dimmer = parametrize("dimmers", DIMMERS, protocol_filter={"IOT"})
|
|
||||||
lightstrip = parametrize("lightstrips", LIGHT_STRIPS, protocol_filter={"IOT"})
|
|
||||||
|
|
||||||
# bulb types
|
|
||||||
dimmable = parametrize("dimmable", DIMMABLE, protocol_filter={"IOT"})
|
|
||||||
non_dimmable = parametrize("non-dimmable", BULBS - DIMMABLE, protocol_filter={"IOT"})
|
|
||||||
variable_temp = parametrize(
|
|
||||||
"variable color temp", BULBS_VARIABLE_TEMP, protocol_filter={"SMART", "IOT"}
|
|
||||||
)
|
|
||||||
non_variable_temp = parametrize(
|
|
||||||
"non-variable color temp",
|
|
||||||
BULBS - BULBS_VARIABLE_TEMP,
|
|
||||||
protocol_filter={"SMART", "IOT"},
|
|
||||||
)
|
|
||||||
color_bulb = parametrize("color bulbs", BULBS_COLOR, protocol_filter={"SMART", "IOT"})
|
|
||||||
non_color_bulb = parametrize(
|
|
||||||
"non-color bulbs", BULBS - BULBS_COLOR, protocol_filter={"SMART", "IOT"}
|
|
||||||
)
|
|
||||||
|
|
||||||
color_bulb_iot = parametrize(
|
|
||||||
"color bulbs iot", BULBS_IOT_COLOR, protocol_filter={"IOT"}
|
|
||||||
)
|
|
||||||
variable_temp_iot = parametrize(
|
|
||||||
"variable color temp iot", BULBS_IOT_VARIABLE_TEMP, protocol_filter={"IOT"}
|
|
||||||
)
|
|
||||||
bulb_iot = parametrize("bulb devices iot", BULBS_IOT, protocol_filter={"IOT"})
|
|
||||||
|
|
||||||
strip_iot = parametrize("strip devices iot", STRIPS_IOT, protocol_filter={"IOT"})
|
|
||||||
strip_smart = parametrize(
|
|
||||||
"strip devices smart", STRIPS_SMART, protocol_filter={"SMART"}
|
|
||||||
)
|
|
||||||
|
|
||||||
plug_smart = parametrize("plug devices smart", PLUGS_SMART, protocol_filter={"SMART"})
|
|
||||||
bulb_smart = parametrize("bulb devices smart", BULBS_SMART, protocol_filter={"SMART"})
|
|
||||||
dimmers_smart = parametrize(
|
|
||||||
"dimmer devices smart", DIMMERS_SMART, protocol_filter={"SMART"}
|
|
||||||
)
|
|
||||||
hubs_smart = parametrize("hubs smart", HUBS_SMART, protocol_filter={"SMART"})
|
|
||||||
device_smart = parametrize(
|
|
||||||
"devices smart", ALL_DEVICES_SMART, protocol_filter={"SMART"}
|
|
||||||
)
|
|
||||||
device_iot = parametrize("devices iot", ALL_DEVICES_IOT, protocol_filter={"IOT"})
|
|
||||||
|
|
||||||
|
|
||||||
def get_fixture_data():
|
|
||||||
"""Return raw discovery file contents as JSON. Used for discovery tests."""
|
|
||||||
fixture_data = {}
|
|
||||||
for file, protocol in SUPPORTED_DEVICES:
|
|
||||||
p = Path(file)
|
|
||||||
if not p.is_absolute():
|
|
||||||
folder = Path(__file__).parent / "fixtures"
|
|
||||||
if protocol == "SMART":
|
|
||||||
folder = folder / "smart"
|
|
||||||
p = folder / file
|
|
||||||
|
|
||||||
with open(p) as f:
|
|
||||||
fixture_data[basename(p)] = json.load(f)
|
|
||||||
return fixture_data
|
|
||||||
|
|
||||||
|
|
||||||
FIXTURE_DATA = get_fixture_data()
|
|
||||||
|
|
||||||
|
|
||||||
def filter_fixtures(desc, root_filter):
|
|
||||||
filtered = {}
|
|
||||||
for key, val in FIXTURE_DATA.items():
|
|
||||||
if root_filter in val:
|
|
||||||
filtered[key] = val
|
|
||||||
|
|
||||||
print(f"# {desc}")
|
|
||||||
for key in filtered:
|
|
||||||
print(f"\t{key}")
|
|
||||||
return filtered
|
|
||||||
|
|
||||||
|
|
||||||
def parametrize_discovery(desc, root_key):
|
|
||||||
filtered_fixtures = filter_fixtures(desc, root_key)
|
|
||||||
return pytest.mark.parametrize(
|
|
||||||
"all_fixture_data",
|
|
||||||
filtered_fixtures.values(),
|
|
||||||
indirect=True,
|
|
||||||
ids=filtered_fixtures.keys(),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
new_discovery = parametrize_discovery("new discovery", "discovery_result")
|
|
||||||
|
|
||||||
|
|
||||||
def check_categories():
|
|
||||||
"""Check that every fixture file is categorized."""
|
|
||||||
categorized_fixtures = set(
|
|
||||||
dimmer.args[1]
|
|
||||||
+ strip.args[1]
|
|
||||||
+ plug.args[1]
|
|
||||||
+ bulb.args[1]
|
|
||||||
+ lightstrip.args[1]
|
|
||||||
+ plug_smart.args[1]
|
|
||||||
+ bulb_smart.args[1]
|
|
||||||
+ dimmers_smart.args[1]
|
|
||||||
+ hubs_smart.args[1]
|
|
||||||
)
|
|
||||||
diff = set(SUPPORTED_DEVICES) - set(categorized_fixtures)
|
|
||||||
if diff:
|
|
||||||
for file, protocol in diff:
|
|
||||||
print(
|
|
||||||
f"No category for file {file} protocol {protocol}, add to the corresponding set (BULBS, PLUGS, ..)"
|
|
||||||
)
|
|
||||||
raise Exception(f"Missing category for {diff}")
|
|
||||||
|
|
||||||
|
|
||||||
check_categories()
|
|
||||||
|
|
||||||
# Parametrize tests to run with device both on and off
|
# Parametrize tests to run with device both on and off
|
||||||
turn_on = pytest.mark.parametrize("turn_on", [True, False])
|
turn_on = pytest.mark.parametrize("turn_on", [True, False])
|
||||||
@ -348,241 +24,6 @@ async def handle_turn_on(dev, turn_on):
|
|||||||
await dev.turn_off()
|
await dev.turn_off()
|
||||||
|
|
||||||
|
|
||||||
def device_for_file(model, protocol):
|
|
||||||
if protocol == "SMART":
|
|
||||||
for d in PLUGS_SMART:
|
|
||||||
if d in model:
|
|
||||||
return SmartDevice
|
|
||||||
for d in BULBS_SMART:
|
|
||||||
if d in model:
|
|
||||||
return SmartBulb
|
|
||||||
for d in DIMMERS_SMART:
|
|
||||||
if d in model:
|
|
||||||
return SmartBulb
|
|
||||||
for d in STRIPS_SMART:
|
|
||||||
if d in model:
|
|
||||||
return SmartDevice
|
|
||||||
for d in HUBS_SMART:
|
|
||||||
if d in model:
|
|
||||||
return SmartDevice
|
|
||||||
else:
|
|
||||||
for d in STRIPS_IOT:
|
|
||||||
if d in model:
|
|
||||||
return IotStrip
|
|
||||||
|
|
||||||
for d in PLUGS_IOT:
|
|
||||||
if d in model:
|
|
||||||
return IotPlug
|
|
||||||
|
|
||||||
# Light strips are recognized also as bulbs, so this has to go first
|
|
||||||
for d in BULBS_IOT_LIGHT_STRIP:
|
|
||||||
if d in model:
|
|
||||||
return IotLightStrip
|
|
||||||
|
|
||||||
for d in BULBS_IOT:
|
|
||||||
if d in model:
|
|
||||||
return IotBulb
|
|
||||||
|
|
||||||
for d in DIMMERS_IOT:
|
|
||||||
if d in model:
|
|
||||||
return IotDimmer
|
|
||||||
|
|
||||||
raise Exception("Unable to find type for %s", model)
|
|
||||||
|
|
||||||
|
|
||||||
async def _update_and_close(d):
|
|
||||||
await d.update()
|
|
||||||
await d.protocol.close()
|
|
||||||
return d
|
|
||||||
|
|
||||||
|
|
||||||
async def _discover_update_and_close(ip, username, password):
|
|
||||||
if username and password:
|
|
||||||
credentials = Credentials(username=username, password=password)
|
|
||||||
else:
|
|
||||||
credentials = None
|
|
||||||
d = await Discover.discover_single(ip, timeout=10, credentials=credentials)
|
|
||||||
return await _update_and_close(d)
|
|
||||||
|
|
||||||
|
|
||||||
async def get_device_for_file(file, protocol):
|
|
||||||
# if the wanted file is not an absolute path, prepend the fixtures directory
|
|
||||||
p = Path(file)
|
|
||||||
if not p.is_absolute():
|
|
||||||
folder = Path(__file__).parent / "fixtures"
|
|
||||||
if protocol == "SMART":
|
|
||||||
folder = folder / "smart"
|
|
||||||
p = folder / file
|
|
||||||
|
|
||||||
def load_file():
|
|
||||||
with open(p) as f:
|
|
||||||
return json.load(f)
|
|
||||||
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
sysinfo = await loop.run_in_executor(None, load_file)
|
|
||||||
|
|
||||||
model = basename(file)
|
|
||||||
d = device_for_file(model, protocol)(host="127.0.0.123")
|
|
||||||
if protocol == "SMART":
|
|
||||||
d.protocol = FakeSmartProtocol(sysinfo)
|
|
||||||
else:
|
|
||||||
d.protocol = FakeIotProtocol(sysinfo)
|
|
||||||
await _update_and_close(d)
|
|
||||||
return d
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(params=SUPPORTED_DEVICES, ids=idgenerator)
|
|
||||||
async def dev(request):
|
|
||||||
"""Device fixture.
|
|
||||||
|
|
||||||
Provides a device (given --ip) or parametrized fixture for the supported devices.
|
|
||||||
The initial update is called automatically before returning the device.
|
|
||||||
"""
|
|
||||||
file, protocol = request.param
|
|
||||||
|
|
||||||
ip = request.config.getoption("--ip")
|
|
||||||
username = request.config.getoption("--username")
|
|
||||||
password = request.config.getoption("--password")
|
|
||||||
if ip:
|
|
||||||
model = IP_MODEL_CACHE.get(ip)
|
|
||||||
d = None
|
|
||||||
if not model:
|
|
||||||
d = await _discover_update_and_close(ip, username, password)
|
|
||||||
IP_MODEL_CACHE[ip] = model = d.model
|
|
||||||
if model not in file:
|
|
||||||
pytest.skip(f"skipping file {file}")
|
|
||||||
dev: Device = (
|
|
||||||
d if d else await _discover_update_and_close(ip, username, password)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
dev: Device = await get_device_for_file(file, protocol)
|
|
||||||
|
|
||||||
yield dev
|
|
||||||
|
|
||||||
await dev.disconnect()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def discovery_mock(all_fixture_data, mocker):
|
|
||||||
@dataclass
|
|
||||||
class _DiscoveryMock:
|
|
||||||
ip: str
|
|
||||||
default_port: int
|
|
||||||
discovery_port: int
|
|
||||||
discovery_data: dict
|
|
||||||
query_data: dict
|
|
||||||
device_type: str
|
|
||||||
encrypt_type: str
|
|
||||||
login_version: Optional[int] = None
|
|
||||||
port_override: Optional[int] = None
|
|
||||||
|
|
||||||
if "discovery_result" in all_fixture_data:
|
|
||||||
discovery_data = {"result": all_fixture_data["discovery_result"]}
|
|
||||||
device_type = all_fixture_data["discovery_result"]["device_type"]
|
|
||||||
encrypt_type = all_fixture_data["discovery_result"]["mgt_encrypt_schm"][
|
|
||||||
"encrypt_type"
|
|
||||||
]
|
|
||||||
login_version = all_fixture_data["discovery_result"]["mgt_encrypt_schm"].get(
|
|
||||||
"lv"
|
|
||||||
)
|
|
||||||
datagram = (
|
|
||||||
b"\x02\x00\x00\x01\x01[\x00\x00\x00\x00\x00\x00W\xcev\xf8"
|
|
||||||
+ json_dumps(discovery_data).encode()
|
|
||||||
)
|
|
||||||
dm = _DiscoveryMock(
|
|
||||||
"127.0.0.123",
|
|
||||||
80,
|
|
||||||
20002,
|
|
||||||
discovery_data,
|
|
||||||
all_fixture_data,
|
|
||||||
device_type,
|
|
||||||
encrypt_type,
|
|
||||||
login_version,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
sys_info = all_fixture_data["system"]["get_sysinfo"]
|
|
||||||
discovery_data = {"system": {"get_sysinfo": sys_info}}
|
|
||||||
device_type = sys_info.get("mic_type") or sys_info.get("type")
|
|
||||||
encrypt_type = "XOR"
|
|
||||||
login_version = None
|
|
||||||
datagram = XorEncryption.encrypt(json_dumps(discovery_data))[4:]
|
|
||||||
dm = _DiscoveryMock(
|
|
||||||
"127.0.0.123",
|
|
||||||
9999,
|
|
||||||
9999,
|
|
||||||
discovery_data,
|
|
||||||
all_fixture_data,
|
|
||||||
device_type,
|
|
||||||
encrypt_type,
|
|
||||||
login_version,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def mock_discover(self):
|
|
||||||
port = (
|
|
||||||
dm.port_override
|
|
||||||
if dm.port_override and dm.discovery_port != 20002
|
|
||||||
else dm.discovery_port
|
|
||||||
)
|
|
||||||
self.datagram_received(
|
|
||||||
datagram,
|
|
||||||
(dm.ip, port),
|
|
||||||
)
|
|
||||||
|
|
||||||
mocker.patch("kasa.discover._DiscoverProtocol.do_discover", mock_discover)
|
|
||||||
mocker.patch(
|
|
||||||
"socket.getaddrinfo",
|
|
||||||
side_effect=lambda *_, **__: [(None, None, None, None, (dm.ip, 0))],
|
|
||||||
)
|
|
||||||
|
|
||||||
if "component_nego" in dm.query_data:
|
|
||||||
proto = FakeSmartProtocol(dm.query_data)
|
|
||||||
else:
|
|
||||||
proto = FakeIotProtocol(dm.query_data)
|
|
||||||
|
|
||||||
async def _query(request, retry_count: int = 3):
|
|
||||||
return await proto.query(request)
|
|
||||||
|
|
||||||
mocker.patch("kasa.IotProtocol.query", side_effect=_query)
|
|
||||||
mocker.patch("kasa.SmartProtocol.query", side_effect=_query)
|
|
||||||
|
|
||||||
yield dm
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def discovery_data(all_fixture_data):
|
|
||||||
"""Return raw discovery file contents as JSON. Used for discovery tests."""
|
|
||||||
if "discovery_result" in all_fixture_data:
|
|
||||||
return {"result": all_fixture_data["discovery_result"]}
|
|
||||||
else:
|
|
||||||
return {"system": {"get_sysinfo": all_fixture_data["system"]["get_sysinfo"]}}
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(params=FIXTURE_DATA.values(), ids=FIXTURE_DATA.keys(), scope="session")
|
|
||||||
def all_fixture_data(request):
|
|
||||||
"""Return raw fixture file contents as JSON. Used for discovery tests."""
|
|
||||||
fixture_data = request.param
|
|
||||||
return fixture_data
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(params=UNSUPPORTED_DEVICES.values(), ids=UNSUPPORTED_DEVICES.keys())
|
|
||||||
def unsupported_device_info(request, mocker):
|
|
||||||
"""Return unsupported devices for cli and discovery tests."""
|
|
||||||
discovery_data = request.param
|
|
||||||
host = "127.0.0.1"
|
|
||||||
|
|
||||||
async def mock_discover(self):
|
|
||||||
if discovery_data:
|
|
||||||
data = (
|
|
||||||
b"\x02\x00\x00\x01\x01[\x00\x00\x00\x00\x00\x00W\xcev\xf8"
|
|
||||||
+ json_dumps(discovery_data).encode()
|
|
||||||
)
|
|
||||||
self.datagram_received(data, (host, 20002))
|
|
||||||
|
|
||||||
mocker.patch("kasa.discover._DiscoverProtocol.do_discover", mock_discover)
|
|
||||||
|
|
||||||
yield discovery_data
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def dummy_protocol():
|
def dummy_protocol():
|
||||||
"""Return a smart protocol instance with a mocking-ready dummy transport."""
|
"""Return a smart protocol instance with a mocking-ready dummy transport."""
|
||||||
@ -611,6 +52,22 @@ def dummy_protocol():
|
|||||||
return protocol
|
return protocol
|
||||||
|
|
||||||
|
|
||||||
|
def pytest_configure():
|
||||||
|
pytest.fixtures_missing_methods = {}
|
||||||
|
|
||||||
|
|
||||||
|
def pytest_sessionfinish(session, exitstatus):
|
||||||
|
msg = "\n"
|
||||||
|
for fixture, methods in sorted(pytest.fixtures_missing_methods.items()):
|
||||||
|
method_list = ", ".join(methods)
|
||||||
|
msg += f"Fixture {fixture} missing: {method_list}\n"
|
||||||
|
|
||||||
|
warnings.warn(
|
||||||
|
UserWarning(msg),
|
||||||
|
stacklevel=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def pytest_addoption(parser):
|
def pytest_addoption(parser):
|
||||||
parser.addoption(
|
parser.addoption(
|
||||||
"--ip", action="store", default=None, help="run against device on given ip"
|
"--ip", action="store", default=None, help="run against device on given ip"
|
||||||
|
367
kasa/tests/device_fixtures.py
Normal file
367
kasa/tests/device_fixtures.py
Normal file
@ -0,0 +1,367 @@
|
|||||||
|
from typing import Dict, Set
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from kasa import (
|
||||||
|
Credentials,
|
||||||
|
Device,
|
||||||
|
Discover,
|
||||||
|
)
|
||||||
|
from kasa.iot import IotBulb, IotDimmer, IotLightStrip, IotPlug, IotStrip
|
||||||
|
from kasa.smart import SmartBulb, SmartDevice
|
||||||
|
|
||||||
|
from .fakeprotocol_iot import FakeIotProtocol
|
||||||
|
from .fakeprotocol_smart import FakeSmartProtocol
|
||||||
|
from .fixtureinfo import FIXTURE_DATA, FixtureInfo, filter_fixtures, idgenerator
|
||||||
|
|
||||||
|
# Tapo bulbs
|
||||||
|
BULBS_SMART_VARIABLE_TEMP = {"L530E", "L930-5"}
|
||||||
|
BULBS_SMART_LIGHT_STRIP = {"L900-5", "L900-10", "L920-5", "L930-5"}
|
||||||
|
BULBS_SMART_COLOR = {"L530E", *BULBS_SMART_LIGHT_STRIP}
|
||||||
|
BULBS_SMART_DIMMABLE = {"L510B", "L510E"}
|
||||||
|
BULBS_SMART = (
|
||||||
|
BULBS_SMART_VARIABLE_TEMP.union(BULBS_SMART_COLOR)
|
||||||
|
.union(BULBS_SMART_DIMMABLE)
|
||||||
|
.union(BULBS_SMART_LIGHT_STRIP)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Kasa (IOT-prefixed) bulbs
|
||||||
|
BULBS_IOT_LIGHT_STRIP = {"KL400L5", "KL430", "KL420L5"}
|
||||||
|
BULBS_IOT_VARIABLE_TEMP = {
|
||||||
|
"LB120",
|
||||||
|
"LB130",
|
||||||
|
"KL120",
|
||||||
|
"KL125",
|
||||||
|
"KL130",
|
||||||
|
"KL135",
|
||||||
|
"KL430",
|
||||||
|
}
|
||||||
|
BULBS_IOT_COLOR = {"LB130", "KL125", "KL130", "KL135", *BULBS_IOT_LIGHT_STRIP}
|
||||||
|
BULBS_IOT_DIMMABLE = {"KL50", "KL60", "LB100", "LB110", "KL110"}
|
||||||
|
BULBS_IOT = (
|
||||||
|
BULBS_IOT_VARIABLE_TEMP.union(BULBS_IOT_COLOR)
|
||||||
|
.union(BULBS_IOT_DIMMABLE)
|
||||||
|
.union(BULBS_IOT_LIGHT_STRIP)
|
||||||
|
)
|
||||||
|
|
||||||
|
BULBS_VARIABLE_TEMP = {*BULBS_SMART_VARIABLE_TEMP, *BULBS_IOT_VARIABLE_TEMP}
|
||||||
|
BULBS_COLOR = {*BULBS_SMART_COLOR, *BULBS_IOT_COLOR}
|
||||||
|
|
||||||
|
|
||||||
|
LIGHT_STRIPS = {*BULBS_SMART_LIGHT_STRIP, *BULBS_IOT_LIGHT_STRIP}
|
||||||
|
BULBS = {
|
||||||
|
*BULBS_IOT,
|
||||||
|
*BULBS_SMART,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
PLUGS_IOT = {
|
||||||
|
"HS100",
|
||||||
|
"HS103",
|
||||||
|
"HS105",
|
||||||
|
"HS110",
|
||||||
|
"HS200",
|
||||||
|
"HS210",
|
||||||
|
"EP10",
|
||||||
|
"KP100",
|
||||||
|
"KP105",
|
||||||
|
"KP115",
|
||||||
|
"KP125",
|
||||||
|
"KP401",
|
||||||
|
"KS200M",
|
||||||
|
}
|
||||||
|
# P135 supports dimming, but its not currently support
|
||||||
|
# by the library
|
||||||
|
PLUGS_SMART = {
|
||||||
|
"P100",
|
||||||
|
"P110",
|
||||||
|
"KP125M",
|
||||||
|
"EP25",
|
||||||
|
"KS205",
|
||||||
|
"P125M",
|
||||||
|
"S505",
|
||||||
|
"TP15",
|
||||||
|
}
|
||||||
|
PLUGS = {
|
||||||
|
*PLUGS_IOT,
|
||||||
|
*PLUGS_SMART,
|
||||||
|
}
|
||||||
|
STRIPS_IOT = {"HS107", "HS300", "KP303", "KP200", "KP400", "EP40"}
|
||||||
|
STRIPS_SMART = {"P300", "TP25"}
|
||||||
|
STRIPS = {*STRIPS_IOT, *STRIPS_SMART}
|
||||||
|
|
||||||
|
DIMMERS_IOT = {"ES20M", "HS220", "KS220M", "KS230", "KP405"}
|
||||||
|
DIMMERS_SMART = {"KS225", "S500D", "P135"}
|
||||||
|
DIMMERS = {
|
||||||
|
*DIMMERS_IOT,
|
||||||
|
*DIMMERS_SMART,
|
||||||
|
}
|
||||||
|
|
||||||
|
HUBS_SMART = {"H100"}
|
||||||
|
|
||||||
|
WITH_EMETER_IOT = {"HS110", "HS300", "KP115", "KP125", *BULBS_IOT}
|
||||||
|
WITH_EMETER_SMART = {"P110", "KP125M", "EP25"}
|
||||||
|
WITH_EMETER = {*WITH_EMETER_IOT, *WITH_EMETER_SMART}
|
||||||
|
|
||||||
|
DIMMABLE = {*BULBS, *DIMMERS}
|
||||||
|
|
||||||
|
ALL_DEVICES_IOT = BULBS_IOT.union(PLUGS_IOT).union(STRIPS_IOT).union(DIMMERS_IOT)
|
||||||
|
ALL_DEVICES_SMART = (
|
||||||
|
BULBS_SMART.union(PLUGS_SMART)
|
||||||
|
.union(STRIPS_SMART)
|
||||||
|
.union(DIMMERS_SMART)
|
||||||
|
.union(HUBS_SMART)
|
||||||
|
)
|
||||||
|
ALL_DEVICES = ALL_DEVICES_IOT.union(ALL_DEVICES_SMART)
|
||||||
|
|
||||||
|
IP_MODEL_CACHE: Dict[str, str] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def parametrize(
|
||||||
|
desc,
|
||||||
|
*,
|
||||||
|
model_filter=None,
|
||||||
|
protocol_filter=None,
|
||||||
|
component_filter=None,
|
||||||
|
data_root_filter=None,
|
||||||
|
ids=None,
|
||||||
|
):
|
||||||
|
if ids is None:
|
||||||
|
ids = idgenerator
|
||||||
|
return pytest.mark.parametrize(
|
||||||
|
"dev",
|
||||||
|
filter_fixtures(
|
||||||
|
desc,
|
||||||
|
model_filter=model_filter,
|
||||||
|
protocol_filter=protocol_filter,
|
||||||
|
component_filter=component_filter,
|
||||||
|
data_root_filter=data_root_filter,
|
||||||
|
),
|
||||||
|
indirect=True,
|
||||||
|
ids=ids,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
has_emeter = parametrize(
|
||||||
|
"has emeter", model_filter=WITH_EMETER, protocol_filter={"SMART", "IOT"}
|
||||||
|
)
|
||||||
|
no_emeter = parametrize(
|
||||||
|
"no emeter",
|
||||||
|
model_filter=ALL_DEVICES - WITH_EMETER,
|
||||||
|
protocol_filter={"SMART", "IOT"},
|
||||||
|
)
|
||||||
|
has_emeter_iot = parametrize(
|
||||||
|
"has emeter iot", model_filter=WITH_EMETER_IOT, protocol_filter={"IOT"}
|
||||||
|
)
|
||||||
|
no_emeter_iot = parametrize(
|
||||||
|
"no emeter iot",
|
||||||
|
model_filter=ALL_DEVICES_IOT - WITH_EMETER_IOT,
|
||||||
|
protocol_filter={"IOT"},
|
||||||
|
)
|
||||||
|
|
||||||
|
bulb = parametrize("bulbs", model_filter=BULBS, protocol_filter={"SMART", "IOT"})
|
||||||
|
plug = parametrize("plugs", model_filter=PLUGS, protocol_filter={"IOT"})
|
||||||
|
strip = parametrize("strips", model_filter=STRIPS, protocol_filter={"SMART", "IOT"})
|
||||||
|
dimmer = parametrize("dimmers", model_filter=DIMMERS, protocol_filter={"IOT"})
|
||||||
|
lightstrip = parametrize(
|
||||||
|
"lightstrips", model_filter=LIGHT_STRIPS, protocol_filter={"IOT"}
|
||||||
|
)
|
||||||
|
|
||||||
|
# bulb types
|
||||||
|
dimmable = parametrize("dimmable", model_filter=DIMMABLE, protocol_filter={"IOT"})
|
||||||
|
non_dimmable = parametrize(
|
||||||
|
"non-dimmable", model_filter=BULBS - DIMMABLE, protocol_filter={"IOT"}
|
||||||
|
)
|
||||||
|
variable_temp = parametrize(
|
||||||
|
"variable color temp",
|
||||||
|
model_filter=BULBS_VARIABLE_TEMP,
|
||||||
|
protocol_filter={"SMART", "IOT"},
|
||||||
|
)
|
||||||
|
non_variable_temp = parametrize(
|
||||||
|
"non-variable color temp",
|
||||||
|
model_filter=BULBS - BULBS_VARIABLE_TEMP,
|
||||||
|
protocol_filter={"SMART", "IOT"},
|
||||||
|
)
|
||||||
|
color_bulb = parametrize(
|
||||||
|
"color bulbs", model_filter=BULBS_COLOR, protocol_filter={"SMART", "IOT"}
|
||||||
|
)
|
||||||
|
non_color_bulb = parametrize(
|
||||||
|
"non-color bulbs",
|
||||||
|
model_filter=BULBS - BULBS_COLOR,
|
||||||
|
protocol_filter={"SMART", "IOT"},
|
||||||
|
)
|
||||||
|
|
||||||
|
color_bulb_iot = parametrize(
|
||||||
|
"color bulbs iot", model_filter=BULBS_IOT_COLOR, protocol_filter={"IOT"}
|
||||||
|
)
|
||||||
|
variable_temp_iot = parametrize(
|
||||||
|
"variable color temp iot",
|
||||||
|
model_filter=BULBS_IOT_VARIABLE_TEMP,
|
||||||
|
protocol_filter={"IOT"},
|
||||||
|
)
|
||||||
|
bulb_iot = parametrize(
|
||||||
|
"bulb devices iot", model_filter=BULBS_IOT, protocol_filter={"IOT"}
|
||||||
|
)
|
||||||
|
|
||||||
|
strip_iot = parametrize(
|
||||||
|
"strip devices iot", model_filter=STRIPS_IOT, protocol_filter={"IOT"}
|
||||||
|
)
|
||||||
|
strip_smart = parametrize(
|
||||||
|
"strip devices smart", model_filter=STRIPS_SMART, protocol_filter={"SMART"}
|
||||||
|
)
|
||||||
|
|
||||||
|
plug_smart = parametrize(
|
||||||
|
"plug devices smart", model_filter=PLUGS_SMART, protocol_filter={"SMART"}
|
||||||
|
)
|
||||||
|
bulb_smart = parametrize(
|
||||||
|
"bulb devices smart", model_filter=BULBS_SMART, protocol_filter={"SMART"}
|
||||||
|
)
|
||||||
|
dimmers_smart = parametrize(
|
||||||
|
"dimmer devices smart", model_filter=DIMMERS_SMART, protocol_filter={"SMART"}
|
||||||
|
)
|
||||||
|
hubs_smart = parametrize(
|
||||||
|
"hubs smart", model_filter=HUBS_SMART, protocol_filter={"SMART"}
|
||||||
|
)
|
||||||
|
device_smart = parametrize(
|
||||||
|
"devices smart", model_filter=ALL_DEVICES_SMART, protocol_filter={"SMART"}
|
||||||
|
)
|
||||||
|
device_iot = parametrize(
|
||||||
|
"devices iot", model_filter=ALL_DEVICES_IOT, protocol_filter={"IOT"}
|
||||||
|
)
|
||||||
|
|
||||||
|
brightness = parametrize("brightness smart", component_filter="brightness")
|
||||||
|
|
||||||
|
|
||||||
|
def check_categories():
|
||||||
|
"""Check that every fixture file is categorized."""
|
||||||
|
categorized_fixtures = set(
|
||||||
|
dimmer.args[1]
|
||||||
|
+ strip.args[1]
|
||||||
|
+ plug.args[1]
|
||||||
|
+ bulb.args[1]
|
||||||
|
+ lightstrip.args[1]
|
||||||
|
+ plug_smart.args[1]
|
||||||
|
+ bulb_smart.args[1]
|
||||||
|
+ dimmers_smart.args[1]
|
||||||
|
+ hubs_smart.args[1]
|
||||||
|
)
|
||||||
|
diffs: Set[FixtureInfo] = set(FIXTURE_DATA) - set(categorized_fixtures)
|
||||||
|
if diffs:
|
||||||
|
print(diffs)
|
||||||
|
for diff in diffs:
|
||||||
|
print(
|
||||||
|
f"No category for file {diff.name} protocol {diff.protocol}, add to the corresponding set (BULBS, PLUGS, ..)"
|
||||||
|
)
|
||||||
|
raise Exception(f"Missing category for {diff.name}")
|
||||||
|
|
||||||
|
|
||||||
|
check_categories()
|
||||||
|
|
||||||
|
|
||||||
|
def device_for_fixture_name(model, protocol):
|
||||||
|
if protocol == "SMART":
|
||||||
|
for d in PLUGS_SMART:
|
||||||
|
if d in model:
|
||||||
|
return SmartDevice
|
||||||
|
for d in BULBS_SMART:
|
||||||
|
if d in model:
|
||||||
|
return SmartBulb
|
||||||
|
for d in DIMMERS_SMART:
|
||||||
|
if d in model:
|
||||||
|
return SmartBulb
|
||||||
|
for d in STRIPS_SMART:
|
||||||
|
if d in model:
|
||||||
|
return SmartDevice
|
||||||
|
for d in HUBS_SMART:
|
||||||
|
if d in model:
|
||||||
|
return SmartDevice
|
||||||
|
else:
|
||||||
|
for d in STRIPS_IOT:
|
||||||
|
if d in model:
|
||||||
|
return IotStrip
|
||||||
|
|
||||||
|
for d in PLUGS_IOT:
|
||||||
|
if d in model:
|
||||||
|
return IotPlug
|
||||||
|
|
||||||
|
# Light strips are recognized also as bulbs, so this has to go first
|
||||||
|
for d in BULBS_IOT_LIGHT_STRIP:
|
||||||
|
if d in model:
|
||||||
|
return IotLightStrip
|
||||||
|
|
||||||
|
for d in BULBS_IOT:
|
||||||
|
if d in model:
|
||||||
|
return IotBulb
|
||||||
|
|
||||||
|
for d in DIMMERS_IOT:
|
||||||
|
if d in model:
|
||||||
|
return IotDimmer
|
||||||
|
|
||||||
|
raise Exception("Unable to find type for %s", model)
|
||||||
|
|
||||||
|
|
||||||
|
async def _update_and_close(d):
|
||||||
|
await d.update()
|
||||||
|
await d.protocol.close()
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
async def _discover_update_and_close(ip, username, password):
|
||||||
|
if username and password:
|
||||||
|
credentials = Credentials(username=username, password=password)
|
||||||
|
else:
|
||||||
|
credentials = None
|
||||||
|
d = await Discover.discover_single(ip, timeout=10, credentials=credentials)
|
||||||
|
return await _update_and_close(d)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_device_for_fixture(fixture_data: FixtureInfo):
|
||||||
|
# if the wanted file is not an absolute path, prepend the fixtures directory
|
||||||
|
|
||||||
|
d = device_for_fixture_name(fixture_data.name, fixture_data.protocol)(
|
||||||
|
host="127.0.0.123"
|
||||||
|
)
|
||||||
|
if fixture_data.protocol == "SMART":
|
||||||
|
d.protocol = FakeSmartProtocol(fixture_data.data, fixture_data.name)
|
||||||
|
else:
|
||||||
|
d.protocol = FakeIotProtocol(fixture_data.data)
|
||||||
|
await _update_and_close(d)
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
async def get_device_for_fixture_protocol(fixture, protocol):
|
||||||
|
finfo = FixtureInfo(name=fixture, protocol=protocol, data={})
|
||||||
|
for fixture_info in FIXTURE_DATA:
|
||||||
|
if finfo == fixture_info:
|
||||||
|
return await get_device_for_fixture(fixture_info)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(params=FIXTURE_DATA, ids=idgenerator)
|
||||||
|
async def dev(request):
|
||||||
|
"""Device fixture.
|
||||||
|
|
||||||
|
Provides a device (given --ip) or parametrized fixture for the supported devices.
|
||||||
|
The initial update is called automatically before returning the device.
|
||||||
|
"""
|
||||||
|
fixture_data: FixtureInfo = request.param
|
||||||
|
|
||||||
|
ip = request.config.getoption("--ip")
|
||||||
|
username = request.config.getoption("--username")
|
||||||
|
password = request.config.getoption("--password")
|
||||||
|
if ip:
|
||||||
|
model = IP_MODEL_CACHE.get(ip)
|
||||||
|
d = None
|
||||||
|
if not model:
|
||||||
|
d = await _discover_update_and_close(ip, username, password)
|
||||||
|
IP_MODEL_CACHE[ip] = model = d.model
|
||||||
|
if model not in fixture_data.name:
|
||||||
|
pytest.skip(f"skipping file {fixture_data.name}")
|
||||||
|
dev: Device = (
|
||||||
|
d if d else await _discover_update_and_close(ip, username, password)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
dev: Device = await get_device_for_fixture(fixture_data)
|
||||||
|
|
||||||
|
yield dev
|
||||||
|
|
||||||
|
await dev.disconnect()
|
173
kasa/tests/discovery_fixtures.py
Normal file
173
kasa/tests/discovery_fixtures.py
Normal file
@ -0,0 +1,173 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from json import dumps as json_dumps
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from kasa.xortransport import XorEncryption
|
||||||
|
|
||||||
|
from .fakeprotocol_iot import FakeIotProtocol
|
||||||
|
from .fakeprotocol_smart import FakeSmartProtocol
|
||||||
|
from .fixtureinfo import FIXTURE_DATA, FixtureInfo, filter_fixtures, idgenerator
|
||||||
|
|
||||||
|
|
||||||
|
def _make_unsupported(device_family, encrypt_type):
|
||||||
|
return {
|
||||||
|
"result": {
|
||||||
|
"device_id": "xx",
|
||||||
|
"owner": "xx",
|
||||||
|
"device_type": device_family,
|
||||||
|
"device_model": "P110(EU)",
|
||||||
|
"ip": "127.0.0.1",
|
||||||
|
"mac": "48-22xxx",
|
||||||
|
"is_support_iot_cloud": True,
|
||||||
|
"obd_src": "tplink",
|
||||||
|
"factory_default": False,
|
||||||
|
"mgt_encrypt_schm": {
|
||||||
|
"is_support_https": False,
|
||||||
|
"encrypt_type": encrypt_type,
|
||||||
|
"http_port": 80,
|
||||||
|
"lv": 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"error_code": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
UNSUPPORTED_DEVICES = {
|
||||||
|
"unknown_device_family": _make_unsupported("SMART.TAPOXMASTREE", "AES"),
|
||||||
|
"wrong_encryption_iot": _make_unsupported("IOT.SMARTPLUGSWITCH", "AES"),
|
||||||
|
"wrong_encryption_smart": _make_unsupported("SMART.TAPOBULB", "IOT"),
|
||||||
|
"unknown_encryption": _make_unsupported("IOT.SMARTPLUGSWITCH", "FOO"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def parametrize_discovery(desc, root_key):
|
||||||
|
filtered_fixtures = filter_fixtures(desc, data_root_filter=root_key)
|
||||||
|
return pytest.mark.parametrize(
|
||||||
|
"discovery_mock",
|
||||||
|
filtered_fixtures,
|
||||||
|
indirect=True,
|
||||||
|
ids=idgenerator,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
new_discovery = parametrize_discovery("new discovery", "discovery_result")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(params=FIXTURE_DATA, ids=idgenerator)
|
||||||
|
def discovery_mock(request, mocker):
|
||||||
|
fixture_info: FixtureInfo = request.param
|
||||||
|
fixture_data = fixture_info.data
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class _DiscoveryMock:
|
||||||
|
ip: str
|
||||||
|
default_port: int
|
||||||
|
discovery_port: int
|
||||||
|
discovery_data: dict
|
||||||
|
query_data: dict
|
||||||
|
device_type: str
|
||||||
|
encrypt_type: str
|
||||||
|
login_version: Optional[int] = None
|
||||||
|
port_override: Optional[int] = None
|
||||||
|
|
||||||
|
if "discovery_result" in fixture_data:
|
||||||
|
discovery_data = {"result": fixture_data["discovery_result"]}
|
||||||
|
device_type = fixture_data["discovery_result"]["device_type"]
|
||||||
|
encrypt_type = fixture_data["discovery_result"]["mgt_encrypt_schm"][
|
||||||
|
"encrypt_type"
|
||||||
|
]
|
||||||
|
login_version = fixture_data["discovery_result"]["mgt_encrypt_schm"].get("lv")
|
||||||
|
datagram = (
|
||||||
|
b"\x02\x00\x00\x01\x01[\x00\x00\x00\x00\x00\x00W\xcev\xf8"
|
||||||
|
+ json_dumps(discovery_data).encode()
|
||||||
|
)
|
||||||
|
dm = _DiscoveryMock(
|
||||||
|
"127.0.0.123",
|
||||||
|
80,
|
||||||
|
20002,
|
||||||
|
discovery_data,
|
||||||
|
fixture_data,
|
||||||
|
device_type,
|
||||||
|
encrypt_type,
|
||||||
|
login_version,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
sys_info = fixture_data["system"]["get_sysinfo"]
|
||||||
|
discovery_data = {"system": {"get_sysinfo": sys_info}}
|
||||||
|
device_type = sys_info.get("mic_type") or sys_info.get("type")
|
||||||
|
encrypt_type = "XOR"
|
||||||
|
login_version = None
|
||||||
|
datagram = XorEncryption.encrypt(json_dumps(discovery_data))[4:]
|
||||||
|
dm = _DiscoveryMock(
|
||||||
|
"127.0.0.123",
|
||||||
|
9999,
|
||||||
|
9999,
|
||||||
|
discovery_data,
|
||||||
|
fixture_data,
|
||||||
|
device_type,
|
||||||
|
encrypt_type,
|
||||||
|
login_version,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def mock_discover(self):
|
||||||
|
port = (
|
||||||
|
dm.port_override
|
||||||
|
if dm.port_override and dm.discovery_port != 20002
|
||||||
|
else dm.discovery_port
|
||||||
|
)
|
||||||
|
self.datagram_received(
|
||||||
|
datagram,
|
||||||
|
(dm.ip, port),
|
||||||
|
)
|
||||||
|
|
||||||
|
mocker.patch("kasa.discover._DiscoverProtocol.do_discover", mock_discover)
|
||||||
|
mocker.patch(
|
||||||
|
"socket.getaddrinfo",
|
||||||
|
side_effect=lambda *_, **__: [(None, None, None, None, (dm.ip, 0))],
|
||||||
|
)
|
||||||
|
|
||||||
|
if fixture_info.protocol == "SMART":
|
||||||
|
proto = FakeSmartProtocol(fixture_data, fixture_info.name)
|
||||||
|
else:
|
||||||
|
proto = FakeIotProtocol(fixture_data)
|
||||||
|
|
||||||
|
async def _query(request, retry_count: int = 3):
|
||||||
|
return await proto.query(request)
|
||||||
|
|
||||||
|
mocker.patch("kasa.IotProtocol.query", side_effect=_query)
|
||||||
|
mocker.patch("kasa.SmartProtocol.query", side_effect=_query)
|
||||||
|
|
||||||
|
yield dm
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(params=FIXTURE_DATA, ids=idgenerator)
|
||||||
|
def discovery_data(request, mocker):
|
||||||
|
"""Return raw discovery file contents as JSON. Used for discovery tests."""
|
||||||
|
fixture_info = request.param
|
||||||
|
mocker.patch("kasa.IotProtocol.query", return_value=fixture_info.data)
|
||||||
|
mocker.patch("kasa.SmartProtocol.query", return_value=fixture_info.data)
|
||||||
|
if "discovery_result" in fixture_info.data:
|
||||||
|
return {"result": fixture_info.data["discovery_result"]}
|
||||||
|
else:
|
||||||
|
return {"system": {"get_sysinfo": fixture_info.data["system"]["get_sysinfo"]}}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(params=UNSUPPORTED_DEVICES.values(), ids=UNSUPPORTED_DEVICES.keys())
|
||||||
|
def unsupported_device_info(request, mocker):
|
||||||
|
"""Return unsupported devices for cli and discovery tests."""
|
||||||
|
discovery_data = request.param
|
||||||
|
host = "127.0.0.1"
|
||||||
|
|
||||||
|
async def mock_discover(self):
|
||||||
|
if discovery_data:
|
||||||
|
data = (
|
||||||
|
b"\x02\x00\x00\x01\x01[\x00\x00\x00\x00\x00\x00W\xcev\xf8"
|
||||||
|
+ json_dumps(discovery_data).encode()
|
||||||
|
)
|
||||||
|
self.datagram_received(data, (host, 20002))
|
||||||
|
|
||||||
|
mocker.patch("kasa.discover._DiscoverProtocol.do_discover", mock_discover)
|
||||||
|
|
||||||
|
yield discovery_data
|
@ -129,6 +129,7 @@ class FakeIotProtocol(IotProtocol):
|
|||||||
config=DeviceConfig("127.0.0.123"),
|
config=DeviceConfig("127.0.0.123"),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
info = copy.deepcopy(info)
|
||||||
self.discovery_data = info
|
self.discovery_data = info
|
||||||
self.writer = None
|
self.writer = None
|
||||||
self.reader = None
|
self.reader = None
|
||||||
|
@ -1,14 +1,17 @@
|
|||||||
import warnings
|
import copy
|
||||||
from json import loads as json_loads
|
from json import loads as json_loads
|
||||||
|
|
||||||
from kasa import Credentials, DeviceConfig, KasaException, SmartProtocol
|
import pytest
|
||||||
|
|
||||||
|
from kasa import Credentials, DeviceConfig, SmartProtocol
|
||||||
|
from kasa.exceptions import SmartErrorCode
|
||||||
from kasa.protocol import BaseTransport
|
from kasa.protocol import BaseTransport
|
||||||
|
|
||||||
|
|
||||||
class FakeSmartProtocol(SmartProtocol):
|
class FakeSmartProtocol(SmartProtocol):
|
||||||
def __init__(self, info):
|
def __init__(self, info, fixture_name):
|
||||||
super().__init__(
|
super().__init__(
|
||||||
transport=FakeSmartTransport(info),
|
transport=FakeSmartTransport(info, fixture_name),
|
||||||
)
|
)
|
||||||
|
|
||||||
async def query(self, request, retry_count: int = 3):
|
async def query(self, request, retry_count: int = 3):
|
||||||
@ -18,7 +21,7 @@ class FakeSmartProtocol(SmartProtocol):
|
|||||||
|
|
||||||
|
|
||||||
class FakeSmartTransport(BaseTransport):
|
class FakeSmartTransport(BaseTransport):
|
||||||
def __init__(self, info):
|
def __init__(self, info, fixture_name):
|
||||||
super().__init__(
|
super().__init__(
|
||||||
config=DeviceConfig(
|
config=DeviceConfig(
|
||||||
"127.0.0.123",
|
"127.0.0.123",
|
||||||
@ -28,7 +31,8 @@ class FakeSmartTransport(BaseTransport):
|
|||||||
),
|
),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
self.info = info
|
self.fixture_name = fixture_name
|
||||||
|
self.info = copy.deepcopy(info)
|
||||||
self.components = {
|
self.components = {
|
||||||
comp["id"]: comp["ver_code"]
|
comp["id"]: comp["ver_code"]
|
||||||
for comp in self.info["component_nego"]["component_list"]
|
for comp in self.info["component_nego"]["component_list"]
|
||||||
@ -90,11 +94,14 @@ class FakeSmartTransport(BaseTransport):
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
"get_support_alarm_type_list": ("alarm", {
|
"get_support_alarm_type_list": (
|
||||||
|
"alarm",
|
||||||
|
{
|
||||||
"alarm_type_list": [
|
"alarm_type_list": [
|
||||||
"Doorbell Ring 1",
|
"Doorbell Ring 1",
|
||||||
]
|
]
|
||||||
}),
|
},
|
||||||
|
),
|
||||||
"get_device_usage": ("device", {}),
|
"get_device_usage": ("device", {}),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -149,18 +156,26 @@ class FakeSmartTransport(BaseTransport):
|
|||||||
elif method == "component_nego" or method[:4] == "get_":
|
elif method == "component_nego" or method[:4] == "get_":
|
||||||
if method in info:
|
if method in info:
|
||||||
return {"result": info[method], "error_code": 0}
|
return {"result": info[method], "error_code": 0}
|
||||||
elif (
|
if (
|
||||||
|
# FIXTURE_MISSING is for service calls not in place when
|
||||||
|
# SMART fixtures started to be generated
|
||||||
missing_result := self.FIXTURE_MISSING_MAP.get(method)
|
missing_result := self.FIXTURE_MISSING_MAP.get(method)
|
||||||
) and missing_result[0] in self.components:
|
) and missing_result[0] in self.components:
|
||||||
warnings.warn(
|
retval = {"result": missing_result[1], "error_code": 0}
|
||||||
UserWarning(
|
|
||||||
f"Fixture missing expected method {method}, try to regenerate"
|
|
||||||
),
|
|
||||||
stacklevel=1,
|
|
||||||
)
|
|
||||||
return {"result": missing_result[1], "error_code": 0}
|
|
||||||
else:
|
else:
|
||||||
raise KasaException(f"Fixture doesn't support {method}")
|
# PARAMS error returned for KS240 when get_device_usage called
|
||||||
|
# on parent device. Could be any error code though.
|
||||||
|
# TODO: Try to figure out if there's a way to prevent the KS240 smartdevice
|
||||||
|
# calling the unsupported device in the first place.
|
||||||
|
retval = {
|
||||||
|
"error_code": SmartErrorCode.PARAMS_ERROR.value,
|
||||||
|
"method": "get_device_usage",
|
||||||
|
}
|
||||||
|
# Reduce warning spam by consolidating and reporting at the end of the run
|
||||||
|
if self.fixture_name not in pytest.fixtures_missing_methods:
|
||||||
|
pytest.fixtures_missing_methods[self.fixture_name] = set()
|
||||||
|
pytest.fixtures_missing_methods[self.fixture_name].add(method)
|
||||||
|
return retval
|
||||||
elif method == "set_qs_info":
|
elif method == "set_qs_info":
|
||||||
return {"error_code": 0}
|
return {"error_code": 0}
|
||||||
elif method[:4] == "set_":
|
elif method[:4] == "set_":
|
||||||
|
118
kasa/tests/fixtureinfo.py
Normal file
118
kasa/tests/fixtureinfo.py
Normal file
@ -0,0 +1,118 @@
|
|||||||
|
import glob
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Dict, List, NamedTuple, Optional, Set
|
||||||
|
|
||||||
|
|
||||||
|
class FixtureInfo(NamedTuple):
|
||||||
|
name: str
|
||||||
|
protocol: str
|
||||||
|
data: Dict
|
||||||
|
|
||||||
|
|
||||||
|
FixtureInfo.__hash__ = lambda x: hash((x.name, x.protocol)) # type: ignore[attr-defined, method-assign]
|
||||||
|
FixtureInfo.__eq__ = lambda x, y: hash(x) == hash(y) # type: ignore[method-assign]
|
||||||
|
|
||||||
|
|
||||||
|
SUPPORTED_IOT_DEVICES = [
|
||||||
|
(device, "IOT")
|
||||||
|
for device in glob.glob(
|
||||||
|
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/*.json"
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
SUPPORTED_SMART_DEVICES = [
|
||||||
|
(device, "SMART")
|
||||||
|
for device in glob.glob(
|
||||||
|
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/smart/*.json"
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
SUPPORTED_DEVICES = SUPPORTED_IOT_DEVICES + SUPPORTED_SMART_DEVICES
|
||||||
|
|
||||||
|
|
||||||
|
def idgenerator(paramtuple: FixtureInfo):
|
||||||
|
try:
|
||||||
|
return paramtuple.name + (
|
||||||
|
"" if paramtuple.protocol == "IOT" else "-" + paramtuple.protocol
|
||||||
|
)
|
||||||
|
except: # TODO: HACK as idgenerator is now used by default # noqa: E722
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_fixture_info() -> List[FixtureInfo]:
|
||||||
|
"""Return raw discovery file contents as JSON. Used for discovery tests."""
|
||||||
|
fixture_data = []
|
||||||
|
for file, protocol in SUPPORTED_DEVICES:
|
||||||
|
p = Path(file)
|
||||||
|
folder = Path(__file__).parent / "fixtures"
|
||||||
|
if protocol == "SMART":
|
||||||
|
folder = folder / "smart"
|
||||||
|
p = folder / file
|
||||||
|
|
||||||
|
with open(p) as f:
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
|
fixture_name = p.name
|
||||||
|
fixture_data.append(
|
||||||
|
FixtureInfo(data=data, protocol=protocol, name=fixture_name)
|
||||||
|
)
|
||||||
|
return fixture_data
|
||||||
|
|
||||||
|
|
||||||
|
FIXTURE_DATA: List[FixtureInfo] = get_fixture_info()
|
||||||
|
|
||||||
|
|
||||||
|
def filter_fixtures(
|
||||||
|
desc,
|
||||||
|
*,
|
||||||
|
data_root_filter: Optional[str] = None,
|
||||||
|
protocol_filter: Optional[Set[str]] = None,
|
||||||
|
model_filter: Optional[Set[str]] = None,
|
||||||
|
component_filter: Optional[str] = None,
|
||||||
|
):
|
||||||
|
"""Filter the fixtures based on supplied parameters.
|
||||||
|
|
||||||
|
data_root_filter: return fixtures containing the supplied top
|
||||||
|
level key, i.e. discovery_result
|
||||||
|
protocol_filter: set of protocols to match, IOT or SMART
|
||||||
|
model_filter: set of device models to match
|
||||||
|
component_filter: filter SMART fixtures that have the provided
|
||||||
|
component in component_nego details.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _model_match(fixture_data: FixtureInfo, model_filter):
|
||||||
|
file_model_region = fixture_data.name.split("_")[0]
|
||||||
|
file_model = file_model_region.split("(")[0]
|
||||||
|
return file_model in model_filter
|
||||||
|
|
||||||
|
def _component_match(fixture_data: FixtureInfo, component_filter):
|
||||||
|
if (component_nego := fixture_data.data.get("component_nego")) is None:
|
||||||
|
return False
|
||||||
|
components = {
|
||||||
|
component["id"]: component["ver_code"]
|
||||||
|
for component in component_nego["component_list"]
|
||||||
|
}
|
||||||
|
return component_filter in components
|
||||||
|
|
||||||
|
filtered = []
|
||||||
|
if protocol_filter is None:
|
||||||
|
protocol_filter = {"IOT", "SMART"}
|
||||||
|
for fixture_data in FIXTURE_DATA:
|
||||||
|
if data_root_filter and data_root_filter not in fixture_data.data:
|
||||||
|
continue
|
||||||
|
if fixture_data.protocol not in protocol_filter:
|
||||||
|
continue
|
||||||
|
if model_filter is not None and not _model_match(fixture_data, model_filter):
|
||||||
|
continue
|
||||||
|
if component_filter and not _component_match(fixture_data, component_filter):
|
||||||
|
continue
|
||||||
|
|
||||||
|
filtered.append(fixture_data)
|
||||||
|
|
||||||
|
print(f"# {desc}")
|
||||||
|
for value in filtered:
|
||||||
|
print(f"\t{value.name}")
|
||||||
|
return filtered
|
@ -35,7 +35,7 @@ from kasa.iot import IotDevice
|
|||||||
from .conftest import (
|
from .conftest import (
|
||||||
device_iot,
|
device_iot,
|
||||||
device_smart,
|
device_smart,
|
||||||
get_device_for_file,
|
get_device_for_fixture_protocol,
|
||||||
handle_turn_on,
|
handle_turn_on,
|
||||||
new_discovery,
|
new_discovery,
|
||||||
turn_on,
|
turn_on,
|
||||||
@ -695,7 +695,9 @@ async def test_errors(mocker):
|
|||||||
|
|
||||||
async def test_feature(mocker):
|
async def test_feature(mocker):
|
||||||
"""Test feature command."""
|
"""Test feature command."""
|
||||||
dummy_device = await get_device_for_file("P300(EU)_1.0_1.0.13.json", "SMART")
|
dummy_device = await get_device_for_fixture_protocol(
|
||||||
|
"P300(EU)_1.0_1.0.13.json", "SMART"
|
||||||
|
)
|
||||||
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
||||||
runner = CliRunner()
|
runner = CliRunner()
|
||||||
res = await runner.invoke(
|
res = await runner.invoke(
|
||||||
@ -711,7 +713,9 @@ async def test_feature(mocker):
|
|||||||
|
|
||||||
async def test_feature_single(mocker):
|
async def test_feature_single(mocker):
|
||||||
"""Test feature command returning single value."""
|
"""Test feature command returning single value."""
|
||||||
dummy_device = await get_device_for_file("P300(EU)_1.0_1.0.13.json", "SMART")
|
dummy_device = await get_device_for_fixture_protocol(
|
||||||
|
"P300(EU)_1.0_1.0.13.json", "SMART"
|
||||||
|
)
|
||||||
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
||||||
runner = CliRunner()
|
runner = CliRunner()
|
||||||
res = await runner.invoke(
|
res = await runner.invoke(
|
||||||
@ -723,9 +727,12 @@ async def test_feature_single(mocker):
|
|||||||
assert "== Features ==" not in res.output
|
assert "== Features ==" not in res.output
|
||||||
assert res.exit_code == 0
|
assert res.exit_code == 0
|
||||||
|
|
||||||
|
|
||||||
async def test_feature_missing(mocker):
|
async def test_feature_missing(mocker):
|
||||||
"""Test feature command returning single value."""
|
"""Test feature command returning single value."""
|
||||||
dummy_device = await get_device_for_file("P300(EU)_1.0_1.0.13.json", "SMART")
|
dummy_device = await get_device_for_fixture_protocol(
|
||||||
|
"P300(EU)_1.0_1.0.13.json", "SMART"
|
||||||
|
)
|
||||||
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
||||||
runner = CliRunner()
|
runner = CliRunner()
|
||||||
res = await runner.invoke(
|
res = await runner.invoke(
|
||||||
@ -737,9 +744,12 @@ async def test_feature_missing(mocker):
|
|||||||
assert "== Features ==" not in res.output
|
assert "== Features ==" not in res.output
|
||||||
assert res.exit_code == 0
|
assert res.exit_code == 0
|
||||||
|
|
||||||
|
|
||||||
async def test_feature_set(mocker):
|
async def test_feature_set(mocker):
|
||||||
"""Test feature command's set value."""
|
"""Test feature command's set value."""
|
||||||
dummy_device = await get_device_for_file("P300(EU)_1.0_1.0.13.json", "SMART")
|
dummy_device = await get_device_for_fixture_protocol(
|
||||||
|
"P300(EU)_1.0_1.0.13.json", "SMART"
|
||||||
|
)
|
||||||
led_setter = mocker.patch("kasa.smart.modules.ledmodule.LedModule.set_led")
|
led_setter = mocker.patch("kasa.smart.modules.ledmodule.LedModule.set_led")
|
||||||
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
||||||
|
|
||||||
@ -757,7 +767,9 @@ async def test_feature_set(mocker):
|
|||||||
|
|
||||||
async def test_feature_set_child(mocker):
|
async def test_feature_set_child(mocker):
|
||||||
"""Test feature command's set value."""
|
"""Test feature command's set value."""
|
||||||
dummy_device = await get_device_for_file("P300(EU)_1.0_1.0.13.json", "SMART")
|
dummy_device = await get_device_for_fixture_protocol(
|
||||||
|
"P300(EU)_1.0_1.0.13.json", "SMART"
|
||||||
|
)
|
||||||
setter = mocker.patch("kasa.smart.smartdevice.SmartDevice.set_state")
|
setter = mocker.patch("kasa.smart.smartdevice.SmartDevice.set_state")
|
||||||
|
|
||||||
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
||||||
|
@ -20,9 +20,8 @@ from kasa.deviceconfig import (
|
|||||||
from kasa.discover import DiscoveryResult
|
from kasa.discover import DiscoveryResult
|
||||||
|
|
||||||
|
|
||||||
def _get_connection_type_device_class(the_fixture_data):
|
def _get_connection_type_device_class(discovery_info):
|
||||||
if "discovery_result" in the_fixture_data:
|
if "result" in discovery_info:
|
||||||
discovery_info = {"result": the_fixture_data["discovery_result"]}
|
|
||||||
device_class = Discover._get_device_class(discovery_info)
|
device_class = Discover._get_device_class(discovery_info)
|
||||||
dr = DiscoveryResult(**discovery_info["result"])
|
dr = DiscoveryResult(**discovery_info["result"])
|
||||||
|
|
||||||
@ -33,21 +32,18 @@ def _get_connection_type_device_class(the_fixture_data):
|
|||||||
connection_type = ConnectionType.from_values(
|
connection_type = ConnectionType.from_values(
|
||||||
DeviceFamilyType.IotSmartPlugSwitch.value, EncryptType.Xor.value
|
DeviceFamilyType.IotSmartPlugSwitch.value, EncryptType.Xor.value
|
||||||
)
|
)
|
||||||
device_class = Discover._get_device_class(the_fixture_data)
|
device_class = Discover._get_device_class(discovery_info)
|
||||||
|
|
||||||
return connection_type, device_class
|
return connection_type, device_class
|
||||||
|
|
||||||
|
|
||||||
async def test_connect(
|
async def test_connect(
|
||||||
all_fixture_data: dict,
|
discovery_data,
|
||||||
mocker,
|
mocker,
|
||||||
):
|
):
|
||||||
"""Test that if the protocol is passed in it gets set correctly."""
|
"""Test that if the protocol is passed in it gets set correctly."""
|
||||||
host = "127.0.0.1"
|
host = "127.0.0.1"
|
||||||
ctype, device_class = _get_connection_type_device_class(all_fixture_data)
|
ctype, device_class = _get_connection_type_device_class(discovery_data)
|
||||||
|
|
||||||
mocker.patch("kasa.IotProtocol.query", return_value=all_fixture_data)
|
|
||||||
mocker.patch("kasa.SmartProtocol.query", return_value=all_fixture_data)
|
|
||||||
|
|
||||||
config = DeviceConfig(
|
config = DeviceConfig(
|
||||||
host=host, credentials=Credentials("foor", "bar"), connection_type=ctype
|
host=host, credentials=Credentials("foor", "bar"), connection_type=ctype
|
||||||
@ -67,34 +63,32 @@ async def test_connect(
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("custom_port", [123, None])
|
@pytest.mark.parametrize("custom_port", [123, None])
|
||||||
async def test_connect_custom_port(all_fixture_data: dict, mocker, custom_port):
|
async def test_connect_custom_port(discovery_data: dict, mocker, custom_port):
|
||||||
"""Make sure that connect returns an initialized SmartDevice instance."""
|
"""Make sure that connect returns an initialized SmartDevice instance."""
|
||||||
host = "127.0.0.1"
|
host = "127.0.0.1"
|
||||||
|
|
||||||
ctype, _ = _get_connection_type_device_class(all_fixture_data)
|
ctype, _ = _get_connection_type_device_class(discovery_data)
|
||||||
config = DeviceConfig(
|
config = DeviceConfig(
|
||||||
host=host,
|
host=host,
|
||||||
port_override=custom_port,
|
port_override=custom_port,
|
||||||
connection_type=ctype,
|
connection_type=ctype,
|
||||||
credentials=Credentials("dummy_user", "dummy_password"),
|
credentials=Credentials("dummy_user", "dummy_password"),
|
||||||
)
|
)
|
||||||
default_port = 80 if "discovery_result" in all_fixture_data else 9999
|
default_port = 80 if "result" in discovery_data else 9999
|
||||||
|
|
||||||
|
ctype, _ = _get_connection_type_device_class(discovery_data)
|
||||||
|
|
||||||
ctype, _ = _get_connection_type_device_class(all_fixture_data)
|
|
||||||
mocker.patch("kasa.IotProtocol.query", return_value=all_fixture_data)
|
|
||||||
mocker.patch("kasa.SmartProtocol.query", return_value=all_fixture_data)
|
|
||||||
dev = await connect(config=config)
|
dev = await connect(config=config)
|
||||||
assert issubclass(dev.__class__, Device)
|
assert issubclass(dev.__class__, Device)
|
||||||
assert dev.port == custom_port or dev.port == default_port
|
assert dev.port == custom_port or dev.port == default_port
|
||||||
|
|
||||||
|
|
||||||
async def test_connect_logs_connect_time(
|
async def test_connect_logs_connect_time(
|
||||||
all_fixture_data: dict, caplog: pytest.LogCaptureFixture, mocker
|
discovery_data: dict,
|
||||||
|
caplog: pytest.LogCaptureFixture,
|
||||||
):
|
):
|
||||||
"""Test that the connect time is logged when debug logging is enabled."""
|
"""Test that the connect time is logged when debug logging is enabled."""
|
||||||
ctype, _ = _get_connection_type_device_class(all_fixture_data)
|
ctype, _ = _get_connection_type_device_class(discovery_data)
|
||||||
mocker.patch("kasa.IotProtocol.query", return_value=all_fixture_data)
|
|
||||||
mocker.patch("kasa.SmartProtocol.query", return_value=all_fixture_data)
|
|
||||||
|
|
||||||
host = "127.0.0.1"
|
host = "127.0.0.1"
|
||||||
config = DeviceConfig(
|
config = DeviceConfig(
|
||||||
@ -107,13 +101,13 @@ async def test_connect_logs_connect_time(
|
|||||||
assert "seconds to update" in caplog.text
|
assert "seconds to update" in caplog.text
|
||||||
|
|
||||||
|
|
||||||
async def test_connect_query_fails(all_fixture_data: dict, mocker):
|
async def test_connect_query_fails(discovery_data, mocker):
|
||||||
"""Make sure that connect fails when query fails."""
|
"""Make sure that connect fails when query fails."""
|
||||||
host = "127.0.0.1"
|
host = "127.0.0.1"
|
||||||
mocker.patch("kasa.IotProtocol.query", side_effect=KasaException)
|
mocker.patch("kasa.IotProtocol.query", side_effect=KasaException)
|
||||||
mocker.patch("kasa.SmartProtocol.query", side_effect=KasaException)
|
mocker.patch("kasa.SmartProtocol.query", side_effect=KasaException)
|
||||||
|
|
||||||
ctype, _ = _get_connection_type_device_class(all_fixture_data)
|
ctype, _ = _get_connection_type_device_class(discovery_data)
|
||||||
config = DeviceConfig(
|
config = DeviceConfig(
|
||||||
host=host, credentials=Credentials("foor", "bar"), connection_type=ctype
|
host=host, credentials=Credentials("foor", "bar"), connection_type=ctype
|
||||||
)
|
)
|
||||||
@ -125,14 +119,11 @@ async def test_connect_query_fails(all_fixture_data: dict, mocker):
|
|||||||
assert close_mock.call_count == 1
|
assert close_mock.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
async def test_connect_http_client(all_fixture_data, mocker):
|
async def test_connect_http_client(discovery_data, mocker):
|
||||||
"""Make sure that discover_single returns an initialized SmartDevice instance."""
|
"""Make sure that discover_single returns an initialized SmartDevice instance."""
|
||||||
host = "127.0.0.1"
|
host = "127.0.0.1"
|
||||||
|
|
||||||
ctype, _ = _get_connection_type_device_class(all_fixture_data)
|
ctype, _ = _get_connection_type_device_class(discovery_data)
|
||||||
|
|
||||||
mocker.patch("kasa.IotProtocol.query", return_value=all_fixture_data)
|
|
||||||
mocker.patch("kasa.SmartProtocol.query", return_value=all_fixture_data)
|
|
||||||
|
|
||||||
http_client = aiohttp.ClientSession()
|
http_client = aiohttp.ClientSession()
|
||||||
|
|
||||||
@ -142,6 +133,7 @@ async def test_connect_http_client(all_fixture_data, mocker):
|
|||||||
dev = await connect(config=config)
|
dev = await connect(config=config)
|
||||||
if ctype.encryption_type != EncryptType.Xor:
|
if ctype.encryption_type != EncryptType.Xor:
|
||||||
assert dev.protocol._transport._http_client.client != http_client
|
assert dev.protocol._transport._http_client.client != http_client
|
||||||
|
await dev.disconnect()
|
||||||
|
|
||||||
config = DeviceConfig(
|
config = DeviceConfig(
|
||||||
host=host,
|
host=host,
|
||||||
@ -152,3 +144,5 @@ async def test_connect_http_client(all_fixture_data, mocker):
|
|||||||
dev = await connect(config=config)
|
dev = await connect(config=config)
|
||||||
if ctype.encryption_type != EncryptType.Xor:
|
if ctype.encryption_type != EncryptType.Xor:
|
||||||
assert dev.protocol._transport._http_client.client == http_client
|
assert dev.protocol._transport._http_client.client == http_client
|
||||||
|
await dev.disconnect()
|
||||||
|
await http_client.close()
|
||||||
|
@ -299,8 +299,9 @@ async def test_discover_single_authentication(discovery_mock, mocker):
|
|||||||
|
|
||||||
|
|
||||||
@new_discovery
|
@new_discovery
|
||||||
async def test_device_update_from_new_discovery_info(discovery_data):
|
async def test_device_update_from_new_discovery_info(discovery_mock):
|
||||||
"""Make sure that new discovery devices update from discovery info correctly."""
|
"""Make sure that new discovery devices update from discovery info correctly."""
|
||||||
|
discovery_data = discovery_mock.discovery_data
|
||||||
device_class = Discover._get_device_class(discovery_data)
|
device_class = Discover._get_device_class(discovery_data)
|
||||||
device = device_class("127.0.0.1")
|
device = device_class("127.0.0.1")
|
||||||
discover_info = DiscoveryResult(**discovery_data["result"])
|
discover_info = DiscoveryResult(**discovery_data["result"])
|
||||||
|
12
kasa/tests/test_feature_brightness.py
Normal file
12
kasa/tests/test_feature_brightness.py
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
from kasa.smart import SmartDevice
|
||||||
|
|
||||||
|
from .conftest import (
|
||||||
|
brightness,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@brightness
|
||||||
|
async def test_brightness_component(dev: SmartDevice):
|
||||||
|
"""Placeholder to test framwework component filter."""
|
||||||
|
assert isinstance(dev, SmartDevice)
|
||||||
|
assert "brightness" in dev._components
|
@ -2,12 +2,12 @@ import asyncio
|
|||||||
|
|
||||||
import xdoctest
|
import xdoctest
|
||||||
|
|
||||||
from kasa.tests.conftest import get_device_for_file
|
from kasa.tests.conftest import get_device_for_fixture_protocol
|
||||||
|
|
||||||
|
|
||||||
def test_bulb_examples(mocker):
|
def test_bulb_examples(mocker):
|
||||||
"""Use KL130 (bulb with all features) to test the doctests."""
|
"""Use KL130 (bulb with all features) to test the doctests."""
|
||||||
p = asyncio.run(get_device_for_file("KL130(US)_1.0_1.8.11.json", "IOT"))
|
p = asyncio.run(get_device_for_fixture_protocol("KL130(US)_1.0_1.8.11.json", "IOT"))
|
||||||
mocker.patch("kasa.iot.iotbulb.IotBulb", return_value=p)
|
mocker.patch("kasa.iot.iotbulb.IotBulb", return_value=p)
|
||||||
mocker.patch("kasa.iot.iotbulb.IotBulb.update")
|
mocker.patch("kasa.iot.iotbulb.IotBulb.update")
|
||||||
res = xdoctest.doctest_module("kasa.iot.iotbulb", "all")
|
res = xdoctest.doctest_module("kasa.iot.iotbulb", "all")
|
||||||
@ -16,7 +16,7 @@ def test_bulb_examples(mocker):
|
|||||||
|
|
||||||
def test_smartdevice_examples(mocker):
|
def test_smartdevice_examples(mocker):
|
||||||
"""Use HS110 for emeter examples."""
|
"""Use HS110 for emeter examples."""
|
||||||
p = asyncio.run(get_device_for_file("HS110(EU)_1.0_1.2.5.json", "IOT"))
|
p = asyncio.run(get_device_for_fixture_protocol("HS110(EU)_1.0_1.2.5.json", "IOT"))
|
||||||
mocker.patch("kasa.iot.iotdevice.IotDevice", return_value=p)
|
mocker.patch("kasa.iot.iotdevice.IotDevice", return_value=p)
|
||||||
mocker.patch("kasa.iot.iotdevice.IotDevice.update")
|
mocker.patch("kasa.iot.iotdevice.IotDevice.update")
|
||||||
res = xdoctest.doctest_module("kasa.iot.iotdevice", "all")
|
res = xdoctest.doctest_module("kasa.iot.iotdevice", "all")
|
||||||
@ -25,7 +25,8 @@ def test_smartdevice_examples(mocker):
|
|||||||
|
|
||||||
def test_plug_examples(mocker):
|
def test_plug_examples(mocker):
|
||||||
"""Test plug examples."""
|
"""Test plug examples."""
|
||||||
p = asyncio.run(get_device_for_file("HS110(EU)_1.0_1.2.5.json", "IOT"))
|
p = asyncio.run(get_device_for_fixture_protocol("HS110(EU)_1.0_1.2.5.json", "IOT"))
|
||||||
|
# p = await get_device_for_fixture_protocol("HS110(EU)_1.0_1.2.5.json", "IOT")
|
||||||
mocker.patch("kasa.iot.iotplug.IotPlug", return_value=p)
|
mocker.patch("kasa.iot.iotplug.IotPlug", return_value=p)
|
||||||
mocker.patch("kasa.iot.iotplug.IotPlug.update")
|
mocker.patch("kasa.iot.iotplug.IotPlug.update")
|
||||||
res = xdoctest.doctest_module("kasa.iot.iotplug", "all")
|
res = xdoctest.doctest_module("kasa.iot.iotplug", "all")
|
||||||
@ -34,7 +35,7 @@ def test_plug_examples(mocker):
|
|||||||
|
|
||||||
def test_strip_examples(mocker):
|
def test_strip_examples(mocker):
|
||||||
"""Test strip examples."""
|
"""Test strip examples."""
|
||||||
p = asyncio.run(get_device_for_file("KP303(UK)_1.0_1.0.3.json", "IOT"))
|
p = asyncio.run(get_device_for_fixture_protocol("KP303(UK)_1.0_1.0.3.json", "IOT"))
|
||||||
mocker.patch("kasa.iot.iotstrip.IotStrip", return_value=p)
|
mocker.patch("kasa.iot.iotstrip.IotStrip", return_value=p)
|
||||||
mocker.patch("kasa.iot.iotstrip.IotStrip.update")
|
mocker.patch("kasa.iot.iotstrip.IotStrip.update")
|
||||||
res = xdoctest.doctest_module("kasa.iot.iotstrip", "all")
|
res = xdoctest.doctest_module("kasa.iot.iotstrip", "all")
|
||||||
@ -43,7 +44,7 @@ def test_strip_examples(mocker):
|
|||||||
|
|
||||||
def test_dimmer_examples(mocker):
|
def test_dimmer_examples(mocker):
|
||||||
"""Test dimmer examples."""
|
"""Test dimmer examples."""
|
||||||
p = asyncio.run(get_device_for_file("HS220(US)_1.0_1.5.7.json", "IOT"))
|
p = asyncio.run(get_device_for_fixture_protocol("HS220(US)_1.0_1.5.7.json", "IOT"))
|
||||||
mocker.patch("kasa.iot.iotdimmer.IotDimmer", return_value=p)
|
mocker.patch("kasa.iot.iotdimmer.IotDimmer", return_value=p)
|
||||||
mocker.patch("kasa.iot.iotdimmer.IotDimmer.update")
|
mocker.patch("kasa.iot.iotdimmer.IotDimmer.update")
|
||||||
res = xdoctest.doctest_module("kasa.iot.iotdimmer", "all")
|
res = xdoctest.doctest_module("kasa.iot.iotdimmer", "all")
|
||||||
@ -52,7 +53,7 @@ def test_dimmer_examples(mocker):
|
|||||||
|
|
||||||
def test_lightstrip_examples(mocker):
|
def test_lightstrip_examples(mocker):
|
||||||
"""Test lightstrip examples."""
|
"""Test lightstrip examples."""
|
||||||
p = asyncio.run(get_device_for_file("KL430(US)_1.0_1.0.10.json", "IOT"))
|
p = asyncio.run(get_device_for_fixture_protocol("KL430(US)_1.0_1.0.10.json", "IOT"))
|
||||||
mocker.patch("kasa.iot.iotlightstrip.IotLightStrip", return_value=p)
|
mocker.patch("kasa.iot.iotlightstrip.IotLightStrip", return_value=p)
|
||||||
mocker.patch("kasa.iot.iotlightstrip.IotLightStrip.update")
|
mocker.patch("kasa.iot.iotlightstrip.IotLightStrip.update")
|
||||||
res = xdoctest.doctest_module("kasa.iot.iotlightstrip", "all")
|
res = xdoctest.doctest_module("kasa.iot.iotlightstrip", "all")
|
||||||
@ -61,7 +62,7 @@ def test_lightstrip_examples(mocker):
|
|||||||
|
|
||||||
def test_discovery_examples(mocker):
|
def test_discovery_examples(mocker):
|
||||||
"""Test discovery examples."""
|
"""Test discovery examples."""
|
||||||
p = asyncio.run(get_device_for_file("KP303(UK)_1.0_1.0.3.json", "IOT"))
|
p = asyncio.run(get_device_for_fixture_protocol("KP303(UK)_1.0_1.0.3.json", "IOT"))
|
||||||
|
|
||||||
mocker.patch("kasa.discover.Discover.discover", return_value=[p])
|
mocker.patch("kasa.discover.Discover.discover", return_value=[p])
|
||||||
res = xdoctest.doctest_module("kasa.discover", "all")
|
res = xdoctest.doctest_module("kasa.discover", "all")
|
||||||
|
Loading…
Reference in New Issue
Block a user