Improve CLI Discovery output (#583)

- Show discovery results for unsupported devices and devices that fail to authenticate.
- Rename `--show-unsupported` to `--verbose`.
- Remove separate `--timeout` parameter from cli discovery so it's not confused with `--timeout` now added to cli command.
- Add tests.
This commit is contained in:
sdb9696
2023-12-19 12:50:33 +00:00
committed by GitHub
parent 2e6c41d039
commit 209391c422
7 changed files with 298 additions and 55 deletions

View File

@@ -18,8 +18,15 @@ from kasa import (
SmartBulb,
SmartDevice,
SmartStrip,
UnsupportedDeviceException,
)
from kasa.device_factory import DEVICE_TYPE_TO_CLASS
from kasa.discover import DiscoveryResult
try:
from pydantic.v1 import ValidationError
except ImportError:
from pydantic import ValidationError
try:
from rich import print as _do_echo
@@ -241,7 +248,7 @@ async def cli(
if host is None:
echo("No host name given, trying discovery..")
return await ctx.invoke(discover, timeout=discovery_timeout)
return await ctx.invoke(discover)
if type is not None:
device_type = DeviceType.from_value(type)
@@ -300,21 +307,21 @@ async def join(dev: SmartDevice, ssid, password, keytype):
@cli.command()
@click.option("--timeout", default=3, required=False)
@click.option(
"--show-unsupported",
envvar="KASA_SHOW_UNSUPPORTED",
"--verbose",
envvar="KASA_VERBOSE",
required=False,
default=False,
is_flag=True,
help="Print out discovered unsupported devices",
help="Be more verbose on output",
)
@click.pass_context
async def discover(ctx, timeout, show_unsupported):
async def discover(ctx, verbose):
"""Discover devices in the network."""
target = ctx.parent.params["target"]
username = ctx.parent.params["username"]
password = ctx.parent.params["password"]
timeout = ctx.parent.params["discovery_timeout"]
credentials = Credentials(username, password)
@@ -323,24 +330,37 @@ async def discover(ctx, timeout, show_unsupported):
unsupported = []
auth_failed = []
async def print_unsupported(data: str):
unsupported.append(data)
if show_unsupported:
echo(f"Found unsupported device (tapo/unknown encryption): {data}")
echo()
async def print_unsupported(unsupported_exception: UnsupportedDeviceException):
unsupported.append(unsupported_exception)
async with sem:
if unsupported_exception.discovery_result:
echo("== Unsupported device ==")
_echo_discovery_info(unsupported_exception.discovery_result)
echo()
else:
echo("== Unsupported device ==")
echo(f"\t{unsupported_exception}")
echo()
echo(f"Discovering devices on {target} for {timeout} seconds")
async def print_discovered(dev: SmartDevice):
try:
await dev.update()
async with sem:
async with sem:
try:
await dev.update()
except AuthenticationException:
auth_failed.append(dev._discovery_info)
echo("== Authentication failed for device ==")
_echo_discovery_info(dev._discovery_info)
echo()
else:
discovered[dev.host] = dev.internal_state
ctx.obj = dev
await ctx.invoke(state)
echo()
except AuthenticationException as aex:
auth_failed.append(str(aex))
if verbose:
echo()
_echo_discovery_info(dev._discovery_info)
echo()
await Discover.discover(
target=target,
@@ -352,22 +372,50 @@ async def discover(ctx, timeout, show_unsupported):
echo(f"Found {len(discovered)} devices")
if unsupported:
echo(
f"Found {len(unsupported)} unsupported devices"
+ (
""
if show_unsupported
else ", to show them use: kasa discover --show-unsupported"
)
)
echo(f"Found {len(unsupported)} unsupported devices")
if auth_failed:
echo(f"Found {len(auth_failed)} devices that failed to authenticate")
for fail in auth_failed:
echo(fail)
return discovered
def _echo_dictionary(discovery_info: dict):
echo("\t[bold]== Discovery information ==[/bold]")
for key, value in discovery_info.items():
key_name = " ".join(x.capitalize() or "_" for x in key.split("_"))
key_name_and_spaces = "{:<15}".format(key_name + ":")
echo(f"\t{key_name_and_spaces}{value}")
def _echo_discovery_info(discovery_info):
if "system" in discovery_info and "get_sysinfo" in discovery_info["system"]:
_echo_dictionary(discovery_info["system"]["get_sysinfo"])
return
try:
dr = DiscoveryResult(**discovery_info)
except ValidationError:
_echo_dictionary(discovery_info)
return
echo("\t[bold]== Discovery Result ==[/bold]")
echo(f"\tDevice Type: {dr.device_type}")
echo(f"\tDevice Model: {dr.device_model}")
echo(f"\tIP: {dr.ip}")
echo(f"\tMAC: {dr.mac}")
echo(f"\tDevice Id (hash): {dr.device_id}")
echo(f"\tOwner (hash): {dr.owner}")
echo(f"\tHW Ver: {dr.hw_ver}")
echo(f"\tIs Support IOT Cloud: {dr.is_support_iot_cloud})")
echo(f"\tOBD Src: {dr.obd_src}")
echo(f"\tFactory Default: {dr.factory_default}")
echo("\t\t== Encryption Scheme ==")
echo(f"\t\tEncrypt Type: {dr.mgt_encrypt_schm.encrypt_type}")
echo(f"\t\tIs Support HTTPS: {dr.mgt_encrypt_schm.is_support_https}")
echo(f"\t\tHTTP Port: {dr.mgt_encrypt_schm.http_port}")
echo(f"\t\tLV (Login Level): {dr.mgt_encrypt_schm.lv}")
async def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attempts=3):
"""Discover a device identified by its alias."""
for _attempt in range(1, attempts):