python-kasa/kasa/cli/main.py
Steven B 983aacbc24
Structure cli into a package (#1038)
PR with just the initial structural changes for the cli to be a package.
Subsequent PR will break out `main.py` into modules. Doing it in two
stages ensure that the commit history will be continuous for `cli.py` >
`cli/main.py`
2024-07-04 14:52:01 +02:00

1383 lines
41 KiB
Python
Executable File

"""python-kasa cli tool."""
from __future__ import annotations
import ast
import asyncio
import json
import logging
import re
import sys
from contextlib import asynccontextmanager, contextmanager
from datetime import datetime
from functools import singledispatch, update_wrapper, wraps
from pprint import pformat as pf
from typing import Any, Final, cast
import asyncclick as click
from pydantic.v1 import ValidationError
from kasa import (
AuthenticationError,
Credentials,
Device,
DeviceConfig,
DeviceConnectionParameters,
DeviceEncryptionType,
DeviceFamily,
Discover,
Feature,
KasaException,
Module,
UnsupportedDeviceError,
)
from kasa.discover import DiscoveryResult
from kasa.iot import (
IotBulb,
IotDevice,
IotDimmer,
IotLightStrip,
IotPlug,
IotStrip,
IotWallSwitch,
)
from kasa.iot.iotstrip import IotStripPlug
from kasa.iot.modules import Usage
from kasa.smart import SmartDevice
try:
from rich import print as _do_echo
except ImportError:
# Strip out rich formatting if rich is not installed
# but only lower case tags to avoid stripping out
# raw data from the device that is printed from
# the device state.
rich_formatting = re.compile(r"\[/?[a-z]+]")
def _strip_rich_formatting(echo_func):
"""Strip rich formatting from messages."""
@wraps(echo_func)
def wrapper(message=None, *args, **kwargs):
if message is not None:
message = rich_formatting.sub("", message)
echo_func(message, *args, **kwargs)
return wrapper
_do_echo = _strip_rich_formatting(click.echo)
# echo is set to _do_echo so that it can be reset to _do_echo later after
# --json has set it to _nop_echo
echo = _do_echo
def error(msg: str):
"""Print an error and exit."""
echo(f"[bold red]{msg}[/bold red]")
sys.exit(1)
# Value for optional options if passed without a value
OPTIONAL_VALUE_FLAG: Final = "_FLAG_"
TYPE_TO_CLASS = {
"plug": IotPlug,
"switch": IotWallSwitch,
"bulb": IotBulb,
"dimmer": IotDimmer,
"strip": IotStrip,
"lightstrip": IotLightStrip,
"iot.plug": IotPlug,
"iot.switch": IotWallSwitch,
"iot.bulb": IotBulb,
"iot.dimmer": IotDimmer,
"iot.strip": IotStrip,
"iot.lightstrip": IotLightStrip,
"smart.plug": SmartDevice,
"smart.bulb": SmartDevice,
}
ENCRYPT_TYPES = [encrypt_type.value for encrypt_type in DeviceEncryptionType]
DEVICE_FAMILY_TYPES = [device_family_type.value for device_family_type in DeviceFamily]
# Block list of commands which require no update
SKIP_UPDATE_COMMANDS = ["raw-command", "command"]
pass_dev = click.make_pass_decorator(Device) # type: ignore[type-abstract]
def CatchAllExceptions(cls):
"""Capture all exceptions and prints them nicely.
Idea from https://stackoverflow.com/a/44347763 and
https://stackoverflow.com/questions/52213375
"""
def _handle_exception(debug, exc):
if isinstance(exc, click.ClickException):
raise
# Handle exit request from click.
if isinstance(exc, click.exceptions.Exit):
sys.exit(exc.exit_code)
echo(f"Raised error: {exc}")
if debug:
raise
echo("Run with --debug enabled to see stacktrace")
sys.exit(1)
class _CommandCls(cls):
_debug = False
async def make_context(self, info_name, args, parent=None, **extra):
self._debug = any(
[arg for arg in args if arg in ["--debug", "-d", "--verbose", "-v"]]
)
try:
return await super().make_context(
info_name, args, parent=parent, **extra
)
except Exception as exc:
_handle_exception(self._debug, exc)
async def invoke(self, ctx):
try:
return await super().invoke(ctx)
except Exception as exc:
_handle_exception(self._debug, exc)
return _CommandCls
def json_formatter_cb(result, **kwargs):
"""Format and output the result as JSON, if requested."""
if not kwargs.get("json"):
return
@singledispatch
def to_serializable(val):
"""Regular obj-to-string for json serialization.
The singledispatch trick is from hynek: https://hynek.me/articles/serialization/
"""
return str(val)
@to_serializable.register(Device)
def _device_to_serializable(val: Device):
"""Serialize smart device data, just using the last update raw payload."""
return val.internal_state
json_content = json.dumps(result, indent=4, default=to_serializable)
print(json_content)
def pass_dev_or_child(wrapped_function):
"""Pass the device or child to the click command based on the child options."""
child_help = (
"Child ID or alias for controlling sub-devices. "
"If no value provided will show an interactive prompt allowing you to "
"select a child."
)
child_index_help = "Child index controlling sub-devices"
@contextmanager
def patched_device_update(parent: Device, child: Device):
try:
orig_update = child.update
# patch child update method. Can be removed once update can be called
# directly on child devices
child.update = parent.update # type: ignore[method-assign]
yield child
finally:
child.update = orig_update # type: ignore[method-assign]
@click.pass_obj
@click.pass_context
@click.option(
"--child",
"--name",
is_flag=False,
flag_value=OPTIONAL_VALUE_FLAG,
default=None,
required=False,
type=click.STRING,
help=child_help,
)
@click.option(
"--child-index",
"--index",
required=False,
default=None,
type=click.INT,
help=child_index_help,
)
async def wrapper(ctx: click.Context, dev, *args, child, child_index, **kwargs):
if child := await _get_child_device(dev, child, child_index, ctx.info_name):
ctx.obj = ctx.with_resource(patched_device_update(dev, child))
dev = child
return await ctx.invoke(wrapped_function, dev, *args, **kwargs)
# Update wrapper function to look like wrapped function
return update_wrapper(wrapper, wrapped_function)
async def _get_child_device(
device: Device, child_option, child_index_option, info_command
) -> Device | None:
def _list_children():
return "\n".join(
[
f"{idx}: {child.device_id} ({child.alias})"
for idx, child in enumerate(device.children)
]
)
if child_option is None and child_index_option is None:
return None
if info_command in SKIP_UPDATE_COMMANDS:
# The device hasn't had update called (e.g. for cmd_command)
# The way child devices are accessed requires a ChildDevice to
# wrap the communications. Doing this properly would require creating
# a common interfaces for both IOT and SMART child devices.
# As a stop-gap solution, we perform an update instead.
await device.update()
if not device.children:
error(f"Device: {device.host} does not have children")
if child_option is not None and child_index_option is not None:
raise click.BadOptionUsage(
"child", "Use either --child or --child-index, not both."
)
if child_option is not None:
if child_option is OPTIONAL_VALUE_FLAG:
msg = _list_children()
child_index_option = click.prompt(
f"\n{msg}\nEnter the index number of the child device",
type=click.IntRange(0, len(device.children) - 1),
)
elif child := device.get_child_device(child_option):
echo(f"Targeting child device {child.alias}")
return child
else:
error(
"No child device found with device_id or name: "
f"{child_option} children are:\n{_list_children()}"
)
if child_index_option + 1 > len(device.children) or child_index_option < 0:
error(
f"Invalid index {child_index_option}, "
f"device has {len(device.children)} children"
)
child_by_index = device.children[child_index_option]
echo(f"Targeting child device {child_by_index.alias}")
return child_by_index
@click.group(
invoke_without_command=True,
cls=CatchAllExceptions(click.Group),
result_callback=json_formatter_cb,
)
@click.option(
"--host",
envvar="KASA_HOST",
required=False,
help="The host name or IP address of the device to connect to.",
)
@click.option(
"--port",
envvar="KASA_PORT",
required=False,
type=int,
help="The port of the device to connect to.",
)
@click.option(
"--alias",
envvar="KASA_NAME",
required=False,
help="The device name, or alias, of the device to connect to.",
)
@click.option(
"--target",
envvar="KASA_TARGET",
default="255.255.255.255",
required=False,
show_default=True,
help="The broadcast address to be used for discovery.",
)
@click.option(
"-v",
"--verbose",
envvar="KASA_VERBOSE",
required=False,
default=False,
is_flag=True,
help="Be more verbose on output",
)
@click.option(
"-d",
"--debug",
envvar="KASA_DEBUG",
default=False,
is_flag=True,
help="Print debug output",
)
@click.option(
"--type",
envvar="KASA_TYPE",
default=None,
type=click.Choice(list(TYPE_TO_CLASS), case_sensitive=False),
)
@click.option(
"--json/--no-json",
envvar="KASA_JSON",
default=False,
is_flag=True,
help="Output raw device response as JSON.",
)
@click.option(
"-e",
"--encrypt-type",
envvar="KASA_ENCRYPT_TYPE",
default=None,
type=click.Choice(ENCRYPT_TYPES, case_sensitive=False),
)
@click.option(
"--device-family",
envvar="KASA_DEVICE_FAMILY",
default="SMART.TAPOPLUG",
type=click.Choice(DEVICE_FAMILY_TYPES, case_sensitive=False),
)
@click.option(
"-lv",
"--login-version",
envvar="KASA_LOGIN_VERSION",
default=2,
type=int,
)
@click.option(
"--timeout",
envvar="KASA_TIMEOUT",
default=5,
required=False,
show_default=True,
help="Timeout for device communications.",
)
@click.option(
"--discovery-timeout",
envvar="KASA_DISCOVERY_TIMEOUT",
default=5,
required=False,
show_default=True,
help="Timeout for discovery.",
)
@click.option(
"--username",
default=None,
required=False,
envvar="KASA_USERNAME",
help="Username/email address to authenticate to device.",
)
@click.option(
"--password",
default=None,
required=False,
envvar="KASA_PASSWORD",
help="Password to use to authenticate to device.",
)
@click.option(
"--credentials-hash",
default=None,
required=False,
envvar="KASA_CREDENTIALS_HASH",
help="Hashed credentials used to authenticate to the device.",
)
@click.version_option(package_name="python-kasa")
@click.pass_context
async def cli(
ctx,
host,
port,
alias,
target,
verbose,
debug,
type,
encrypt_type,
device_family,
login_version,
json,
timeout,
discovery_timeout,
username,
password,
credentials_hash,
):
"""A tool for controlling TP-Link smart home devices.""" # noqa
# no need to perform any checks if we are just displaying the help
if "--help" in sys.argv:
# Context object is required to avoid crashing on sub-groups
ctx.obj = object()
return
# If JSON output is requested, disable echo
global echo
if json:
def _nop_echo(*args, **kwargs):
pass
echo = _nop_echo
else:
# Set back to default is required if running tests with CliRunner
global _do_echo
echo = _do_echo
logging_config: dict[str, Any] = {
"level": logging.DEBUG if debug > 0 else logging.INFO
}
try:
from rich.logging import RichHandler
rich_config = {
"show_time": False,
}
logging_config["handlers"] = [RichHandler(**rich_config)]
logging_config["format"] = "%(message)s"
except ImportError:
pass
# The configuration should be converted to use dictConfig,
# but this keeps mypy happy for now
logging.basicConfig(**logging_config) # type: ignore
if ctx.invoked_subcommand == "discover":
return
if alias is not None and host is not None:
raise click.BadOptionUsage("alias", "Use either --alias or --host, not both.")
if alias is not None and host is None:
echo(f"Alias is given, using discovery to find host {alias}")
host = await find_host_from_alias(alias=alias, target=target)
if host:
echo(f"Found hostname is {host}")
else:
echo(f"No device with name {alias} found")
return
if bool(password) != bool(username):
raise click.BadOptionUsage(
"username", "Using authentication requires both --username and --password"
)
if username:
credentials = Credentials(username=username, password=password)
else:
credentials = None
if host is None:
if ctx.invoked_subcommand and ctx.invoked_subcommand != "discover":
error("Only discover is available without --host or --alias")
echo("No host name given, trying discovery..")
return await ctx.invoke(discover)
device_updated = False
if type is not None:
config = DeviceConfig(host=host, port_override=port, timeout=timeout)
dev = TYPE_TO_CLASS[type](host, config=config)
elif device_family and encrypt_type:
ctype = DeviceConnectionParameters(
DeviceFamily(device_family),
DeviceEncryptionType(encrypt_type),
login_version,
)
config = DeviceConfig(
host=host,
port_override=port,
credentials=credentials,
credentials_hash=credentials_hash,
timeout=timeout,
connection_type=ctype,
)
dev = await Device.connect(config=config)
device_updated = True
else:
dev = await Discover.discover_single(
host,
port=port,
credentials=credentials,
timeout=timeout,
discovery_timeout=discovery_timeout,
)
# Skip update on specific commands, or if device factory,
# that performs an update was used for the device.
if ctx.invoked_subcommand not in SKIP_UPDATE_COMMANDS and not device_updated:
await dev.update()
@asynccontextmanager
async def async_wrapped_device(device: Device):
try:
yield device
finally:
await device.disconnect()
ctx.obj = await ctx.with_async_resource(async_wrapped_device(dev))
if ctx.invoked_subcommand is None:
return await ctx.invoke(state)
@cli.group()
@pass_dev
def wifi(dev):
"""Commands to control wifi settings."""
@wifi.command()
@pass_dev
async def scan(dev):
"""Scan for available wifi networks."""
echo("Scanning for wifi networks, wait a second..")
devs = await dev.wifi_scan()
echo(f"Found {len(devs)} wifi networks!")
for dev in devs:
echo(f"\t {dev}")
return devs
@wifi.command()
@click.argument("ssid")
@click.option("--keytype", prompt=True)
@click.option("--password", prompt=True, hide_input=True)
@pass_dev
async def join(dev: Device, ssid: str, password: str, keytype: str):
"""Join the given wifi network."""
echo(f"Asking the device to connect to {ssid}..")
res = await dev.wifi_join(ssid, password, keytype=keytype)
echo(
f"Response: {res} - if the device is not able to join the network, "
f"it will revert back to its previous state."
)
return res
@cli.command()
@click.pass_context
async def discover(ctx):
"""Discover devices in the network."""
target = ctx.parent.params["target"]
username = ctx.parent.params["username"]
password = ctx.parent.params["password"]
discovery_timeout = ctx.parent.params["discovery_timeout"]
timeout = ctx.parent.params["timeout"]
port = ctx.parent.params["port"]
credentials = Credentials(username, password) if username and password else None
sem = asyncio.Semaphore()
discovered = dict()
unsupported = []
auth_failed = []
async def print_unsupported(unsupported_exception: UnsupportedDeviceError):
unsupported.append(unsupported_exception)
async with sem:
if unsupported_exception.discovery_result:
echo("== Unsupported device ==")
_echo_discovery_info(unsupported_exception.discovery_result)
echo()
else:
echo("== Unsupported device ==")
echo(f"\t{unsupported_exception}")
echo()
echo(f"Discovering devices on {target} for {discovery_timeout} seconds")
async def print_discovered(dev: Device):
async with sem:
try:
await dev.update()
except AuthenticationError:
auth_failed.append(dev._discovery_info)
echo("== Authentication failed for device ==")
_echo_discovery_info(dev._discovery_info)
echo()
else:
ctx.parent.obj = dev
await ctx.parent.invoke(state)
discovered[dev.host] = dev.internal_state
echo()
discovered_devices = await Discover.discover(
target=target,
discovery_timeout=discovery_timeout,
on_discovered=print_discovered,
on_unsupported=print_unsupported,
port=port,
timeout=timeout,
credentials=credentials,
)
for device in discovered_devices.values():
await device.protocol.close()
echo(f"Found {len(discovered)} devices")
if unsupported:
echo(f"Found {len(unsupported)} unsupported devices")
if auth_failed:
echo(f"Found {len(auth_failed)} devices that failed to authenticate")
return discovered
def _echo_dictionary(discovery_info: dict):
echo("\t[bold]== Discovery information ==[/bold]")
for key, value in discovery_info.items():
key_name = " ".join(x.capitalize() or "_" for x in key.split("_"))
key_name_and_spaces = "{:<15}".format(key_name + ":")
echo(f"\t{key_name_and_spaces}{value}")
def _echo_discovery_info(discovery_info):
# We don't have discovery info when all connection params are passed manually
if discovery_info is None:
return
if "system" in discovery_info and "get_sysinfo" in discovery_info["system"]:
_echo_dictionary(discovery_info["system"]["get_sysinfo"])
return
try:
dr = DiscoveryResult(**discovery_info)
except ValidationError:
_echo_dictionary(discovery_info)
return
echo("\t[bold]== Discovery Result ==[/bold]")
echo(f"\tDevice Type: {dr.device_type}")
echo(f"\tDevice Model: {dr.device_model}")
echo(f"\tIP: {dr.ip}")
echo(f"\tMAC: {dr.mac}")
echo(f"\tDevice Id (hash): {dr.device_id}")
echo(f"\tOwner (hash): {dr.owner}")
echo(f"\tHW Ver: {dr.hw_ver}")
echo(f"\tSupports IOT Cloud: {dr.is_support_iot_cloud}")
echo(f"\tOBD Src: {dr.obd_src}")
echo(f"\tFactory Default: {dr.factory_default}")
echo(f"\tEncrypt Type: {dr.mgt_encrypt_schm.encrypt_type}")
echo(f"\tSupports HTTPS: {dr.mgt_encrypt_schm.is_support_https}")
echo(f"\tHTTP Port: {dr.mgt_encrypt_schm.http_port}")
echo(f"\tLV (Login Level): {dr.mgt_encrypt_schm.lv}")
async def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attempts=3):
"""Discover a device identified by its alias."""
for _attempt in range(1, attempts):
found_devs = await Discover.discover(target=target, timeout=timeout)
for _ip, dev in found_devs.items():
if dev.alias.lower() == alias.lower():
host = dev.host
return host
return None
@cli.command()
@pass_dev_or_child
async def sysinfo(dev):
"""Print out full system information."""
echo("== System info ==")
echo(pf(dev.sys_info))
return dev.sys_info
def _echo_features(
features: dict[str, Feature],
title: str,
category: Feature.Category | None = None,
verbose: bool = False,
indent: str = "\t",
):
"""Print out a listing of features and their values."""
if category is not None:
features = {
id_: feat for id_, feat in features.items() if feat.category == category
}
echo(f"{indent}[bold]{title}[/bold]")
for _, feat in features.items():
try:
echo(f"{indent}{feat}")
if verbose:
echo(f"{indent}\tType: {feat.type}")
echo(f"{indent}\tCategory: {feat.category}")
echo(f"{indent}\tIcon: {feat.icon}")
except Exception as ex:
echo(f"{indent}{feat.name} ({feat.id}): [red]got exception ({ex})[/red]")
def _echo_all_features(features, *, verbose=False, title_prefix=None, indent=""):
"""Print out all features by category."""
if title_prefix is not None:
echo(f"[bold]\n{indent}== {title_prefix} ==[/bold]")
echo()
_echo_features(
features,
title="== Primary features ==",
category=Feature.Category.Primary,
verbose=verbose,
indent=indent,
)
echo()
_echo_features(
features,
title="== Information ==",
category=Feature.Category.Info,
verbose=verbose,
indent=indent,
)
echo()
_echo_features(
features,
title="== Configuration ==",
category=Feature.Category.Config,
verbose=verbose,
indent=indent,
)
echo()
_echo_features(
features,
title="== Debug ==",
category=Feature.Category.Debug,
verbose=verbose,
indent=indent,
)
@cli.command()
@pass_dev_or_child
@click.pass_context
async def state(ctx, dev: Device):
"""Print out device state and versions."""
verbose = ctx.parent.params.get("verbose", False) if ctx.parent else False
echo(f"[bold]== {dev.alias} - {dev.model} ==[/bold]")
echo(f"Host: {dev.host}")
echo(f"Port: {dev.port}")
echo(f"Device state: {dev.is_on}")
echo(f"Time: {dev.time} (tz: {dev.timezone}")
echo(f"Hardware: {dev.hw_info['hw_ver']}")
echo(f"Software: {dev.hw_info['sw_ver']}")
echo(f"MAC (rssi): {dev.mac} ({dev.rssi})")
if verbose:
echo(f"Location: {dev.location}")
echo()
_echo_all_features(dev.features, verbose=verbose)
if verbose:
echo("\n[bold]== Modules ==[/bold]")
for module in dev.modules.values():
echo(f"[green]+ {module}[/green]")
if dev.children:
echo("\n[bold]== Children ==[/bold]")
for child in dev.children:
_echo_all_features(
child.features,
title_prefix=f"{child.alias} ({child.model})",
verbose=verbose,
indent="\t",
)
if verbose:
echo(f"\n\t[bold]== Child {child.alias} Modules ==[/bold]")
for module in child.modules.values():
echo(f"\t[green]+ {module}[/green]")
echo()
if verbose:
echo("\n\t[bold]== Protocol information ==[/bold]")
echo(f"\tCredentials hash: {dev.credentials_hash}")
echo()
_echo_discovery_info(dev._discovery_info)
return dev.internal_state
@cli.command()
@click.argument("new_alias", required=False, default=None)
@pass_dev_or_child
async def alias(dev, new_alias):
"""Get or set the device (or plug) alias."""
if new_alias is not None:
echo(f"Setting alias to {new_alias}")
res = await dev.set_alias(new_alias)
await dev.update()
echo(f"Alias set to: {dev.alias}")
return res
echo(f"Alias: {dev.alias}")
if dev.children:
for plug in dev.children:
echo(f" * {plug.alias}")
return dev.alias
@cli.command()
@click.pass_context
@click.argument("module")
@click.argument("command")
@click.argument("parameters", default=None, required=False)
async def raw_command(ctx, module, command, parameters):
"""Run a raw command on the device."""
logging.warning("Deprecated, use 'kasa command --module %s %s'", module, command)
return await ctx.forward(cmd_command)
@cli.command(name="command")
@click.option("--module", required=False, help="Module for IOT protocol.")
@click.argument("command")
@click.argument("parameters", default=None, required=False)
@pass_dev_or_child
async def cmd_command(dev: Device, module, command, parameters):
"""Run a raw command on the device."""
if parameters is not None:
parameters = ast.literal_eval(parameters)
if isinstance(dev, IotDevice):
res = await dev._query_helper(module, command, parameters)
elif isinstance(dev, SmartDevice):
res = await dev._query_helper(command, parameters)
else:
raise KasaException("Unexpected device type %s.", dev)
echo(json.dumps(res))
return res
@cli.command()
@click.option("--index", type=int, required=False)
@click.option("--name", type=str, required=False)
@click.option("--year", type=click.DateTime(["%Y"]), default=None, required=False)
@click.option("--month", type=click.DateTime(["%Y-%m"]), default=None, required=False)
@click.option("--erase", is_flag=True)
@click.pass_context
async def emeter(ctx: click.Context, index, name, year, month, erase):
"""Query emeter for historical consumption."""
logging.warning("Deprecated, use 'kasa energy'")
return await ctx.invoke(
energy, child_index=index, child=name, year=year, month=month, erase=erase
)
@cli.command()
@click.option("--year", type=click.DateTime(["%Y"]), default=None, required=False)
@click.option("--month", type=click.DateTime(["%Y-%m"]), default=None, required=False)
@click.option("--erase", is_flag=True)
@pass_dev_or_child
async def energy(dev: Device, year, month, erase):
"""Query energy module for historical consumption.
Daily and monthly data provided in CSV format.
"""
echo("[bold]== Emeter ==[/bold]")
if not dev.has_emeter:
error("Device has no emeter")
return
if (year or month or erase) and not isinstance(dev, IotDevice):
error("Device has no historical statistics")
return
else:
dev = cast(IotDevice, dev)
if erase:
echo("Erasing emeter statistics..")
return await dev.erase_emeter_stats()
if year:
echo(f"== For year {year.year} ==")
echo("Month, usage (kWh)")
usage_data = await dev.get_emeter_monthly(year=year.year)
elif month:
echo(f"== For month {month.month} of {month.year} ==")
echo("Day, usage (kWh)")
usage_data = await dev.get_emeter_daily(year=month.year, month=month.month)
else:
# Call with no argument outputs summary data and returns
if isinstance(dev, IotStripPlug):
emeter_status = await dev.get_emeter_realtime()
else:
emeter_status = dev.emeter_realtime
echo("Current: %s A" % emeter_status["current"])
echo("Voltage: %s V" % emeter_status["voltage"])
echo("Power: %s W" % emeter_status["power"])
echo("Total consumption: %s kWh" % emeter_status["total"])
echo("Today: %s kWh" % dev.emeter_today)
echo("This month: %s kWh" % dev.emeter_this_month)
return emeter_status
# output any detailed usage data
for index, usage in usage_data.items():
echo(f"{index}, {usage}")
return usage_data
@cli.command()
@click.option("--year", type=click.DateTime(["%Y"]), default=None, required=False)
@click.option("--month", type=click.DateTime(["%Y-%m"]), default=None, required=False)
@click.option("--erase", is_flag=True)
@pass_dev_or_child
async def usage(dev: Device, year, month, erase):
"""Query usage for historical consumption.
Daily and monthly data provided in CSV format.
"""
echo("[bold]== Usage ==[/bold]")
usage = cast(Usage, dev.modules["usage"])
if erase:
echo("Erasing usage statistics..")
return await usage.erase_stats()
if year:
echo(f"== For year {year.year} ==")
echo("Month, usage (minutes)")
usage_data = await usage.get_monthstat(year=year.year)
elif month:
echo(f"== For month {month.month} of {month.year} ==")
echo("Day, usage (minutes)")
usage_data = await usage.get_daystat(year=month.year, month=month.month)
else:
# Call with no argument outputs summary data and returns
echo("Today: %s minutes" % usage.usage_today)
echo("This month: %s minutes" % usage.usage_this_month)
return usage
# output any detailed usage data
for index, usage in usage_data.items():
echo(f"{index}, {usage}")
return usage_data
@cli.command()
@click.argument("brightness", type=click.IntRange(0, 100), default=None, required=False)
@click.option("--transition", type=int, required=False)
@pass_dev_or_child
async def brightness(dev: Device, brightness: int, transition: int):
"""Get or set brightness."""
if not (light := dev.modules.get(Module.Light)) or not light.is_dimmable:
error("This device does not support brightness.")
return
if brightness is None:
echo(f"Brightness: {light.brightness}")
return light.brightness
else:
echo(f"Setting brightness to {brightness}")
return await light.set_brightness(brightness, transition=transition)
@cli.command()
@click.argument(
"temperature", type=click.IntRange(2500, 9000), default=None, required=False
)
@click.option("--transition", type=int, required=False)
@pass_dev_or_child
async def temperature(dev: Device, temperature: int, transition: int):
"""Get or set color temperature."""
if not (light := dev.modules.get(Module.Light)) or not light.is_variable_color_temp:
error("Device does not support color temperature")
return
if temperature is None:
echo(f"Color temperature: {light.color_temp}")
valid_temperature_range = light.valid_temperature_range
if valid_temperature_range != (0, 0):
echo("(min: {}, max: {})".format(*valid_temperature_range))
else:
echo(
"Temperature range unknown, please open a github issue"
f" or a pull request for model '{dev.model}'"
)
return light.valid_temperature_range
else:
echo(f"Setting color temperature to {temperature}")
return await light.set_color_temp(temperature, transition=transition)
@cli.command()
@click.argument("effect", type=click.STRING, default=None, required=False)
@click.pass_context
@pass_dev_or_child
async def effect(dev: Device, ctx, effect):
"""Set an effect."""
if not (light_effect := dev.modules.get(Module.LightEffect)):
error("Device does not support effects")
return
if effect is None:
echo(
f"Light effect: {light_effect.effect}\n"
+ f"Available Effects: {light_effect.effect_list}"
)
return light_effect.effect
if effect not in light_effect.effect_list:
raise click.BadArgumentUsage(
f"Effect must be one of: {light_effect.effect_list}", ctx
)
echo(f"Setting Effect: {effect}")
return await light_effect.set_effect(effect)
@cli.command()
@click.argument("h", type=click.IntRange(0, 360), default=None, required=False)
@click.argument("s", type=click.IntRange(0, 100), default=None, required=False)
@click.argument("v", type=click.IntRange(0, 100), default=None, required=False)
@click.option("--transition", type=int, required=False)
@click.pass_context
@pass_dev_or_child
async def hsv(dev: Device, ctx, h, s, v, transition):
"""Get or set color in HSV."""
if not (light := dev.modules.get(Module.Light)) or not light.is_color:
error("Device does not support colors")
return
if h is None and s is None and v is None:
echo(f"Current HSV: {light.hsv}")
return light.hsv
elif s is None or v is None:
raise click.BadArgumentUsage("Setting a color requires 3 values.", ctx)
else:
echo(f"Setting HSV: {h} {s} {v}")
return await light.set_hsv(h, s, v, transition=transition)
@cli.command()
@click.argument("state", type=bool, required=False)
@pass_dev_or_child
async def led(dev: Device, state):
"""Get or set (Plug's) led state."""
if not (led := dev.modules.get(Module.Led)):
error("Device does not support led.")
return
if state is not None:
echo(f"Turning led to {state}")
return await led.set_led(state)
else:
echo(f"LED state: {led.led}")
return led.led
@cli.group(invoke_without_command=True)
@click.pass_context
async def time(ctx: click.Context):
"""Get and set time."""
if ctx.invoked_subcommand is None:
await ctx.invoke(time_get)
@time.command(name="get")
@pass_dev
async def time_get(dev: Device):
"""Get the device time."""
res = dev.time
echo(f"Current time: {res}")
return res
@time.command(name="sync")
@pass_dev
async def time_sync(dev: Device):
"""Set the device time to current time."""
if not isinstance(dev, SmartDevice):
raise NotImplementedError("setting time currently only implemented on smart")
if (time := dev.modules.get(Module.Time)) is None:
echo("Device does not have time module")
return
echo("Old time: %s" % time.time)
local_tz = datetime.now().astimezone().tzinfo
await time.set_time(datetime.now(tz=local_tz))
await dev.update()
echo("New time: %s" % time.time)
@cli.command()
@click.option("--transition", type=int, required=False)
@pass_dev_or_child
async def on(dev: Device, transition: int):
"""Turn the device on."""
echo(f"Turning on {dev.alias}")
return await dev.turn_on(transition=transition)
@cli.command
@click.option("--transition", type=int, required=False)
@pass_dev_or_child
async def off(dev: Device, transition: int):
"""Turn the device off."""
echo(f"Turning off {dev.alias}")
return await dev.turn_off(transition=transition)
@cli.command()
@click.option("--transition", type=int, required=False)
@pass_dev_or_child
async def toggle(dev: Device, transition: int):
"""Toggle the device on/off."""
if dev.is_on:
echo(f"Turning off {dev.alias}")
return await dev.turn_off(transition=transition)
echo(f"Turning on {dev.alias}")
return await dev.turn_on(transition=transition)
@cli.command()
@click.option("--delay", default=1)
@pass_dev
async def reboot(plug, delay):
"""Reboot the device."""
echo("Rebooting the device..")
return await plug.reboot(delay)
@cli.group()
@pass_dev
async def schedule(dev):
"""Scheduling commands."""
@schedule.command(name="list")
@pass_dev_or_child
@click.argument("type", default="schedule")
async def _schedule_list(dev, type):
"""Return the list of schedule actions for the given type."""
sched = dev.modules[type]
for rule in sched.rules:
print(rule)
else:
error(f"No rules of type {type}")
return sched.rules
@schedule.command(name="delete")
@pass_dev_or_child
@click.option("--id", type=str, required=True)
async def delete_rule(dev, id):
"""Delete rule from device."""
schedule = dev.modules["schedule"]
rule_to_delete = next(filter(lambda rule: (rule.id == id), schedule.rules), None)
if rule_to_delete:
echo(f"Deleting rule id {id}")
return await schedule.delete_rule(rule_to_delete)
else:
error(f"No rule with id {id} was found")
@cli.group(invoke_without_command=True)
@pass_dev_or_child
@click.pass_context
async def presets(ctx, dev):
"""List and modify bulb setting presets."""
if ctx.invoked_subcommand is None:
return await ctx.invoke(presets_list)
@presets.command(name="list")
@pass_dev_or_child
def presets_list(dev: Device):
"""List presets."""
if not (light_preset := dev.modules.get(Module.LightPreset)):
error("Presets not supported on device")
return
for preset in light_preset.preset_states_list:
echo(preset)
return light_preset.preset_states_list
@presets.command(name="modify")
@click.argument("index", type=int)
@click.option("--brightness", type=int)
@click.option("--hue", type=int)
@click.option("--saturation", type=int)
@click.option("--temperature", type=int)
@pass_dev_or_child
async def presets_modify(dev: Device, index, brightness, hue, saturation, temperature):
"""Modify a preset."""
for preset in dev.presets:
if preset.index == index:
break
else:
error(f"No preset found for index {index}")
return
if brightness is not None:
preset.brightness = brightness
if hue is not None:
preset.hue = hue
if saturation is not None:
preset.saturation = saturation
if temperature is not None:
preset.color_temp = temperature
echo(f"Going to save preset: {preset}")
return await dev.save_preset(preset)
@cli.command()
@pass_dev_or_child
@click.option("--type", type=click.Choice(["soft", "hard"], case_sensitive=False))
@click.option("--last", is_flag=True)
@click.option("--preset", type=int)
async def turn_on_behavior(dev: Device, type, last, preset):
"""Modify bulb turn-on behavior."""
if not dev.is_bulb or not isinstance(dev, IotBulb):
error("Presets only supported on iot bulbs")
return
settings = await dev.get_turn_on_behavior()
echo(f"Current turn on behavior: {settings}")
# Return if we are not setting the value
if not type and not last and not preset:
return settings
# If we are setting the value, the type has to be specified
if (last or preset) and type is None:
echo("To set the behavior, you need to define --type")
return
behavior = getattr(settings, type)
if last:
echo(f"Going to set {type} to last")
behavior.preset = None
elif preset is not None:
echo(f"Going to set {type} to preset {preset}")
behavior.preset = preset
return await dev.set_turn_on_behavior(settings)
@cli.command()
@pass_dev
@click.option(
"--username", required=True, prompt=True, help="New username to set on the device"
)
@click.option(
"--password", required=True, prompt=True, help="New password to set on the device"
)
async def update_credentials(dev, username, password):
"""Update device credentials for authenticated devices."""
if not isinstance(dev, SmartDevice):
error("Credentials can only be updated on authenticated devices.")
click.confirm("Do you really want to replace the existing credentials?", abort=True)
return await dev.update_credentials(username, password)
@cli.command()
@pass_dev_or_child
async def shell(dev: Device):
"""Open interactive shell."""
echo("Opening shell for %s" % dev)
from ptpython.repl import embed
logging.getLogger("parso").setLevel(logging.WARNING) # prompt parsing
logging.getLogger("asyncio").setLevel(logging.WARNING)
loop = asyncio.get_event_loop()
try:
await embed( # type: ignore[func-returns-value]
globals=globals(),
locals=locals(),
return_asyncio_coroutine=True,
patch_stdout=True,
)
except EOFError:
loop.stop()
@cli.command(name="feature")
@click.argument("name", required=False)
@click.argument("value", required=False)
@pass_dev_or_child
@click.pass_context
async def feature(
ctx: click.Context,
dev: Device,
name: str,
value,
):
"""Access and modify features.
If no *name* is given, lists available features and their values.
If only *name* is given, the value of named feature is returned.
If both *name* and *value* are set, the described setting is changed.
"""
verbose = ctx.parent.params.get("verbose", False) if ctx.parent else False
if not name:
_echo_all_features(dev.features, verbose=verbose, indent="")
if dev.children:
for child_dev in dev.children:
_echo_all_features(
child_dev.features,
verbose=verbose,
title_prefix=f"Child {child_dev.alias}",
indent="\t",
)
return
if name not in dev.features:
error(f"No feature by name '{name}'")
return
feat = dev.features[name]
if value is None:
unit = f" {feat.unit}" if feat.unit else ""
echo(f"{feat.name} ({name}): {feat.value}{unit}")
return feat.value
value = ast.literal_eval(value)
echo(f"Changing {name} from {feat.value} to {value}")
response = await dev.features[name].set_value(value)
await dev.update()
echo(f"New state: {feat.value}")
return response
if __name__ == "__main__":
cli()