Merge remote-tracking branch 'upstream/master' into feat/parent_child_updates

This commit is contained in:
sdb9696
2024-07-02 14:40:08 +01:00
31 changed files with 1360 additions and 206 deletions

View File

@@ -234,8 +234,8 @@ class FakeIotTransport(BaseTransport):
return 9999
@property
def credentials_hash(self) -> str:
return ""
def credentials_hash(self) -> None:
return None
def set_alias(self, x, child_ids=None):
if child_ids is None:

View File

@@ -250,18 +250,31 @@ class FakeSmartTransport(BaseTransport):
info["get_dynamic_light_effect_rules"]["enable"] = params["enable"]
if params["enable"]:
info["get_device_info"]["dynamic_light_effect_id"] = params["id"]
info["get_dynamic_light_effect_rules"]["current_rule_id"] = params["enable"]
info["get_dynamic_light_effect_rules"]["current_rule_id"] = params["id"]
else:
if "dynamic_light_effect_id" in info["get_device_info"]:
del info["get_device_info"]["dynamic_light_effect_id"]
if "current_rule_id" in info["get_dynamic_light_effect_rules"]:
del info["get_dynamic_light_effect_rules"]["current_rule_id"]
def _set_edit_dynamic_light_effect_rule(self, info, params):
"""Edit dynamic light effect rule."""
rules = info["get_dynamic_light_effect_rules"]["rule_list"]
for rule in rules:
if rule["id"] == params["id"]:
rule.update(params)
return
raise Exception("Unable to find rule with id")
def _set_light_strip_effect(self, info, params):
"""Set or remove values as per the device behaviour."""
info["get_device_info"]["lighting_effect"]["enable"] = params["enable"]
info["get_device_info"]["lighting_effect"]["name"] = params["name"]
info["get_device_info"]["lighting_effect"]["id"] = params["id"]
# Brightness is not always available
if (brightness := params.get("brightness")) is not None:
info["get_device_info"]["lighting_effect"]["brightness"] = brightness
info["get_lighting_effect"] = copy.deepcopy(params)
def _set_led_info(self, info, params):
@@ -365,6 +378,9 @@ class FakeSmartTransport(BaseTransport):
elif method == "set_dynamic_light_effect_rule_enable":
self._set_dynamic_light_effect(info, params)
return {"error_code": 0}
elif method == "edit_dynamic_light_effect_rule":
self._set_edit_dynamic_light_effect_rule(info, params)
return {"error_code": 0}
elif method == "set_lighting_effect":
self._set_light_strip_effect(info, params)
return {"error_code": 0}

View File

