Merge branch 'master' into feat/parent_child_updates

This commit is contained in:
Steven B 2024-07-04 13:39:10 +01:00 committed by GitHub
commit 2b6500fa05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 682 additions and 30 deletions

View File

@ -65,6 +65,7 @@ Some newer Kasa devices require authentication. These are marked with <sup>*</su
- **KP400** - **KP400**
- Hardware: 1.0 (US) / Firmware: 1.0.10 - Hardware: 1.0 (US) / Firmware: 1.0.10
- Hardware: 2.0 (US) / Firmware: 1.0.6 - Hardware: 2.0 (US) / Firmware: 1.0.6
- Hardware: 3.0 (US) / Firmware: 1.0.3
### Wall Switches ### Wall Switches
@ -81,6 +82,7 @@ Some newer Kasa devices require authentication. These are marked with <sup>*</su
- **KP405** - **KP405**
- Hardware: 1.0 (US) / Firmware: 1.0.5 - Hardware: 1.0 (US) / Firmware: 1.0.5
- **KS200M** - **KS200M**
- Hardware: 1.0 (US) / Firmware: 1.0.11
- Hardware: 1.0 (US) / Firmware: 1.0.8 - Hardware: 1.0 (US) / Firmware: 1.0.8
- **KS205** - **KS205**
- Hardware: 1.0 (US) / Firmware: 1.0.2<sup>\*</sup> - Hardware: 1.0 (US) / Firmware: 1.0.2<sup>\*</sup>
@ -88,6 +90,7 @@ Some newer Kasa devices require authentication. These are marked with <sup>*</su
- Hardware: 1.0 (US) / Firmware: 1.0.4 - Hardware: 1.0 (US) / Firmware: 1.0.4
- **KS225** - **KS225**
- Hardware: 1.0 (US) / Firmware: 1.0.2<sup>\*</sup> - Hardware: 1.0 (US) / Firmware: 1.0.2<sup>\*</sup>
- Hardware: 1.0 (US) / Firmware: 1.1.0<sup>\*</sup>
- **KS230** - **KS230**
- Hardware: 1.0 (US) / Firmware: 1.0.14 - Hardware: 1.0 (US) / Firmware: 1.0.14
- **KS240** - **KS240**

View File

