python-kasa/kasa/cli/discover.py
Steven B. bf8f0adabe
Return raw discovery result in cli discover raw ()
Add `on_discovered_raw` callback to Discover and adds a cli command `discover raw` which returns the raw json before serializing to a `DiscoveryResult` and attempting to create a device class.
2024-12-10 22:42:14 +00:00

335 lines
11 KiB
Python

"""Module for cli discovery commands."""
from __future__ import annotations
import asyncio
from pprint import pformat as pf
import asyncclick as click
from kasa import (
AuthenticationError,
Credentials,
Device,
Discover,
UnsupportedDeviceError,
)
from kasa.discover import (
NEW_DISCOVERY_REDACTORS,
ConnectAttempt,
DiscoveredRaw,
DiscoveryResult,
)
from kasa.iot.iotdevice import _extract_sys_info
from kasa.protocols.iotprotocol import REDACTORS as IOT_REDACTORS
from kasa.protocols.protocol import redact_data
from ..json import dumps as json_dumps
from .common import echo, error
@click.group(invoke_without_command=True)
@click.pass_context
async def discover(ctx):
"""Discover devices in the network."""
if ctx.invoked_subcommand is None:
return await ctx.invoke(detail)
@discover.command()
@click.pass_context
async def detail(ctx):
"""Discover devices in the network using udp broadcasts."""
unsupported = []
auth_failed = []
sem = asyncio.Semaphore()
async def print_unsupported(unsupported_exception: UnsupportedDeviceError) -> None:
unsupported.append(unsupported_exception)
async with sem:
if unsupported_exception.discovery_result:
echo("== Unsupported device ==")
_echo_discovery_info(unsupported_exception.discovery_result)
echo()
else:
echo("== Unsupported device ==")
echo(f"\t{unsupported_exception}")
echo()
from .device import state
async def print_discovered(dev: Device) -> None:
async with sem:
try:
await dev.update()
except AuthenticationError:
auth_failed.append(dev._discovery_info)
echo("== Authentication failed for device ==")
_echo_discovery_info(dev._discovery_info)
echo()
else:
ctx.parent.obj = dev
await ctx.parent.invoke(state)
echo()
discovered = await _discover(
ctx, print_discovered=print_discovered, print_unsupported=print_unsupported
)
if ctx.parent.parent.params["host"]:
return discovered
echo(f"Found {len(discovered)} devices")
if unsupported:
echo(f"Found {len(unsupported)} unsupported devices")
if auth_failed:
echo(f"Found {len(auth_failed)} devices that failed to authenticate")
return discovered
@discover.command()
@click.option(
"--redact/--no-redact",
default=False,
is_flag=True,
type=bool,
help="Set flag to redact sensitive data from raw output.",
)
@click.pass_context
async def raw(ctx, redact: bool):
"""Return raw discovery data returned from devices."""
def print_raw(discovered: DiscoveredRaw):
if redact:
redactors = (
NEW_DISCOVERY_REDACTORS
if discovered["meta"]["port"] == Discover.DISCOVERY_PORT_2
else IOT_REDACTORS
)
discovered["discovery_response"] = redact_data(
discovered["discovery_response"], redactors
)
echo(json_dumps(discovered, indent=True))
return await _discover(ctx, print_raw=print_raw, do_echo=False)
@discover.command()
@click.pass_context
async def list(ctx):
"""List devices in the network in a table using udp broadcasts."""
sem = asyncio.Semaphore()
async def print_discovered(dev: Device):
cparams = dev.config.connection_type
infostr = (
f"{dev.host:<15} {cparams.device_family.value:<20} "
f"{cparams.encryption_type.value:<7}"
)
async with sem:
try:
await dev.update()
except AuthenticationError:
echo(f"{infostr} - Authentication failed")
else:
echo(f"{infostr} {dev.alias}")
async def print_unsupported(unsupported_exception: UnsupportedDeviceError):
if host := unsupported_exception.host:
echo(f"{host:<15} UNSUPPORTED DEVICE")
echo(f"{'HOST':<15} {'DEVICE FAMILY':<20} {'ENCRYPT':<7} {'ALIAS'}")
return await _discover(
ctx,
print_discovered=print_discovered,
print_unsupported=print_unsupported,
do_echo=False,
)
async def _discover(
ctx, *, print_discovered=None, print_unsupported=None, print_raw=None, do_echo=True
):
params = ctx.parent.parent.params
target = params["target"]
username = params["username"]
password = params["password"]
discovery_timeout = params["discovery_timeout"]
timeout = params["timeout"]
host = params["host"]
port = params["port"]
credentials = Credentials(username, password) if username and password else None
if host:
echo(f"Discovering device {host} for {discovery_timeout} seconds")
return await Discover.discover_single(
host,
port=port,
credentials=credentials,
timeout=timeout,
discovery_timeout=discovery_timeout,
on_unsupported=print_unsupported,
on_discovered_raw=print_raw,
)
if do_echo:
echo(f"Discovering devices on {target} for {discovery_timeout} seconds")
discovered_devices = await Discover.discover(
target=target,
discovery_timeout=discovery_timeout,
on_discovered=print_discovered,
on_unsupported=print_unsupported,
port=port,
timeout=timeout,
credentials=credentials,
on_discovered_raw=print_raw,
)
for device in discovered_devices.values():
await device.protocol.close()
return discovered_devices
@discover.command()
@click.pass_context
async def config(ctx):
"""Bypass udp discovery and try to show connection config for a device.
Bypasses udp discovery and shows the parameters required to connect
directly to the device.
"""
params = ctx.parent.parent.params
username = params["username"]
password = params["password"]
timeout = params["timeout"]
host = params["host"]
port = params["port"]
if not host:
error("--host option must be supplied to discover config")
credentials = Credentials(username, password) if username and password else None
host_port = host + (f":{port}" if port else "")
def on_attempt(connect_attempt: ConnectAttempt, success: bool) -> None:
prot, tran, dev = connect_attempt
key_str = f"{prot.__name__} + {tran.__name__} + {dev.__name__}"
result = "succeeded" if success else "failed"
msg = f"Attempt to connect to {host_port} with {key_str} {result}"
echo(msg)
dev = await Discover.try_connect_all(
host, credentials=credentials, timeout=timeout, port=port, on_attempt=on_attempt
)
if dev:
cparams = dev.config.connection_type
echo("Managed to connect, cli options to connect are:")
echo(
f"--device-family {cparams.device_family.value} "
f"--encrypt-type {cparams.encryption_type.value} "
f"{'--https' if cparams.https else '--no-https'}"
)
else:
error(f"Unable to connect to {host}")
def _echo_dictionary(discovery_info: dict) -> None:
echo("\t[bold]== Discovery information ==[/bold]")
for key, value in discovery_info.items():
key_name = " ".join(x.capitalize() or "_" for x in key.split("_"))
key_name_and_spaces = "{:<15}".format(key_name + ":")
echo(f"\t{key_name_and_spaces}{value}")
def _echo_discovery_info(discovery_info) -> None:
# We don't have discovery info when all connection params are passed manually
if discovery_info is None:
return
if sysinfo := _extract_sys_info(discovery_info):
_echo_dictionary(sysinfo)
return
try:
dr = DiscoveryResult.from_dict(discovery_info)
except Exception:
_echo_dictionary(discovery_info)
return
def _conditional_echo(label, value):
if value:
ws = " " * (19 - len(label))
echo(f"\t{label}:{ws}{value}")
echo("\t[bold]== Discovery Result ==[/bold]")
_conditional_echo("Device Type", dr.device_type)
_conditional_echo("Device Model", dr.device_model)
_conditional_echo("Device Name", dr.device_name)
_conditional_echo("IP", dr.ip)
_conditional_echo("MAC", dr.mac)
_conditional_echo("Device Id (hash)", dr.device_id)
_conditional_echo("Owner (hash)", dr.owner)
_conditional_echo("FW Ver", dr.firmware_version)
_conditional_echo("HW Ver", dr.hw_ver)
_conditional_echo("HW Ver", dr.hardware_version)
_conditional_echo("Supports IOT Cloud", dr.is_support_iot_cloud)
_conditional_echo("OBD Src", dr.owner)
_conditional_echo("Factory Default", dr.factory_default)
_conditional_echo("Encrypt Type", dr.encrypt_type)
if mgt_encrypt_schm := dr.mgt_encrypt_schm:
_conditional_echo("Encrypt Type", mgt_encrypt_schm.encrypt_type)
_conditional_echo("Supports HTTPS", mgt_encrypt_schm.is_support_https)
_conditional_echo("HTTP Port", mgt_encrypt_schm.http_port)
_conditional_echo("Login version", mgt_encrypt_schm.lv)
_conditional_echo("Encrypt info", pf(dr.encrypt_info) if dr.encrypt_info else None)
_conditional_echo("Decrypted", pf(dr.decrypted_data) if dr.decrypted_data else None)
async def find_dev_from_alias(
alias: str,
credentials: Credentials | None,
target: str = "255.255.255.255",
timeout: int = 5,
attempts: int = 3,
) -> Device | None:
"""Discover a device identified by its alias."""
found_event = asyncio.Event()
found_device = []
seen_hosts = set()
async def on_discovered(dev: Device):
if dev.host in seen_hosts:
return
seen_hosts.add(dev.host)
try:
await dev.update()
except Exception as ex:
echo(f"Error querying device {dev.host}: {ex}")
return
finally:
await dev.protocol.close()
if not dev.alias:
echo(f"Skipping device {dev.host} with no alias")
return
if dev.alias.lower() == alias.lower():
found_device.append(dev)
found_event.set()
async def do_discover():
for _ in range(1, attempts):
await Discover.discover(
target=target,
timeout=timeout,
credentials=credentials,
on_discovered=on_discovered,
)
if found_event.is_set():
break
found_event.set()
asyncio.create_task(do_discover())
await found_event.wait()
return found_device[0] if found_device else None