Update dump_devinfo to produce new TAPO/SMART fixtures (#561)

This commit is contained in:
sdb9696 2023-12-02 16:33:35 +00:00 committed by GitHub
parent 63d64ad920
commit a6b7d73d79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 102 additions and 14 deletions

View File

@ -7,6 +7,7 @@ If you have new, yet unsupported device or a device with no devinfo file under
Executing this script will several modules and methods one by one,
and finally execute a query to query all of them at once.
"""
import base64
import collections.abc
import json
import logging
@ -16,8 +17,9 @@ from pprint import pprint
import asyncclick as click
from kasa import Credentials, Discover
from kasa import Credentials, Discover, SmartDevice
from kasa.discover import DiscoveryResult
from kasa.tapo.tapodevice import TapoDevice
Call = namedtuple("Call", "module method")
@ -53,6 +55,9 @@ def scrub(res):
v = 0
elif k in ["ip"]:
v = "127.0.0.123"
elif k in ["ssid"]:
# Need a valid base64 value here
v = base64.b64encode(b"##MASKEDNAME##").decode()
else:
v = re.sub(r"\w", "0", v)
@ -92,6 +97,28 @@ async def cli(host, debug, username, password):
if debug:
logging.basicConfig(level=logging.DEBUG)
credentials = Credentials(username=username, password=password)
device = await Discover.discover_single(host, credentials=credentials)
if isinstance(device, TapoDevice):
save_to, final = await get_smart_fixture(device)
else:
save_to, final = await get_legacy_fixture(device)
pprint(scrub(final))
save = click.prompt(f"Do you want to save the above content to {save_to} (y/n)")
if save == "y":
click.echo(f"Saving info to {save_to}")
with open(save_to, "w") as f:
json.dump(final, f, sort_keys=True, indent=4)
f.write("\n")
else:
click.echo("Not saving.")
async def get_legacy_fixture(device):
"""Get fixture for legacy IOT style protocol."""
items = [
Call(module="system", method="get_sysinfo"),
Call(module="emeter", method="get_realtime"),
@ -106,9 +133,6 @@ async def cli(host, debug, username, password):
successes = []
credentials = Credentials(username=username, password=password)
device = await Discover.discover_single(host, credentials=credentials)
for test_call in items:
try:
click.echo(f"Testing {test_call}..", nl=False)
@ -127,7 +151,6 @@ async def cli(host, debug, username, password):
final_query = defaultdict(defaultdict)
final = defaultdict(defaultdict)
for succ, resp in successes:
final_query[succ.module][succ.method] = None
final[succ.module][succ.method] = resp
@ -160,16 +183,74 @@ async def cli(host, debug, username, password):
sw_version = sysinfo["sw_ver"]
sw_version = sw_version.split(" ", maxsplit=1)[0]
save_to = f"{model}_{hw_version}_{sw_version}.json"
pprint(scrub(final))
save = click.prompt(f"Do you want to save the above content to {save_to} (y/n)")
if save == "y":
click.echo(f"Saving info to {save_to}")
return save_to, final
with open(save_to, "w") as f:
json.dump(final, f, sort_keys=True, indent=4)
f.write("\n")
else:
click.echo("Not saving.")
async def get_smart_fixture(device: SmartDevice):
"""Get fixture for new TAPO style protocol."""
items = [
Call(module="component_nego", method="component_nego"),
Call(module="device_info", method="get_device_info"),
Call(module="device_usage", method="get_device_usage"),
Call(module="device_time", method="get_device_time"),
Call(module="energy_usage", method="get_energy_usage"),
Call(module="current_power", method="get_current_power"),
Call(
module="child_device_component_list",
method="get_child_device_component_list",
),
]
successes = []
for test_call in items:
try:
click.echo(f"Testing {test_call}..", nl=False)
response = await device.protocol.query(test_call.method)
except Exception as ex:
click.echo(click.style(f"FAIL {ex}", fg="red"))
else:
if not response:
click.echo(click.style("FAIL not suported", fg="red"))
else:
click.echo(click.style("OK", fg="green"))
successes.append(test_call)
requests = []
for succ in successes:
requests.append({"method": succ.method})
final_query = {"multipleRequest": {"requests": requests}}
try:
responses = await device.protocol.query(final_query)
except Exception as ex:
click.echo(
click.style(
f"Unable to query all successes at once: {ex}", bold=True, fg="red"
)
)
final = {}
for response in responses["responses"]:
final[response["method"]] = response["result"]
if device._discovery_info:
# Need to recreate a DiscoverResult here because we don't want the aliases
# in the fixture, we want the actual field names as returned by the device.
dr = DiscoveryResult(**device._discovery_info)
final["discovery_result"] = dr.dict(
by_alias=False, exclude_unset=True, exclude_none=True, exclude_defaults=True
)
click.echo("Got %s successes" % len(successes))
click.echo(click.style("## device info file ##", bold=True))
hw_version = final["get_device_info"]["hw_ver"]
sw_version = final["get_device_info"]["fw_ver"]
model = final["get_device_info"]["model"]
sw_version = sw_version.split(" ", maxsplit=1)[0]
return f"{model}.smart_{hw_version}_{sw_version}.json", final
if __name__ == "__main__":

5
kasa/tapo/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""Package for supporting tapo-branded and newer kasa devices."""
from .tapodevice import TapoDevice
from .tapoplug import TapoPlug
__all__ = ["TapoDevice", "TapoPlug"]

View File

@ -33,11 +33,13 @@ class TapoDevice(SmartDevice):
if self.credentials is None or self.credentials.username is None:
raise AuthenticationException("Tapo plug requires authentication.")
self._components = await self.protocol.query("component_nego")
self._info = await self.protocol.query("get_device_info")
self._usage = await self.protocol.query("get_device_usage")
self._time = await self.protocol.query("get_device_time")
self._last_update = self._data = {
"components": self._components,
"info": self._info,
"usage": self._usage,
"time": self._time,