@ -284,6 +284,15 @@ class SmartRequest:
"""Get preset rules.""" """Get preset rules."""
return SmartRequest("get_preset_rules", params or SmartRequest.GetRulesParams()) return SmartRequest("get_preset_rules", params or SmartRequest.GetRulesParams())
@staticmethod
def get_on_off_gradually_info(
params: SmartRequestParams | None = None,
) -> SmartRequest:
"""Get preset rules."""
return SmartRequest(
"get_on_off_gradually_info", params or SmartRequest.SmartRequestParams()
)
@staticmethod @staticmethod
def get_auto_light_info() -> SmartRequest: def get_auto_light_info() -> SmartRequest:
"""Get auto light info.""" """Get auto light info."""
@ -382,7 +391,7 @@ COMPONENT_REQUESTS = {
"auto_light": [SmartRequest.get_auto_light_info()], "auto_light": [SmartRequest.get_auto_light_info()],
"light_effect": [SmartRequest.get_dynamic_light_effect_rules()], "light_effect": [SmartRequest.get_dynamic_light_effect_rules()],
"bulb_quick_control": [], "bulb_quick_control": [],
"on_off_gradually": [SmartRequest.get_raw_request("get_on_off_gradually_info")], "on_off_gradually": [SmartRequest.get_on_off_gradually_info()],
"light_strip": [], "light_strip": [],
"light_strip_lighting_effect": [ "light_strip_lighting_effect": [
SmartRequest.get_raw_request("get_lighting_effect") SmartRequest.get_raw_request("get_lighting_effect")

View File

@ -19,12 +19,6 @@ class AutoOff(SmartModule):
def _initialize_features(self): def _initialize_features(self):
"""Initialize features after the initial update.""" """Initialize features after the initial update."""
if not isinstance(self.data, dict):
_LOGGER.warning(
"No data available for module, skipping %s: %s", self, self.data
)
return
self._add_feature( self._add_feature(
Feature( Feature(
self._device, self._device,

View File

@ -43,6 +43,10 @@ class BatterySensor(SmartModule):
) )
) )
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {}
@property @property
def battery(self): def battery(self):
"""Return battery level.""" """Return battery level."""

View File

@ -4,7 +4,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from ...exceptions import SmartErrorCode
from ...feature import Feature from ...feature import Feature
from ..smartmodule import SmartModule from ..smartmodule import SmartModule
@ -18,6 +17,13 @@ class Cloud(SmartModule):
QUERY_GETTER_NAME = "get_connect_cloud_state" QUERY_GETTER_NAME = "get_connect_cloud_state"
REQUIRED_COMPONENT = "cloud_connect" REQUIRED_COMPONENT = "cloud_connect"
def _post_update_hook(self):
"""Perform actions after a device update.
Overrides the default behaviour to disable a module if the query returns
an error because the logic here is to treat that as not connected.
"""
def __init__(self, device: SmartDevice, module: str): def __init__(self, device: SmartDevice, module: str):
super().__init__(device, module) super().__init__(device, module)
@ -37,6 +43,6 @@ class Cloud(SmartModule):
@property @property
def is_connected(self): def is_connected(self):
"""Return True if device is connected to the cloud.""" """Return True if device is connected to the cloud."""
if isinstance(self.data, SmartErrorCode): if self._has_data_error():
return False return False
return self.data["status"] == 0 return self.data["status"] == 0

View File

@ -10,6 +10,13 @@ class DeviceModule(SmartModule):
REQUIRED_COMPONENT = "device" REQUIRED_COMPONENT = "device"
def _post_update_hook(self):
"""Perform actions after a device update.
Overrides the default behaviour to disable a module if the query returns
an error because this module is critical.
"""
def query(self) -> dict: def query(self) -> dict:
"""Query to execute during the update cycle.""" """Query to execute during the update cycle."""
query = { query = {

View File

@ -13,7 +13,6 @@ from typing import TYPE_CHECKING, Any, Callable, Optional
from async_timeout import timeout as asyncio_timeout from async_timeout import timeout as asyncio_timeout
from pydantic.v1 import BaseModel, Field, validator from pydantic.v1 import BaseModel, Field, validator
from ...exceptions import SmartErrorCode
from ...feature import Feature from ...feature import Feature
from ..smartmodule import SmartModule from ..smartmodule import SmartModule
@ -123,6 +122,13 @@ class Firmware(SmartModule):
req["get_auto_update_info"] = None req["get_auto_update_info"] = None
return req return req
def _post_update_hook(self):
"""Perform actions after a device update.
Overrides the default behaviour to disable a module if the query returns
an error because some of the module still functions.
"""
@property @property
def current_firmware(self) -> str: def current_firmware(self) -> str:
"""Return the current firmware version.""" """Return the current firmware version."""
@ -136,11 +142,11 @@ class Firmware(SmartModule):
@property @property
def firmware_update_info(self): def firmware_update_info(self):
"""Return latest firmware information.""" """Return latest firmware information."""
fw = self.data.get("get_latest_fw") or self.data if not self._device.is_cloud_connected or self._has_data_error():
if not self._device.is_cloud_connected or isinstance(fw, SmartErrorCode):
# Error in response, probably disconnected from the cloud. # Error in response, probably disconnected from the cloud.
return UpdateInfo(type=0, need_to_upgrade=False) return UpdateInfo(type=0, need_to_upgrade=False)
fw = self.data.get("get_latest_fw") or self.data
return UpdateInfo.parse_obj(fw) return UpdateInfo.parse_obj(fw)
@property @property

View File

@ -14,6 +14,10 @@ class FrostProtection(SmartModule):
REQUIRED_COMPONENT = "frost_protection" REQUIRED_COMPONENT = "frost_protection"
QUERY_GETTER_NAME = "get_frost_protection" QUERY_GETTER_NAME = "get_frost_protection"
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {}
@property @property
def enabled(self) -> bool: def enabled(self) -> bool:
"""Return True if frost protection is on.""" """Return True if frost protection is on."""

View File

@ -45,6 +45,10 @@ class HumiditySensor(SmartModule):
) )
) )
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {}
@property @property
def humidity(self): def humidity(self):
"""Return current humidity in percentage.""" """Return current humidity in percentage."""

View File

@ -140,7 +140,7 @@ class LightPreset(SmartModule, LightPresetInterface):
"""Query to execute during the update cycle.""" """Query to execute during the update cycle."""
if self._state_in_sysinfo: # Child lights can have states in the child info if self._state_in_sysinfo: # Child lights can have states in the child info
return {} return {}
return {self.QUERY_GETTER_NAME: None} return {self.QUERY_GETTER_NAME: {"start_index": 0}}
async def _check_supported(self): async def _check_supported(self):
"""Additional check to see if the module is supported by the device. """Additional check to see if the module is supported by the device.

View File

@ -230,7 +230,7 @@ class LightTransition(SmartModule):
if self._state_in_sysinfo: if self._state_in_sysinfo:
return {} return {}
else: else:
return {self.QUERY_GETTER_NAME: None} return {self.QUERY_GETTER_NAME: {}}
async def _check_supported(self): async def _check_supported(self):
"""Additional check to see if the module is supported by the device.""" """Additional check to see if the module is supported by the device."""

View File

@ -32,6 +32,10 @@ class ReportMode(SmartModule):
) )
) )
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {}
@property @property
def report_interval(self): def report_interval(self):
"""Reporting interval of a sensor device.""" """Reporting interval of a sensor device."""

