Update dump_devinfo to collect child device info (#796)

Will create separate fixture files if the model of the child devices
differs from the parent (i.e. hubs). Otherwise the child device info
will be under `child_devices`
This commit is contained in:
Steven B 2024-02-28 16:04:57 +00:00 committed by GitHub
parent 75c60eb97c
commit 24344b71f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 286 additions and 61 deletions

View File

@ -32,9 +32,14 @@ from kasa import (
from kasa.discover import DiscoveryResult from kasa.discover import DiscoveryResult
from kasa.exceptions import SmartErrorCode from kasa.exceptions import SmartErrorCode
from kasa.smart import SmartDevice from kasa.smart import SmartDevice
from kasa.smartprotocol import _ChildProtocolWrapper
Call = namedtuple("Call", "module method") Call = namedtuple("Call", "module method")
SmartCall = namedtuple("SmartCall", "module request should_succeed") SmartCall = namedtuple("SmartCall", "module request should_succeed child_device_id")
FixtureResult = namedtuple("FixtureResult", "filename, folder, data")
SMART_FOLDER = "kasa/tests/fixtures/smart/"
IOT_FOLDER = "kasa/tests/fixtures/"
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -69,6 +74,10 @@ def scrub(res):
"parent_device_id", # for hub children "parent_device_id", # for hub children
"setup_code", # matter "setup_code", # matter
"setup_payload", # matter "setup_payload", # matter
"mfi_setup_code", # mfi_ for homekit
"mfi_setup_id",
"mfi_token_token",
"mfi_token_uuid",
] ]
for k, v in res.items(): for k, v in res.items():
@ -105,6 +114,8 @@ def scrub(res):
v = "#MASKED_NAME#" v = "#MASKED_NAME#"
elif isinstance(res[k], int): elif isinstance(res[k], int):
v = 0 v = 0
elif k == "device_id" and "SCRUBBED" in v:
pass # already scrubbed
elif k == "device_id" and len(v) > 40: elif k == "device_id" and len(v) > 40:
# retain the last two chars when scrubbing child ids # retain the last two chars when scrubbing child ids
end = v[-2:] end = v[-2:]
@ -130,27 +141,30 @@ def default_to_regular(d):
async def handle_device(basedir, autosave, device: Device, batch_size: int): async def handle_device(basedir, autosave, device: Device, batch_size: int):
"""Create a fixture for a single device instance.""" """Create a fixture for a single device instance."""
if isinstance(device, SmartDevice): if isinstance(device, SmartDevice):
filename, copy_folder, final = await get_smart_fixture(device, batch_size) fixture_results: List[FixtureResult] = await get_smart_fixtures(
else: device, batch_size
filename, copy_folder, final = await get_legacy_fixture(device)
save_filename = Path(basedir) / copy_folder / filename
pprint(scrub(final))
if autosave:
save = "y"
else:
save = click.prompt(
f"Do you want to save the above content to {save_filename} (y/n)"
) )
if save == "y":
click.echo(f"Saving info to {save_filename}")
with open(save_filename, "w") as f:
json.dump(final, f, sort_keys=True, indent=4)
f.write("\n")
else: else:
click.echo("Not saving.") fixture_results = [await get_legacy_fixture(device)]
for fixture_result in fixture_results:
save_filename = Path(basedir) / fixture_result.folder / fixture_result.filename
pprint(scrub(fixture_result.data))
if autosave:
save = "y"
else:
save = click.prompt(
f"Do you want to save the above content to {save_filename} (y/n)"
)
if save == "y":
click.echo(f"Saving info to {save_filename}")
with open(save_filename, "w") as f:
json.dump(fixture_result.data, f, sort_keys=True, indent=4)
f.write("\n")
else:
click.echo("Not saving.")
@click.command() @click.command()
@ -181,7 +195,27 @@ async def handle_device(basedir, autosave, device: Device, batch_size: int):
"--batch-size", default=5, help="Number of batched requests to send at once" "--batch-size", default=5, help="Number of batched requests to send at once"
) )
@click.option("-d", "--debug", is_flag=True) @click.option("-d", "--debug", is_flag=True)
async def cli(host, target, basedir, autosave, debug, username, password, batch_size): @click.option(
"-di",
"--discovery-info",
help=(
"Bypass discovery by passing an accurate discovery result json escaped string."
+ " Do not use this flag unless you are sure you know what it means."
),
)
@click.option("--port", help="Port override")
async def cli(
host,
target,
basedir,
autosave,
debug,
username,
password,
batch_size,
discovery_info,
port,
):
"""Generate devinfo files for devices. """Generate devinfo files for devices.
Use --host (for a single device) or --target (for a complete network). Use --host (for a single device) or --target (for a complete network).
@ -191,8 +225,30 @@ async def cli(host, target, basedir, autosave, debug, username, password, batch_
credentials = Credentials(username=username, password=password) credentials = Credentials(username=username, password=password)
if host is not None: if host is not None:
click.echo("Host given, performing discovery on %s." % host) if discovery_info:
device = await Discover.discover_single(host, credentials=credentials) click.echo("Host and discovery info given, trying connect on %s." % host)
from kasa import ConnectionType, DeviceConfig
di = json.loads(discovery_info)
dr = DiscoveryResult(**di)
connection_type = ConnectionType.from_values(
dr.device_type,
dr.mgt_encrypt_schm.encrypt_type,
dr.mgt_encrypt_schm.lv,
)
dc = DeviceConfig(
host=host,
connection_type=connection_type,
port_override=port,
credentials=credentials,
)
device = await Device.connect(config=dc)
device.update_from_discover_info(dr.get_dict())
else:
click.echo("Host given, performing discovery on %s." % host)
device = await Discover.discover_single(
host, credentials=credentials, port=port
)
await handle_device(basedir, autosave, device, batch_size) await handle_device(basedir, autosave, device, batch_size)
else: else:
click.echo( click.echo(
@ -270,8 +326,8 @@ async def get_legacy_fixture(device):
sw_version = sysinfo["sw_ver"] sw_version = sysinfo["sw_ver"]
sw_version = sw_version.split(" ", maxsplit=1)[0] sw_version = sw_version.split(" ", maxsplit=1)[0]
save_filename = f"{model}_{hw_version}_{sw_version}.json" save_filename = f"{model}_{hw_version}_{sw_version}.json"
copy_folder = "kasa/tests/fixtures/" copy_folder = IOT_FOLDER
return save_filename, copy_folder, final return FixtureResult(filename=save_filename, folder=copy_folder, data=final)
def _echo_error(msg: str): def _echo_error(msg: str):
@ -289,8 +345,15 @@ async def _make_requests_or_exit(
requests: List[SmartRequest], requests: List[SmartRequest],
name: str, name: str,
batch_size: int, batch_size: int,
*,
child_device_id: str,
) -> Dict[str, Dict]: ) -> Dict[str, Dict]:
final = {} final = {}
protocol = (
device.protocol
if child_device_id == ""
else _ChildProtocolWrapper(child_device_id, device.protocol)
)
try: try:
end = len(requests) end = len(requests)
step = batch_size # Break the requests down as there seems to be a size limit step = batch_size # Break the requests down as there seems to be a size limit
@ -300,9 +363,7 @@ async def _make_requests_or_exit(
request: Union[List[SmartRequest], SmartRequest] = ( request: Union[List[SmartRequest], SmartRequest] = (
requests_step[0] if len(requests_step) == 1 else requests_step requests_step[0] if len(requests_step) == 1 else requests_step
) )
responses = await device.protocol.query( responses = await protocol.query(SmartRequest._create_request_dict(request))
SmartRequest._create_request_dict(request)
)
for method, result in responses.items(): for method, result in responses.items():
final[method] = result final[method] = result
return final return final
@ -331,38 +392,36 @@ async def _make_requests_or_exit(
await device.protocol.close() await device.protocol.close()
async def get_smart_fixture(device: SmartDevice, batch_size: int): async def get_smart_test_calls(device: SmartDevice):
"""Get fixture for new TAPO style protocol.""" """Get the list of test calls to make."""
test_calls = []
successes = []
child_device_components = {}
extra_test_calls = [ extra_test_calls = [
SmartCall( SmartCall(
module="temp_humidity_records", module="temp_humidity_records",
request=SmartRequest.get_raw_request("get_temp_humidity_records"), request=SmartRequest.get_raw_request("get_temp_humidity_records"),
should_succeed=False, should_succeed=False,
), child_device_id="",
SmartCall(
module="child_device_list",
request=SmartRequest.get_raw_request("get_child_device_list"),
should_succeed=False,
),
SmartCall(
module="child_device_component_list",
request=SmartRequest.get_raw_request("get_child_device_component_list"),
should_succeed=False,
), ),
SmartCall( SmartCall(
module="trigger_logs", module="trigger_logs",
request=SmartRequest.get_raw_request( request=SmartRequest.get_raw_request(
"get_trigger_logs", SmartRequest.GetTriggerLogsParams(5, 0) "get_trigger_logs", SmartRequest.GetTriggerLogsParams()
), ),
should_succeed=False, should_succeed=False,
child_device_id="",
), ),
] ]
successes = []
click.echo("Testing component_nego call ..", nl=False) click.echo("Testing component_nego call ..", nl=False)
responses = await _make_requests_or_exit( responses = await _make_requests_or_exit(
device, [SmartRequest.component_nego()], "component_nego call", batch_size device,
[SmartRequest.component_nego()],
"component_nego call",
batch_size=1,
child_device_id="",
) )
component_info_response = responses["component_nego"] component_info_response = responses["component_nego"]
click.echo(click.style("OK", fg="green")) click.echo(click.style("OK", fg="green"))
@ -371,35 +430,127 @@ async def get_smart_fixture(device: SmartDevice, batch_size: int):
module="component_nego", module="component_nego",
request=SmartRequest("component_nego"), request=SmartRequest("component_nego"),
should_succeed=True, should_succeed=True,
child_device_id="",
) )
) )
components = {
item["id"]: item["ver_code"]
for item in component_info_response["component_list"]
}
test_calls = [] if "child_device" in components:
should_succeed = [] child_components = await _make_requests_or_exit(
device,
[SmartRequest.get_child_device_component_list()],
"child device component list",
batch_size=1,
child_device_id="",
)
successes.append(
SmartCall(
module="child_component_list",
request=SmartRequest.get_child_device_component_list(),
should_succeed=True,
child_device_id="",
)
)
test_calls.append(
SmartCall(
module="child_device_list",
request=SmartRequest.get_child_device_list(),
should_succeed=True,
child_device_id="",
)
)
# Get list of child components to call
if "control_child" in components:
child_device_components = {
child_component_list["device_id"]: {
item["id"]: item["ver_code"]
for item in child_component_list["component_list"]
}
for child_component_list in child_components[
"get_child_device_component_list"
]["child_component_list"]
}
for item in component_info_response["component_list"]: # Get component calls
component_id = item["id"] for component_id, ver_code in components.items():
ver_code = item["ver_code"] if component_id == "child_device":
continue
if (requests := get_component_requests(component_id, ver_code)) is not None: if (requests := get_component_requests(component_id, ver_code)) is not None:
component_test_calls = [ component_test_calls = [
SmartCall(module=component_id, request=request, should_succeed=True) SmartCall(
module=component_id,
request=request,
should_succeed=True,
child_device_id="",
)
for request in requests for request in requests
] ]
test_calls.extend(component_test_calls) test_calls.extend(component_test_calls)
should_succeed.extend(component_test_calls)
else: else:
click.echo(f"Skipping {component_id}..", nl=False) click.echo(f"Skipping {component_id}..", nl=False)
click.echo(click.style("UNSUPPORTED", fg="yellow")) click.echo(click.style("UNSUPPORTED", fg="yellow"))
test_calls.extend(extra_test_calls) test_calls.extend(extra_test_calls)
# Child component calls
for child_device_id, child_components in child_device_components.items():
for component_id, ver_code in child_components.items():
if (requests := get_component_requests(component_id, ver_code)) is not None:
component_test_calls = [
SmartCall(
module=component_id,
request=request,
should_succeed=True,
child_device_id=child_device_id,
)
for request in requests
]
test_calls.extend(component_test_calls)
else:
click.echo(f"Skipping {component_id}..", nl=False)
click.echo(click.style("UNSUPPORTED", fg="yellow"))
# Add the extra calls for each child
for extra_call in extra_test_calls:
extra_child_call = extra_call._replace(child_device_id=child_device_id)
test_calls.append(extra_child_call)
return test_calls, successes
def get_smart_child_fixture(response):
"""Get a seperate fixture for the child device."""
info = response["get_device_info"]
hw_version = info["hw_ver"]
sw_version = info["fw_ver"]
sw_version = sw_version.split(" ", maxsplit=1)[0]
model = info["model"]
if region := info.get("specs"):
model += f"({region})"
save_filename = f"{model}_{hw_version}_{sw_version}.json"
return FixtureResult(filename=save_filename, folder=SMART_FOLDER, data=response)
async def get_smart_fixtures(device: SmartDevice, batch_size: int):
"""Get fixture for new TAPO style protocol."""
test_calls, successes = await get_smart_test_calls(device)
for test_call in test_calls: for test_call in test_calls:
click.echo(f"Testing {test_call.module}..", nl=False) click.echo(f"Testing {test_call.module}..", nl=False)
try: try:
click.echo(f"Testing {test_call}..", nl=False) click.echo(f"Testing {test_call}..", nl=False)
response = await device.protocol.query( if test_call.child_device_id == "":
SmartRequest._create_request_dict(test_call.request) response = await device.protocol.query(
) SmartRequest._create_request_dict(test_call.request)
)
else:
cp = _ChildProtocolWrapper(test_call.child_device_id, device.protocol)
response = await cp.query(
SmartRequest._create_request_dict(test_call.request)
)
except AuthenticationError as ex: except AuthenticationError as ex:
_echo_error( _echo_error(
f"Unable to query the device due to an authentication error: {ex}", f"Unable to query the device due to an authentication error: {ex}",
@ -413,6 +564,7 @@ async def get_smart_fixture(device: SmartDevice, batch_size: int):
in [ in [
SmartErrorCode.UNKNOWN_METHOD_ERROR, SmartErrorCode.UNKNOWN_METHOD_ERROR,
SmartErrorCode.TRANSPORT_NOT_AVAILABLE_ERROR, SmartErrorCode.TRANSPORT_NOT_AVAILABLE_ERROR,
SmartErrorCode.UNSPECIFIC_ERROR,
] ]
): ):
click.echo(click.style("FAIL - EXPECTED", fg="green")) click.echo(click.style("FAIL - EXPECTED", fg="green"))
@ -430,13 +582,57 @@ async def get_smart_fixture(device: SmartDevice, batch_size: int):
finally: finally:
await device.protocol.close() await device.protocol.close()
requests = [] device_requests: Dict[str, List[SmartRequest]] = {}
for succ in successes: for success in successes:
requests.append(succ.request) device_request = device_requests.setdefault(success.child_device_id, [])
device_request.append(success.request)
scrubbed_device_ids = {
device_id: f"SCRUBBED_CHILD_DEVICE_ID_{index}"
for index, device_id in enumerate(device_requests.keys())
if device_id != ""
}
final = await _make_requests_or_exit( final = await _make_requests_or_exit(
device, requests, "all successes at once", batch_size device,
device_requests[""],
"all successes at once",
batch_size,
child_device_id="",
) )
fixture_results = []
for child_device_id, requests in device_requests.items():
if child_device_id == "":
continue
response = await _make_requests_or_exit(
device,
requests,
"all child successes at once",
batch_size,
child_device_id=child_device_id,
)
scrubbed = scrubbed_device_ids[child_device_id]
if "get_device_info" in response and "device_id" in response["get_device_info"]:
response["get_device_info"]["device_id"] = scrubbed
# If the child is a different model to the parent create a seperate fixture
if (
"get_device_info" in response
and (child_model := response["get_device_info"].get("model"))
and child_model != final["get_device_info"]["model"]
):
fixture_results.append(get_smart_child_fixture(response))
else:
cd = final.setdefault("child_devices", {})
cd[scrubbed] = response
# Scrub the device ids in the parent
if gc := final.get("get_child_device_component_list"):
for child in gc["child_component_list"]:
device_id = child["device_id"]
child["device_id"] = scrubbed_device_ids[device_id]
for child in final["get_child_device_list"]["child_device_list"]:
device_id = child["device_id"]
child["device_id"] = scrubbed_device_ids[device_id]
# Need to recreate a DiscoverResult here because we don't want the aliases # Need to recreate a DiscoverResult here because we don't want the aliases
# in the fixture, we want the actual field names as returned by the device. # in the fixture, we want the actual field names as returned by the device.
@ -454,8 +650,11 @@ async def get_smart_fixture(device: SmartDevice, batch_size: int):
sw_version = sw_version.split(" ", maxsplit=1)[0] sw_version = sw_version.split(" ", maxsplit=1)[0]
save_filename = f"{model}_{hw_version}_{sw_version}.json" save_filename = f"{model}_{hw_version}_{sw_version}.json"
copy_folder = "kasa/tests/fixtures/smart/" copy_folder = SMART_FOLDER
return save_filename, copy_folder, final fixture_results.insert(
0, FixtureResult(filename=save_filename, folder=copy_folder, data=final)
)
return fixture_results
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -75,6 +75,13 @@ class SmartRequest:
start_index: int = 0 start_index: int = 0
@dataclass
class GetScheduleRulesParams(SmartRequestParams):
"""Get Rules Params."""
start_index: int = 0
schedule_mode: str = ""
@dataclass @dataclass
class GetTriggerLogsParams(SmartRequestParams): class GetTriggerLogsParams(SmartRequestParams):
"""Trigger Logs params.""" """Trigger Logs params."""
@ -166,6 +173,16 @@ class SmartRequest:
"""Get device time.""" """Get device time."""
return SmartRequest("get_device_time") return SmartRequest("get_device_time")
@staticmethod
def get_child_device_list() -> "SmartRequest":
"""Get child device list."""
return SmartRequest("get_child_device_list")
@staticmethod
def get_child_device_component_list() -> "SmartRequest":
"""Get child device component list."""
return SmartRequest("get_child_device_component_list")
@staticmethod @staticmethod
def get_wireless_scan_info( def get_wireless_scan_info(
params: Optional[GetRulesParams] = None, params: Optional[GetRulesParams] = None,
@ -179,7 +196,7 @@ class SmartRequest:
def get_schedule_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest": def get_schedule_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest":
"""Get schedule rules.""" """Get schedule rules."""
return SmartRequest( return SmartRequest(
"get_schedule_rules", params or SmartRequest.GetRulesParams() "get_schedule_rules", params or SmartRequest.GetScheduleRulesParams()
) )
@staticmethod @staticmethod
@ -381,4 +398,13 @@ COMPONENT_REQUESTS = {
SmartRequest.get_raw_request("get_alarm_configure"), SmartRequest.get_raw_request("get_alarm_configure"),
], ],
"alarm_logs": [SmartRequest.get_raw_request("get_alarm_triggers")], "alarm_logs": [SmartRequest.get_raw_request("get_alarm_triggers")],
"child_device": [
SmartRequest.get_raw_request("get_child_device_list"),
SmartRequest.get_raw_request("get_child_device_component_list"),
],
"control_child": [],
"homekit": [SmartRequest.get_raw_request("get_homekit_info")],
"dimmer_calibration": [],
"fan_control": [],
"overheat_protection": [],
} }