@@ -0,0 +1,436 @@
{
"component_nego": {
"component_list": [
{
"id": "device",
"ver_code": 2
},
{
"id": "light_strip",
"ver_code": 1
},
{
"id": "light_strip_lighting_effect",
"ver_code": 2
},
{
"id": "firmware",
"ver_code": 2
},
{
"id": "quick_setup",
"ver_code": 3
},
{
"id": "inherit",
"ver_code": 1
},
{
"id": "time",
"ver_code": 1
},
{
"id": "wireless",
"ver_code": 1
},
{
"id": "schedule",
"ver_code": 2
},
{
"id": "countdown",
"ver_code": 2
},
{
"id": "antitheft",
"ver_code": 1
},
{
"id": "account",
"ver_code": 1
},
{
"id": "synchronize",
"ver_code": 1
},
{
"id": "sunrise_sunset",
"ver_code": 1
},
{
"id": "brightness",
"ver_code": 1
},
{
"id": "cloud_connect",
"ver_code": 1
},
{
"id": "color_temperature",
"ver_code": 1
},
{
"id": "default_states",
"ver_code": 1
},
{
"id": "preset",
"ver_code": 3
},
{
"id": "color",
"ver_code": 1
},
{
"id": "on_off_gradually",
"ver_code": 1
},
{
"id": "device_local_time",
"ver_code": 1
},
{
"id": "iot_cloud",
"ver_code": 1
},
{
"id": "music_rhythm",
"ver_code": 3
},
{
"id": "bulb_quick_control",
"ver_code": 1
},
{
"id": "localSmart",
"ver_code": 1
},
{
"id": "segment",
"ver_code": 1
},
{
"id": "segment_effect",
"ver_code": 1
}
]
},
"discovery_result": {
"device_id": "00000000000000000000000000000000",
"device_model": "L920-5(EU)",
"device_type": "SMART.TAPOBULB",
"factory_default": false,
"ip": "127.0.0.123",
"is_support_iot_cloud": true,
"mac": "1C-61-B4-00-00-00",
"mgt_encrypt_schm": {
"encrypt_type": "KLAP",
"http_port": 80,
"is_support_https": false,
"lv": 2
},
"obd_src": "tplink",
"owner": "00000000000000000000000000000000"
},
"get_antitheft_rules": {
"antitheft_rule_max_count": 1,
"enable": false,
"rule_list": []
},
"get_auto_update_info": {
"enable": false,
"random_range": 120,
"time": 180
},
"get_connect_cloud_state": {
"status": 1
},
"get_countdown_rules": {
"countdown_rule_max_count": 1,
"enable": false,
"rule_list": []
},
"get_device_info": {
"avatar": "light_strip",
"brightness": 65,
"color_temp": 0,
"color_temp_range": [
9000,
9000
],
"default_states": {
"state": {
"brightness": 65,
"color_temp": 0,
"hue": 9,
"saturation": 67
},
"type": "last_states"
},
"device_id": "0000000000000000000000000000000000000000",
"device_on": false,
"fw_id": "00000000000000000000000000000000",
"fw_ver": "1.1.3 Build 231229 Rel.164316",
"has_set_location_info": false,
"hue": 9,
"hw_id": "00000000000000000000000000000000",
"hw_ver": "1.0",
"ip": "127.0.0.123",
"lang": "de_DE",
"lighting_effect": {
"brightness": 65,
"custom": 0,
"display_colors": [
[
136,
98,
100
],
[
350,
97,
100
]
],
"enable": 0,
"id": "TapoStrip_5zkiG6avJ1IbhjiZbRlWvh",
"name": "Christmas"
},
"mac": "1C-61-B4-00-00-00",
"model": "L920",
"music_rhythm_enable": false,
"music_rhythm_mode": "single_lamp",
"nickname": "I01BU0tFRF9OQU1FIw==",
"oem_id": "00000000000000000000000000000000",
"overheated": false,
"region": "Europe/Berlin",
"rssi": -56,
"saturation": 67,
"segment_effect": {
"brightness": 97,
"custom": 0,
"display_colors": [],
"enable": 0,
"id": "",
"name": "Lightning"
},
"signal_level": 2,
"specs": "",
"ssid": "I01BU0tFRF9TU0lEIw==",
"time_diff": 60,
"type": "SMART.TAPOBULB"
},
"get_device_segment": {
"segment": 50
},
"get_device_time": {
"region": "Europe/Berlin",
"time_diff": 60,
"timestamp": 1719920893
},
"get_device_usage": {
"power_usage": {
"past30": 20,
"past7": 20,
"today": 0
},
"saved_power": {
"past30": 319,
"past7": 319,
"today": 0
},
"time_usage": {
"past30": 339,
"past7": 339,
"today": 0
}
},
"get_fw_download_state": {
"auto_upgrade": false,
"download_progress": 0,
"reboot_time": 5,
"status": 0,
"upgrade_time": 5
},
"get_inherit_info": null,
"get_lighting_effect": {
"backgrounds": [
[
136,
98,
75
],
[
136,
0,
0
],
[
350,
0,
100
],
[
350,
97,
94
]
],
"brightness": 65,
"brightness_range": [
50,
100
],
"custom": 0,
"display_colors": [
[
136,
98,
100
],
[
350,
97,
100
]
],
"duration": 5000,
"enable": 0,
"expansion_strategy": 1,
"fadeoff": 2000,
"hue_range": [
136,
146
],
"id": "TapoStrip_5zkiG6avJ1IbhjiZbRlWvh",
"init_states": [
[
136,
0,
100
]
],
"name": "Christmas",
"random_seed": 100,
"saturation_range": [
90,
100
],
"segments": [
0
],
"transition": 0,
"type": "random"
},
"get_next_event": {},
"get_on_off_gradually_info": {
"enable": true
},
"get_preset_rules": {
"start_index": 0,
"states": [
{
"brightness": 50,
"color_temp": 9000,
"hue": 0,
"saturation": 100
},
{
"brightness": 100,
"color_temp": 0,
"hue": 240,
"saturation": 100
},
{
"brightness": 100,
"color_temp": 0,
"hue": 0,
"saturation": 100
},
{
"brightness": 100,
"color_temp": 0,
"hue": 120,
"saturation": 100
},
{
"brightness": 100,
"color_temp": 0,
"hue": 277,
"saturation": 86
},
{
"brightness": 100,
"color_temp": 0,
"hue": 60,
"saturation": 100
},
{
"brightness": 100,
"color_temp": 0,
"hue": 300,
"saturation": 100
}
],
"sum": 7
},
"get_schedule_rules": {
"enable": false,
"rule_list": [],
"schedule_rule_max_count": 24,
"start_index": 0,
"sum": 0
},
"get_segment_effect_rule": {
"brightness": 97,
"custom": 0,
"display_colors": [],
"enable": 0,
"id": "",
"name": "Lightning"
},
"get_wireless_scan_info": {
"ap_list": [
{
"bssid": "000000000000",
"channel": 0,
"cipher_type": 2,
"key_type": "wpa2_psk",
"signal_level": 2,
"ssid": "I01BU0tFRF9TU0lEIw=="
}
],
"start_index": 0,
"sum": 1,
"wep_supported": false
},
"qs_component_nego": {
"component_list": [
{
"id": "quick_setup",
"ver_code": 3
},
{
"id": "sunrise_sunset",
"ver_code": 1
},
{
"id": "inherit",
"ver_code": 1
},
{
"id": "iot_cloud",
"ver_code": 1
},
{
"id": "firmware",
"ver_code": 2
}
],
"extra_info": {
"device_model": "L920",
"device_type": "SMART.TAPOBULB",
"is_klap": true
}
}
}

