From 24344b71f511ac049bfd8a17bea50e09b2c98be2 Mon Sep 17 00:00:00 2001 From: Steven B <51370195+sdb9696@users.noreply.github.com> Date: Wed, 28 Feb 2024 16:04:57 +0000 Subject: [PATCH] Update dump_devinfo to collect child device info (#796) Will create separate fixture files if the model of the child devices differs from the parent (i.e. hubs). Otherwise the child device info will be under `child_devices` --- devtools/dump_devinfo.py | 319 ++++++++++++++++++++++++------ devtools/helpers/smartrequests.py | 28 ++- 2 files changed, 286 insertions(+), 61 deletions(-) diff --git a/devtools/dump_devinfo.py b/devtools/dump_devinfo.py index 8e012606..ebfe3b1b 100644 --- a/devtools/dump_devinfo.py +++ b/devtools/dump_devinfo.py @@ -32,9 +32,14 @@ from kasa import ( from kasa.discover import DiscoveryResult from kasa.exceptions import SmartErrorCode from kasa.smart import SmartDevice +from kasa.smartprotocol import _ChildProtocolWrapper Call = namedtuple("Call", "module method") -SmartCall = namedtuple("SmartCall", "module request should_succeed") +SmartCall = namedtuple("SmartCall", "module request should_succeed child_device_id") +FixtureResult = namedtuple("FixtureResult", "filename, folder, data") + +SMART_FOLDER = "kasa/tests/fixtures/smart/" +IOT_FOLDER = "kasa/tests/fixtures/" _LOGGER = logging.getLogger(__name__) @@ -69,6 +74,10 @@ def scrub(res): "parent_device_id", # for hub children "setup_code", # matter "setup_payload", # matter + "mfi_setup_code", # mfi_ for homekit + "mfi_setup_id", + "mfi_token_token", + "mfi_token_uuid", ] for k, v in res.items(): @@ -105,6 +114,8 @@ def scrub(res): v = "#MASKED_NAME#" elif isinstance(res[k], int): v = 0 + elif k == "device_id" and "SCRUBBED" in v: + pass # already scrubbed elif k == "device_id" and len(v) > 40: # retain the last two chars when scrubbing child ids end = v[-2:] @@ -130,27 +141,30 @@ def default_to_regular(d): async def handle_device(basedir, autosave, device: Device, batch_size: int): """Create a fixture for a single device instance.""" if isinstance(device, SmartDevice): - filename, copy_folder, final = await get_smart_fixture(device, batch_size) - else: - filename, copy_folder, final = await get_legacy_fixture(device) - - save_filename = Path(basedir) / copy_folder / filename - - pprint(scrub(final)) - if autosave: - save = "y" - else: - save = click.prompt( - f"Do you want to save the above content to {save_filename} (y/n)" + fixture_results: List[FixtureResult] = await get_smart_fixtures( + device, batch_size ) - if save == "y": - click.echo(f"Saving info to {save_filename}") - - with open(save_filename, "w") as f: - json.dump(final, f, sort_keys=True, indent=4) - f.write("\n") else: - click.echo("Not saving.") + fixture_results = [await get_legacy_fixture(device)] + + for fixture_result in fixture_results: + save_filename = Path(basedir) / fixture_result.folder / fixture_result.filename + + pprint(scrub(fixture_result.data)) + if autosave: + save = "y" + else: + save = click.prompt( + f"Do you want to save the above content to {save_filename} (y/n)" + ) + if save == "y": + click.echo(f"Saving info to {save_filename}") + + with open(save_filename, "w") as f: + json.dump(fixture_result.data, f, sort_keys=True, indent=4) + f.write("\n") + else: + click.echo("Not saving.") @click.command() @@ -181,7 +195,27 @@ async def handle_device(basedir, autosave, device: Device, batch_size: int): "--batch-size", default=5, help="Number of batched requests to send at once" ) @click.option("-d", "--debug", is_flag=True) -async def cli(host, target, basedir, autosave, debug, username, password, batch_size): +@click.option( + "-di", + "--discovery-info", + help=( + "Bypass discovery by passing an accurate discovery result json escaped string." + + " Do not use this flag unless you are sure you know what it means." + ), +) +@click.option("--port", help="Port override") +async def cli( + host, + target, + basedir, + autosave, + debug, + username, + password, + batch_size, + discovery_info, + port, +): """Generate devinfo files for devices. Use --host (for a single device) or --target (for a complete network). @@ -191,8 +225,30 @@ async def cli(host, target, basedir, autosave, debug, username, password, batch_ credentials = Credentials(username=username, password=password) if host is not None: - click.echo("Host given, performing discovery on %s." % host) - device = await Discover.discover_single(host, credentials=credentials) + if discovery_info: + click.echo("Host and discovery info given, trying connect on %s." % host) + from kasa import ConnectionType, DeviceConfig + + di = json.loads(discovery_info) + dr = DiscoveryResult(**di) + connection_type = ConnectionType.from_values( + dr.device_type, + dr.mgt_encrypt_schm.encrypt_type, + dr.mgt_encrypt_schm.lv, + ) + dc = DeviceConfig( + host=host, + connection_type=connection_type, + port_override=port, + credentials=credentials, + ) + device = await Device.connect(config=dc) + device.update_from_discover_info(dr.get_dict()) + else: + click.echo("Host given, performing discovery on %s." % host) + device = await Discover.discover_single( + host, credentials=credentials, port=port + ) await handle_device(basedir, autosave, device, batch_size) else: click.echo( @@ -270,8 +326,8 @@ async def get_legacy_fixture(device): sw_version = sysinfo["sw_ver"] sw_version = sw_version.split(" ", maxsplit=1)[0] save_filename = f"{model}_{hw_version}_{sw_version}.json" - copy_folder = "kasa/tests/fixtures/" - return save_filename, copy_folder, final + copy_folder = IOT_FOLDER + return FixtureResult(filename=save_filename, folder=copy_folder, data=final) def _echo_error(msg: str): @@ -289,8 +345,15 @@ async def _make_requests_or_exit( requests: List[SmartRequest], name: str, batch_size: int, + *, + child_device_id: str, ) -> Dict[str, Dict]: final = {} + protocol = ( + device.protocol + if child_device_id == "" + else _ChildProtocolWrapper(child_device_id, device.protocol) + ) try: end = len(requests) step = batch_size # Break the requests down as there seems to be a size limit @@ -300,9 +363,7 @@ async def _make_requests_or_exit( request: Union[List[SmartRequest], SmartRequest] = ( requests_step[0] if len(requests_step) == 1 else requests_step ) - responses = await device.protocol.query( - SmartRequest._create_request_dict(request) - ) + responses = await protocol.query(SmartRequest._create_request_dict(request)) for method, result in responses.items(): final[method] = result return final @@ -331,38 +392,36 @@ async def _make_requests_or_exit( await device.protocol.close() -async def get_smart_fixture(device: SmartDevice, batch_size: int): - """Get fixture for new TAPO style protocol.""" +async def get_smart_test_calls(device: SmartDevice): + """Get the list of test calls to make.""" + test_calls = [] + successes = [] + child_device_components = {} + extra_test_calls = [ SmartCall( module="temp_humidity_records", request=SmartRequest.get_raw_request("get_temp_humidity_records"), should_succeed=False, - ), - SmartCall( - module="child_device_list", - request=SmartRequest.get_raw_request("get_child_device_list"), - should_succeed=False, - ), - SmartCall( - module="child_device_component_list", - request=SmartRequest.get_raw_request("get_child_device_component_list"), - should_succeed=False, + child_device_id="", ), SmartCall( module="trigger_logs", request=SmartRequest.get_raw_request( - "get_trigger_logs", SmartRequest.GetTriggerLogsParams(5, 0) + "get_trigger_logs", SmartRequest.GetTriggerLogsParams() ), should_succeed=False, + child_device_id="", ), ] - successes = [] - click.echo("Testing component_nego call ..", nl=False) responses = await _make_requests_or_exit( - device, [SmartRequest.component_nego()], "component_nego call", batch_size + device, + [SmartRequest.component_nego()], + "component_nego call", + batch_size=1, + child_device_id="", ) component_info_response = responses["component_nego"] click.echo(click.style("OK", fg="green")) @@ -371,35 +430,127 @@ async def get_smart_fixture(device: SmartDevice, batch_size: int): module="component_nego", request=SmartRequest("component_nego"), should_succeed=True, + child_device_id="", ) ) + components = { + item["id"]: item["ver_code"] + for item in component_info_response["component_list"] + } - test_calls = [] - should_succeed = [] + if "child_device" in components: + child_components = await _make_requests_or_exit( + device, + [SmartRequest.get_child_device_component_list()], + "child device component list", + batch_size=1, + child_device_id="", + ) + successes.append( + SmartCall( + module="child_component_list", + request=SmartRequest.get_child_device_component_list(), + should_succeed=True, + child_device_id="", + ) + ) + test_calls.append( + SmartCall( + module="child_device_list", + request=SmartRequest.get_child_device_list(), + should_succeed=True, + child_device_id="", + ) + ) + # Get list of child components to call + if "control_child" in components: + child_device_components = { + child_component_list["device_id"]: { + item["id"]: item["ver_code"] + for item in child_component_list["component_list"] + } + for child_component_list in child_components[ + "get_child_device_component_list" + ]["child_component_list"] + } - for item in component_info_response["component_list"]: - component_id = item["id"] - ver_code = item["ver_code"] + # Get component calls + for component_id, ver_code in components.items(): + if component_id == "child_device": + continue if (requests := get_component_requests(component_id, ver_code)) is not None: component_test_calls = [ - SmartCall(module=component_id, request=request, should_succeed=True) + SmartCall( + module=component_id, + request=request, + should_succeed=True, + child_device_id="", + ) for request in requests ] test_calls.extend(component_test_calls) - should_succeed.extend(component_test_calls) else: click.echo(f"Skipping {component_id}..", nl=False) click.echo(click.style("UNSUPPORTED", fg="yellow")) test_calls.extend(extra_test_calls) + # Child component calls + for child_device_id, child_components in child_device_components.items(): + for component_id, ver_code in child_components.items(): + if (requests := get_component_requests(component_id, ver_code)) is not None: + component_test_calls = [ + SmartCall( + module=component_id, + request=request, + should_succeed=True, + child_device_id=child_device_id, + ) + for request in requests + ] + test_calls.extend(component_test_calls) + else: + click.echo(f"Skipping {component_id}..", nl=False) + click.echo(click.style("UNSUPPORTED", fg="yellow")) + # Add the extra calls for each child + for extra_call in extra_test_calls: + extra_child_call = extra_call._replace(child_device_id=child_device_id) + test_calls.append(extra_child_call) + + return test_calls, successes + + +def get_smart_child_fixture(response): + """Get a seperate fixture for the child device.""" + info = response["get_device_info"] + hw_version = info["hw_ver"] + sw_version = info["fw_ver"] + sw_version = sw_version.split(" ", maxsplit=1)[0] + model = info["model"] + if region := info.get("specs"): + model += f"({region})" + + save_filename = f"{model}_{hw_version}_{sw_version}.json" + return FixtureResult(filename=save_filename, folder=SMART_FOLDER, data=response) + + +async def get_smart_fixtures(device: SmartDevice, batch_size: int): + """Get fixture for new TAPO style protocol.""" + test_calls, successes = await get_smart_test_calls(device) + for test_call in test_calls: click.echo(f"Testing {test_call.module}..", nl=False) try: click.echo(f"Testing {test_call}..", nl=False) - response = await device.protocol.query( - SmartRequest._create_request_dict(test_call.request) - ) + if test_call.child_device_id == "": + response = await device.protocol.query( + SmartRequest._create_request_dict(test_call.request) + ) + else: + cp = _ChildProtocolWrapper(test_call.child_device_id, device.protocol) + response = await cp.query( + SmartRequest._create_request_dict(test_call.request) + ) except AuthenticationError as ex: _echo_error( f"Unable to query the device due to an authentication error: {ex}", @@ -413,6 +564,7 @@ async def get_smart_fixture(device: SmartDevice, batch_size: int): in [ SmartErrorCode.UNKNOWN_METHOD_ERROR, SmartErrorCode.TRANSPORT_NOT_AVAILABLE_ERROR, + SmartErrorCode.UNSPECIFIC_ERROR, ] ): click.echo(click.style("FAIL - EXPECTED", fg="green")) @@ -430,13 +582,57 @@ async def get_smart_fixture(device: SmartDevice, batch_size: int): finally: await device.protocol.close() - requests = [] - for succ in successes: - requests.append(succ.request) + device_requests: Dict[str, List[SmartRequest]] = {} + for success in successes: + device_request = device_requests.setdefault(success.child_device_id, []) + device_request.append(success.request) + + scrubbed_device_ids = { + device_id: f"SCRUBBED_CHILD_DEVICE_ID_{index}" + for index, device_id in enumerate(device_requests.keys()) + if device_id != "" + } final = await _make_requests_or_exit( - device, requests, "all successes at once", batch_size + device, + device_requests[""], + "all successes at once", + batch_size, + child_device_id="", ) + fixture_results = [] + for child_device_id, requests in device_requests.items(): + if child_device_id == "": + continue + response = await _make_requests_or_exit( + device, + requests, + "all child successes at once", + batch_size, + child_device_id=child_device_id, + ) + scrubbed = scrubbed_device_ids[child_device_id] + if "get_device_info" in response and "device_id" in response["get_device_info"]: + response["get_device_info"]["device_id"] = scrubbed + # If the child is a different model to the parent create a seperate fixture + if ( + "get_device_info" in response + and (child_model := response["get_device_info"].get("model")) + and child_model != final["get_device_info"]["model"] + ): + fixture_results.append(get_smart_child_fixture(response)) + else: + cd = final.setdefault("child_devices", {}) + cd[scrubbed] = response + + # Scrub the device ids in the parent + if gc := final.get("get_child_device_component_list"): + for child in gc["child_component_list"]: + device_id = child["device_id"] + child["device_id"] = scrubbed_device_ids[device_id] + for child in final["get_child_device_list"]["child_device_list"]: + device_id = child["device_id"] + child["device_id"] = scrubbed_device_ids[device_id] # Need to recreate a DiscoverResult here because we don't want the aliases # in the fixture, we want the actual field names as returned by the device. @@ -454,8 +650,11 @@ async def get_smart_fixture(device: SmartDevice, batch_size: int): sw_version = sw_version.split(" ", maxsplit=1)[0] save_filename = f"{model}_{hw_version}_{sw_version}.json" - copy_folder = "kasa/tests/fixtures/smart/" - return save_filename, copy_folder, final + copy_folder = SMART_FOLDER + fixture_results.insert( + 0, FixtureResult(filename=save_filename, folder=copy_folder, data=final) + ) + return fixture_results if __name__ == "__main__": diff --git a/devtools/helpers/smartrequests.py b/devtools/helpers/smartrequests.py index 29298e2e..1ece6c87 100644 --- a/devtools/helpers/smartrequests.py +++ b/devtools/helpers/smartrequests.py @@ -75,6 +75,13 @@ class SmartRequest: start_index: int = 0 + @dataclass + class GetScheduleRulesParams(SmartRequestParams): + """Get Rules Params.""" + + start_index: int = 0 + schedule_mode: str = "" + @dataclass class GetTriggerLogsParams(SmartRequestParams): """Trigger Logs params.""" @@ -166,6 +173,16 @@ class SmartRequest: """Get device time.""" return SmartRequest("get_device_time") + @staticmethod + def get_child_device_list() -> "SmartRequest": + """Get child device list.""" + return SmartRequest("get_child_device_list") + + @staticmethod + def get_child_device_component_list() -> "SmartRequest": + """Get child device component list.""" + return SmartRequest("get_child_device_component_list") + @staticmethod def get_wireless_scan_info( params: Optional[GetRulesParams] = None, @@ -179,7 +196,7 @@ class SmartRequest: def get_schedule_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest": """Get schedule rules.""" return SmartRequest( - "get_schedule_rules", params or SmartRequest.GetRulesParams() + "get_schedule_rules", params or SmartRequest.GetScheduleRulesParams() ) @staticmethod @@ -381,4 +398,13 @@ COMPONENT_REQUESTS = { SmartRequest.get_raw_request("get_alarm_configure"), ], "alarm_logs": [SmartRequest.get_raw_request("get_alarm_triggers")], + "child_device": [ + SmartRequest.get_raw_request("get_child_device_list"), + SmartRequest.get_raw_request("get_child_device_component_list"), + ], + "control_child": [], + "homekit": [SmartRequest.get_raw_request("get_homekit_info")], + "dimmer_calibration": [], + "fan_control": [], + "overheat_protection": [], }