Use rich for prettier output, if available (#403)

Use rich for prettier output, if available.

This does not add a new dependency, but rather uses rich if it's installed.
This commit is contained in:
Teemu R 2023-02-18 17:31:06 +01:00 committed by GitHub
parent 334ba1713a
commit dd044130d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 152 additions and 141 deletions

View File

@ -3,12 +3,12 @@
import json
from collections import Counter, defaultdict
from pprint import pformat as pf
from pprint import pprint as pp
import click
import dpkt
from dpkt.ethernet import ETH_TYPE_IP, Ethernet
from kasa.cli import echo
from kasa.protocol import TPLinkSmartHomeProtocol
@ -36,21 +36,20 @@ def read_payloads_from_file(file):
try:
decrypted = TPLinkSmartHomeProtocol.decrypt(data[4:])
except Exception as ex:
click.echo(
click.style(f"Unable to decrypt the data, ignoring: {ex}", fg="red")
)
echo(f"[red]Unable to decrypt the data, ignoring: {ex}[/red]")
continue
if not decrypted: # skip empty payloads
continue
try:
json_payload = json.loads(decrypted)
except Exception as ex:
click.echo(
click.style(f"Unable to parse payload, ignoring: {ex}", fg="red")
)
except Exception as ex: # this can happen when the response is split into multiple tcp segments
echo(f"[red]Unable to parse payload '{decrypted}', ignoring: {ex}[/red]")
continue
if not json_payload: # ignore empty payloads
click.echo(click.style("Got empty payload, ignoring", fg="red"))
echo("[red]Got empty payload, ignoring[/red]")
continue
yield json_payload
@ -67,7 +66,7 @@ def parse_pcap(file):
for module, cmds in json_payload.items():
seen_items["modules"][module] += 1
if "err_code" in cmds:
click.echo(click.style("Got error for module: %s" % cmds, fg="red"))
echo("[red]Got error for module: %s[/red]" % cmds)
continue
for cmd, response in cmds.items():
@ -76,30 +75,24 @@ def parse_pcap(file):
if response is None:
continue
direction = ">>"
style = {}
if response is None:
print("got none as response for %s, weird?" % (cmd))
echo(f"got none as response for {cmd} %s, weird?")
continue
is_success = "[green]+[/green]"
if "err_code" in response:
direction = "<<"
if response["err_code"] != 0:
seen_items["errorcodes"][response["err_code"]] += 1
seen_items["errors"][response["err_msg"]] += 1
print(response)
style = {"bold": True, "fg": "red"}
else:
style = {"fg": "green"}
is_success = "[red]![/red]"
context_str = f" [ctx: {context}]" if context else ""
click.echo(
click.style(
f"{direction}{context_str} {module}.{cmd}: {pf(response)}",
**style,
)
echo(
f"[{is_success}] {direction}{context_str} {module}.{cmd}: {pf(response)}"
)
pp(seen_items)
echo(pf(seen_items))
if __name__ == "__main__":

View File

@ -1,12 +1,33 @@
"""python-kasa cli tool."""
import asyncio
import json
import logging
import re
import sys
from functools import wraps
from pprint import pformat as pf
from typing import cast
from typing import Any, Dict, cast
import asyncclick as click
try:
from rich import print as echo
except ImportError:
def _strip_rich_formatting(echo_func):
"""Strip rich formatting from messages."""
@wraps(echo_func)
def wrapper(message=None, *args, **kwargs):
if message is not None:
message = re.sub(r"\[/?.+?]", "", message)
echo_func(message, *args, **kwargs)
return wrapper
echo = _strip_rich_formatting(click.echo)
from kasa import (
Discover,
SmartBulb,
@ -69,32 +90,44 @@ async def cli(ctx, host, alias, target, debug, type):
ctx.obj = SmartDevice(None)
return
if debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging_config: Dict[str, Any] = {
"level": logging.DEBUG if debug > 0 else logging.INFO
}
try:
from rich.logging import RichHandler
rich_config = {
"show_time": False,
}
logging_config["handlers"] = [RichHandler(**rich_config)]
logging_config["format"] = "%(message)s"
except ImportError:
pass
# The configuration should be converted to use dictConfig, but this keeps mypy happy for now
logging.basicConfig(**logging_config) # type: ignore
if ctx.invoked_subcommand == "discover":
return
if alias is not None and host is None:
click.echo(f"Alias is given, using discovery to find host {alias}")
echo(f"Alias is given, using discovery to find host {alias}")
host = await find_host_from_alias(alias=alias, target=target)
if host:
click.echo(f"Found hostname is {host}")
echo(f"Found hostname is {host}")
else:
click.echo(f"No device with name {alias} found")
echo(f"No device with name {alias} found")
return
if host is None:
click.echo("No host name given, trying discovery..")
echo("No host name given, trying discovery..")
await ctx.invoke(discover)
return
if type is not None:
dev = TYPE_TO_CLASS[type](host)
else:
click.echo("No --type defined, discovering..")
echo("No --type defined, discovering..")
dev = await Discover.discover_single(host)
await dev.update()
@ -114,11 +147,11 @@ def wifi(dev):
@pass_dev
async def scan(dev):
"""Scan for available wifi networks."""
click.echo("Scanning for wifi networks, wait a second..")
echo("Scanning for wifi networks, wait a second..")
devs = await dev.wifi_scan()
click.echo(f"Found {len(devs)} wifi networks!")
echo(f"Found {len(devs)} wifi networks!")
for dev in devs:
click.echo(f"\t {dev}")
echo(f"\t {dev}")
return devs
@ -130,9 +163,9 @@ async def scan(dev):
@pass_dev
async def join(dev: SmartDevice, ssid, password, keytype):
"""Join the given wifi network."""
click.echo(f"Asking the device to connect to {ssid}..")
echo(f"Asking the device to connect to {ssid}..")
res = await dev.wifi_join(ssid, password, keytype=keytype)
click.echo(
echo(
f"Response: {res} - if the device is not able to join the network, it will revert back to its previous state."
)
@ -145,7 +178,7 @@ async def join(dev: SmartDevice, ssid, password, keytype):
async def discover(ctx, timeout):
"""Discover devices in the network."""
target = ctx.parent.params["target"]
click.echo(f"Discovering devices on {target} for {timeout} seconds")
echo(f"Discovering devices on {target} for {timeout} seconds")
sem = asyncio.Semaphore()
async def print_discovered(dev: SmartDevice):
@ -153,7 +186,7 @@ async def discover(ctx, timeout):
async with sem:
ctx.obj = dev
await ctx.invoke(state)
click.echo()
echo()
await Discover.discover(
target=target, timeout=timeout, on_discovered=print_discovered
@ -176,8 +209,8 @@ async def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attem
@pass_dev
async def sysinfo(dev):
"""Print out full system information."""
click.echo(click.style("== System info ==", bold=True))
click.echo(pf(dev.sys_info))
echo("== System info ==")
echo(pf(dev.sys_info))
return dev.sys_info
@ -185,56 +218,42 @@ async def sysinfo(dev):
@pass_dev
async def state(dev: SmartDevice):
"""Print out device state and versions."""
click.echo(click.style(f"== {dev.alias} - {dev.model} ==", bold=True))
click.echo(f"\tHost: {dev.host}")
click.echo(
click.style(
"\tDevice state: {}\n".format("ON" if dev.is_on else "OFF"),
fg="green" if dev.is_on else "red",
)
)
echo(f"[bold]== {dev.alias} - {dev.model} ==[/bold]")
echo(f"\tHost: {dev.host}")
echo(f"\tDevice state: {dev.is_on}")
if dev.is_strip:
click.echo(click.style("\t== Plugs ==", bold=True))
echo("\t[bold]== Plugs ==[/bold]")
for plug in dev.children: # type: ignore
is_on = plug.is_on
alias = plug.alias
click.echo(
click.style(
"\t* Socket '{}' state: {} on_since: {}".format(
alias, ("ON" if is_on else "OFF"), plug.on_since
),
fg="green" if is_on else "red",
)
)
click.echo()
echo(f"\t* Socket '{plug.alias}' state: {plug.is_on} since {plug.on_since}")
echo()
click.echo(click.style("\t== Generic information ==", bold=True))
click.echo(f"\tTime: {dev.time} (tz: {dev.timezone}")
click.echo(f"\tHardware: {dev.hw_info['hw_ver']}")
click.echo(f"\tSoftware: {dev.hw_info['sw_ver']}")
click.echo(f"\tMAC (rssi): {dev.mac} ({dev.rssi})")
click.echo(f"\tLocation: {dev.location}")
echo("\t[bold]== Generic information ==[/bold]")
echo(f"\tTime: {dev.time} (tz: {dev.timezone}")
echo(f"\tHardware: {dev.hw_info['hw_ver']}")
echo(f"\tSoftware: {dev.hw_info['sw_ver']}")
echo(f"\tMAC (rssi): {dev.mac} ({dev.rssi})")
echo(f"\tLocation: {dev.location}")
click.echo(click.style("\n\t== Device specific information ==", bold=True))
echo("\n\t[bold]== Device specific information ==[/bold]")
for info_name, info_data in dev.state_information.items():
if isinstance(info_data, list):
click.echo(f"\t{info_name}:")
echo(f"\t{info_name}:")
for item in info_data:
click.echo(f"\t\t{item}")
echo(f"\t\t{item}")
else:
click.echo(f"\t{info_name}: {info_data}")
echo(f"\t{info_name}: {info_data}")
if dev.has_emeter:
click.echo(click.style("\n\t== Current State ==", bold=True))
echo("\n\t[bold]== Current State ==[/bold]")
emeter_status = dev.emeter_realtime
click.echo(f"\t{emeter_status}")
echo(f"\t{emeter_status}")
click.echo(click.style("\n\t== Modules ==", bold=True))
echo("\n\t[bold]== Modules ==[/bold]")
for module in dev.modules.values():
if module.is_supported:
click.echo(click.style(f"\t+ {module}", fg="green"))
echo(f"\t[green]+ {module}[/green]")
else:
click.echo(click.style(f"\t- {module}", fg="red"))
echo(f"\t[red]- {module}[/red]")
@cli.command()
@ -245,20 +264,20 @@ async def alias(dev, new_alias, index):
"""Get or set the device (or plug) alias."""
if index is not None:
if not dev.is_strip:
click.echo("Index can only used for power strips!")
echo("Index can only used for power strips!")
return
dev = cast(SmartStrip, dev)
dev = dev.get_plug_by_index(index)
if new_alias is not None:
click.echo(f"Setting alias to {new_alias}")
echo(f"Setting alias to {new_alias}")
res = await dev.set_alias(new_alias)
return res
click.echo(f"Alias: {dev.alias}")
echo(f"Alias: {dev.alias}")
if dev.is_strip:
for plug in dev.children:
click.echo(f" * {plug.alias}")
echo(f" * {plug.alias}")
@cli.command()
@ -274,8 +293,7 @@ async def raw_command(dev: SmartDevice, module, command, parameters):
parameters = ast.literal_eval(parameters)
res = await dev._query_helper(module, command, parameters)
click.echo(res)
echo(json.dumps(res))
return res
@ -289,41 +307,41 @@ async def emeter(dev: SmartDevice, year, month, erase):
Daily and monthly data provided in CSV format.
"""
click.echo(click.style("== Emeter ==", bold=True))
echo("[bold]== Emeter ==[/bold]")
if not dev.has_emeter:
click.echo("Device has no emeter")
echo("Device has no emeter")
return
if erase:
click.echo("Erasing emeter statistics..")
click.echo(await dev.erase_emeter_stats())
echo("Erasing emeter statistics..")
echo(await dev.erase_emeter_stats())
return
if year:
click.echo(f"== For year {year.year} ==")
click.echo("Month, usage (kWh)")
echo(f"== For year {year.year} ==")
echo("Month, usage (kWh)")
usage_data = await dev.get_emeter_monthly(year=year.year)
elif month:
click.echo(f"== For month {month.month} of {month.year} ==")
click.echo("Day, usage (kWh)")
echo(f"== For month {month.month} of {month.year} ==")
echo("Day, usage (kWh)")
usage_data = await dev.get_emeter_daily(year=month.year, month=month.month)
else:
# Call with no argument outputs summary data and returns
emeter_status = dev.emeter_realtime
click.echo("Current: %s A" % emeter_status["current"])
click.echo("Voltage: %s V" % emeter_status["voltage"])
click.echo("Power: %s W" % emeter_status["power"])
click.echo("Total consumption: %s kWh" % emeter_status["total"])
echo("Current: %s A" % emeter_status["current"])
echo("Voltage: %s V" % emeter_status["voltage"])
echo("Power: %s W" % emeter_status["power"])
echo("Total consumption: %s kWh" % emeter_status["total"])
click.echo("Today: %s kWh" % dev.emeter_today)
click.echo("This month: %s kWh" % dev.emeter_this_month)
echo("Today: %s kWh" % dev.emeter_today)
echo("This month: %s kWh" % dev.emeter_this_month)
return
# output any detailed usage data
for index, usage in usage_data.items():
click.echo(f"{index}, {usage}")
echo(f"{index}, {usage}")
@cli.command()
@ -336,32 +354,32 @@ async def usage(dev: SmartDevice, year, month, erase):
Daily and monthly data provided in CSV format.
"""
click.echo(click.style("== Usage ==", bold=True))
echo("[bold]== Usage ==[/bold]")
usage = dev.modules["usage"]
if erase:
click.echo("Erasing usage statistics..")
click.echo(await usage.erase_stats())
echo("Erasing usage statistics..")
echo(await usage.erase_stats())
return
if year:
click.echo(f"== For year {year.year} ==")
click.echo("Month, usage (minutes)")
usage_data = await usage.get_monthstat(year.year)
echo(f"== For year {year.year} ==")
echo("Month, usage (minutes)")
usage_data = await usage.get_monthstat(year=year.year)
elif month:
click.echo(f"== For month {month.month} of {month.year} ==")
click.echo("Day, usage (minutes)")
echo(f"== For month {month.month} of {month.year} ==")
echo("Day, usage (minutes)")
usage_data = await usage.get_daystat(year=month.year, month=month.month)
else:
# Call with no argument outputs summary data and returns
click.echo("Today: %s minutes" % usage.usage_today)
click.echo("This month: %s minutes" % usage.usage_this_month)
echo("Today: %s minutes" % usage.usage_today)
echo("This month: %s minutes" % usage.usage_this_month)
return
# output any detailed usage data
for index, usage in usage_data.items():
click.echo(f"{index}, {usage}")
echo(f"{index}, {usage}")
@cli.command()
@ -371,13 +389,13 @@ async def usage(dev: SmartDevice, year, month, erase):
async def brightness(dev: SmartBulb, brightness: int, transition: int):
"""Get or set brightness."""
if not dev.is_dimmable:
click.echo("This device does not support brightness.")
echo("This device does not support brightness.")
return
if brightness is None:
click.echo(f"Brightness: {dev.brightness}")
echo(f"Brightness: {dev.brightness}")
else:
click.echo(f"Setting brightness to {brightness}")
echo(f"Setting brightness to {brightness}")
return await dev.set_brightness(brightness, transition=transition)
@ -390,21 +408,21 @@ async def brightness(dev: SmartBulb, brightness: int, transition: int):
async def temperature(dev: SmartBulb, temperature: int, transition: int):
"""Get or set color temperature."""
if not dev.is_variable_color_temp:
click.echo("Device does not support color temperature")
echo("Device does not support color temperature")
return
if temperature is None:
click.echo(f"Color temperature: {dev.color_temp}")
echo(f"Color temperature: {dev.color_temp}")
valid_temperature_range = dev.valid_temperature_range
if valid_temperature_range != (0, 0):
click.echo("(min: {}, max: {})".format(*valid_temperature_range))
echo("(min: {}, max: {})".format(*valid_temperature_range))
else:
click.echo(
echo(
"Temperature range unknown, please open a github issue"
f" or a pull request for model '{dev.model}'"
)
else:
click.echo(f"Setting color temperature to {temperature}")
echo(f"Setting color temperature to {temperature}")
return await dev.set_color_temp(temperature, transition=transition)
@ -415,7 +433,7 @@ async def temperature(dev: SmartBulb, temperature: int, transition: int):
async def effect(dev, ctx, effect):
"""Set an effect."""
if not dev.has_effects:
click.echo("Device does not support effects")
echo("Device does not support effects")
return
if effect is None:
raise click.BadArgumentUsage(
@ -425,7 +443,7 @@ async def effect(dev, ctx, effect):
if effect not in dev.effect_list:
raise click.BadArgumentUsage(f"Effect must be one of: {dev.effect_list}", ctx)
click.echo(f"Setting Effect: {effect}")
echo(f"Setting Effect: {effect}")
return await dev.set_effect(effect)
@ -439,15 +457,15 @@ async def effect(dev, ctx, effect):
async def hsv(dev, ctx, h, s, v, transition):
"""Get or set color in HSV."""
if not dev.is_color:
click.echo("Device does not support colors")
echo("Device does not support colors")
return
if h is None or s is None or v is None:
click.echo(f"Current HSV: {dev.hsv}")
echo(f"Current HSV: {dev.hsv}")
elif s is None or v is None:
raise click.BadArgumentUsage("Setting a color requires 3 values.", ctx)
else:
click.echo(f"Setting HSV: {h} {s} {v}")
echo(f"Setting HSV: {h} {s} {v}")
return await dev.set_hsv(h, s, v, transition=transition)
@ -457,10 +475,10 @@ async def hsv(dev, ctx, h, s, v, transition):
async def led(dev, state):
"""Get or set (Plug's) led state."""
if state is not None:
click.echo(f"Turning led to {state}")
echo(f"Turning led to {state}")
return await dev.set_led(state)
else:
click.echo(f"LED state: {dev.led}")
echo(f"LED state: {dev.led}")
@cli.command()
@ -468,7 +486,7 @@ async def led(dev, state):
async def time(dev):
"""Get the device time."""
res = dev.time
click.echo(f"Current time: {res}")
echo(f"Current time: {res}")
return res
@ -481,7 +499,7 @@ async def on(dev: SmartDevice, index: int, name: str, transition: int):
"""Turn the device on."""
if index is not None or name is not None:
if not dev.is_strip:
click.echo("Index and name are only for power strips!")
echo("Index and name are only for power strips!")
return
dev = cast(SmartStrip, dev)
@ -490,7 +508,7 @@ async def on(dev: SmartDevice, index: int, name: str, transition: int):
elif name:
dev = dev.get_plug_by_name(name)
click.echo(f"Turning on {dev.alias}")
echo(f"Turning on {dev.alias}")
return await dev.turn_on(transition=transition)
@ -503,7 +521,7 @@ async def off(dev: SmartDevice, index: int, name: str, transition: int):
"""Turn the device off."""
if index is not None or name is not None:
if not dev.is_strip:
click.echo("Index and name are only for power strips!")
echo("Index and name are only for power strips!")
return
dev = cast(SmartStrip, dev)
@ -512,7 +530,7 @@ async def off(dev: SmartDevice, index: int, name: str, transition: int):
elif name:
dev = dev.get_plug_by_name(name)
click.echo(f"Turning off {dev.alias}")
echo(f"Turning off {dev.alias}")
return await dev.turn_off(transition=transition)
@ -521,7 +539,7 @@ async def off(dev: SmartDevice, index: int, name: str, transition: int):
@pass_dev
async def reboot(plug, delay):
"""Reboot the device."""
click.echo("Rebooting the device..")
echo("Rebooting the device..")
return await plug.reboot(delay)
@ -540,7 +558,7 @@ def _schedule_list(dev, type):
for rule in sched.rules:
print(rule)
else:
click.echo(f"No rules of type {type}")
echo(f"No rules of type {type}")
@schedule.command(name="delete")
@ -551,10 +569,10 @@ async def delete_rule(dev, id):
schedule = dev.modules["schedule"]
rule_to_delete = next(filter(lambda rule: (rule.id == id), schedule.rules), None)
if rule_to_delete:
click.echo(f"Deleting rule id {id}")
echo(f"Deleting rule id {id}")
await schedule.delete_rule(rule_to_delete)
else:
click.echo(f"No rule with id {id} was found")
echo(f"No rule with id {id} was found")
@cli.group(invoke_without_command=True)
@ -570,7 +588,7 @@ async def presets(ctx):
def presets_list(dev: SmartBulb):
"""List presets."""
if not dev.is_bulb:
click.echo("Presets only supported on bulbs")
echo("Presets only supported on bulbs")
return
for preset in dev.presets:
@ -592,7 +610,7 @@ async def presets_modify(
if preset.index == index:
break
else:
click.echo(f"No preset found for index {index}")
echo(f"No preset found for index {index}")
return
if brightness is not None:
@ -604,7 +622,7 @@ async def presets_modify(
if temperature is not None:
preset.color_temp = temperature
click.echo(f"Going to save preset: {preset}")
echo(f"Going to save preset: {preset}")
await dev.save_preset(preset)
@ -617,7 +635,7 @@ async def presets_modify(
async def turn_on_behavior(dev: SmartBulb, type, last, preset):
"""Modify bulb turn-on behavior."""
settings = await dev.get_turn_on_behavior()
click.echo(f"Current turn on behavior: {settings}")
echo(f"Current turn on behavior: {settings}")
# Return if we are not setting the value
if not type and not last and not preset:
@ -625,16 +643,16 @@ async def turn_on_behavior(dev: SmartBulb, type, last, preset):
# If we are setting the value, the type has to be specified
if (last or preset) and type is None:
click.echo("To set the behavior, you need to define --type")
echo("To set the behavior, you need to define --type")
return
behavior = getattr(settings, type)
if last:
click.echo(f"Going to set {type} to last")
echo(f"Going to set {type} to last")
behavior.preset = None
elif preset is not None:
click.echo(f"Going to set {type} to preset {preset}")
echo(f"Going to set {type} to preset {preset}")
behavior.preset = preset
await dev.set_turn_on_behavior(settings)

View File

@ -33,9 +33,9 @@ async def test_state(dev, turn_on):
await dev.update()
if dev.is_on:
assert "Device state: ON" in res.output
assert "Device state: True" in res.output
else:
assert "Device state: OFF" in res.output
assert "Device state: False" in res.output
async def test_alias(dev):