View File

@@ -39,3 +39,45 @@ async def test_light_effect(dev: Device, mocker: MockerFixture):
with pytest.raises(ValueError):
await light_effect.set_effect("foobar")
@light_effect
@pytest.mark.parametrize("effect_active", [True, False])
async def test_light_effect_brightness(
dev: Device, effect_active: bool, mocker: MockerFixture
):
"""Test that light module uses light_effect for brightness when active."""
light_module = dev.modules[Module.Light]
light_effect = dev.modules[Module.SmartLightEffect]
light_effect_set_brightness = mocker.spy(light_effect, "set_brightness")
mock_light_effect_call = mocker.patch.object(light_effect, "call")
brightness = dev.modules[Module.Brightness]
brightness_set_brightness = mocker.spy(brightness, "set_brightness")
mock_brightness_call = mocker.patch.object(brightness, "call")
mocker.patch.object(
type(light_effect),
"is_active",
new_callable=mocker.PropertyMock,
return_value=effect_active,
)
if effect_active: # Set the rule L1 active for testing
light_effect.data["current_rule_id"] = "L1"
await light_module.set_brightness(10)
if effect_active:
assert light_effect.is_active
assert light_effect.brightness == dev.brightness
light_effect_set_brightness.assert_called_with(10)
mock_light_effect_call.assert_called_with(
"edit_dynamic_light_effect_rule", mocker.ANY
)
else:
assert not light_effect.is_active
brightness_set_brightness.assert_called_with(10)
mock_brightness_call.assert_called_with("set_device_info", {"brightness": 10})

View File