View File

@ -58,6 +58,10 @@ class TemperatureSensor(SmartModule):
) )
) )
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {}
@property @property
def temperature(self): def temperature(self):
"""Return current humidity in percentage.""" """Return current humidity in percentage."""

View File

@ -201,11 +201,20 @@ class SmartDevice(Device):
self._children[info["device_id"]]._update_internal_state(info) self._children[info["device_id"]]._update_internal_state(info)
# Call handle update for modules that want to update internal data # Call handle update for modules that want to update internal data
for module in self._modules.values(): errors = []
module._post_update_hook() for module_name, module in self._modules.items():
if not self._handle_module_post_update_hook(module):
errors.append(module_name)
for error in errors:
self._modules.pop(error)
for child in self._children.values(): for child in self._children.values():
for child_module in child._modules.values(): errors = []
child_module._post_update_hook() for child_module_name, child_module in child._modules.items():
if not self._handle_module_post_update_hook(child_module):
errors.append(child_module_name)
for error in errors:
child._modules.pop(error)
# We can first initialize the features after the first update. # We can first initialize the features after the first update.
# We make here an assumption that every device has at least a single feature. # We make here an assumption that every device has at least a single feature.
@ -214,6 +223,19 @@ class SmartDevice(Device):
_LOGGER.debug("Got an update: %s", self._last_update) _LOGGER.debug("Got an update: %s", self._last_update)
def _handle_module_post_update_hook(self, module: SmartModule) -> bool:
try:
module._post_update_hook()
return True
except Exception as ex:
_LOGGER.error(
"Error processing %s for device %s, module will be unavailable: %s",
module.name,
self.host,
ex,
)
return False
async def _initialize_modules(self): async def _initialize_modules(self):
"""Initialize modules based on component negotiation response.""" """Initialize modules based on component negotiation response."""
from .smartmodule import SmartModule from .smartmodule import SmartModule

View File

@ -5,7 +5,7 @@ from __future__ import annotations
import logging import logging
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from ..exceptions import KasaException from ..exceptions import DeviceError, KasaException, SmartErrorCode
from ..module import Module from ..module import Module
if TYPE_CHECKING: if TYPE_CHECKING:
@ -41,6 +41,14 @@ class SmartModule(Module):
"""Name of the module.""" """Name of the module."""
return getattr(self, "NAME", self.__class__.__name__) return getattr(self, "NAME", self.__class__.__name__)
def _post_update_hook(self): # noqa: B027
"""Perform actions after a device update.
Any modules overriding this should ensure that self.data is
accessed unless the module should remain active despite errors.
"""
assert self.data # noqa: S101
def query(self) -> dict: def query(self) -> dict:
"""Query to execute during the update cycle. """Query to execute during the update cycle.
@ -87,6 +95,11 @@ class SmartModule(Module):
filtered_data = {k: v for k, v in dev._last_update.items() if k in q_keys} filtered_data = {k: v for k, v in dev._last_update.items() if k in q_keys}
for data_item in filtered_data:
if isinstance(filtered_data[data_item], SmartErrorCode):
raise DeviceError(
f"{data_item} for {self.name}", error_code=filtered_data[data_item]
)
if len(filtered_data) == 1: if len(filtered_data) == 1:
return next(iter(filtered_data.values())) return next(iter(filtered_data.values()))
@ -110,3 +123,10 @@ class SmartModule(Module):
color_temp_range but only supports one value. color_temp_range but only supports one value.
""" """
return True return True
def _has_data_error(self) -> bool:
try:
assert self.data # noqa: S101
return False
except DeviceError:
return True

