Add known smart requests to dump_devinfo (#597)

* Add known smart requests to dump_devinfo

* Move smartrequest.py to devtools

* Update post-review
This commit is contained in:
sdb9696 2024-01-02 17:20:53 +00:00 committed by GitHub
parent 5dafc1d1ed
commit ae5ad3e8c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 486 additions and 59 deletions

1
devtools/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Devtools package."""

View File

@ -14,14 +14,18 @@ import logging
import re import re
from collections import defaultdict, namedtuple from collections import defaultdict, namedtuple
from pprint import pprint from pprint import pprint
from typing import Dict, List
import asyncclick as click import asyncclick as click
from devtools.helpers.smartrequests import COMPONENT_REQUESTS, SmartRequest
from kasa import AuthenticationException, Credentials, Discover, SmartDevice from kasa import AuthenticationException, Credentials, Discover, SmartDevice
from kasa.discover import DiscoveryResult from kasa.discover import DiscoveryResult
from kasa.exceptions import SmartErrorCode
from kasa.tapo.tapodevice import TapoDevice from kasa.tapo.tapodevice import TapoDevice
Call = namedtuple("Call", "module method") Call = namedtuple("Call", "module method")
SmartCall = namedtuple("SmartCall", "module request should_succeed")
def scrub(res): def scrub(res):
@ -46,11 +50,19 @@ def scrub(res):
"oem_id", "oem_id",
"nickname", "nickname",
"alias", "alias",
"bssid",
"channel",
] ]
for k, v in res.items(): for k, v in res.items():
if isinstance(v, collections.abc.Mapping): if isinstance(v, collections.abc.Mapping):
res[k] = scrub(res.get(k)) res[k] = scrub(res.get(k))
elif (
isinstance(v, list)
and len(v) > 0
and isinstance(v[0], collections.abc.Mapping)
):
res[k] = [scrub(vi) for vi in v]
else: else:
if k in keys_to_scrub: if k in keys_to_scrub:
if k in ["latitude", "latitude_i", "longitude", "longitude_i"]: if k in ["latitude", "latitude_i", "longitude", "longitude_i"]:
@ -64,6 +76,8 @@ def scrub(res):
v = base64.b64encode(b"#MASKED_NAME#").decode() v = base64.b64encode(b"#MASKED_NAME#").decode()
elif k in ["alias"]: elif k in ["alias"]:
v = "#MASKED_NAME#" v = "#MASKED_NAME#"
elif isinstance(res[k], int):
v = 0
else: else:
v = re.sub(r"\w", "0", v) v = re.sub(r"\w", "0", v)
@ -179,7 +193,7 @@ async def get_legacy_fixture(device):
) )
) )
if device._discovery_info: if device._discovery_info and not device._discovery_info.get("system"):
# Need to recreate a DiscoverResult here because we don't want the aliases # Need to recreate a DiscoverResult here because we don't want the aliases
# in the fixture, we want the actual field names as returned by the device. # in the fixture, we want the actual field names as returned by the device.
dr = DiscoveryResult(**device._discovery_info) dr = DiscoveryResult(**device._discovery_info)
@ -200,59 +214,22 @@ async def get_legacy_fixture(device):
return save_filename, copy_folder, final return save_filename, copy_folder, final
async def get_smart_fixture(device: SmartDevice): async def _make_requests_or_exit(
"""Get fixture for new TAPO style protocol.""" device: SmartDevice, requests: List[SmartRequest], name: str
items = [ ) -> Dict[str, Dict]:
Call(module="component_nego", method="component_nego"), final = {}
Call(module="device_info", method="get_device_info"),
Call(module="device_usage", method="get_device_usage"),
Call(module="device_time", method="get_device_time"),
Call(module="energy_usage", method="get_energy_usage"),
Call(module="current_power", method="get_current_power"),
Call(module="temp_humidity_records", method="get_temp_humidity_records"),
Call(module="child_device_list", method="get_child_device_list"),
Call(
module="trigger_logs",
method={"get_trigger_logs": {"page_size": 5, "start_id": 0}},
),
Call(
module="child_device_component_list",
method="get_child_device_component_list",
),
]
successes = []
for test_call in items:
try:
click.echo(f"Testing {test_call}..", nl=False)
response = await device.protocol.query(test_call.method)
except AuthenticationException as ex:
click.echo(
click.style(
f"Unable to query the device due to an authentication error: {ex}",
bold=True,
fg="red",
)
)
exit(1)
except Exception as ex:
click.echo(click.style(f"FAIL {ex}", fg="red"))
else:
if not response:
click.echo(click.style("FAIL not suported", fg="red"))
else:
click.echo(click.style("OK", fg="green"))
successes.append(test_call)
requests = []
for succ in successes:
requests.append({"method": succ.method})
final_query = {"multipleRequest": {"requests": requests}}
try: try:
responses = await device.protocol.query(final_query) end = len(requests)
step = 10 # Break the requests down as there seems to be a size limit
for i in range(0, end, step):
x = i
requests_step = requests[x : x + step]
responses = await device.protocol.query(
SmartRequest._create_request_dict(requests_step)
)
for method, result in responses.items():
final[method] = result
return final
except AuthenticationException as ex: except AuthenticationException as ex:
click.echo( click.echo(
click.style( click.style(
@ -264,14 +241,112 @@ async def get_smart_fixture(device: SmartDevice):
exit(1) exit(1)
except Exception as ex: except Exception as ex:
click.echo( click.echo(
click.style( click.style(f"Unable to query {name} at once: {ex}", bold=True, fg="red")
f"Unable to query all successes at once: {ex}", bold=True, fg="red"
)
) )
exit(1) exit(1)
final = {}
for method, result in responses.items():
final[method] = result async def get_smart_fixture(device: TapoDevice):
"""Get fixture for new TAPO style protocol."""
extra_test_calls = [
SmartCall(
module="temp_humidity_records",
request=SmartRequest.get_raw_request("get_temp_humidity_records"),
should_succeed=False,
),
SmartCall(
module="child_device_list",
request=SmartRequest.get_raw_request("get_child_device_list"),
should_succeed=False,
),
SmartCall(
module="child_device_component_list",
request=SmartRequest.get_raw_request("get_child_device_component_list"),
should_succeed=False,
),
SmartCall(
module="trigger_logs",
request=SmartRequest.get_raw_request(
"get_trigger_logs", SmartRequest.GetTriggerLogsParams(5, 0)
),
should_succeed=False,
),
]
successes = []
click.echo("Testing component_nego call ..", nl=False)
responses = await _make_requests_or_exit(
device, [SmartRequest.component_nego()], "component_nego call"
)
component_info_response = responses["component_nego"]
click.echo(click.style("OK", fg="green"))
successes.append(
SmartCall(
module="component_nego",
request=SmartRequest("component_nego"),
should_succeed=True,
)
)
test_calls = []
should_succeed = []
for item in component_info_response["component_list"]:
component_id = item["id"]
if requests := COMPONENT_REQUESTS.get(component_id):
component_test_calls = [
SmartCall(module=component_id, request=request, should_succeed=True)
for request in requests
]
test_calls.extend(component_test_calls)
should_succeed.extend(component_test_calls)
elif component_id not in COMPONENT_REQUESTS:
click.echo(f"Skipping {component_id}..", nl=False)
click.echo(click.style("UNSUPPORTED", fg="yellow"))
test_calls.extend(extra_test_calls)
for test_call in test_calls:
click.echo(f"Testing {test_call.module}..", nl=False)
try:
click.echo(f"Testing {test_call}..", nl=False)
response = await device.protocol.query(
SmartRequest._create_request_dict(test_call.request)
)
except AuthenticationException as ex:
click.echo(
click.style(
f"Unable to query the device due to an authentication error: {ex}",
bold=True,
fg="red",
)
)
exit(1)
except Exception as ex:
if (
not test_call.should_succeed
and hasattr(ex, "error_code")
and ex.error_code == SmartErrorCode.UNKNOWN_METHOD_ERROR
):
click.echo(click.style("FAIL - EXPECTED", fg="green"))
else:
click.echo(click.style(f"FAIL {ex}", fg="red"))
else:
if not response:
click.echo(click.style("FAIL no response", fg="red"))
else:
if not test_call.should_succeed:
click.echo(click.style("OK - EXPECTED FAIL", fg="red"))
else:
click.echo(click.style("OK", fg="green"))
successes.append(test_call)
requests = []
for succ in successes:
requests.append(succ.request)
final = await _make_requests_or_exit(device, requests, "all successes at once")
# Need to recreate a DiscoverResult here because we don't want the aliases # Need to recreate a DiscoverResult here because we don't want the aliases
# in the fixture, we want the actual field names as returned by the device. # in the fixture, we want the actual field names as returned by the device.

View File

@ -0,0 +1 @@
"""Helpers package."""

View File

@ -0,0 +1,350 @@
"""SmartRequest helper classes and functions for new SMART/TAPO devices.
List of known requests with associated parameter classes.
Other requests that are known but not currently implemented
or tested are:
get_child_device_component_list
get_child_device_list
control_child
get_device_running_info - seems to be a subset of get_device_info
get_tss_info
get_raw_dvi
get_homekit_info
fw_download
sync_env
account_sync
device_reset
close_device_ble
heart_beat
"""
import logging
from dataclasses import asdict, dataclass
from typing import List, Optional, Union
_LOGGER = logging.getLogger(__name__)
logging.getLogger("httpx").propagate = False
class SmartRequest:
"""Class to represent a smart protocol request."""
def __init__(self, method_name: str, params: Optional["SmartRequestParams"] = None):
self.method_name = method_name
if params:
self.params = params.to_dict()
else:
self.params = None
def __repr__(self):
return f"SmartRequest({self.method_name})"
def to_dict(self):
"""Return the request as a dict suitable for passing to query()."""
return {self.method_name: self.params}
@dataclass
class SmartRequestParams:
"""Base class for Smart request params.
The to_dict() method of this class omits null values which
is required by the devices.
"""
def to_dict(self):
"""Return the params as a dict with values of None ommited."""
return asdict(
self, dict_factory=lambda x: {k: v for (k, v) in x if v is not None}
)
@dataclass
class DeviceOnParams(SmartRequestParams):
"""Get Rules Params."""
device_on: bool
@dataclass
class GetRulesParams(SmartRequestParams):
"""Get Rules Params."""
start_index: int = 0
@dataclass
class GetTriggerLogsParams(SmartRequestParams):
"""Trigger Logs params."""
page_size: int = 5
start_id: int = 0
@dataclass
class LedStatusParams(SmartRequestParams):
"""LED Status params."""
led_rule: Optional[str] = None
@staticmethod
def from_bool(state: bool):
"""Set the led_rule from the state."""
rule = "always" if state else "never"
return SmartRequest.LedStatusParams(led_rule=rule)
@dataclass
class LightInfoParams(SmartRequestParams):
"""LightInfo params."""
brightness: Optional[int] = None
color_temp: Optional[int] = None
hue: Optional[int] = None
saturation: Optional[int] = None
@dataclass
class DynamicLightEffectParams(SmartRequestParams):
"""LightInfo params."""
enable: bool
id: Optional[str] = None
@staticmethod
def get_raw_request(
method: str, params: Optional[SmartRequestParams] = None
) -> "SmartRequest":
"""Send a raw request to the device."""
return SmartRequest(method, params)
@staticmethod
def component_nego() -> "SmartRequest":
"""Get quick setup component info."""
return SmartRequest("component_nego")
@staticmethod
def get_device_info() -> "SmartRequest":
"""Get device info."""
return SmartRequest("get_device_info")
@staticmethod
def get_device_usage() -> "SmartRequest":
"""Get device usage."""
return SmartRequest("get_device_usage")
@staticmethod
def device_info_list() -> List["SmartRequest"]:
"""Get device info list."""
return [
SmartRequest.get_device_info(),
SmartRequest.get_device_usage(),
]
@staticmethod
def get_auto_update_info() -> "SmartRequest":
"""Get auto update info."""
return SmartRequest("get_auto_update_info")
@staticmethod
def firmware_info_list() -> List["SmartRequest"]:
"""Get info list."""
return [
SmartRequest.get_auto_update_info(),
SmartRequest.get_raw_request("get_fw_download_state"),
SmartRequest.get_raw_request("get_latest_fw"),
]
@staticmethod
def qs_component_nego() -> "SmartRequest":
"""Get quick setup component info."""
return SmartRequest("qs_component_nego")
@staticmethod
def get_device_time() -> "SmartRequest":
"""Get device time."""
return SmartRequest("get_device_time")
@staticmethod
def get_wireless_scan_info() -> "SmartRequest":
"""Get wireless scan info."""
return SmartRequest("get_wireless_scan_info")
@staticmethod
def get_schedule_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest":
"""Get schedule rules."""
return SmartRequest(
"get_schedule_rules", params or SmartRequest.GetRulesParams()
)
@staticmethod
def get_next_event(params: Optional[GetRulesParams] = None) -> "SmartRequest":
"""Get next scheduled event."""
return SmartRequest("get_next_event", params or SmartRequest.GetRulesParams())
@staticmethod
def schedule_info_list() -> List["SmartRequest"]:
"""Get schedule info list."""
return [
SmartRequest.get_schedule_rules(),
SmartRequest.get_next_event(),
]
@staticmethod
def get_countdown_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest":
"""Get countdown rules."""
return SmartRequest(
"get_countdown_rules", params or SmartRequest.GetRulesParams()
)
@staticmethod
def get_antitheft_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest":
"""Get antitheft rules."""
return SmartRequest(
"get_antitheft_rules", params or SmartRequest.GetRulesParams()
)
@staticmethod
def get_led_info(params: Optional[LedStatusParams] = None) -> "SmartRequest":
"""Get led info."""
return SmartRequest("get_led_info", params or SmartRequest.LedStatusParams())
@staticmethod
def get_auto_off_config(params: Optional[GetRulesParams] = None) -> "SmartRequest":
"""Get auto off config."""
return SmartRequest(
"get_auto_off_config", params or SmartRequest.GetRulesParams()
)
@staticmethod
def get_delay_action_info() -> "SmartRequest":
"""Get delay action info."""
return SmartRequest("get_delay_action_info")
@staticmethod
def auto_off_list() -> List["SmartRequest"]:
"""Get energy usage."""
return [
SmartRequest.get_auto_off_config(),
SmartRequest.get_delay_action_info(), # May not live here
]
@staticmethod
def get_energy_usage() -> "SmartRequest":
"""Get energy usage."""
return SmartRequest("get_energy_usage")
@staticmethod
def energy_monitoring_list() -> List["SmartRequest"]:
"""Get energy usage."""
return [
SmartRequest("get_energy_usage"),
SmartRequest.get_raw_request("get_electricity_price_config"),
]
@staticmethod
def get_current_power() -> "SmartRequest":
"""Get current power."""
return SmartRequest("get_current_power")
@staticmethod
def power_protection_list() -> List["SmartRequest"]:
"""Get power protection info list."""
return [
SmartRequest.get_current_power(),
SmartRequest.get_raw_request("get_max_power"),
SmartRequest.get_raw_request("get_protection_power"),
]
@staticmethod
def get_preset_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest":
"""Get preset rules."""
return SmartRequest("get_preset_rules", params or SmartRequest.GetRulesParams())
@staticmethod
def get_auto_light_info() -> "SmartRequest":
"""Get auto light info."""
return SmartRequest("get_auto_light_info")
@staticmethod
def get_dynamic_light_effect_rules(
params: Optional[GetRulesParams] = None
) -> "SmartRequest":
"""Get dynamic light effect rules."""
return SmartRequest(
"get_dynamic_light_effect_rules", params or SmartRequest.GetRulesParams()
)
@staticmethod
def set_device_on(params: DeviceOnParams) -> "SmartRequest":
"""Set device on state."""
return SmartRequest("set_device_info", params)
@staticmethod
def set_light_info(params: LightInfoParams) -> "SmartRequest":
"""Set color temperature."""
return SmartRequest("set_device_info", params)
@staticmethod
def set_dynamic_light_effect_rule_enable(
params: DynamicLightEffectParams
) -> "SmartRequest":
"""Enable dynamic light effect rule."""
return SmartRequest("set_dynamic_light_effect_rule_enable", params)
@staticmethod
def get_component_info_requests(component_nego_response) -> List["SmartRequest"]:
"""Get a list of requests based on the component info response."""
request_list = []
for component in component_nego_response["component_list"]:
if requests := COMPONENT_REQUESTS.get(component["id"]):
request_list.extend(requests)
return request_list
@staticmethod
def _create_request_dict(
smart_request: Union["SmartRequest", List["SmartRequest"]]
) -> dict:
"""Create request dict to be passed to SmartProtocol.query()."""
if isinstance(smart_request, list):
request = {}
for sr in smart_request:
request[sr.method_name] = sr.params
else:
request = smart_request.to_dict()
return request
COMPONENT_REQUESTS = {
"device": SmartRequest.device_info_list(),
"firmware": SmartRequest.firmware_info_list(),
"quick_setup": [SmartRequest.qs_component_nego()],
"inherit": [SmartRequest.get_raw_request("get_inherit_info")],
"time": [SmartRequest.get_device_time()],
"wireless": [SmartRequest.get_wireless_scan_info()],
"schedule": SmartRequest.schedule_info_list(),
"countdown": [SmartRequest.get_countdown_rules()],
"antitheft": [SmartRequest.get_antitheft_rules()],
"account": None,
"synchronize": None, # sync_env
"sunrise_sunset": None, # for schedules
"led": [SmartRequest.get_led_info()],
"cloud_connect": [SmartRequest.get_raw_request("get_connect_cloud_state")],
"iot_cloud": None,
"device_local_time": None,
"default_states": None, # in device_info
"auto_off": [SmartRequest.get_auto_off_config()],
"localSmart": None,
"energy_monitoring": SmartRequest.energy_monitoring_list(),
"power_protection": SmartRequest.power_protection_list(),
"current_protection": None, # overcurrent in device_info
"matter": None,
"preset": [SmartRequest.get_preset_rules()],
"brightness": None, # in device_info
"color": None, # in device_info
"color_temperature": None, # in device_info
"auto_light": [SmartRequest.get_auto_light_info()],
"light_effect": [SmartRequest.get_dynamic_light_effect_rules()],
"bulb_quick_control": None,
"on_off_gradually": [SmartRequest.get_raw_request("get_on_off_gradually_info")],
}