@@ -0,0 +1,101 @@
from __future__ import annotations
from itertools import chain
import pytest
from pytest_mock import MockerFixture
from kasa import Device, Feature, Module
from kasa.smart.modules import LightEffect, LightStripEffect
from kasa.tests.device_fixtures import parametrize
light_strip_effect = parametrize(
"has light strip effect",
component_filter="light_strip_lighting_effect",
protocol_filter={"SMART"},
)
@light_strip_effect
async def test_light_strip_effect(dev: Device, mocker: MockerFixture):
"""Test light strip effect."""
light_effect = dev.modules.get(Module.LightEffect)
assert isinstance(light_effect, LightStripEffect)
brightness = dev.modules[Module.Brightness]
feature = dev.features["light_effect"]
assert feature.type == Feature.Type.Choice
call = mocker.spy(light_effect, "call")
light = dev.modules[Module.Light]
light_call = mocker.spy(light, "call")
assert feature.choices == light_effect.effect_list
assert feature.choices
for effect in chain(reversed(feature.choices), feature.choices):
await light_effect.set_effect(effect)
if effect == LightEffect.LIGHT_EFFECTS_OFF:
light_call.assert_called()
continue
# Start with the current effect data
params = light_effect.data["lighting_effect"]
enable = effect != LightEffect.LIGHT_EFFECTS_OFF
params["enable"] = enable
if enable:
params = light_effect._effect_mapping[effect]
params["enable"] = enable
params["brightness"] = brightness.brightness # use the existing brightness
call.assert_called_with("set_lighting_effect", params)
await dev.update()
assert light_effect.effect == effect
assert feature.value == effect
with pytest.raises(ValueError):
await light_effect.set_effect("foobar")
@light_strip_effect
@pytest.mark.parametrize("effect_active", [True, False])
async def test_light_effect_brightness(
dev: Device, effect_active: bool, mocker: MockerFixture
):
"""Test that light module uses light_effect for brightness when active."""
light_module = dev.modules[Module.Light]
light_effect = dev.modules[Module.SmartLightEffect]
light_effect_set_brightness = mocker.spy(light_effect, "set_brightness")
mock_light_effect_call = mocker.patch.object(light_effect, "call")
brightness = dev.modules[Module.Brightness]
brightness_set_brightness = mocker.spy(brightness, "set_brightness")
mock_brightness_call = mocker.patch.object(brightness, "call")
mocker.patch.object(
type(light_effect),
"is_active",
new_callable=mocker.PropertyMock,
return_value=effect_active,
)
await light_module.set_brightness(10)
if effect_active:
assert light_effect.is_active
assert light_effect.brightness == dev.brightness
light_effect_set_brightness.assert_called_with(10)
mock_light_effect_call.assert_called_with(
"set_lighting_effect", {"brightness": 10, "bAdjusted": True}
)
else:
assert not light_effect.is_active
brightness_set_brightness.assert_called_with(10)
mock_brightness_call.assert_called_with("set_device_info", {"brightness": 10})

View File

