mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-01-23 05:07:09 +00:00
b220beb811
To fix intermittent issues with [windows CI](https://github.com/python-kasa/python-kasa/actions/runs/9952477932/job/27493918272?pr=1068). Probably better to use monotonic here anyway. ``` FAILED kasa/tests/test_smartdevice.py::test_update_module_update_delays[L530E(EU)_3.0_1.1.6.json-SMART] - ValueError: Clock moved backwards. Refusing to generate ID. ```
431 lines
14 KiB
Python
431 lines
14 KiB
Python
"""Tests for SMART devices."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
from typing import Any, cast
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from freezegun.api import FrozenDateTimeFactory
|
|
from pytest_mock import MockerFixture
|
|
|
|
from kasa import Device, KasaException, Module
|
|
from kasa.exceptions import SmartErrorCode
|
|
from kasa.smart import SmartDevice
|
|
|
|
from .conftest import (
|
|
device_smart,
|
|
get_device_for_fixture_protocol,
|
|
get_parent_and_child_modules,
|
|
)
|
|
|
|
|
|
@device_smart
|
|
async def test_try_get_response(dev: SmartDevice, caplog):
|
|
mock_response: dict = {
|
|
"get_device_info": SmartErrorCode.PARAMS_ERROR,
|
|
}
|
|
caplog.set_level(logging.DEBUG)
|
|
dev._try_get_response(mock_response, "get_device_info", {})
|
|
msg = "Error PARAMS_ERROR(-1008) getting request get_device_info for device 127.0.0.123"
|
|
assert msg in caplog.text
|
|
|
|
|
|
@device_smart
|
|
async def test_update_no_device_info(dev: SmartDevice, mocker: MockerFixture):
|
|
mock_response: dict = {
|
|
"get_device_usage": {},
|
|
"get_device_time": {},
|
|
}
|
|
msg = f"get_device_info not found in {mock_response} for device 127.0.0.123"
|
|
with (
|
|
mocker.patch.object(dev.protocol, "query", return_value=mock_response),
|
|
pytest.raises(KasaException, match=msg),
|
|
):
|
|
await dev.update()
|
|
|
|
|
|
@device_smart
|
|
async def test_initial_update(dev: SmartDevice, mocker: MockerFixture):
|
|
"""Test the initial update cycle."""
|
|
# As the fixture data is already initialized, we reset the state for testing
|
|
dev._components_raw = None
|
|
dev._components = {}
|
|
dev._modules = {}
|
|
dev._features = {}
|
|
dev._children = {}
|
|
dev._last_update = {}
|
|
dev._last_update_time = None
|
|
|
|
negotiate = mocker.spy(dev, "_negotiate")
|
|
initialize_modules = mocker.spy(dev, "_initialize_modules")
|
|
initialize_features = mocker.spy(dev, "_initialize_features")
|
|
|
|
# Perform two updates and verify that initialization is only done once
|
|
await dev.update()
|
|
await dev.update()
|
|
|
|
negotiate.assert_called_once()
|
|
assert dev._components_raw is not None
|
|
initialize_modules.assert_called_once()
|
|
assert dev.modules
|
|
initialize_features.assert_called_once()
|
|
assert dev.features
|
|
|
|
|
|
@device_smart
|
|
async def test_negotiate(dev: SmartDevice, mocker: MockerFixture):
|
|
"""Test that the initial negotiation performs expected steps."""
|
|
# As the fixture data is already initialized, we reset the state for testing
|
|
dev._components_raw = None
|
|
dev._children = {}
|
|
|
|
query = mocker.spy(dev.protocol, "query")
|
|
initialize_children = mocker.spy(dev, "_initialize_children")
|
|
await dev._negotiate()
|
|
|
|
# Check that we got the initial negotiation call
|
|
query.assert_any_call(
|
|
{
|
|
"component_nego": None,
|
|
"get_device_info": None,
|
|
"get_connect_cloud_state": None,
|
|
}
|
|
)
|
|
assert dev._components_raw
|
|
|
|
# Check the children are created, if device supports them
|
|
if "child_device" in dev._components:
|
|
initialize_children.assert_called_once()
|
|
query.assert_any_call(
|
|
{
|
|
"get_child_device_component_list": None,
|
|
"get_child_device_list": None,
|
|
}
|
|
)
|
|
assert len(dev._children) == dev.internal_state["get_child_device_list"]["sum"]
|
|
|
|
|
|
@device_smart
|
|
async def test_update_module_queries(dev: SmartDevice, mocker: MockerFixture):
|
|
"""Test that the regular update uses queries from all supported modules."""
|
|
# We need to have some modules initialized by now
|
|
assert dev._modules
|
|
# Reset last update so all modules will query
|
|
for mod in dev._modules.values():
|
|
mod._last_update_time = None
|
|
|
|
device_queries: dict[SmartDevice, dict[str, Any]] = {}
|
|
for mod in dev._modules.values():
|
|
device_queries.setdefault(mod._device, {}).update(mod.query())
|
|
# Hubs do not query child modules by default.
|
|
if dev.device_type != Device.Type.Hub:
|
|
for child in dev.children:
|
|
for mod in child.modules.values():
|
|
device_queries.setdefault(mod._device, {}).update(mod.query())
|
|
|
|
spies = {}
|
|
for device in device_queries:
|
|
spies[device] = mocker.spy(device.protocol, "query")
|
|
|
|
await dev.update()
|
|
for device in device_queries:
|
|
if device_queries[device]:
|
|
# Need assert any here because the child device updates use the parent's protocol
|
|
spies[device].assert_any_call(device_queries[device])
|
|
else:
|
|
spies[device].assert_not_called()
|
|
|
|
|
|
@device_smart
|
|
async def test_update_module_errors(dev: SmartDevice, mocker: MockerFixture):
|
|
"""Test that modules that error are disabled / removed."""
|
|
# We need to have some modules initialized by now
|
|
assert dev._modules
|
|
|
|
critical_modules = {Module.DeviceModule, Module.ChildDevice}
|
|
not_disabling_modules = {Module.Cloud}
|
|
|
|
new_dev = SmartDevice("127.0.0.1", protocol=dev.protocol)
|
|
|
|
module_queries = {
|
|
modname: q
|
|
for modname, module in dev._modules.items()
|
|
if (q := module.query()) and modname not in critical_modules
|
|
}
|
|
child_module_queries = {
|
|
modname: q
|
|
for child in dev.children
|
|
for modname, module in child._modules.items()
|
|
if (q := module.query()) and modname not in critical_modules
|
|
}
|
|
all_queries_names = {
|
|
key for mod_query in module_queries.values() for key in mod_query
|
|
}
|
|
all_child_queries_names = {
|
|
key for mod_query in child_module_queries.values() for key in mod_query
|
|
}
|
|
|
|
async def _query(request, *args, **kwargs):
|
|
responses = await dev.protocol._query(request, *args, **kwargs)
|
|
for k in responses:
|
|
if k in all_queries_names:
|
|
responses[k] = SmartErrorCode.PARAMS_ERROR
|
|
return responses
|
|
|
|
async def _child_query(self, request, *args, **kwargs):
|
|
responses = await child_protocols[self._device_id]._query(
|
|
request, *args, **kwargs
|
|
)
|
|
for k in responses:
|
|
if k in all_child_queries_names:
|
|
responses[k] = SmartErrorCode.PARAMS_ERROR
|
|
return responses
|
|
|
|
mocker.patch.object(new_dev.protocol, "query", side_effect=_query)
|
|
|
|
from kasa.smartprotocol import _ChildProtocolWrapper
|
|
|
|
child_protocols = {
|
|
cast(_ChildProtocolWrapper, child.protocol)._device_id: child.protocol
|
|
for child in dev.children
|
|
}
|
|
# children not created yet so cannot patch.object
|
|
mocker.patch("kasa.smartprotocol._ChildProtocolWrapper.query", new=_child_query)
|
|
|
|
await new_dev.update()
|
|
for modname in module_queries:
|
|
no_disable = modname in not_disabling_modules
|
|
mod_present = modname in new_dev._modules
|
|
assert (
|
|
mod_present is no_disable
|
|
), f"{modname} present {mod_present} when no_disable {no_disable}"
|
|
|
|
for modname in child_module_queries:
|
|
no_disable = modname in not_disabling_modules
|
|
mod_present = any(modname in child._modules for child in new_dev.children)
|
|
assert (
|
|
mod_present is no_disable
|
|
), f"{modname} present {mod_present} when no_disable {no_disable}"
|
|
|
|
|
|
@device_smart
|
|
async def test_update_module_update_delays(
|
|
dev: SmartDevice,
|
|
mocker: MockerFixture,
|
|
caplog: pytest.LogCaptureFixture,
|
|
freezer: FrozenDateTimeFactory,
|
|
):
|
|
"""Test that modules that disabled / removed on query failures."""
|
|
# We need to have some modules initialized by now
|
|
assert dev._modules
|
|
|
|
new_dev = SmartDevice("127.0.0.1", protocol=dev.protocol)
|
|
await new_dev.update()
|
|
first_update_time = time.monotonic()
|
|
assert new_dev._last_update_time == first_update_time
|
|
for module in new_dev.modules.values():
|
|
if module.query():
|
|
assert module._last_update_time == first_update_time
|
|
|
|
seconds = 0
|
|
tick = 30
|
|
while seconds <= 180:
|
|
seconds += tick
|
|
freezer.tick(tick)
|
|
|
|
now = time.monotonic()
|
|
await new_dev.update()
|
|
for module in new_dev.modules.values():
|
|
mod_delay = module.MINIMUM_UPDATE_INTERVAL_SECS
|
|
if module.query():
|
|
expected_update_time = (
|
|
now if mod_delay == 0 else now - (seconds % mod_delay)
|
|
)
|
|
|
|
assert (
|
|
module._last_update_time == expected_update_time
|
|
), f"Expected update time {expected_update_time} after {seconds} seconds for {module.name} with delay {mod_delay} got {module._last_update_time}"
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("first_update"),
|
|
[
|
|
pytest.param(True, id="First update true"),
|
|
pytest.param(False, id="First update false"),
|
|
],
|
|
)
|
|
@device_smart
|
|
async def test_update_module_query_errors(
|
|
dev: SmartDevice,
|
|
mocker: MockerFixture,
|
|
caplog: pytest.LogCaptureFixture,
|
|
freezer: FrozenDateTimeFactory,
|
|
first_update,
|
|
):
|
|
"""Test that modules that disabled / removed on query failures."""
|
|
# We need to have some modules initialized by now
|
|
assert dev._modules
|
|
|
|
first_update_queries = {"get_device_info", "get_connect_cloud_state"}
|
|
|
|
critical_modules = {Module.DeviceModule, Module.ChildDevice}
|
|
not_disabling_modules = {Module.Cloud}
|
|
|
|
new_dev = SmartDevice("127.0.0.1", protocol=dev.protocol)
|
|
if not first_update:
|
|
await new_dev.update()
|
|
freezer.tick(
|
|
max(module.MINIMUM_UPDATE_INTERVAL_SECS for module in dev._modules.values())
|
|
)
|
|
|
|
module_queries = {
|
|
modname: q
|
|
for modname, module in dev._modules.items()
|
|
if (q := module.query()) and modname not in critical_modules
|
|
}
|
|
|
|
async def _query(request, *args, **kwargs):
|
|
if (
|
|
"component_nego" in request
|
|
or "get_child_device_component_list" in request
|
|
or "control_child" in request
|
|
):
|
|
return await dev.protocol._query(request, *args, **kwargs)
|
|
if len(request) == 1 and "get_device_info" in request:
|
|
return await dev.protocol._query(request, *args, **kwargs)
|
|
|
|
raise TimeoutError("Dummy timeout")
|
|
|
|
from kasa.smartprotocol import _ChildProtocolWrapper
|
|
|
|
child_protocols = {
|
|
cast(_ChildProtocolWrapper, child.protocol)._device_id: child.protocol
|
|
for child in dev.children
|
|
}
|
|
|
|
async def _child_query(self, request, *args, **kwargs):
|
|
return await child_protocols[self._device_id]._query(request, *args, **kwargs)
|
|
|
|
mocker.patch.object(new_dev.protocol, "query", side_effect=_query)
|
|
# children not created yet so cannot patch.object
|
|
mocker.patch("kasa.smartprotocol._ChildProtocolWrapper.query", new=_child_query)
|
|
|
|
await new_dev.update()
|
|
msg = f"Error querying {new_dev.host} for modules"
|
|
assert msg in caplog.text
|
|
for modname in module_queries:
|
|
no_disable = modname in not_disabling_modules
|
|
mod_present = modname in new_dev._modules
|
|
assert (
|
|
mod_present is no_disable
|
|
), f"{modname} present {mod_present} when no_disable {no_disable}"
|
|
for mod_query in module_queries[modname]:
|
|
if not first_update or mod_query not in first_update_queries:
|
|
msg = f"Error querying {new_dev.host} individually for module query '{mod_query}"
|
|
assert msg in caplog.text
|
|
|
|
|
|
async def test_get_modules():
|
|
"""Test getting modules for child and parent modules."""
|
|
dummy_device = await get_device_for_fixture_protocol(
|
|
"KS240(US)_1.0_1.0.5.json", "SMART"
|
|
)
|
|
from kasa.smart.modules import Cloud
|
|
|
|
# Modules on device
|
|
module = dummy_device.modules.get("Cloud")
|
|
assert module
|
|
assert module._device == dummy_device
|
|
assert isinstance(module, Cloud)
|
|
|
|
module = dummy_device.modules.get(Module.Cloud)
|
|
assert module
|
|
assert module._device == dummy_device
|
|
assert isinstance(module, Cloud)
|
|
|
|
# Modules on child
|
|
module = dummy_device.modules.get("Fan")
|
|
assert module is None
|
|
module = next(get_parent_and_child_modules(dummy_device, "Fan"))
|
|
assert module
|
|
assert module._device != dummy_device
|
|
assert module._device._parent == dummy_device
|
|
|
|
# Invalid modules
|
|
module = dummy_device.modules.get("DummyModule")
|
|
assert module is None
|
|
|
|
module = dummy_device.modules.get(Module.IotAmbientLight)
|
|
assert module is None
|
|
|
|
|
|
@device_smart
|
|
async def test_smartdevice_cloud_connection(dev: SmartDevice, mocker: MockerFixture):
|
|
"""Test is_cloud_connected property."""
|
|
assert isinstance(dev, SmartDevice)
|
|
assert "cloud_connect" in dev._components
|
|
|
|
is_connected = (
|
|
(cc := dev._last_update.get("get_connect_cloud_state"))
|
|
and not isinstance(cc, SmartErrorCode)
|
|
and cc["status"] == 0
|
|
)
|
|
|
|
assert dev.is_cloud_connected == is_connected
|
|
last_update = dev._last_update
|
|
|
|
for child in dev.children:
|
|
mocker.patch.object(child.protocol, "query", return_value=child._last_update)
|
|
|
|
last_update["get_connect_cloud_state"] = {"status": 0}
|
|
with patch.object(dev.protocol, "query", return_value=last_update):
|
|
await dev.update()
|
|
assert dev.is_cloud_connected is True
|
|
|
|
last_update["get_connect_cloud_state"] = {"status": 1}
|
|
with patch.object(dev.protocol, "query", return_value=last_update):
|
|
await dev.update()
|
|
assert dev.is_cloud_connected is False
|
|
|
|
last_update["get_connect_cloud_state"] = SmartErrorCode.UNKNOWN_METHOD_ERROR
|
|
with patch.object(dev.protocol, "query", return_value=last_update):
|
|
await dev.update()
|
|
assert dev.is_cloud_connected is False
|
|
|
|
# Test for no cloud_connect component during device initialisation
|
|
component_list = [
|
|
val
|
|
for val in dev._components_raw["component_list"]
|
|
if val["id"] not in {"cloud_connect"}
|
|
]
|
|
initial_response = {
|
|
"component_nego": {"component_list": component_list},
|
|
"get_connect_cloud_state": last_update["get_connect_cloud_state"],
|
|
"get_device_info": last_update["get_device_info"],
|
|
}
|
|
|
|
new_dev = SmartDevice("127.0.0.1", protocol=dev.protocol)
|
|
|
|
first_call = True
|
|
|
|
async def side_effect_func(*args, **kwargs):
|
|
nonlocal first_call
|
|
resp = (
|
|
initial_response
|
|
if first_call
|
|
else await new_dev.protocol._query(*args, **kwargs)
|
|
)
|
|
first_call = False
|
|
return resp
|
|
|
|
with patch.object(
|
|
new_dev.protocol,
|
|
"query",
|
|
side_effect=side_effect_func,
|
|
):
|
|
await new_dev.update()
|
|
assert new_dev.is_cloud_connected is False
|