View File

@ -416,6 +416,10 @@ class _ChildProtocolWrapper(SmartProtocol):
return smart_method, smart_params return smart_method, smart_params
async def query(self, request: str | dict, retry_count: int = 3) -> dict: async def query(self, request: str | dict, retry_count: int = 3) -> dict:
"""Wrap request inside control_child envelope."""
return await self._query(request, retry_count)
async def _query(self, request: str | dict, retry_count: int = 3) -> dict:
"""Wrap request inside control_child envelope.""" """Wrap request inside control_child envelope."""
method, params = self._get_method_and_params_for_request(request) method, params = self._get_method_and_params_for_request(request)
request_data = { request_data = {

View File

@ -0,0 +1,45 @@
{
"system": {
"get_sysinfo": {
"alias": "#MASKED_NAME#",
"child_num": 2,
"children": [
{
"alias": "#MASKED_NAME#",
"id": "8006521377E30159055A751347B5A5E321A8D0A100",
"next_action": {
"type": -1
},
"on_time": 4024,
"state": 1
},
{
"alias": "#MASKED_NAME#",
"id": "8006521377E30159055A751347B5A5E321A8D0A101",
"next_action": {
"type": -1
},
"on_time": 4024,
"state": 1
}
],
"deviceId": "0000000000000000000000000000000000000000",
"err_code": 0,
"feature": "TIM",
"hwId": "00000000000000000000000000000000",
"hw_ver": "3.0",
"latitude_i": 0,
"led_off": 0,
"longitude_i": 0,
"mac": "3C:52:A1:00:00:00",
"mic_type": "IOT.SMARTPLUGSWITCH",
"model": "KP400(US)",
"ntc_state": 0,
"oemId": "00000000000000000000000000000000",
"rssi": -75,
"status": "new",
"sw_ver": "1.0.3 Build 220803 Rel.172301",
"updating": 0
}
}
}

View File

@ -0,0 +1,96 @@
{
"smartlife.iot.LAS": {
"get_config": {
"devs": [
{
"dark_index": 0,
"enable": 0,
"hw_id": 0,
"level_array": [
{
"adc": 390,
"name": "cloudy",
"value": 15
},
{
"adc": 300,
"name": "overcast",
"value": 12
},
{
"adc": 222,
"name": "dawn",
"value": 9
},
{
"adc": 222,
"name": "twilight",
"value": 9
},
{
"adc": 111,
"name": "total darkness",
"value": 4
},
{
"adc": 2400,
"name": "custom",
"value": 97
}
],
"max_adc": 2450,
"min_adc": 0
}
],
"err_code": 0,
"ver": "1.0"
}
},
"smartlife.iot.PIR": {
"get_config": {
"array": [
80,
50,
20,
0
],
"cold_time": 60000,
"enable": 0,
"err_code": 0,
"max_adc": 4095,
"min_adc": 0,
"trigger_index": 1,
"version": "1.0"
}
},
"system": {
"get_sysinfo": {
"active_mode": "none",
"alias": "#MASKED_NAME#",
"dev_name": "Smart Light Switch with PIR",
"deviceId": "0000000000000000000000000000000000000000",
"err_code": 0,
"feature": "TIM",
"hwId": "00000000000000000000000000000000",
"hw_ver": "1.0",
"icon_hash": "",
"latitude_i": 0,
"led_off": 0,
"longitude_i": 0,
"mac": "3C:52:A1:00:00:00",
"mic_type": "IOT.SMARTPLUGSWITCH",
"model": "KS200M(US)",
"next_action": {
"type": -1
},
"obd_src": "tplink",
"oemId": "00000000000000000000000000000000",
"on_time": 0,
"relay_state": 0,
"rssi": -40,
"status": "new",
"sw_ver": "1.0.11 Build 230113 Rel.151038",
"updating": 0
}
}
}

View File

@ -0,0 +1,332 @@
{
"component_nego": {
"component_list": [
{
"id": "device",
"ver_code": 2
},
{
"id": "firmware",
"ver_code": 2
},
{
"id": "quick_setup",
"ver_code": 3
},
{
"id": "inherit",
"ver_code": 1
},
{
"id": "time",
"ver_code": 1
},
{
"id": "wireless",
"ver_code": 1
},
{
"id": "schedule",
"ver_code": 2
},
{
"id": "countdown",
"ver_code": 2
},
{
"id": "antitheft",
"ver_code": 1
},
{
"id": "account",
"ver_code": 1
},
{
"id": "synchronize",
"ver_code": 1
},
{
"id": "sunrise_sunset",
"ver_code": 1
},
{
"id": "led",
"ver_code": 1
},
{
"id": "cloud_connect",
"ver_code": 1
},
{
"id": "iot_cloud",
"ver_code": 1
},
{
"id": "device_local_time",
"ver_code": 1
},
{
"id": "default_states",
"ver_code": 1
},
{
"id": "brightness",
"ver_code": 1
},
{
"id": "preset",
"ver_code": 1
},
{
"id": "on_off_gradually",
"ver_code": 2
},
{
"id": "dimmer_calibration",
"ver_code": 1
},
{
"id": "overheat_protection",
"ver_code": 1
},
{
"id": "matter",
"ver_code": 2
}
]
},
"discovery_result": {
"device_id": "00000000000000000000000000000000",
"device_model": "KS225(US)",
"device_type": "SMART.KASASWITCH",
"factory_default": false,
"ip": "127.0.0.123",
"is_support_iot_cloud": true,
"mac": "3C-52-A1-00-00-00",
"mgt_encrypt_schm": {
"encrypt_type": "KLAP",
"http_port": 80,
"is_support_https": false,
"lv": 2
},
"obd_src": "tplink",
"owner": ""
},
"get_antitheft_rules": {
"antitheft_rule_max_count": 1,
"enable": false,
"rule_list": []
},
"get_auto_update_info": {
"enable": true,
"random_range": 120,
"time": 180
},
"get_connect_cloud_state": {
"status": 0
},
"get_countdown_rules": {
"countdown_rule_max_count": 1,
"enable": false,
"rule_list": []
},
"get_device_info": {
"avatar": "switch_s500d",
"brightness": 5,
"default_states": {
"re_power_type": "always_off",
"re_power_type_capability": [
"last_states",
"always_on",
"always_off"
],
"type": "last_states"
},
"device_id": "0000000000000000000000000000000000000000",
"device_on": true,
"fw_id": "00000000000000000000000000000000",
"fw_ver": "1.1.0 Build 240411 Rel.150716",
"has_set_location_info": true,
"hw_id": "00000000000000000000000000000000",
"hw_ver": "1.0",
"ip": "127.0.0.123",
"lang": "en_US",
"latitude": 0,
"longitude": 0,
"mac": "3C-52-A1-00-00-00",
"model": "KS225",
"nickname": "I01BU0tFRF9OQU1FIw==",
"oem_id": "00000000000000000000000000000000",
"on_time": 88,
"overheat_status": "normal",
"region": "America/Toronto",
"rssi": -48,
"signal_level": 3,
"specs": "",
"ssid": "I01BU0tFRF9TU0lEIw==",
"time_diff": -300,
"type": "SMART.KASASWITCH"
},
"get_device_time": {
"region": "America/Toronto",
"time_diff": -300,
"timestamp": 1720036002
},
"get_device_usage": {
"time_usage": {
"past30": 1371,
"past7": 659,
"today": 58
}
},
"get_fw_download_state": {
"auto_upgrade": false,
"download_progress": 0,
"reboot_time": 5,
"status": 0,
"upgrade_time": 5
},
"get_inherit_info": null,
"get_latest_fw": {
"fw_size": 0,
"fw_ver": "1.1.0 Build 240411 Rel.150716",
"hw_id": "",
"need_to_upgrade": false,
"oem_id": "",
"release_date": "",
"release_note": "",
"type": 0
},
"get_led_info": {
"bri_config": {
"bri_type": "overall",
"overall_bri": 50
},
"led_rule": "always",
"led_status": false,
"night_mode": {
"end_time": 350,
"night_mode_type": "sunrise_sunset",
"start_time": 1266,
"sunrise_offset": 0,
"sunset_offset": 0
}
},
"get_matter_setup_info": {
"setup_code": "00000000000",
"setup_payload": "00:0000000000000000000"
},
"get_next_event": {},
"get_on_off_gradually_info": {
"off_state": {
"duration": 1,
"enable": true,
"max_duration": 60
},
"on_state": {
"duration": 1,
"enable": true,
"max_duration": 60
}
},
"get_preset_rules": {
"brightness": [
100,
75,
50,
25,
1
]
},
"get_schedule_rules": {
"enable": false,
"rule_list": [],
"schedule_rule_max_count": 32,
"start_index": 0,
"sum": 0
},
"get_wireless_scan_info": {
"ap_list": [
{
"bssid": "000000000000",
"channel": 0,
"cipher_type": 2,
"key_type": "wpa2_psk",
"signal_level": 3,
"ssid": "I01BU0tFRF9TU0lEIw=="
},
{
"bssid": "000000000000",
"channel": 0,
"cipher_type": 2,
"key_type": "wpa2_psk",
"signal_level": 3,
"ssid": "I01BU0tFRF9TU0lEIw=="
},
{
"bssid": "000000000000",
"channel": 0,
"cipher_type": 2,
"key_type": "wpa2_psk",
"signal_level": 1,
"ssid": "I01BU0tFRF9TU0lEIw=="
},
{
"bssid": "000000000000",
"channel": 0,
"cipher_type": 2,
"key_type": "wpa2_psk",
"signal_level": 1,
"ssid": "I01BU0tFRF9TU0lEIw=="
},
{
"bssid": "000000000000",
"channel": 0,
"cipher_type": 1,
"key_type": "wpa2_psk",
"signal_level": 1,
"ssid": "I01BU0tFRF9TU0lEIw=="
}
],
"start_index": 0,
"sum": 5,
"wep_supported": false
},
"qs_component_nego": {
"component_list": [
{
"id": "quick_setup",
"ver_code": 3
},
{
"id": "sunrise_sunset",
"ver_code": 1
},
{
"id": "ble_whole_setup",
"ver_code": 1
},
{
"id": "matter",
"ver_code": 2
},
{
"id": "iot_cloud",
"ver_code": 1
},
{
"id": "inherit",
"ver_code": 1
},
{
"id": "firmware",
"ver_code": 2
}
],
"extra_info": {
"device_model": "KS225",
"device_type": "SMART.KASASWITCH",
"is_klap": true
}
}
}

View File

@ -3,7 +3,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import Any from typing import Any, cast
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
@ -132,6 +132,78 @@ async def test_update_module_queries(dev: SmartDevice, mocker: MockerFixture):
spies[device].assert_not_called() spies[device].assert_not_called()
@device_smart
async def test_update_module_errors(dev: SmartDevice, mocker: MockerFixture):
"""Test that modules that error are disabled / removed."""
# We need to have some modules initialized by now
assert dev._modules
critical_modules = {Module.DeviceModule, Module.ChildDevice}
not_disabling_modules = {Module.Firmware, Module.Cloud}
new_dev = SmartDevice("127.0.0.1", protocol=dev.protocol)
module_queries = {
modname: q
for modname, module in dev._modules.items()
if (q := module.query()) and modname not in critical_modules
}
child_module_queries = {
modname: q
for child in dev.children
for modname, module in child._modules.items()
if (q := module.query()) and modname not in critical_modules
}
all_queries_names = {
key for mod_query in module_queries.values() for key in mod_query
}
all_child_queries_names = {
key for mod_query in child_module_queries.values() for key in mod_query
}
async def _query(request, *args, **kwargs):
responses = await dev.protocol._query(request, *args, **kwargs)
for k in responses:
if k in all_queries_names:
responses[k] = SmartErrorCode.PARAMS_ERROR
return responses
async def _child_query(self, request, *args, **kwargs):
responses = await child_protocols[self._device_id]._query(
request, *args, **kwargs
)
for k in responses:
if k in all_child_queries_names:
responses[k] = SmartErrorCode.PARAMS_ERROR
return responses
mocker.patch.object(new_dev.protocol, "query", side_effect=_query)
from kasa.smartprotocol import _ChildProtocolWrapper
child_protocols = {
cast(_ChildProtocolWrapper, child.protocol)._device_id: child.protocol
for child in dev.children
}
# children not created yet so cannot patch.object
mocker.patch("kasa.smartprotocol._ChildProtocolWrapper.query", new=_child_query)
await new_dev.update()
for modname in module_queries:
no_disable = modname in not_disabling_modules
mod_present = modname in new_dev._modules
assert (
mod_present is no_disable
), f"{modname} present {mod_present} when no_disable {no_disable}"
for modname in child_module_queries:
no_disable = modname in not_disabling_modules
mod_present = any(modname in child._modules for child in new_dev.children)
assert (
mod_present is no_disable
), f"{modname} present {mod_present} when no_disable {no_disable}"
async def test_get_modules(): async def test_get_modules():
"""Test getting modules for child and parent modules.""" """Test getting modules for child and parent modules."""
dummy_device = await get_device_for_fixture_protocol( dummy_device = await get_device_for_fixture_protocol(
@ -181,6 +253,9 @@ async def test_smartdevice_cloud_connection(dev: SmartDevice, mocker: MockerFixt
assert dev.is_cloud_connected == is_connected assert dev.is_cloud_connected == is_connected
last_update = dev._last_update last_update = dev._last_update
for child in dev.children:
mocker.patch.object(child.protocol, "query", return_value=child._last_update)
last_update["get_connect_cloud_state"] = {"status": 0} last_update["get_connect_cloud_state"] = {"status": 0}
with patch.object(dev.protocol, "query", return_value=last_update): with patch.object(dev.protocol, "query", return_value=last_update):
await dev.update() await dev.update()
@ -207,21 +282,18 @@ async def test_smartdevice_cloud_connection(dev: SmartDevice, mocker: MockerFixt
"get_connect_cloud_state": last_update["get_connect_cloud_state"], "get_connect_cloud_state": last_update["get_connect_cloud_state"],
"get_device_info": last_update["get_device_info"], "get_device_info": last_update["get_device_info"],
} }
# Child component list is not stored on the device
if "get_child_device_list" in last_update:
child_component_list = await dev.protocol.query(
"get_child_device_component_list"
)
last_update["get_child_device_component_list"] = child_component_list[
"get_child_device_component_list"
]
new_dev = SmartDevice("127.0.0.1", protocol=dev.protocol) new_dev = SmartDevice("127.0.0.1", protocol=dev.protocol)
first_call = True first_call = True
def side_effect_func(*_, **__): async def side_effect_func(*args, **kwargs):
nonlocal first_call nonlocal first_call
resp = initial_response if first_call else last_update resp = (
initial_response
if first_call
else await new_dev.protocol._query(*args, **kwargs)
)
first_call = False first_call = False
return resp return resp

View File

@ -1,6 +1,7 @@
import logging import logging
import pytest import pytest
import pytest_mock
from ..exceptions import ( from ..exceptions import (
SMART_RETRYABLE_ERRORS, SMART_RETRYABLE_ERRORS,
@ -19,6 +20,21 @@ DUMMY_MULTIPLE_QUERY = {
ERRORS = [e for e in SmartErrorCode if e != 0] ERRORS = [e for e in SmartErrorCode if e != 0]
async def test_smart_queries(dummy_protocol, mocker: pytest_mock.MockerFixture):
mock_response = {"result": {"great": "success"}, "error_code": 0}
mocker.patch.object(dummy_protocol._transport, "send", return_value=mock_response)
# test sending a method name as a string
resp = await dummy_protocol.query("foobar")
assert "foobar" in resp
assert resp["foobar"] == mock_response["result"]
# test sending a method name as a dict
resp = await dummy_protocol.query(DUMMY_QUERY)
assert "foobar" in resp
assert resp["foobar"] == mock_response["result"]
@pytest.mark.parametrize("error_code", ERRORS, ids=lambda e: e.name) @pytest.mark.parametrize("error_code", ERRORS, ids=lambda e: e.name)
async def test_smart_device_errors(dummy_protocol, mocker, error_code): async def test_smart_device_errors(dummy_protocol, mocker, error_code):
mock_response = {"result": {"great": "success"}, "error_code": error_code.value} mock_response = {"result": {"great": "success"}, "error_code": error_code.value}