python-kasa/devtools/dump_devinfo.py

279 lines
8.9 KiB
Python
Raw Normal View History

"""This script generates devinfo files for the test suite.
If you have new, yet unsupported device or a device with no devinfo file under
kasa/tests/fixtures, feel free to run this script and create a PR to add the file
to the repository.
Executing this script will several modules and methods one by one,
and finally execute a query to query all of them at once.
"""
import base64
import collections.abc
import json
import logging
import re
from collections import defaultdict, namedtuple
from pprint import pprint
import asyncclick as click
from kasa import Credentials, Discover, SmartDevice
from kasa.discover import DiscoveryResult
from kasa.tapo.tapodevice import TapoDevice
Call = namedtuple("Call", "module method")
def scrub(res):
"""Remove identifiers from the given dict."""
keys_to_scrub = [
"deviceId",
"fwId",
"hwId",
"oemId",
"mac",
"mic_mac",
"latitude_i",
"longitude_i",
"latitude",
"longitude",
"owner",
"device_id",
"ip",
"ssid",
"hw_id",
"fw_id",
"oem_id",
"nickname",
"alias",
]
for k, v in res.items():
if isinstance(v, collections.abc.Mapping):
res[k] = scrub(res.get(k))
else:
if k in keys_to_scrub:
if k in ["latitude", "latitude_i", "longitude", "longitude_i"]:
v = 0
elif k in ["ip"]:
v = "127.0.0.123"
elif k in ["ssid"]:
# Need a valid base64 value here
v = base64.b64encode(b"#MASKED_SSID#").decode()
elif k in ["nickname"]:
v = base64.b64encode(b"#MASKED_NAME#").decode()
elif k in ["alias"]:
v = "#MASKED_NAME#"
else:
v = re.sub(r"\w", "0", v)
res[k] = v
return res
def default_to_regular(d):
"""Convert nested defaultdicts to regular ones.
From https://stackoverflow.com/a/26496899
"""
if isinstance(d, defaultdict):
d = {k: default_to_regular(v) for k, v in d.items()}
return d
@click.command()
@click.argument("host")
@click.option(
"--username",
default=None,
required=False,
envvar="TPLINK_CLOUD_USERNAME",
help="Username/email address to authenticate to device.",
)
@click.option(
"--password",
default=None,
required=False,
envvar="TPLINK_CLOUD_PASSWORD",
help="Password to use to authenticate to device.",
)
@click.option("-d", "--debug", is_flag=True)
async def cli(host, debug, username, password):
"""Generate devinfo file for given device."""
if debug:
logging.basicConfig(level=logging.DEBUG)
credentials = Credentials(username=username, password=password)
device = await Discover.discover_single(host, credentials=credentials)
if isinstance(device, TapoDevice):
save_filename, copy_folder, final = await get_smart_fixture(device)
else:
save_filename, copy_folder, final = await get_legacy_fixture(device)
pprint(scrub(final))
save = click.prompt(
f"Do you want to save the above content to {save_filename} (y/n)"
)
if save == "y":
click.echo(f"Saving info to {save_filename}")
with open(save_filename, "w") as f:
json.dump(final, f, sort_keys=True, indent=4)
f.write("\n")
click.echo(
f"Saved. Copy/Move {save_filename} to "
+ f"{copy_folder} to add it to the test suite"
)
else:
click.echo("Not saving.")
async def get_legacy_fixture(device):
"""Get fixture for legacy IOT style protocol."""
items = [
Call(module="system", method="get_sysinfo"),
Call(module="emeter", method="get_realtime"),
Call(module="smartlife.iot.dimmer", method="get_dimmer_parameters"),
Call(module="smartlife.iot.common.emeter", method="get_realtime"),
Call(
module="smartlife.iot.smartbulb.lightingservice", method="get_light_state"
),
Call(module="smartlife.iot.LAS", method="get_config"),
Call(module="smartlife.iot.PIR", method="get_config"),
]
successes = []
for test_call in items:
try:
click.echo(f"Testing {test_call}..", nl=False)
info = await device.protocol.query(
{test_call.module: {test_call.method: None}}
)
resp = info[test_call.module]
except Exception as ex:
click.echo(click.style(f"FAIL {ex}", fg="red"))
else:
if "err_msg" in resp:
click.echo(click.style(f"FAIL {resp['err_msg']}", fg="red"))
else:
click.echo(click.style("OK", fg="green"))
successes.append((test_call, info))
final_query = defaultdict(defaultdict)
final = defaultdict(defaultdict)
for succ, resp in successes:
final_query[succ.module][succ.method] = None
final[succ.module][succ.method] = resp
final = default_to_regular(final)
try:
final = await device.protocol.query(final_query)
except Exception as ex:
click.echo(
click.style(
f"Unable to query all successes at once: {ex}", bold=True, fg="red"
)
)
if device._discovery_info:
# Need to recreate a DiscoverResult here because we don't want the aliases
# in the fixture, we want the actual field names as returned by the device.
dr = DiscoveryResult(**device._discovery_info)
final["discovery_result"] = dr.dict(
by_alias=False, exclude_unset=True, exclude_none=True, exclude_defaults=True
)
click.echo("Got %s successes" % len(successes))
click.echo(click.style("## device info file ##", bold=True))
sysinfo = final["system"]["get_sysinfo"]
model = sysinfo["model"]
hw_version = sysinfo["hw_ver"]
sw_version = sysinfo["sw_ver"]
sw_version = sw_version.split(" ", maxsplit=1)[0]
save_filename = f"{model}_{hw_version}_{sw_version}.json"
copy_folder = "kasa/tests/fixtures/"
return save_filename, copy_folder, final
async def get_smart_fixture(device: SmartDevice):
"""Get fixture for new TAPO style protocol."""
items = [
Call(module="component_nego", method="component_nego"),
Call(module="device_info", method="get_device_info"),
Call(module="device_usage", method="get_device_usage"),
Call(module="device_time", method="get_device_time"),
Call(module="energy_usage", method="get_energy_usage"),
Call(module="current_power", method="get_current_power"),
Call(module="temp_humidity_records", method="get_temp_humidity_records"),
Call(module="child_device_list", method="get_child_device_list"),
Call(
module="trigger_logs",
method={"get_trigger_logs": {"page_size": 5, "start_id": 0}},
),
Call(
module="child_device_component_list",
method="get_child_device_component_list",
),
]
successes = []
for test_call in items:
try:
click.echo(f"Testing {test_call}..", nl=False)
response = await device.protocol.query(test_call.method)
except Exception as ex:
click.echo(click.style(f"FAIL {ex}", fg="red"))
else:
if not response:
click.echo(click.style("FAIL not suported", fg="red"))
else:
click.echo(click.style("OK", fg="green"))
successes.append(test_call)
requests = []
for succ in successes:
requests.append({"method": succ.method})
final_query = {"multipleRequest": {"requests": requests}}
try:
responses = await device.protocol.query(final_query)
except Exception as ex:
click.echo(
click.style(
f"Unable to query all successes at once: {ex}", bold=True, fg="red"
)
)
final = {}
for response in responses["responses"]:
final[response["method"]] = response["result"]
# Need to recreate a DiscoverResult here because we don't want the aliases
# in the fixture, we want the actual field names as returned by the device.
dr = DiscoveryResult(**device._discovery_info) # type: ignore
final["discovery_result"] = dr.dict(
by_alias=False, exclude_unset=True, exclude_none=True, exclude_defaults=True
)
click.echo("Got %s successes" % len(successes))
click.echo(click.style("## device info file ##", bold=True))
hw_version = final["get_device_info"]["hw_ver"]
sw_version = final["get_device_info"]["fw_ver"]
model = final["discovery_result"]["device_model"]
sw_version = sw_version.split(" ", maxsplit=1)[0]
save_filename = f"{model}_{hw_version}_{sw_version}.json"
copy_folder = "kasa/tests/fixtures/smart/"
return save_filename, copy_folder, final
if __name__ == "__main__":
cli()