@@ -238,3 +238,14 @@ async def test_device_updates_deprecated(
child_spy.assert_called_once()
else:
child_spy.assert_not_called()
@has_children
async def test_parent_property(dev: Device):
"""Test a child device exposes it's parent."""
if not dev.children:
pytest.skip(f"Device {dev} fixture does not have any children")
assert dev.parent is None
for child in dev.children:
assert child.parent == dev

View File

@@ -5,6 +5,7 @@ import re
import asyncclick as click
import pytest
from asyncclick.testing import CliRunner
from pytest_mock import MockerFixture
from kasa import (
AuthenticationError,
@@ -24,6 +25,7 @@ from kasa.cli import (
cmd_command,
effect,
emeter,
energy,
hsv,
led,
raw_command,
@@ -62,7 +64,6 @@ def runner():
[
pytest.param(None, None, id="No connect params"),
pytest.param("SMART.TAPOPLUG", None, id="Only device_family"),
pytest.param(None, "KLAP", id="Only encrypt_type"),
],
)
async def test_update_called_by_cli(dev, mocker, runner, device_family, encrypt_type):
@@ -171,13 +172,16 @@ async def test_command_with_child(dev, mocker, runner):
class DummyDevice(dev.__class__):
def __init__(self):
super().__init__("127.0.0.1")
# device_type and _info initialised for repr
self._device_type = Device.Type.StripSocket
self._info = {}
async def _query_helper(*_, **__):
return {"dummy": "response"}
dummy_child = DummyDevice()
mocker.patch.object(dev, "_children", {"XYZ": dummy_child})
mocker.patch.object(dev, "_children", {"XYZ": [dummy_child]})
mocker.patch.object(dev, "get_child_device", return_value=dummy_child)
res = await runner.invoke(
@@ -314,9 +318,9 @@ async def test_emeter(dev: Device, mocker, runner):
if not dev.is_strip:
res = await runner.invoke(emeter, ["--index", "0"], obj=dev)
assert "Index and name are only for power strips!" in res.output
assert f"Device: {dev.host} does not have children" in res.output
res = await runner.invoke(emeter, ["--name", "mock"], obj=dev)
assert "Index and name are only for power strips!" in res.output
assert f"Device: {dev.host} does not have children" in res.output
if dev.is_strip and len(dev.children) > 0:
realtime_emeter = mocker.patch.object(dev.children[0], "get_emeter_realtime")
@@ -930,3 +934,110 @@ async def test_feature_set_child(mocker, runner):
assert f"Targeting child device {child_id}"
assert "Changing state from False to True" in res.output
assert res.exit_code == 0
async def test_cli_child_commands(
dev: Device, runner: CliRunner, mocker: MockerFixture
):
if not dev.children:
res = await runner.invoke(alias, ["--child-index", "0"], obj=dev)
assert f"Device: {dev.host} does not have children" in res.output
assert res.exit_code == 1
res = await runner.invoke(alias, ["--index", "0"], obj=dev)
assert f"Device: {dev.host} does not have children" in res.output
assert res.exit_code == 1
res = await runner.invoke(alias, ["--child", "Plug 2"], obj=dev)
assert f"Device: {dev.host} does not have children" in res.output
assert res.exit_code == 1
res = await runner.invoke(alias, ["--name", "Plug 2"], obj=dev)
assert f"Device: {dev.host} does not have children" in res.output
assert res.exit_code == 1
if dev.children:
child_alias = dev.children[0].alias
assert child_alias
child_device_id = dev.children[0].device_id
child_count = len(dev.children)
child_update_method = dev.children[0].update
# Test child retrieval
res = await runner.invoke(alias, ["--child-index", "0"], obj=dev)
assert f"Targeting child device {child_alias}" in res.output
assert res.exit_code == 0
res = await runner.invoke(alias, ["--index", "0"], obj=dev)
assert f"Targeting child device {child_alias}" in res.output
assert res.exit_code == 0
res = await runner.invoke(alias, ["--child", child_alias], obj=dev)
assert f"Targeting child device {child_alias}" in res.output
assert res.exit_code == 0
res = await runner.invoke(alias, ["--name", child_alias], obj=dev)
assert f"Targeting child device {child_alias}" in res.output
assert res.exit_code == 0
res = await runner.invoke(alias, ["--child", child_device_id], obj=dev)
assert f"Targeting child device {child_alias}" in res.output
assert res.exit_code == 0
res = await runner.invoke(alias, ["--name", child_device_id], obj=dev)
assert f"Targeting child device {child_alias}" in res.output
assert res.exit_code == 0
# Test invalid name and index
res = await runner.invoke(alias, ["--child-index", "-1"], obj=dev)
assert f"Invalid index -1, device has {child_count} children" in res.output
assert res.exit_code == 1
res = await runner.invoke(alias, ["--child-index", str(child_count)], obj=dev)
assert (
f"Invalid index {child_count}, device has {child_count} children"
in res.output
)
assert res.exit_code == 1
res = await runner.invoke(alias, ["--child", "foobar"], obj=dev)
assert "No child device found with device_id or name: foobar" in res.output
assert res.exit_code == 1
# Test using both options:
res = await runner.invoke(
alias, ["--child", child_alias, "--child-index", "0"], obj=dev
)
assert "Use either --child or --child-index, not both." in res.output
assert res.exit_code == 2
# Test child with no parameter interactive prompt
res = await runner.invoke(alias, ["--child"], obj=dev, input="0\n")
assert "Enter the index number of the child device:" in res.output
assert f"Alias: {child_alias}" in res.output
assert res.exit_code == 0
# Test values and updates
res = await runner.invoke(alias, ["foo", "--child", child_device_id], obj=dev)
assert "Alias set to: foo" in res.output
assert res.exit_code == 0
# Test help has command options plus child options
res = await runner.invoke(energy, ["--help"], obj=dev)
assert "--year" in res.output
assert "--child" in res.output
assert "--child-index" in res.output
assert res.exit_code == 0
# Test child update patching calls parent and is undone on exit
parent_update_spy = mocker.spy(dev, "update")
res = await runner.invoke(alias, ["bar", "--child", child_device_id], obj=dev)
assert "Alias set to: bar" in res.output
assert res.exit_code == 0
parent_update_spy.assert_called_once()
assert dev.children[0].update == child_update_method

View File

@@ -89,35 +89,39 @@ async def test_light_effect_module(dev: Device, mocker: MockerFixture):
assert light_effect_module.has_custom_effects is not None
await light_effect_module.set_effect("Off")
assert call.call_count == 1
call.assert_called()
await dev.update()
assert light_effect_module.effect == "Off"
assert feat.value == "Off"
call.reset_mock()
second_effect = effect_list[1]
await light_effect_module.set_effect(second_effect)
assert call.call_count == 2
call.assert_called()
await dev.update()
assert light_effect_module.effect == second_effect
assert feat.value == second_effect
call.reset_mock()
last_effect = effect_list[len(effect_list) - 1]
await light_effect_module.set_effect(last_effect)
assert call.call_count == 3
call.assert_called()
await dev.update()
assert light_effect_module.effect == last_effect
assert feat.value == last_effect
call.reset_mock()
# Test feature set
await feat.set_value(second_effect)
assert call.call_count == 4
call.assert_called()
await dev.update()
assert light_effect_module.effect == second_effect
assert feat.value == second_effect
call.reset_mock()
with pytest.raises(ValueError):
await light_effect_module.set_effect("foobar")
assert call.call_count == 4
call.assert_not_called()
@dimmable

View File

@@ -13,6 +13,7 @@ import pytest
from ..aestransport import AesTransport
from ..credentials import Credentials
from ..device import Device
from ..deviceconfig import DeviceConfig
from ..exceptions import KasaException
from ..iotprotocol import IotProtocol, _deprecated_TPLinkSmartHomeProtocol
@@ -512,11 +513,72 @@ def test_transport_init_signature(class_name_obj):
)
@pytest.mark.parametrize(
("transport_class", "login_version", "expected_hash"),
[
pytest.param(
AesTransport,
1,
"eyJwYXNzd29yZCI6IlFtRnkiLCJ1c2VybmFtZSI6Ik1qQXhZVFppTXpBMU0yTmpNVFF5TW1ReVl6TTJOekJpTmpJMk1UWXlNakZrTWpJNU1Ea3lPUT09In0=",
id="aes-lv-1",
),
pytest.param(
AesTransport,
2,
"eyJwYXNzd29yZDIiOiJaVFE1Tm1aa01qQXhNelprTkdKaU56Z3lPR1ZpWWpCaFlqa3lOV0l4WW1RNU56Y3lNRGhsTkE9PSIsInVzZXJuYW1lIjoiTWpBeFlUWmlNekExTTJOak1UUXlNbVF5WXpNMk56QmlOakkyTVRZeU1qRmtNakk1TURreU9RPT0ifQ==",
id="aes-lv-2",
),
pytest.param(KlapTransport, 1, "xBhMRGYWStVCVk9aSD8/6Q==", id="klap-lv-1"),
pytest.param(KlapTransport, 2, "xBhMRGYWStVCVk9aSD8/6Q==", id="klap-lv-2"),
pytest.param(
KlapTransportV2,
1,
"tEmiensOcZkP9twDEZKwU3JJl3asmseKCP7N9sfatVo=",
id="klapv2-lv-1",
),
pytest.param(
KlapTransportV2,
2,
"tEmiensOcZkP9twDEZKwU3JJl3asmseKCP7N9sfatVo=",
id="klapv2-lv-2",
),
pytest.param(XorTransport, None, None, id="xor"),
],
)
@pytest.mark.parametrize(
("credentials", "expected_blank"),
[
pytest.param(Credentials("Foo", "Bar"), False, id="credentials"),
pytest.param(None, True, id="no-credentials"),
pytest.param(Credentials(None, "Bar"), True, id="no-username"), # type: ignore[arg-type]
],
)
async def test_transport_credentials_hash(
mocker, transport_class, login_version, expected_hash, credentials, expected_blank
):
"""Test that the actual hashing doesn't break and empty credential returns an empty hash."""
host = "127.0.0.1"
params = Device.ConnectionParameters(
device_family=Device.Family.SmartTapoPlug,
encryption_type=Device.EncryptionType.Xor,
login_version=login_version,
)
config = DeviceConfig(host, credentials=credentials, connection_type=params)
transport = transport_class(config=config)
credentials_hash = transport.credentials_hash
expected = None if expected_blank else expected_hash
assert credentials_hash == expected
@pytest.mark.parametrize(
"transport_class",
[AesTransport, KlapTransport, KlapTransportV2, XorTransport, XorTransport],
)
async def test_transport_credentials_hash(mocker, transport_class):
async def test_transport_credentials_hash_from_config(mocker, transport_class):
"""Test that credentials_hash provided via config sets correctly."""
host = "127.0.0.1"
credentials = Credentials("Foo", "Bar")

View File

@@ -2,10 +2,9 @@ import logging
import pytest
from ..credentials import Credentials
from ..deviceconfig import DeviceConfig
from ..exceptions import (
SMART_RETRYABLE_ERRORS,
DeviceError,
KasaException,
SmartErrorCode,
)
@@ -93,7 +92,6 @@ async def test_smart_device_errors_in_multiple_request(
async def test_smart_device_multiple_request(
dummy_protocol, mocker, request_size, batch_size
):
host = "127.0.0.1"
requests = {}
mock_response = {
"result": {"responses": []},
@@ -109,16 +107,101 @@ async def test_smart_device_multiple_request(
send_mock = mocker.patch.object(
dummy_protocol._transport, "send", return_value=mock_response
)
config = DeviceConfig(
host, credentials=Credentials("foo", "bar"), batch_size=batch_size
)
dummy_protocol._transport._config = config
dummy_protocol._multi_request_batch_size = batch_size
await dummy_protocol.query(requests, retry_count=0)
expected_count = int(request_size / batch_size) + (request_size % batch_size > 0)
assert send_mock.call_count == expected_count
async def test_smart_device_multiple_request_json_decode_failure(
dummy_protocol, mocker
):
"""Test the logic to disable multiple requests on JSON_DECODE_FAIL_ERROR."""
requests = {}
mock_responses = []
mock_json_error = {
"result": {"responses": []},
"error_code": SmartErrorCode.JSON_DECODE_FAIL_ERROR.value,
}
for i in range(10):
method = f"get_method_{i}"
requests[method] = {"foo": "bar", "bar": "foo"}
mock_responses.append(
{"method": method, "result": {"great": "success"}, "error_code": 0}
)
send_mock = mocker.patch.object(
dummy_protocol._transport,
"send",
side_effect=[mock_json_error, *mock_responses],
)
dummy_protocol._multi_request_batch_size = 5
assert dummy_protocol._multi_request_batch_size == 5
await dummy_protocol.query(requests, retry_count=1)
assert dummy_protocol._multi_request_batch_size == 1
# Call count should be the first error + number of requests
assert send_mock.call_count == len(requests) + 1
async def test_smart_device_multiple_request_json_decode_failure_twice(
dummy_protocol, mocker
):
"""Test the logic to disable multiple requests on JSON_DECODE_FAIL_ERROR."""
requests = {}
mock_json_error = {
"result": {"responses": []},
"error_code": SmartErrorCode.JSON_DECODE_FAIL_ERROR.value,
}
for i in range(10):
method = f"get_method_{i}"
requests[method] = {"foo": "bar", "bar": "foo"}
send_mock = mocker.patch.object(
dummy_protocol._transport,
"send",
side_effect=[mock_json_error, KasaException],
)
dummy_protocol._multi_request_batch_size = 5
with pytest.raises(KasaException):
await dummy_protocol.query(requests, retry_count=1)
assert dummy_protocol._multi_request_batch_size == 1
assert send_mock.call_count == 2
async def test_smart_device_multiple_request_non_json_decode_failure(
dummy_protocol, mocker
):
"""Test the logic to disable multiple requests on JSON_DECODE_FAIL_ERROR.
Ensure other exception types behave as expected.
"""
requests = {}
mock_json_error = {
"result": {"responses": []},
"error_code": SmartErrorCode.UNKNOWN_METHOD_ERROR.value,
}
for i in range(10):
method = f"get_method_{i}"
requests[method] = {"foo": "bar", "bar": "foo"}
send_mock = mocker.patch.object(
dummy_protocol._transport,
"send",
side_effect=[mock_json_error, KasaException],
)
dummy_protocol._multi_request_batch_size = 5
with pytest.raises(DeviceError):
await dummy_protocol.query(requests, retry_count=1)
assert dummy_protocol._multi_request_batch_size == 5
assert send_mock.call_count == 1
async def test_childdevicewrapper_unwrapping(dummy_protocol, mocker):
"""Test that responseData gets unwrapped correctly."""
wrapped_protocol = _ChildProtocolWrapper("dummyid", dummy_protocol)