diff --git a/devtools/__init__.py b/devtools/__init__.py new file mode 100644 index 00000000..49189835 --- /dev/null +++ b/devtools/__init__.py @@ -0,0 +1 @@ +"""Devtools package.""" diff --git a/devtools/dump_devinfo.py b/devtools/dump_devinfo.py index c03e97d5..985ce669 100644 --- a/devtools/dump_devinfo.py +++ b/devtools/dump_devinfo.py @@ -14,14 +14,18 @@ import logging import re from collections import defaultdict, namedtuple from pprint import pprint +from typing import Dict, List import asyncclick as click +from devtools.helpers.smartrequests import COMPONENT_REQUESTS, SmartRequest from kasa import AuthenticationException, Credentials, Discover, SmartDevice from kasa.discover import DiscoveryResult +from kasa.exceptions import SmartErrorCode from kasa.tapo.tapodevice import TapoDevice Call = namedtuple("Call", "module method") +SmartCall = namedtuple("SmartCall", "module request should_succeed") def scrub(res): @@ -46,11 +50,19 @@ def scrub(res): "oem_id", "nickname", "alias", + "bssid", + "channel", ] for k, v in res.items(): if isinstance(v, collections.abc.Mapping): res[k] = scrub(res.get(k)) + elif ( + isinstance(v, list) + and len(v) > 0 + and isinstance(v[0], collections.abc.Mapping) + ): + res[k] = [scrub(vi) for vi in v] else: if k in keys_to_scrub: if k in ["latitude", "latitude_i", "longitude", "longitude_i"]: @@ -64,6 +76,8 @@ def scrub(res): v = base64.b64encode(b"#MASKED_NAME#").decode() elif k in ["alias"]: v = "#MASKED_NAME#" + elif isinstance(res[k], int): + v = 0 else: v = re.sub(r"\w", "0", v) @@ -179,7 +193,7 @@ async def get_legacy_fixture(device): ) ) - if device._discovery_info: + if device._discovery_info and not device._discovery_info.get("system"): # Need to recreate a DiscoverResult here because we don't want the aliases # in the fixture, we want the actual field names as returned by the device. dr = DiscoveryResult(**device._discovery_info) @@ -200,59 +214,22 @@ async def get_legacy_fixture(device): return save_filename, copy_folder, final -async def get_smart_fixture(device: SmartDevice): - """Get fixture for new TAPO style protocol.""" - items = [ - Call(module="component_nego", method="component_nego"), - Call(module="device_info", method="get_device_info"), - Call(module="device_usage", method="get_device_usage"), - Call(module="device_time", method="get_device_time"), - Call(module="energy_usage", method="get_energy_usage"), - Call(module="current_power", method="get_current_power"), - Call(module="temp_humidity_records", method="get_temp_humidity_records"), - Call(module="child_device_list", method="get_child_device_list"), - Call( - module="trigger_logs", - method={"get_trigger_logs": {"page_size": 5, "start_id": 0}}, - ), - Call( - module="child_device_component_list", - method="get_child_device_component_list", - ), - ] - - successes = [] - - for test_call in items: - try: - click.echo(f"Testing {test_call}..", nl=False) - response = await device.protocol.query(test_call.method) - except AuthenticationException as ex: - click.echo( - click.style( - f"Unable to query the device due to an authentication error: {ex}", - bold=True, - fg="red", - ) - ) - exit(1) - except Exception as ex: - click.echo(click.style(f"FAIL {ex}", fg="red")) - else: - if not response: - click.echo(click.style("FAIL not suported", fg="red")) - else: - click.echo(click.style("OK", fg="green")) - successes.append(test_call) - - requests = [] - for succ in successes: - requests.append({"method": succ.method}) - - final_query = {"multipleRequest": {"requests": requests}} - +async def _make_requests_or_exit( + device: SmartDevice, requests: List[SmartRequest], name: str +) -> Dict[str, Dict]: + final = {} try: - responses = await device.protocol.query(final_query) + end = len(requests) + step = 10 # Break the requests down as there seems to be a size limit + for i in range(0, end, step): + x = i + requests_step = requests[x : x + step] + responses = await device.protocol.query( + SmartRequest._create_request_dict(requests_step) + ) + for method, result in responses.items(): + final[method] = result + return final except AuthenticationException as ex: click.echo( click.style( @@ -264,14 +241,112 @@ async def get_smart_fixture(device: SmartDevice): exit(1) except Exception as ex: click.echo( - click.style( - f"Unable to query all successes at once: {ex}", bold=True, fg="red" - ) + click.style(f"Unable to query {name} at once: {ex}", bold=True, fg="red") ) exit(1) - final = {} - for method, result in responses.items(): - final[method] = result + + +async def get_smart_fixture(device: TapoDevice): + """Get fixture for new TAPO style protocol.""" + extra_test_calls = [ + SmartCall( + module="temp_humidity_records", + request=SmartRequest.get_raw_request("get_temp_humidity_records"), + should_succeed=False, + ), + SmartCall( + module="child_device_list", + request=SmartRequest.get_raw_request("get_child_device_list"), + should_succeed=False, + ), + SmartCall( + module="child_device_component_list", + request=SmartRequest.get_raw_request("get_child_device_component_list"), + should_succeed=False, + ), + SmartCall( + module="trigger_logs", + request=SmartRequest.get_raw_request( + "get_trigger_logs", SmartRequest.GetTriggerLogsParams(5, 0) + ), + should_succeed=False, + ), + ] + + successes = [] + + click.echo("Testing component_nego call ..", nl=False) + responses = await _make_requests_or_exit( + device, [SmartRequest.component_nego()], "component_nego call" + ) + component_info_response = responses["component_nego"] + click.echo(click.style("OK", fg="green")) + successes.append( + SmartCall( + module="component_nego", + request=SmartRequest("component_nego"), + should_succeed=True, + ) + ) + + test_calls = [] + should_succeed = [] + + for item in component_info_response["component_list"]: + component_id = item["id"] + if requests := COMPONENT_REQUESTS.get(component_id): + component_test_calls = [ + SmartCall(module=component_id, request=request, should_succeed=True) + for request in requests + ] + test_calls.extend(component_test_calls) + should_succeed.extend(component_test_calls) + elif component_id not in COMPONENT_REQUESTS: + click.echo(f"Skipping {component_id}..", nl=False) + click.echo(click.style("UNSUPPORTED", fg="yellow")) + + test_calls.extend(extra_test_calls) + + for test_call in test_calls: + click.echo(f"Testing {test_call.module}..", nl=False) + try: + click.echo(f"Testing {test_call}..", nl=False) + response = await device.protocol.query( + SmartRequest._create_request_dict(test_call.request) + ) + except AuthenticationException as ex: + click.echo( + click.style( + f"Unable to query the device due to an authentication error: {ex}", + bold=True, + fg="red", + ) + ) + exit(1) + except Exception as ex: + if ( + not test_call.should_succeed + and hasattr(ex, "error_code") + and ex.error_code == SmartErrorCode.UNKNOWN_METHOD_ERROR + ): + click.echo(click.style("FAIL - EXPECTED", fg="green")) + else: + click.echo(click.style(f"FAIL {ex}", fg="red")) + else: + if not response: + click.echo(click.style("FAIL no response", fg="red")) + else: + if not test_call.should_succeed: + click.echo(click.style("OK - EXPECTED FAIL", fg="red")) + else: + click.echo(click.style("OK", fg="green")) + successes.append(test_call) + + requests = [] + for succ in successes: + requests.append(succ.request) + + final = await _make_requests_or_exit(device, requests, "all successes at once") # Need to recreate a DiscoverResult here because we don't want the aliases # in the fixture, we want the actual field names as returned by the device. diff --git a/devtools/helpers/__init__.py b/devtools/helpers/__init__.py new file mode 100644 index 00000000..182958c6 --- /dev/null +++ b/devtools/helpers/__init__.py @@ -0,0 +1 @@ +"""Helpers package.""" diff --git a/devtools/helpers/smartrequests.py b/devtools/helpers/smartrequests.py new file mode 100644 index 00000000..4bb50ae3 --- /dev/null +++ b/devtools/helpers/smartrequests.py @@ -0,0 +1,350 @@ +"""SmartRequest helper classes and functions for new SMART/TAPO devices. + +List of known requests with associated parameter classes. + +Other requests that are known but not currently implemented +or tested are: + +get_child_device_component_list +get_child_device_list +control_child +get_device_running_info - seems to be a subset of get_device_info + +get_tss_info +get_raw_dvi +get_homekit_info + +fw_download + +sync_env +account_sync + +device_reset +close_device_ble +heart_beat + +""" + +import logging +from dataclasses import asdict, dataclass +from typing import List, Optional, Union + +_LOGGER = logging.getLogger(__name__) +logging.getLogger("httpx").propagate = False + + +class SmartRequest: + """Class to represent a smart protocol request.""" + + def __init__(self, method_name: str, params: Optional["SmartRequestParams"] = None): + self.method_name = method_name + if params: + self.params = params.to_dict() + else: + self.params = None + + def __repr__(self): + return f"SmartRequest({self.method_name})" + + def to_dict(self): + """Return the request as a dict suitable for passing to query().""" + return {self.method_name: self.params} + + @dataclass + class SmartRequestParams: + """Base class for Smart request params. + + The to_dict() method of this class omits null values which + is required by the devices. + """ + + def to_dict(self): + """Return the params as a dict with values of None ommited.""" + return asdict( + self, dict_factory=lambda x: {k: v for (k, v) in x if v is not None} + ) + + @dataclass + class DeviceOnParams(SmartRequestParams): + """Get Rules Params.""" + + device_on: bool + + @dataclass + class GetRulesParams(SmartRequestParams): + """Get Rules Params.""" + + start_index: int = 0 + + @dataclass + class GetTriggerLogsParams(SmartRequestParams): + """Trigger Logs params.""" + + page_size: int = 5 + start_id: int = 0 + + @dataclass + class LedStatusParams(SmartRequestParams): + """LED Status params.""" + + led_rule: Optional[str] = None + + @staticmethod + def from_bool(state: bool): + """Set the led_rule from the state.""" + rule = "always" if state else "never" + return SmartRequest.LedStatusParams(led_rule=rule) + + @dataclass + class LightInfoParams(SmartRequestParams): + """LightInfo params.""" + + brightness: Optional[int] = None + color_temp: Optional[int] = None + hue: Optional[int] = None + saturation: Optional[int] = None + + @dataclass + class DynamicLightEffectParams(SmartRequestParams): + """LightInfo params.""" + + enable: bool + id: Optional[str] = None + + @staticmethod + def get_raw_request( + method: str, params: Optional[SmartRequestParams] = None + ) -> "SmartRequest": + """Send a raw request to the device.""" + return SmartRequest(method, params) + + @staticmethod + def component_nego() -> "SmartRequest": + """Get quick setup component info.""" + return SmartRequest("component_nego") + + @staticmethod + def get_device_info() -> "SmartRequest": + """Get device info.""" + return SmartRequest("get_device_info") + + @staticmethod + def get_device_usage() -> "SmartRequest": + """Get device usage.""" + return SmartRequest("get_device_usage") + + @staticmethod + def device_info_list() -> List["SmartRequest"]: + """Get device info list.""" + return [ + SmartRequest.get_device_info(), + SmartRequest.get_device_usage(), + ] + + @staticmethod + def get_auto_update_info() -> "SmartRequest": + """Get auto update info.""" + return SmartRequest("get_auto_update_info") + + @staticmethod + def firmware_info_list() -> List["SmartRequest"]: + """Get info list.""" + return [ + SmartRequest.get_auto_update_info(), + SmartRequest.get_raw_request("get_fw_download_state"), + SmartRequest.get_raw_request("get_latest_fw"), + ] + + @staticmethod + def qs_component_nego() -> "SmartRequest": + """Get quick setup component info.""" + return SmartRequest("qs_component_nego") + + @staticmethod + def get_device_time() -> "SmartRequest": + """Get device time.""" + return SmartRequest("get_device_time") + + @staticmethod + def get_wireless_scan_info() -> "SmartRequest": + """Get wireless scan info.""" + return SmartRequest("get_wireless_scan_info") + + @staticmethod + def get_schedule_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest": + """Get schedule rules.""" + return SmartRequest( + "get_schedule_rules", params or SmartRequest.GetRulesParams() + ) + + @staticmethod + def get_next_event(params: Optional[GetRulesParams] = None) -> "SmartRequest": + """Get next scheduled event.""" + return SmartRequest("get_next_event", params or SmartRequest.GetRulesParams()) + + @staticmethod + def schedule_info_list() -> List["SmartRequest"]: + """Get schedule info list.""" + return [ + SmartRequest.get_schedule_rules(), + SmartRequest.get_next_event(), + ] + + @staticmethod + def get_countdown_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest": + """Get countdown rules.""" + return SmartRequest( + "get_countdown_rules", params or SmartRequest.GetRulesParams() + ) + + @staticmethod + def get_antitheft_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest": + """Get antitheft rules.""" + return SmartRequest( + "get_antitheft_rules", params or SmartRequest.GetRulesParams() + ) + + @staticmethod + def get_led_info(params: Optional[LedStatusParams] = None) -> "SmartRequest": + """Get led info.""" + return SmartRequest("get_led_info", params or SmartRequest.LedStatusParams()) + + @staticmethod + def get_auto_off_config(params: Optional[GetRulesParams] = None) -> "SmartRequest": + """Get auto off config.""" + return SmartRequest( + "get_auto_off_config", params or SmartRequest.GetRulesParams() + ) + + @staticmethod + def get_delay_action_info() -> "SmartRequest": + """Get delay action info.""" + return SmartRequest("get_delay_action_info") + + @staticmethod + def auto_off_list() -> List["SmartRequest"]: + """Get energy usage.""" + return [ + SmartRequest.get_auto_off_config(), + SmartRequest.get_delay_action_info(), # May not live here + ] + + @staticmethod + def get_energy_usage() -> "SmartRequest": + """Get energy usage.""" + return SmartRequest("get_energy_usage") + + @staticmethod + def energy_monitoring_list() -> List["SmartRequest"]: + """Get energy usage.""" + return [ + SmartRequest("get_energy_usage"), + SmartRequest.get_raw_request("get_electricity_price_config"), + ] + + @staticmethod + def get_current_power() -> "SmartRequest": + """Get current power.""" + return SmartRequest("get_current_power") + + @staticmethod + def power_protection_list() -> List["SmartRequest"]: + """Get power protection info list.""" + return [ + SmartRequest.get_current_power(), + SmartRequest.get_raw_request("get_max_power"), + SmartRequest.get_raw_request("get_protection_power"), + ] + + @staticmethod + def get_preset_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest": + """Get preset rules.""" + return SmartRequest("get_preset_rules", params or SmartRequest.GetRulesParams()) + + @staticmethod + def get_auto_light_info() -> "SmartRequest": + """Get auto light info.""" + return SmartRequest("get_auto_light_info") + + @staticmethod + def get_dynamic_light_effect_rules( + params: Optional[GetRulesParams] = None + ) -> "SmartRequest": + """Get dynamic light effect rules.""" + return SmartRequest( + "get_dynamic_light_effect_rules", params or SmartRequest.GetRulesParams() + ) + + @staticmethod + def set_device_on(params: DeviceOnParams) -> "SmartRequest": + """Set device on state.""" + return SmartRequest("set_device_info", params) + + @staticmethod + def set_light_info(params: LightInfoParams) -> "SmartRequest": + """Set color temperature.""" + return SmartRequest("set_device_info", params) + + @staticmethod + def set_dynamic_light_effect_rule_enable( + params: DynamicLightEffectParams + ) -> "SmartRequest": + """Enable dynamic light effect rule.""" + return SmartRequest("set_dynamic_light_effect_rule_enable", params) + + @staticmethod + def get_component_info_requests(component_nego_response) -> List["SmartRequest"]: + """Get a list of requests based on the component info response.""" + request_list = [] + for component in component_nego_response["component_list"]: + if requests := COMPONENT_REQUESTS.get(component["id"]): + request_list.extend(requests) + return request_list + + @staticmethod + def _create_request_dict( + smart_request: Union["SmartRequest", List["SmartRequest"]] + ) -> dict: + """Create request dict to be passed to SmartProtocol.query().""" + if isinstance(smart_request, list): + request = {} + for sr in smart_request: + request[sr.method_name] = sr.params + else: + request = smart_request.to_dict() + return request + + +COMPONENT_REQUESTS = { + "device": SmartRequest.device_info_list(), + "firmware": SmartRequest.firmware_info_list(), + "quick_setup": [SmartRequest.qs_component_nego()], + "inherit": [SmartRequest.get_raw_request("get_inherit_info")], + "time": [SmartRequest.get_device_time()], + "wireless": [SmartRequest.get_wireless_scan_info()], + "schedule": SmartRequest.schedule_info_list(), + "countdown": [SmartRequest.get_countdown_rules()], + "antitheft": [SmartRequest.get_antitheft_rules()], + "account": None, + "synchronize": None, # sync_env + "sunrise_sunset": None, # for schedules + "led": [SmartRequest.get_led_info()], + "cloud_connect": [SmartRequest.get_raw_request("get_connect_cloud_state")], + "iot_cloud": None, + "device_local_time": None, + "default_states": None, # in device_info + "auto_off": [SmartRequest.get_auto_off_config()], + "localSmart": None, + "energy_monitoring": SmartRequest.energy_monitoring_list(), + "power_protection": SmartRequest.power_protection_list(), + "current_protection": None, # overcurrent in device_info + "matter": None, + "preset": [SmartRequest.get_preset_rules()], + "brightness": None, # in device_info + "color": None, # in device_info + "color_temperature": None, # in device_info + "auto_light": [SmartRequest.get_auto_light_info()], + "light_effect": [SmartRequest.get_dynamic_light_effect_rules()], + "bulb_quick_control": None, + "on_off_gradually": [SmartRequest.get_raw_request("get_on_off_gradually_info")], +}