From dd044130d402193db3d630b6f89c7ddbc4369042 Mon Sep 17 00:00:00 2001 From: Teemu R Date: Sat, 18 Feb 2023 17:31:06 +0100 Subject: [PATCH] Use rich for prettier output, if available (#403) Use rich for prettier output, if available. This does not add a new dependency, but rather uses rich if it's installed. --- devtools/parse_pcap.py | 37 +++--- kasa/cli.py | 252 ++++++++++++++++++++++------------------- kasa/tests/test_cli.py | 4 +- 3 files changed, 152 insertions(+), 141 deletions(-) diff --git a/devtools/parse_pcap.py b/devtools/parse_pcap.py index 305fcc57..3044aee3 100644 --- a/devtools/parse_pcap.py +++ b/devtools/parse_pcap.py @@ -3,12 +3,12 @@ import json from collections import Counter, defaultdict from pprint import pformat as pf -from pprint import pprint as pp import click import dpkt from dpkt.ethernet import ETH_TYPE_IP, Ethernet +from kasa.cli import echo from kasa.protocol import TPLinkSmartHomeProtocol @@ -36,21 +36,20 @@ def read_payloads_from_file(file): try: decrypted = TPLinkSmartHomeProtocol.decrypt(data[4:]) except Exception as ex: - click.echo( - click.style(f"Unable to decrypt the data, ignoring: {ex}", fg="red") - ) + echo(f"[red]Unable to decrypt the data, ignoring: {ex}[/red]") + continue + + if not decrypted: # skip empty payloads continue try: json_payload = json.loads(decrypted) - except Exception as ex: - click.echo( - click.style(f"Unable to parse payload, ignoring: {ex}", fg="red") - ) + except Exception as ex: # this can happen when the response is split into multiple tcp segments + echo(f"[red]Unable to parse payload '{decrypted}', ignoring: {ex}[/red]") continue if not json_payload: # ignore empty payloads - click.echo(click.style("Got empty payload, ignoring", fg="red")) + echo("[red]Got empty payload, ignoring[/red]") continue yield json_payload @@ -67,7 +66,7 @@ def parse_pcap(file): for module, cmds in json_payload.items(): seen_items["modules"][module] += 1 if "err_code" in cmds: - click.echo(click.style("Got error for module: %s" % cmds, fg="red")) + echo("[red]Got error for module: %s[/red]" % cmds) continue for cmd, response in cmds.items(): @@ -76,30 +75,24 @@ def parse_pcap(file): if response is None: continue direction = ">>" - style = {} if response is None: - print("got none as response for %s, weird?" % (cmd)) + echo(f"got none as response for {cmd} %s, weird?") continue + is_success = "[green]+[/green]" if "err_code" in response: direction = "<<" if response["err_code"] != 0: seen_items["errorcodes"][response["err_code"]] += 1 seen_items["errors"][response["err_msg"]] += 1 - print(response) - style = {"bold": True, "fg": "red"} - else: - style = {"fg": "green"} + is_success = "[red]![/red]" context_str = f" [ctx: {context}]" if context else "" - click.echo( - click.style( - f"{direction}{context_str} {module}.{cmd}: {pf(response)}", - **style, - ) + echo( + f"[{is_success}] {direction}{context_str} {module}.{cmd}: {pf(response)}" ) - pp(seen_items) + echo(pf(seen_items)) if __name__ == "__main__": diff --git a/kasa/cli.py b/kasa/cli.py index 4fd3990b..f72759a5 100755 --- a/kasa/cli.py +++ b/kasa/cli.py @@ -1,12 +1,33 @@ """python-kasa cli tool.""" import asyncio +import json import logging +import re import sys +from functools import wraps from pprint import pformat as pf -from typing import cast +from typing import Any, Dict, cast import asyncclick as click +try: + from rich import print as echo +except ImportError: + + def _strip_rich_formatting(echo_func): + """Strip rich formatting from messages.""" + + @wraps(echo_func) + def wrapper(message=None, *args, **kwargs): + if message is not None: + message = re.sub(r"\[/?.+?]", "", message) + echo_func(message, *args, **kwargs) + + return wrapper + + echo = _strip_rich_formatting(click.echo) + + from kasa import ( Discover, SmartBulb, @@ -69,32 +90,44 @@ async def cli(ctx, host, alias, target, debug, type): ctx.obj = SmartDevice(None) return - if debug: - logging.basicConfig(level=logging.DEBUG) - else: - logging.basicConfig(level=logging.INFO) + logging_config: Dict[str, Any] = { + "level": logging.DEBUG if debug > 0 else logging.INFO + } + try: + from rich.logging import RichHandler + + rich_config = { + "show_time": False, + } + logging_config["handlers"] = [RichHandler(**rich_config)] + logging_config["format"] = "%(message)s" + except ImportError: + pass + + # The configuration should be converted to use dictConfig, but this keeps mypy happy for now + logging.basicConfig(**logging_config) # type: ignore if ctx.invoked_subcommand == "discover": return if alias is not None and host is None: - click.echo(f"Alias is given, using discovery to find host {alias}") + echo(f"Alias is given, using discovery to find host {alias}") host = await find_host_from_alias(alias=alias, target=target) if host: - click.echo(f"Found hostname is {host}") + echo(f"Found hostname is {host}") else: - click.echo(f"No device with name {alias} found") + echo(f"No device with name {alias} found") return if host is None: - click.echo("No host name given, trying discovery..") + echo("No host name given, trying discovery..") await ctx.invoke(discover) return if type is not None: dev = TYPE_TO_CLASS[type](host) else: - click.echo("No --type defined, discovering..") + echo("No --type defined, discovering..") dev = await Discover.discover_single(host) await dev.update() @@ -114,11 +147,11 @@ def wifi(dev): @pass_dev async def scan(dev): """Scan for available wifi networks.""" - click.echo("Scanning for wifi networks, wait a second..") + echo("Scanning for wifi networks, wait a second..") devs = await dev.wifi_scan() - click.echo(f"Found {len(devs)} wifi networks!") + echo(f"Found {len(devs)} wifi networks!") for dev in devs: - click.echo(f"\t {dev}") + echo(f"\t {dev}") return devs @@ -130,9 +163,9 @@ async def scan(dev): @pass_dev async def join(dev: SmartDevice, ssid, password, keytype): """Join the given wifi network.""" - click.echo(f"Asking the device to connect to {ssid}..") + echo(f"Asking the device to connect to {ssid}..") res = await dev.wifi_join(ssid, password, keytype=keytype) - click.echo( + echo( f"Response: {res} - if the device is not able to join the network, it will revert back to its previous state." ) @@ -145,7 +178,7 @@ async def join(dev: SmartDevice, ssid, password, keytype): async def discover(ctx, timeout): """Discover devices in the network.""" target = ctx.parent.params["target"] - click.echo(f"Discovering devices on {target} for {timeout} seconds") + echo(f"Discovering devices on {target} for {timeout} seconds") sem = asyncio.Semaphore() async def print_discovered(dev: SmartDevice): @@ -153,7 +186,7 @@ async def discover(ctx, timeout): async with sem: ctx.obj = dev await ctx.invoke(state) - click.echo() + echo() await Discover.discover( target=target, timeout=timeout, on_discovered=print_discovered @@ -176,8 +209,8 @@ async def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attem @pass_dev async def sysinfo(dev): """Print out full system information.""" - click.echo(click.style("== System info ==", bold=True)) - click.echo(pf(dev.sys_info)) + echo("== System info ==") + echo(pf(dev.sys_info)) return dev.sys_info @@ -185,56 +218,42 @@ async def sysinfo(dev): @pass_dev async def state(dev: SmartDevice): """Print out device state and versions.""" - click.echo(click.style(f"== {dev.alias} - {dev.model} ==", bold=True)) - click.echo(f"\tHost: {dev.host}") - click.echo( - click.style( - "\tDevice state: {}\n".format("ON" if dev.is_on else "OFF"), - fg="green" if dev.is_on else "red", - ) - ) + echo(f"[bold]== {dev.alias} - {dev.model} ==[/bold]") + echo(f"\tHost: {dev.host}") + echo(f"\tDevice state: {dev.is_on}") if dev.is_strip: - click.echo(click.style("\t== Plugs ==", bold=True)) + echo("\t[bold]== Plugs ==[/bold]") for plug in dev.children: # type: ignore - is_on = plug.is_on - alias = plug.alias - click.echo( - click.style( - "\t* Socket '{}' state: {} on_since: {}".format( - alias, ("ON" if is_on else "OFF"), plug.on_since - ), - fg="green" if is_on else "red", - ) - ) - click.echo() + echo(f"\t* Socket '{plug.alias}' state: {plug.is_on} since {plug.on_since}") + echo() - click.echo(click.style("\t== Generic information ==", bold=True)) - click.echo(f"\tTime: {dev.time} (tz: {dev.timezone}") - click.echo(f"\tHardware: {dev.hw_info['hw_ver']}") - click.echo(f"\tSoftware: {dev.hw_info['sw_ver']}") - click.echo(f"\tMAC (rssi): {dev.mac} ({dev.rssi})") - click.echo(f"\tLocation: {dev.location}") + echo("\t[bold]== Generic information ==[/bold]") + echo(f"\tTime: {dev.time} (tz: {dev.timezone}") + echo(f"\tHardware: {dev.hw_info['hw_ver']}") + echo(f"\tSoftware: {dev.hw_info['sw_ver']}") + echo(f"\tMAC (rssi): {dev.mac} ({dev.rssi})") + echo(f"\tLocation: {dev.location}") - click.echo(click.style("\n\t== Device specific information ==", bold=True)) + echo("\n\t[bold]== Device specific information ==[/bold]") for info_name, info_data in dev.state_information.items(): if isinstance(info_data, list): - click.echo(f"\t{info_name}:") + echo(f"\t{info_name}:") for item in info_data: - click.echo(f"\t\t{item}") + echo(f"\t\t{item}") else: - click.echo(f"\t{info_name}: {info_data}") + echo(f"\t{info_name}: {info_data}") if dev.has_emeter: - click.echo(click.style("\n\t== Current State ==", bold=True)) + echo("\n\t[bold]== Current State ==[/bold]") emeter_status = dev.emeter_realtime - click.echo(f"\t{emeter_status}") + echo(f"\t{emeter_status}") - click.echo(click.style("\n\t== Modules ==", bold=True)) + echo("\n\t[bold]== Modules ==[/bold]") for module in dev.modules.values(): if module.is_supported: - click.echo(click.style(f"\t+ {module}", fg="green")) + echo(f"\t[green]+ {module}[/green]") else: - click.echo(click.style(f"\t- {module}", fg="red")) + echo(f"\t[red]- {module}[/red]") @cli.command() @@ -245,20 +264,20 @@ async def alias(dev, new_alias, index): """Get or set the device (or plug) alias.""" if index is not None: if not dev.is_strip: - click.echo("Index can only used for power strips!") + echo("Index can only used for power strips!") return dev = cast(SmartStrip, dev) dev = dev.get_plug_by_index(index) if new_alias is not None: - click.echo(f"Setting alias to {new_alias}") + echo(f"Setting alias to {new_alias}") res = await dev.set_alias(new_alias) return res - click.echo(f"Alias: {dev.alias}") + echo(f"Alias: {dev.alias}") if dev.is_strip: for plug in dev.children: - click.echo(f" * {plug.alias}") + echo(f" * {plug.alias}") @cli.command() @@ -274,8 +293,7 @@ async def raw_command(dev: SmartDevice, module, command, parameters): parameters = ast.literal_eval(parameters) res = await dev._query_helper(module, command, parameters) - - click.echo(res) + echo(json.dumps(res)) return res @@ -289,41 +307,41 @@ async def emeter(dev: SmartDevice, year, month, erase): Daily and monthly data provided in CSV format. """ - click.echo(click.style("== Emeter ==", bold=True)) + echo("[bold]== Emeter ==[/bold]") if not dev.has_emeter: - click.echo("Device has no emeter") + echo("Device has no emeter") return if erase: - click.echo("Erasing emeter statistics..") - click.echo(await dev.erase_emeter_stats()) + echo("Erasing emeter statistics..") + echo(await dev.erase_emeter_stats()) return if year: - click.echo(f"== For year {year.year} ==") - click.echo("Month, usage (kWh)") + echo(f"== For year {year.year} ==") + echo("Month, usage (kWh)") usage_data = await dev.get_emeter_monthly(year=year.year) elif month: - click.echo(f"== For month {month.month} of {month.year} ==") - click.echo("Day, usage (kWh)") + echo(f"== For month {month.month} of {month.year} ==") + echo("Day, usage (kWh)") usage_data = await dev.get_emeter_daily(year=month.year, month=month.month) else: # Call with no argument outputs summary data and returns emeter_status = dev.emeter_realtime - click.echo("Current: %s A" % emeter_status["current"]) - click.echo("Voltage: %s V" % emeter_status["voltage"]) - click.echo("Power: %s W" % emeter_status["power"]) - click.echo("Total consumption: %s kWh" % emeter_status["total"]) + echo("Current: %s A" % emeter_status["current"]) + echo("Voltage: %s V" % emeter_status["voltage"]) + echo("Power: %s W" % emeter_status["power"]) + echo("Total consumption: %s kWh" % emeter_status["total"]) - click.echo("Today: %s kWh" % dev.emeter_today) - click.echo("This month: %s kWh" % dev.emeter_this_month) + echo("Today: %s kWh" % dev.emeter_today) + echo("This month: %s kWh" % dev.emeter_this_month) return # output any detailed usage data for index, usage in usage_data.items(): - click.echo(f"{index}, {usage}") + echo(f"{index}, {usage}") @cli.command() @@ -336,32 +354,32 @@ async def usage(dev: SmartDevice, year, month, erase): Daily and monthly data provided in CSV format. """ - click.echo(click.style("== Usage ==", bold=True)) + echo("[bold]== Usage ==[/bold]") usage = dev.modules["usage"] if erase: - click.echo("Erasing usage statistics..") - click.echo(await usage.erase_stats()) + echo("Erasing usage statistics..") + echo(await usage.erase_stats()) return if year: - click.echo(f"== For year {year.year} ==") - click.echo("Month, usage (minutes)") - usage_data = await usage.get_monthstat(year.year) + echo(f"== For year {year.year} ==") + echo("Month, usage (minutes)") + usage_data = await usage.get_monthstat(year=year.year) elif month: - click.echo(f"== For month {month.month} of {month.year} ==") - click.echo("Day, usage (minutes)") + echo(f"== For month {month.month} of {month.year} ==") + echo("Day, usage (minutes)") usage_data = await usage.get_daystat(year=month.year, month=month.month) else: # Call with no argument outputs summary data and returns - click.echo("Today: %s minutes" % usage.usage_today) - click.echo("This month: %s minutes" % usage.usage_this_month) + echo("Today: %s minutes" % usage.usage_today) + echo("This month: %s minutes" % usage.usage_this_month) return # output any detailed usage data for index, usage in usage_data.items(): - click.echo(f"{index}, {usage}") + echo(f"{index}, {usage}") @cli.command() @@ -371,13 +389,13 @@ async def usage(dev: SmartDevice, year, month, erase): async def brightness(dev: SmartBulb, brightness: int, transition: int): """Get or set brightness.""" if not dev.is_dimmable: - click.echo("This device does not support brightness.") + echo("This device does not support brightness.") return if brightness is None: - click.echo(f"Brightness: {dev.brightness}") + echo(f"Brightness: {dev.brightness}") else: - click.echo(f"Setting brightness to {brightness}") + echo(f"Setting brightness to {brightness}") return await dev.set_brightness(brightness, transition=transition) @@ -390,21 +408,21 @@ async def brightness(dev: SmartBulb, brightness: int, transition: int): async def temperature(dev: SmartBulb, temperature: int, transition: int): """Get or set color temperature.""" if not dev.is_variable_color_temp: - click.echo("Device does not support color temperature") + echo("Device does not support color temperature") return if temperature is None: - click.echo(f"Color temperature: {dev.color_temp}") + echo(f"Color temperature: {dev.color_temp}") valid_temperature_range = dev.valid_temperature_range if valid_temperature_range != (0, 0): - click.echo("(min: {}, max: {})".format(*valid_temperature_range)) + echo("(min: {}, max: {})".format(*valid_temperature_range)) else: - click.echo( + echo( "Temperature range unknown, please open a github issue" f" or a pull request for model '{dev.model}'" ) else: - click.echo(f"Setting color temperature to {temperature}") + echo(f"Setting color temperature to {temperature}") return await dev.set_color_temp(temperature, transition=transition) @@ -415,7 +433,7 @@ async def temperature(dev: SmartBulb, temperature: int, transition: int): async def effect(dev, ctx, effect): """Set an effect.""" if not dev.has_effects: - click.echo("Device does not support effects") + echo("Device does not support effects") return if effect is None: raise click.BadArgumentUsage( @@ -425,7 +443,7 @@ async def effect(dev, ctx, effect): if effect not in dev.effect_list: raise click.BadArgumentUsage(f"Effect must be one of: {dev.effect_list}", ctx) - click.echo(f"Setting Effect: {effect}") + echo(f"Setting Effect: {effect}") return await dev.set_effect(effect) @@ -439,15 +457,15 @@ async def effect(dev, ctx, effect): async def hsv(dev, ctx, h, s, v, transition): """Get or set color in HSV.""" if not dev.is_color: - click.echo("Device does not support colors") + echo("Device does not support colors") return if h is None or s is None or v is None: - click.echo(f"Current HSV: {dev.hsv}") + echo(f"Current HSV: {dev.hsv}") elif s is None or v is None: raise click.BadArgumentUsage("Setting a color requires 3 values.", ctx) else: - click.echo(f"Setting HSV: {h} {s} {v}") + echo(f"Setting HSV: {h} {s} {v}") return await dev.set_hsv(h, s, v, transition=transition) @@ -457,10 +475,10 @@ async def hsv(dev, ctx, h, s, v, transition): async def led(dev, state): """Get or set (Plug's) led state.""" if state is not None: - click.echo(f"Turning led to {state}") + echo(f"Turning led to {state}") return await dev.set_led(state) else: - click.echo(f"LED state: {dev.led}") + echo(f"LED state: {dev.led}") @cli.command() @@ -468,7 +486,7 @@ async def led(dev, state): async def time(dev): """Get the device time.""" res = dev.time - click.echo(f"Current time: {res}") + echo(f"Current time: {res}") return res @@ -481,7 +499,7 @@ async def on(dev: SmartDevice, index: int, name: str, transition: int): """Turn the device on.""" if index is not None or name is not None: if not dev.is_strip: - click.echo("Index and name are only for power strips!") + echo("Index and name are only for power strips!") return dev = cast(SmartStrip, dev) @@ -490,7 +508,7 @@ async def on(dev: SmartDevice, index: int, name: str, transition: int): elif name: dev = dev.get_plug_by_name(name) - click.echo(f"Turning on {dev.alias}") + echo(f"Turning on {dev.alias}") return await dev.turn_on(transition=transition) @@ -503,7 +521,7 @@ async def off(dev: SmartDevice, index: int, name: str, transition: int): """Turn the device off.""" if index is not None or name is not None: if not dev.is_strip: - click.echo("Index and name are only for power strips!") + echo("Index and name are only for power strips!") return dev = cast(SmartStrip, dev) @@ -512,7 +530,7 @@ async def off(dev: SmartDevice, index: int, name: str, transition: int): elif name: dev = dev.get_plug_by_name(name) - click.echo(f"Turning off {dev.alias}") + echo(f"Turning off {dev.alias}") return await dev.turn_off(transition=transition) @@ -521,7 +539,7 @@ async def off(dev: SmartDevice, index: int, name: str, transition: int): @pass_dev async def reboot(plug, delay): """Reboot the device.""" - click.echo("Rebooting the device..") + echo("Rebooting the device..") return await plug.reboot(delay) @@ -540,7 +558,7 @@ def _schedule_list(dev, type): for rule in sched.rules: print(rule) else: - click.echo(f"No rules of type {type}") + echo(f"No rules of type {type}") @schedule.command(name="delete") @@ -551,10 +569,10 @@ async def delete_rule(dev, id): schedule = dev.modules["schedule"] rule_to_delete = next(filter(lambda rule: (rule.id == id), schedule.rules), None) if rule_to_delete: - click.echo(f"Deleting rule id {id}") + echo(f"Deleting rule id {id}") await schedule.delete_rule(rule_to_delete) else: - click.echo(f"No rule with id {id} was found") + echo(f"No rule with id {id} was found") @cli.group(invoke_without_command=True) @@ -570,7 +588,7 @@ async def presets(ctx): def presets_list(dev: SmartBulb): """List presets.""" if not dev.is_bulb: - click.echo("Presets only supported on bulbs") + echo("Presets only supported on bulbs") return for preset in dev.presets: @@ -592,7 +610,7 @@ async def presets_modify( if preset.index == index: break else: - click.echo(f"No preset found for index {index}") + echo(f"No preset found for index {index}") return if brightness is not None: @@ -604,7 +622,7 @@ async def presets_modify( if temperature is not None: preset.color_temp = temperature - click.echo(f"Going to save preset: {preset}") + echo(f"Going to save preset: {preset}") await dev.save_preset(preset) @@ -617,7 +635,7 @@ async def presets_modify( async def turn_on_behavior(dev: SmartBulb, type, last, preset): """Modify bulb turn-on behavior.""" settings = await dev.get_turn_on_behavior() - click.echo(f"Current turn on behavior: {settings}") + echo(f"Current turn on behavior: {settings}") # Return if we are not setting the value if not type and not last and not preset: @@ -625,16 +643,16 @@ async def turn_on_behavior(dev: SmartBulb, type, last, preset): # If we are setting the value, the type has to be specified if (last or preset) and type is None: - click.echo("To set the behavior, you need to define --type") + echo("To set the behavior, you need to define --type") return behavior = getattr(settings, type) if last: - click.echo(f"Going to set {type} to last") + echo(f"Going to set {type} to last") behavior.preset = None elif preset is not None: - click.echo(f"Going to set {type} to preset {preset}") + echo(f"Going to set {type} to preset {preset}") behavior.preset = preset await dev.set_turn_on_behavior(settings) diff --git a/kasa/tests/test_cli.py b/kasa/tests/test_cli.py index 762f0503..42058378 100644 --- a/kasa/tests/test_cli.py +++ b/kasa/tests/test_cli.py @@ -33,9 +33,9 @@ async def test_state(dev, turn_on): await dev.update() if dev.is_on: - assert "Device state: ON" in res.output + assert "Device state: True" in res.output else: - assert "Device state: OFF" in res.output + assert "Device state: False" in res.output async def test_alias(dev):