async++, small powerstrip improvements (#46)

* async++, small powerstrip improvements

* use asyncclick instead of click, allows defining the commands with async def to avoid manual eventloop/asyncio.run handling
* improve powerstrip support:
  * new powerstrip api: turn_{on,off}_by_{name,index} methods
  * cli: fix on/off for powerstrip using the new apis
* add missing update()s for cli's hsv, led, temperature (fixes #43)
* prettyprint the received payloads when debug mode in use
* cli: debug mode can be activated now with '-d'

* update requirements_test.txt

* remove outdated click-datetime, replace click with asyncclick

* debug is a flag

* make smartstripplug to inherit the sysinfo from its parent, allows for simple access of general plug properties

* proper bound checking for index accesses, allow controlling the plug at index 0

* remove the mess of turn_{on,off}_by_{name,index}, get_plug_by_{name,index} are enough.

* adapt cli to use that
* allow changing the alias per index

* use f-strings consistently everywhere in the cli

* add tests for get_plug_by_{index,name}
This commit is contained in:
Teemu R
2020-04-21 20:46:13 +02:00
committed by GitHub
parent 852ae494af
commit 3fe578cf26
11 changed files with 203 additions and 120 deletions

View File

@@ -4,12 +4,13 @@ import json
import logging
import re
from pprint import pformat as pf
from typing import cast
import click
import asyncclick as click
from kasa import Discover, SmartBulb, SmartDevice, SmartPlug, SmartStrip
from kasa import Discover, SmartBulb, SmartDevice, SmartStrip
click.anyio_backend = "asyncio"
from kasa import SmartPlug # noqa: E402; noqa: E402
pass_dev = click.make_pass_decorator(SmartDevice)
@@ -33,13 +34,13 @@ pass_dev = click.make_pass_decorator(SmartDevice)
required=False,
help="The broadcast address to be used for discovery.",
)
@click.option("--debug/--normal", default=False)
@click.option("-d", "--debug", default=False, is_flag=True)
@click.option("--bulb", default=False, is_flag=True)
@click.option("--plug", default=False, is_flag=True)
@click.option("--strip", default=False, is_flag=True)
@click.version_option()
@click.pass_context
def cli(ctx, host, alias, target, debug, bulb, plug, strip):
async def cli(ctx, host, alias, target, debug, bulb, plug, strip):
"""A cli tool for controlling TP-Link smart home plugs.""" # noqa
if debug:
logging.basicConfig(level=logging.DEBUG)
@@ -51,7 +52,7 @@ def cli(ctx, host, alias, target, debug, bulb, plug, strip):
if alias is not None and host is None:
click.echo(f"Alias is given, using discovery to find host {alias}")
host = find_host_from_alias(alias=alias, target=target)
host = await find_host_from_alias(alias=alias, target=target)
if host:
click.echo(f"Found hostname is {host}")
else:
@@ -60,12 +61,12 @@ def cli(ctx, host, alias, target, debug, bulb, plug, strip):
if host is None:
click.echo("No host name given, trying discovery..")
ctx.invoke(discover)
await ctx.invoke(discover)
return
else:
if not bulb and not plug and not strip:
click.echo("No --strip nor --bulb nor --plug given, discovering..")
dev = asyncio.run(Discover.discover_single(host))
dev = await Discover.discover_single(host)
elif bulb:
dev = SmartBulb(host)
elif plug:
@@ -78,7 +79,7 @@ def cli(ctx, host, alias, target, debug, bulb, plug, strip):
ctx.obj = dev
if ctx.invoked_subcommand is None:
ctx.invoke(state)
await ctx.invoke(state)
@cli.group()
@@ -89,10 +90,10 @@ def wifi(dev):
@wifi.command()
@pass_dev
def scan(dev):
async def scan(dev):
"""Scan for available wifi networks."""
click.echo("Scanning for wifi networks, wait a second..")
devs = asyncio.run(dev.wifi_scan())
devs = await dev.wifi_scan()
click.echo(f"Found {len(devs)} wifi networks!")
for dev in devs:
click.echo(f"\t {dev}")
@@ -103,10 +104,10 @@ def scan(dev):
@click.option("--password", prompt=True, hide_input=True)
@click.option("--keytype", default=3)
@pass_dev
def join(dev: SmartDevice, ssid, password, keytype):
async def join(dev: SmartDevice, ssid, password, keytype):
"""Join the given wifi network."""
click.echo("Asking the device to connect to {ssid}.." % (ssid))
res = asyncio.run(dev.wifi_join(ssid, password, keytype=keytype))
res = await dev.wifi_join(ssid, password, keytype=keytype)
click.echo(
f"Response: {res} - if the device is not able to join the network, it will revert back to its previous state."
)
@@ -115,7 +116,7 @@ def join(dev: SmartDevice, ssid, password, keytype):
@cli.command()
@click.option("--scrub/--no-scrub", default=True)
@click.pass_context
def dump_discover(ctx, scrub):
async def dump_discover(ctx, scrub):
"""Dump discovery information.
Useful for dumping into a file to be added to the test suite.
@@ -132,7 +133,7 @@ def dump_discover(ctx, scrub):
"latitude",
"longitude",
]
devs = asyncio.run(Discover.discover(target=target, return_raw=True))
devs = await Discover.discover(target=target, return_raw=True)
if scrub:
click.echo("Scrubbing personal data before writing")
for dev in devs.values():
@@ -160,35 +161,35 @@ def dump_discover(ctx, scrub):
@click.option("--discover-only", default=False)
@click.option("--dump-raw", is_flag=True)
@click.pass_context
def discover(ctx, timeout, discover_only, dump_raw):
async def discover(ctx, timeout, discover_only, dump_raw):
"""Discover devices in the network."""
target = ctx.parent.params["target"]
click.echo(f"Discovering devices for {timeout} seconds")
found_devs = asyncio.run(
Discover.discover(target=target, timeout=timeout, return_raw=dump_raw)
found_devs = await Discover.discover(
target=target, timeout=timeout, return_raw=dump_raw
)
if not discover_only:
for ip, dev in found_devs.items():
asyncio.run(dev.update())
await dev.update()
if dump_raw:
click.echo(dev)
continue
ctx.obj = dev
ctx.invoke(state)
print()
await ctx.invoke(state)
click.echo()
return found_devs
def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attempts=3):
async def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attempts=3):
"""Discover a device identified by its alias."""
host = None
click.echo(
f"Trying to discover {alias} using {attempts} attempts of {timeout} seconds"
)
for attempt in range(1, attempts):
click.echo(f"Attempt {attempt} of {attempts}")
found_devs = Discover.discover(target=target, timeout=timeout).items()
found_devs = await Discover.discover(target=target, timeout=timeout)
found_devs = found_devs.items()
for ip, dev in found_devs:
if dev.alias.lower() == alias.lower():
host = dev.host
@@ -198,9 +199,9 @@ def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attempts=3)
@cli.command()
@pass_dev
def sysinfo(dev):
async def sysinfo(dev):
"""Print out full system information."""
asyncio.run(dev.update())
await dev.update()
click.echo(click.style("== System info ==", bold=True))
click.echo(pf(dev.sys_info))
@@ -208,9 +209,9 @@ def sysinfo(dev):
@cli.command()
@pass_dev
@click.pass_context
def state(ctx, dev: SmartDevice):
async def state(ctx, dev: SmartDevice):
"""Print out device state and versions."""
asyncio.run(dev.update())
await dev.update()
click.echo(click.style(f"== {dev.alias} - {dev.model} ==", bold=True))
click.echo(
@@ -221,12 +222,13 @@ def state(ctx, dev: SmartDevice):
)
if dev.is_strip:
for plug in dev.plugs: # type: ignore
asyncio.run(plug.update())
is_on = plug.is_on
alias = plug.alias
click.echo(
click.style(
" * {} state: {}".format(alias, ("ON" if is_on else "OFF")),
" * Socket '{}' state: {} on_since: {}".format(
alias, ("ON" if is_on else "OFF"), plug.on_since
),
fg="green" if is_on else "red",
)
)
@@ -235,25 +237,37 @@ def state(ctx, dev: SmartDevice):
for k, v in dev.state_information.items():
click.echo(f"{k}: {v}")
click.echo(click.style("== Generic information ==", bold=True))
click.echo(f"Time: {asyncio.run(dev.get_time())}")
click.echo(f"Time: {await dev.get_time()}")
click.echo(f"Hardware: {dev.hw_info['hw_ver']}")
click.echo(f"Software: {dev.hw_info['sw_ver']}")
click.echo(f"MAC (rssi): {dev.mac} ({dev.rssi})")
click.echo(f"Location: {dev.location}")
ctx.invoke(emeter)
await ctx.invoke(emeter)
@cli.command()
@pass_dev
@click.argument("new_alias", required=False, default=None)
def alias(dev, new_alias):
"""Get or set the device alias."""
@click.option("--index", type=int)
async def alias(dev, new_alias, index):
"""Get or set the device (or plug) alias."""
await dev.update()
if index is not None:
if not dev.is_strip:
click.echo("Index can only used for power strips!")
return
dev = cast(SmartStrip, dev)
dev = dev.get_plug_by_index(index)
if new_alias is not None:
click.echo(f"Setting alias to {new_alias}")
asyncio.run(dev.set_alias(new_alias))
click.echo(await dev.set_alias(new_alias))
click.echo(f"Alias: {dev.alias}")
if dev.is_strip:
for plug in dev.plugs:
click.echo(f" * {plug.alias}")
@cli.command()
@@ -261,14 +275,14 @@ def alias(dev, new_alias):
@click.argument("module")
@click.argument("command")
@click.argument("parameters", default=None, required=False)
def raw_command(dev: SmartDevice, module, command, parameters):
async def raw_command(dev: SmartDevice, module, command, parameters):
"""Run a raw command on the device."""
import ast
if parameters is not None:
parameters = ast.literal_eval(parameters)
res = asyncio.run(dev._query_helper(module, command, parameters))
asyncio.run(dev.update())
res = await dev._query_helper(module, command, parameters)
await dev.update() # TODO: is this needed?
click.echo(res)
@@ -277,34 +291,34 @@ def raw_command(dev: SmartDevice, module, command, parameters):
@click.option("--year", type=click.DateTime(["%Y"]), default=None, required=False)
@click.option("--month", type=click.DateTime(["%Y-%m"]), default=None, required=False)
@click.option("--erase", is_flag=True)
def emeter(dev, year, month, erase):
async def emeter(dev, year, month, erase):
"""Query emeter for historical consumption."""
click.echo(click.style("== Emeter ==", bold=True))
asyncio.run(dev.update())
await dev.update()
if not dev.has_emeter:
click.echo("Device has no emeter")
return
if erase:
click.echo("Erasing emeter statistics..")
asyncio.run(dev.erase_emeter_stats())
click.echo(await dev.erase_emeter_stats())
return
if year:
click.echo(f"== For year {year.year} ==")
emeter_status = asyncio.run(dev.get_emeter_monthly(year.year))
emeter_status = await dev.get_emeter_monthly(year.year)
elif month:
click.echo(f"== For month {month.month} of {month.year} ==")
emeter_status = asyncio.run(
dev.get_emeter_daily(year=month.year, month=month.month)
)
emeter_status = await dev.get_emeter_daily(year=month.year, month=month.month)
else:
emeter_status = asyncio.run(dev.get_emeter_realtime())
emeter_status = await dev.get_emeter_realtime()
click.echo("== Current State ==")
if isinstance(emeter_status, list):
for plug in emeter_status:
click.echo("Plug %d: %s" % (emeter_status.index(plug) + 1, plug))
index = emeter_status.index(plug) + 1
click.echo(f"Plug {index}: {plug}")
else:
click.echo(str(emeter_status))
@@ -312,9 +326,9 @@ def emeter(dev, year, month, erase):
@cli.command()
@click.argument("brightness", type=click.IntRange(0, 100), default=None, required=False)
@pass_dev
def brightness(dev, brightness):
async def brightness(dev, brightness):
"""Get or set brightness."""
asyncio.run(dev.update())
await dev.update()
if not dev.is_dimmable:
click.echo("This device does not support brightness.")
return
@@ -322,7 +336,7 @@ def brightness(dev, brightness):
click.echo(f"Brightness: {dev.brightness}")
else:
click.echo(f"Setting brightness to {brightness}")
asyncio.run(dev.set_brightness(brightness))
click.echo(await dev.set_brightness(brightness))
@cli.command()
@@ -330,8 +344,9 @@ def brightness(dev, brightness):
"temperature", type=click.IntRange(2500, 9000), default=None, required=False
)
@pass_dev
def temperature(dev: SmartBulb, temperature):
async def temperature(dev: SmartBulb, temperature):
"""Get or set color temperature."""
await dev.update()
if temperature is None:
click.echo(f"Color temperature: {dev.color_temp}")
valid_temperature_range = dev.valid_temperature_range
@@ -353,67 +368,87 @@ def temperature(dev: SmartBulb, temperature):
@click.argument("v", type=click.IntRange(0, 100), default=None, required=False)
@click.pass_context
@pass_dev
def hsv(dev, ctx, h, s, v):
async def hsv(dev, ctx, h, s, v):
"""Get or set color in HSV. (Bulb only)."""
await dev.update()
if h is None or s is None or v is None:
click.echo(f"Current HSV: {dev.hsv}")
elif s is None or v is None:
raise click.BadArgumentUsage("Setting a color requires 3 values.", ctx)
else:
click.echo(f"Setting HSV: {h} {s} {v}")
asyncio.run(dev.set_hsv(h, s, v))
click.echo(await dev.set_hsv(h, s, v))
@cli.command()
@click.argument("state", type=bool, required=False)
@pass_dev
def led(dev, state):
async def led(dev, state):
"""Get or set (Plug's) led state."""
await dev.update()
if state is not None:
click.echo(f"Turning led to {state}")
asyncio.run(dev.set_led(state))
click.echo(await dev.set_led(state))
else:
click.echo(f"LED state: {dev.led}")
@cli.command()
@pass_dev
def time(dev):
async def time(dev):
"""Get the device time."""
click.echo(asyncio.run(dev.get_time()))
click.echo(await dev.get_time())
@cli.command()
@click.argument("index", type=int, required=False)
@click.option("--index", type=int, required=False)
@click.option("--name", type=str, required=False)
@pass_dev
def on(plug, index):
async def on(dev: SmartDevice, index, name):
"""Turn the device on."""
click.echo("Turning on..")
if index is None:
asyncio.run(plug.turn_on())
else:
asyncio.run(plug.turn_on(index=(index - 1)))
await dev.update()
if index is not None or name is not None:
if not dev.is_strip:
click.echo("Index and name are only for power strips!")
return
dev = cast(SmartStrip, dev)
if index is not None:
dev = dev.get_plug_by_index(index)
elif name:
dev = dev.get_plug_by_name(name)
click.echo(f"Turning on {dev.alias}")
await dev.turn_on()
@cli.command()
@click.argument("index", type=int, required=False)
@click.option("--index", type=int, required=False)
@click.option("--name", type=str, required=False)
@pass_dev
def off(plug, index):
async def off(dev, index, name):
"""Turn the device off."""
click.echo("Turning off..")
if index is None:
asyncio.run(plug.turn_off())
else:
asyncio.run(plug.turn_off(index=(index - 1)))
await dev.update()
if index is not None or name is not None:
if not dev.is_strip:
click.echo("Index and name are only for power strips!")
return
dev = cast(SmartStrip, dev)
if index is not None:
dev = dev.get_plug_by_index(index)
elif name:
dev = dev.get_plug_by_name(name)
click.echo(f"Turning off {dev.alias}")
await dev.turn_off()
@cli.command()
@click.option("--delay", default=1)
@pass_dev
def reboot(plug, delay):
async def reboot(plug, delay):
"""Reboot the device."""
click.echo("Rebooting the device..")
asyncio.run(plug.reboot(delay))
click.echo(await plug.reboot(delay))
if __name__ == "__main__":