mirror of
https://github.com/python-kasa/python-kasa.git
synced 2024-12-23 11:43:34 +00:00
1143 lines
33 KiB
Python
Executable File
1143 lines
33 KiB
Python
Executable File
"""python-kasa cli tool."""
|
|
import ast
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
from functools import singledispatch, wraps
|
|
from pprint import pformat as pf
|
|
from typing import Any, Dict, cast
|
|
|
|
import asyncclick as click
|
|
|
|
from kasa import (
|
|
AuthenticationException,
|
|
Bulb,
|
|
ConnectionType,
|
|
Credentials,
|
|
Device,
|
|
DeviceConfig,
|
|
DeviceFamilyType,
|
|
Discover,
|
|
EncryptType,
|
|
SmartDeviceException,
|
|
UnsupportedDeviceException,
|
|
)
|
|
from kasa.discover import DiscoveryResult
|
|
from kasa.iot import IotBulb, IotDevice, IotDimmer, IotLightStrip, IotPlug, IotStrip
|
|
from kasa.smart import SmartBulb, SmartDevice, SmartPlug
|
|
|
|
try:
|
|
from pydantic.v1 import ValidationError
|
|
except ImportError:
|
|
from pydantic import ValidationError
|
|
|
|
try:
|
|
from rich import print as _do_echo
|
|
except ImportError:
|
|
# Strip out rich formatting if rich is not installed
|
|
# but only lower case tags to avoid stripping out
|
|
# raw data from the device that is printed from
|
|
# the device state.
|
|
rich_formatting = re.compile(r"\[/?[a-z]+]")
|
|
|
|
def _strip_rich_formatting(echo_func):
|
|
"""Strip rich formatting from messages."""
|
|
|
|
@wraps(echo_func)
|
|
def wrapper(message=None, *args, **kwargs):
|
|
if message is not None:
|
|
message = rich_formatting.sub("", message)
|
|
echo_func(message, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
_do_echo = _strip_rich_formatting(click.echo)
|
|
|
|
# echo is set to _do_echo so that it can be reset to _do_echo later after
|
|
# --json has set it to _nop_echo
|
|
echo = _do_echo
|
|
|
|
|
|
TYPE_TO_CLASS = {
|
|
"plug": IotPlug,
|
|
"bulb": IotBulb,
|
|
"dimmer": IotDimmer,
|
|
"strip": IotStrip,
|
|
"lightstrip": IotLightStrip,
|
|
"iot.plug": IotPlug,
|
|
"iot.bulb": IotBulb,
|
|
"iot.dimmer": IotDimmer,
|
|
"iot.strip": IotStrip,
|
|
"iot.lightstrip": IotLightStrip,
|
|
"smart.plug": SmartPlug,
|
|
"smart.bulb": SmartBulb,
|
|
}
|
|
|
|
ENCRYPT_TYPES = [encrypt_type.value for encrypt_type in EncryptType]
|
|
|
|
DEVICE_FAMILY_TYPES = [
|
|
device_family_type.value for device_family_type in DeviceFamilyType
|
|
]
|
|
|
|
# Block list of commands which require no update
|
|
SKIP_UPDATE_COMMANDS = ["wifi", "raw-command", "command"]
|
|
|
|
click.anyio_backend = "asyncio"
|
|
|
|
pass_dev = click.make_pass_decorator(Device)
|
|
|
|
|
|
class ExceptionHandlerGroup(click.Group):
|
|
"""Group to capture all exceptions and print them nicely.
|
|
|
|
Idea from https://stackoverflow.com/a/44347763
|
|
"""
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
"""Run the coroutine in the event loop and print any exceptions."""
|
|
try:
|
|
asyncio.get_event_loop().run_until_complete(self.main(*args, **kwargs))
|
|
except Exception as ex:
|
|
echo(f"Got error: {ex!r}")
|
|
|
|
|
|
def json_formatter_cb(result, **kwargs):
|
|
"""Format and output the result as JSON, if requested."""
|
|
if not kwargs.get("json"):
|
|
return
|
|
|
|
@singledispatch
|
|
def to_serializable(val):
|
|
"""Regular obj-to-string for json serialization.
|
|
|
|
The singledispatch trick is from hynek: https://hynek.me/articles/serialization/
|
|
"""
|
|
return str(val)
|
|
|
|
@to_serializable.register(Device)
|
|
def _device_to_serializable(val: Device):
|
|
"""Serialize smart device data, just using the last update raw payload."""
|
|
return val.internal_state
|
|
|
|
json_content = json.dumps(result, indent=4, default=to_serializable)
|
|
print(json_content)
|
|
|
|
|
|
@click.group(
|
|
invoke_without_command=True,
|
|
cls=ExceptionHandlerGroup,
|
|
result_callback=json_formatter_cb,
|
|
)
|
|
@click.option(
|
|
"--host",
|
|
envvar="KASA_HOST",
|
|
required=False,
|
|
help="The host name or IP address of the device to connect to.",
|
|
)
|
|
@click.option(
|
|
"--port",
|
|
envvar="KASA_PORT",
|
|
required=False,
|
|
type=int,
|
|
help="The port of the device to connect to.",
|
|
)
|
|
@click.option(
|
|
"--alias",
|
|
envvar="KASA_NAME",
|
|
required=False,
|
|
help="The device name, or alias, of the device to connect to.",
|
|
)
|
|
@click.option(
|
|
"--target",
|
|
envvar="KASA_TARGET",
|
|
default="255.255.255.255",
|
|
required=False,
|
|
show_default=True,
|
|
help="The broadcast address to be used for discovery.",
|
|
)
|
|
@click.option(
|
|
"-v",
|
|
"--verbose",
|
|
envvar="KASA_VERBOSE",
|
|
required=False,
|
|
default=False,
|
|
is_flag=True,
|
|
help="Be more verbose on output",
|
|
)
|
|
@click.option(
|
|
"-d",
|
|
"--debug",
|
|
envvar="KASA_DEBUG",
|
|
default=False,
|
|
is_flag=True,
|
|
help="Print debug output",
|
|
)
|
|
@click.option(
|
|
"--type",
|
|
envvar="KASA_TYPE",
|
|
default=None,
|
|
type=click.Choice(list(TYPE_TO_CLASS), case_sensitive=False),
|
|
)
|
|
@click.option(
|
|
"--json/--no-json",
|
|
envvar="KASA_JSON",
|
|
default=False,
|
|
is_flag=True,
|
|
help="Output raw device response as JSON.",
|
|
)
|
|
@click.option(
|
|
"--encrypt-type",
|
|
envvar="KASA_ENCRYPT_TYPE",
|
|
default=None,
|
|
type=click.Choice(ENCRYPT_TYPES, case_sensitive=False),
|
|
)
|
|
@click.option(
|
|
"--device-family",
|
|
envvar="KASA_DEVICE_FAMILY",
|
|
default=None,
|
|
type=click.Choice(DEVICE_FAMILY_TYPES, case_sensitive=False),
|
|
)
|
|
@click.option(
|
|
"--login-version",
|
|
envvar="KASA_LOGIN_VERSION",
|
|
default=None,
|
|
type=int,
|
|
)
|
|
@click.option(
|
|
"--timeout",
|
|
envvar="KASA_TIMEOUT",
|
|
default=5,
|
|
required=False,
|
|
show_default=True,
|
|
help="Timeout for device communications.",
|
|
)
|
|
@click.option(
|
|
"--discovery-timeout",
|
|
envvar="KASA_DISCOVERY_TIMEOUT",
|
|
default=3,
|
|
required=False,
|
|
show_default=True,
|
|
help="Timeout for discovery.",
|
|
)
|
|
@click.option(
|
|
"--username",
|
|
default=None,
|
|
required=False,
|
|
envvar="KASA_USERNAME",
|
|
help="Username/email address to authenticate to device.",
|
|
)
|
|
@click.option(
|
|
"--password",
|
|
default=None,
|
|
required=False,
|
|
envvar="KASA_PASSWORD",
|
|
help="Password to use to authenticate to device.",
|
|
)
|
|
@click.option(
|
|
"--credentials-hash",
|
|
default=None,
|
|
required=False,
|
|
envvar="KASA_CREDENTIALS_HASH",
|
|
help="Hashed credentials used to authenticate to the device.",
|
|
)
|
|
@click.version_option(package_name="python-kasa")
|
|
@click.pass_context
|
|
async def cli(
|
|
ctx,
|
|
host,
|
|
port,
|
|
alias,
|
|
target,
|
|
verbose,
|
|
debug,
|
|
type,
|
|
encrypt_type,
|
|
device_family,
|
|
login_version,
|
|
json,
|
|
timeout,
|
|
discovery_timeout,
|
|
username,
|
|
password,
|
|
credentials_hash,
|
|
):
|
|
"""A tool for controlling TP-Link smart home devices.""" # noqa
|
|
# no need to perform any checks if we are just displaying the help
|
|
if sys.argv[-1] == "--help":
|
|
# Context object is required to avoid crashing on sub-groups
|
|
ctx.obj = Device(None)
|
|
return
|
|
|
|
# If JSON output is requested, disable echo
|
|
global echo
|
|
if json:
|
|
|
|
def _nop_echo(*args, **kwargs):
|
|
pass
|
|
|
|
echo = _nop_echo
|
|
else:
|
|
# Set back to default is required if running tests with CliRunner
|
|
global _do_echo
|
|
echo = _do_echo
|
|
|
|
logging_config: Dict[str, Any] = {
|
|
"level": logging.DEBUG if debug > 0 else logging.INFO
|
|
}
|
|
try:
|
|
from rich.logging import RichHandler
|
|
|
|
rich_config = {
|
|
"show_time": False,
|
|
}
|
|
logging_config["handlers"] = [RichHandler(**rich_config)]
|
|
logging_config["format"] = "%(message)s"
|
|
except ImportError:
|
|
pass
|
|
|
|
# The configuration should be converted to use dictConfig,
|
|
# but this keeps mypy happy for now
|
|
logging.basicConfig(**logging_config) # type: ignore
|
|
|
|
if ctx.invoked_subcommand == "discover":
|
|
return
|
|
|
|
if alias is not None and host is not None:
|
|
raise click.BadOptionUsage("alias", "Use either --alias or --host, not both.")
|
|
|
|
if alias is not None and host is None:
|
|
echo(f"Alias is given, using discovery to find host {alias}")
|
|
host = await find_host_from_alias(alias=alias, target=target)
|
|
if host:
|
|
echo(f"Found hostname is {host}")
|
|
else:
|
|
echo(f"No device with name {alias} found")
|
|
return
|
|
|
|
if bool(password) != bool(username):
|
|
raise click.BadOptionUsage(
|
|
"username", "Using authentication requires both --username and --password"
|
|
)
|
|
|
|
if username:
|
|
credentials = Credentials(username=username, password=password)
|
|
else:
|
|
credentials = None
|
|
|
|
if host is None:
|
|
echo("No host name given, trying discovery..")
|
|
return await ctx.invoke(discover)
|
|
|
|
if type is not None:
|
|
dev = TYPE_TO_CLASS[type](host)
|
|
elif device_family and encrypt_type:
|
|
ctype = ConnectionType(
|
|
DeviceFamilyType(device_family),
|
|
EncryptType(encrypt_type),
|
|
login_version,
|
|
)
|
|
config = DeviceConfig(
|
|
host=host,
|
|
port_override=port,
|
|
credentials=credentials,
|
|
credentials_hash=credentials_hash,
|
|
timeout=timeout,
|
|
connection_type=ctype,
|
|
)
|
|
dev = await Device.connect(config=config)
|
|
else:
|
|
echo("No --type or --device-family and --encrypt-type defined, discovering..")
|
|
dev = await Discover.discover_single(
|
|
host,
|
|
port=port,
|
|
credentials=credentials,
|
|
)
|
|
|
|
# Skip update on specific commands, or if device factory,
|
|
# that performs an update was used for the device.
|
|
if ctx.invoked_subcommand not in SKIP_UPDATE_COMMANDS and not device_family:
|
|
await dev.update()
|
|
|
|
ctx.obj = dev
|
|
|
|
if ctx.invoked_subcommand is None:
|
|
return await ctx.invoke(state)
|
|
|
|
|
|
@cli.group()
|
|
@pass_dev
|
|
def wifi(dev):
|
|
"""Commands to control wifi settings."""
|
|
|
|
|
|
@wifi.command()
|
|
@pass_dev
|
|
async def scan(dev):
|
|
"""Scan for available wifi networks."""
|
|
echo("Scanning for wifi networks, wait a second..")
|
|
devs = await dev.wifi_scan()
|
|
echo(f"Found {len(devs)} wifi networks!")
|
|
for dev in devs:
|
|
echo(f"\t {dev}")
|
|
|
|
return devs
|
|
|
|
|
|
@wifi.command()
|
|
@click.argument("ssid")
|
|
@click.option("--keytype", prompt=True)
|
|
@click.option("--password", prompt=True, hide_input=True)
|
|
@pass_dev
|
|
async def join(dev: Device, ssid: str, password: str, keytype: str):
|
|
"""Join the given wifi network."""
|
|
echo(f"Asking the device to connect to {ssid}..")
|
|
res = await dev.wifi_join(ssid, password, keytype=keytype)
|
|
echo(
|
|
f"Response: {res} - if the device is not able to join the network, "
|
|
f"it will revert back to its previous state."
|
|
)
|
|
|
|
return res
|
|
|
|
|
|
@cli.command()
|
|
@click.pass_context
|
|
async def discover(ctx):
|
|
"""Discover devices in the network."""
|
|
target = ctx.parent.params["target"]
|
|
username = ctx.parent.params["username"]
|
|
password = ctx.parent.params["password"]
|
|
discovery_timeout = ctx.parent.params["discovery_timeout"]
|
|
timeout = ctx.parent.params["timeout"]
|
|
port = ctx.parent.params["port"]
|
|
|
|
credentials = Credentials(username, password) if username and password else None
|
|
|
|
sem = asyncio.Semaphore()
|
|
discovered = dict()
|
|
unsupported = []
|
|
auth_failed = []
|
|
|
|
async def print_unsupported(unsupported_exception: UnsupportedDeviceException):
|
|
unsupported.append(unsupported_exception)
|
|
async with sem:
|
|
if unsupported_exception.discovery_result:
|
|
echo("== Unsupported device ==")
|
|
_echo_discovery_info(unsupported_exception.discovery_result)
|
|
echo()
|
|
else:
|
|
echo("== Unsupported device ==")
|
|
echo(f"\t{unsupported_exception}")
|
|
echo()
|
|
|
|
echo(f"Discovering devices on {target} for {discovery_timeout} seconds")
|
|
|
|
async def print_discovered(dev: Device):
|
|
async with sem:
|
|
try:
|
|
await dev.update()
|
|
except AuthenticationException:
|
|
auth_failed.append(dev._discovery_info)
|
|
echo("== Authentication failed for device ==")
|
|
_echo_discovery_info(dev._discovery_info)
|
|
echo()
|
|
else:
|
|
ctx.parent.obj = dev
|
|
await ctx.parent.invoke(state)
|
|
discovered[dev.host] = dev.internal_state
|
|
echo()
|
|
|
|
discovered_devices = await Discover.discover(
|
|
target=target,
|
|
discovery_timeout=discovery_timeout,
|
|
on_discovered=print_discovered,
|
|
on_unsupported=print_unsupported,
|
|
port=port,
|
|
timeout=timeout,
|
|
credentials=credentials,
|
|
)
|
|
|
|
for device in discovered_devices.values():
|
|
await device.protocol.close()
|
|
|
|
echo(f"Found {len(discovered)} devices")
|
|
if unsupported:
|
|
echo(f"Found {len(unsupported)} unsupported devices")
|
|
if auth_failed:
|
|
echo(f"Found {len(auth_failed)} devices that failed to authenticate")
|
|
|
|
return discovered
|
|
|
|
|
|
def _echo_dictionary(discovery_info: dict):
|
|
echo("\t[bold]== Discovery information ==[/bold]")
|
|
for key, value in discovery_info.items():
|
|
key_name = " ".join(x.capitalize() or "_" for x in key.split("_"))
|
|
key_name_and_spaces = "{:<15}".format(key_name + ":")
|
|
echo(f"\t{key_name_and_spaces}{value}")
|
|
|
|
|
|
def _echo_discovery_info(discovery_info):
|
|
# We don't have discovery info when all connection params are passed manually
|
|
if discovery_info is None:
|
|
return
|
|
|
|
if "system" in discovery_info and "get_sysinfo" in discovery_info["system"]:
|
|
_echo_dictionary(discovery_info["system"]["get_sysinfo"])
|
|
return
|
|
|
|
try:
|
|
dr = DiscoveryResult(**discovery_info)
|
|
except ValidationError:
|
|
_echo_dictionary(discovery_info)
|
|
return
|
|
|
|
echo("\t[bold]== Discovery Result ==[/bold]")
|
|
echo(f"\tDevice Type: {dr.device_type}")
|
|
echo(f"\tDevice Model: {dr.device_model}")
|
|
echo(f"\tIP: {dr.ip}")
|
|
echo(f"\tMAC: {dr.mac}")
|
|
echo(f"\tDevice Id (hash): {dr.device_id}")
|
|
echo(f"\tOwner (hash): {dr.owner}")
|
|
echo(f"\tHW Ver: {dr.hw_ver}")
|
|
echo(f"\tSupports IOT Cloud: {dr.is_support_iot_cloud}")
|
|
echo(f"\tOBD Src: {dr.obd_src}")
|
|
echo(f"\tFactory Default: {dr.factory_default}")
|
|
echo(f"\tEncrypt Type: {dr.mgt_encrypt_schm.encrypt_type}")
|
|
echo(f"\tSupports HTTPS: {dr.mgt_encrypt_schm.is_support_https}")
|
|
echo(f"\tHTTP Port: {dr.mgt_encrypt_schm.http_port}")
|
|
echo(f"\tLV (Login Level): {dr.mgt_encrypt_schm.lv}")
|
|
|
|
|
|
async def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attempts=3):
|
|
"""Discover a device identified by its alias."""
|
|
for _attempt in range(1, attempts):
|
|
found_devs = await Discover.discover(target=target, timeout=timeout)
|
|
for _ip, dev in found_devs.items():
|
|
if dev.alias.lower() == alias.lower():
|
|
host = dev.host
|
|
return host
|
|
|
|
return None
|
|
|
|
|
|
@cli.command()
|
|
@pass_dev
|
|
async def sysinfo(dev):
|
|
"""Print out full system information."""
|
|
echo("== System info ==")
|
|
echo(pf(dev.sys_info))
|
|
return dev.sys_info
|
|
|
|
|
|
@cli.command()
|
|
@pass_dev
|
|
@click.pass_context
|
|
async def state(ctx, dev: Device):
|
|
"""Print out device state and versions."""
|
|
verbose = ctx.parent.params.get("verbose", False) if ctx.parent else False
|
|
|
|
echo(f"[bold]== {dev.alias} - {dev.model} ==[/bold]")
|
|
echo(f"\tHost: {dev.host}")
|
|
echo(f"\tPort: {dev.port}")
|
|
echo(f"\tDevice state: {dev.is_on}")
|
|
if dev.is_strip:
|
|
echo("\t[bold]== Plugs ==[/bold]")
|
|
for plug in dev.children: # type: ignore
|
|
echo(f"\t* Socket '{plug.alias}' state: {plug.is_on} since {plug.on_since}")
|
|
echo()
|
|
|
|
echo("\t[bold]== Generic information ==[/bold]")
|
|
echo(f"\tTime: {dev.time} (tz: {dev.timezone}")
|
|
echo(f"\tHardware: {dev.hw_info['hw_ver']}")
|
|
echo(f"\tSoftware: {dev.hw_info['sw_ver']}")
|
|
echo(f"\tMAC (rssi): {dev.mac} ({dev.rssi})")
|
|
echo(f"\tLocation: {dev.location}")
|
|
|
|
echo("\n\t[bold]== Device specific information ==[/bold]")
|
|
for info_name, info_data in dev.state_information.items():
|
|
if isinstance(info_data, list):
|
|
echo(f"\t{info_name}:")
|
|
for item in info_data:
|
|
echo(f"\t\t{item}")
|
|
else:
|
|
echo(f"\t{info_name}: {info_data}")
|
|
|
|
echo("\n\t[bold]== Descriptors == [/bold]")
|
|
for id_, descriptor in dev.descriptors.items():
|
|
echo(f"\t{descriptor.name} ({id_}): {descriptor.value}")
|
|
|
|
if dev.has_emeter:
|
|
echo("\n\t[bold]== Current State ==[/bold]")
|
|
emeter_status = dev.emeter_realtime
|
|
echo(f"\t{emeter_status}")
|
|
|
|
echo("\n\t[bold]== Modules ==[/bold]")
|
|
for module in dev.modules.values():
|
|
if module.is_supported:
|
|
echo(f"\t[green]+ {module}[/green]")
|
|
else:
|
|
echo(f"\t[red]- {module}[/red]")
|
|
|
|
if verbose:
|
|
echo("\n\t[bold]== Verbose information ==[/bold]")
|
|
echo(f"\tCredentials hash: {dev.credentials_hash}")
|
|
echo(f"\tDevice ID: {dev.device_id}")
|
|
for feature in dev.features:
|
|
echo(f"\tFeature: {feature}")
|
|
echo()
|
|
_echo_discovery_info(dev._discovery_info)
|
|
return dev.internal_state
|
|
|
|
|
|
@cli.command()
|
|
@pass_dev
|
|
@click.argument("new_alias", required=False, default=None)
|
|
@click.option("--index", type=int)
|
|
async def alias(dev, new_alias, index):
|
|
"""Get or set the device (or plug) alias."""
|
|
if index is not None:
|
|
if not dev.is_strip:
|
|
echo("Index can only used for power strips!")
|
|
return
|
|
dev = dev.get_plug_by_index(index)
|
|
|
|
if new_alias is not None:
|
|
echo(f"Setting alias to {new_alias}")
|
|
res = await dev.set_alias(new_alias)
|
|
return res
|
|
|
|
echo(f"Alias: {dev.alias}")
|
|
if dev.is_strip:
|
|
for plug in dev.children:
|
|
echo(f" * {plug.alias}")
|
|
|
|
return dev.alias
|
|
|
|
|
|
@cli.command()
|
|
@pass_dev
|
|
@click.pass_context
|
|
@click.argument("module")
|
|
@click.argument("command")
|
|
@click.argument("parameters", default=None, required=False)
|
|
async def raw_command(ctx, dev: Device, module, command, parameters):
|
|
"""Run a raw command on the device."""
|
|
logging.warning("Deprecated, use 'kasa command --module %s %s'", module, command)
|
|
return await ctx.forward(cmd_command)
|
|
|
|
|
|
@cli.command(name="command")
|
|
@pass_dev
|
|
@click.option("--module", required=False, help="Module for IOT protocol.")
|
|
@click.argument("command")
|
|
@click.argument("parameters", default=None, required=False)
|
|
async def cmd_command(dev: Device, module, command, parameters):
|
|
"""Run a raw command on the device."""
|
|
if parameters is not None:
|
|
parameters = ast.literal_eval(parameters)
|
|
|
|
if isinstance(dev, IotDevice):
|
|
res = await dev._query_helper(module, command, parameters)
|
|
elif isinstance(dev, SmartDevice):
|
|
res = await dev._query_helper(command, parameters)
|
|
else:
|
|
raise SmartDeviceException("Unexpected device type %s.", dev)
|
|
echo(json.dumps(res))
|
|
return res
|
|
|
|
|
|
@cli.command()
|
|
@pass_dev
|
|
@click.option("--index", type=int, required=False)
|
|
@click.option("--name", type=str, required=False)
|
|
@click.option("--year", type=click.DateTime(["%Y"]), default=None, required=False)
|
|
@click.option("--month", type=click.DateTime(["%Y-%m"]), default=None, required=False)
|
|
@click.option("--erase", is_flag=True)
|
|
async def emeter(dev: Device, index: int, name: str, year, month, erase):
|
|
"""Query emeter for historical consumption.
|
|
|
|
Daily and monthly data provided in CSV format.
|
|
"""
|
|
if index is not None or name is not None:
|
|
if not dev.is_strip:
|
|
echo("Index and name are only for power strips!")
|
|
return
|
|
|
|
if index is not None:
|
|
dev = dev.get_plug_by_index(index)
|
|
elif name:
|
|
dev = dev.get_plug_by_name(name)
|
|
|
|
echo("[bold]== Emeter ==[/bold]")
|
|
if not dev.has_emeter:
|
|
echo("Device has no emeter")
|
|
return
|
|
|
|
if (year or month or erase) and not isinstance(dev, IotDevice):
|
|
echo("Device has no historical statistics")
|
|
return
|
|
else:
|
|
dev = cast(IotDevice, dev)
|
|
|
|
if erase:
|
|
echo("Erasing emeter statistics..")
|
|
return await dev.erase_emeter_stats()
|
|
|
|
if year:
|
|
echo(f"== For year {year.year} ==")
|
|
echo("Month, usage (kWh)")
|
|
usage_data = await dev.get_emeter_monthly(year=year.year)
|
|
elif month:
|
|
echo(f"== For month {month.month} of {month.year} ==")
|
|
echo("Day, usage (kWh)")
|
|
usage_data = await dev.get_emeter_daily(year=month.year, month=month.month)
|
|
else:
|
|
# Call with no argument outputs summary data and returns
|
|
if index is not None or name is not None:
|
|
emeter_status = await dev.get_emeter_realtime()
|
|
else:
|
|
emeter_status = dev.emeter_realtime
|
|
|
|
echo("Current: %s A" % emeter_status["current"])
|
|
echo("Voltage: %s V" % emeter_status["voltage"])
|
|
echo("Power: %s W" % emeter_status["power"])
|
|
echo("Total consumption: %s kWh" % emeter_status["total"])
|
|
|
|
echo("Today: %s kWh" % dev.emeter_today)
|
|
echo("This month: %s kWh" % dev.emeter_this_month)
|
|
|
|
return emeter_status
|
|
|
|
# output any detailed usage data
|
|
for index, usage in usage_data.items():
|
|
echo(f"{index}, {usage}")
|
|
|
|
return usage_data
|
|
|
|
|
|
@cli.command()
|
|
@pass_dev
|
|
@click.option("--year", type=click.DateTime(["%Y"]), default=None, required=False)
|
|
@click.option("--month", type=click.DateTime(["%Y-%m"]), default=None, required=False)
|
|
@click.option("--erase", is_flag=True)
|
|
async def usage(dev: Device, year, month, erase):
|
|
"""Query usage for historical consumption.
|
|
|
|
Daily and monthly data provided in CSV format.
|
|
"""
|
|
echo("[bold]== Usage ==[/bold]")
|
|
usage = dev.modules["usage"]
|
|
|
|
if erase:
|
|
echo("Erasing usage statistics..")
|
|
return await usage.erase_stats()
|
|
|
|
if year:
|
|
echo(f"== For year {year.year} ==")
|
|
echo("Month, usage (minutes)")
|
|
usage_data = await usage.get_monthstat(year=year.year)
|
|
elif month:
|
|
echo(f"== For month {month.month} of {month.year} ==")
|
|
echo("Day, usage (minutes)")
|
|
usage_data = await usage.get_daystat(year=month.year, month=month.month)
|
|
else:
|
|
# Call with no argument outputs summary data and returns
|
|
echo("Today: %s minutes" % usage.usage_today)
|
|
echo("This month: %s minutes" % usage.usage_this_month)
|
|
|
|
return usage
|
|
|
|
# output any detailed usage data
|
|
for index, usage in usage_data.items():
|
|
echo(f"{index}, {usage}")
|
|
|
|
return usage_data
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("brightness", type=click.IntRange(0, 100), default=None, required=False)
|
|
@click.option("--transition", type=int, required=False)
|
|
@pass_dev
|
|
async def brightness(dev: Bulb, brightness: int, transition: int):
|
|
"""Get or set brightness."""
|
|
if not dev.is_dimmable:
|
|
echo("This device does not support brightness.")
|
|
return
|
|
|
|
if brightness is None:
|
|
echo(f"Brightness: {dev.brightness}")
|
|
return dev.brightness
|
|
else:
|
|
echo(f"Setting brightness to {brightness}")
|
|
return await dev.set_brightness(brightness, transition=transition)
|
|
|
|
|
|
@cli.command()
|
|
@click.argument(
|
|
"temperature", type=click.IntRange(2500, 9000), default=None, required=False
|
|
)
|
|
@click.option("--transition", type=int, required=False)
|
|
@pass_dev
|
|
async def temperature(dev: Bulb, temperature: int, transition: int):
|
|
"""Get or set color temperature."""
|
|
if not dev.is_variable_color_temp:
|
|
echo("Device does not support color temperature")
|
|
return
|
|
|
|
if temperature is None:
|
|
echo(f"Color temperature: {dev.color_temp}")
|
|
valid_temperature_range = dev.valid_temperature_range
|
|
if valid_temperature_range != (0, 0):
|
|
echo("(min: {}, max: {})".format(*valid_temperature_range))
|
|
else:
|
|
echo(
|
|
"Temperature range unknown, please open a github issue"
|
|
f" or a pull request for model '{dev.model}'"
|
|
)
|
|
return dev.valid_temperature_range
|
|
else:
|
|
echo(f"Setting color temperature to {temperature}")
|
|
return await dev.set_color_temp(temperature, transition=transition)
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("effect", type=click.STRING, default=None, required=False)
|
|
@click.pass_context
|
|
@pass_dev
|
|
async def effect(dev, ctx, effect):
|
|
"""Set an effect."""
|
|
if not dev.has_effects:
|
|
echo("Device does not support effects")
|
|
return
|
|
if effect is None:
|
|
raise click.BadArgumentUsage(
|
|
f"Setting an effect requires a named built-in effect: {dev.effect_list}",
|
|
ctx,
|
|
)
|
|
if effect not in dev.effect_list:
|
|
raise click.BadArgumentUsage(f"Effect must be one of: {dev.effect_list}", ctx)
|
|
|
|
echo(f"Setting Effect: {effect}")
|
|
return await dev.set_effect(effect)
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("h", type=click.IntRange(0, 360), default=None, required=False)
|
|
@click.argument("s", type=click.IntRange(0, 100), default=None, required=False)
|
|
@click.argument("v", type=click.IntRange(0, 100), default=None, required=False)
|
|
@click.option("--transition", type=int, required=False)
|
|
@click.pass_context
|
|
@pass_dev
|
|
async def hsv(dev, ctx, h, s, v, transition):
|
|
"""Get or set color in HSV."""
|
|
if not dev.is_color:
|
|
echo("Device does not support colors")
|
|
return
|
|
|
|
if h is None or s is None or v is None:
|
|
echo(f"Current HSV: {dev.hsv}")
|
|
return dev.hsv
|
|
elif s is None or v is None:
|
|
raise click.BadArgumentUsage("Setting a color requires 3 values.", ctx)
|
|
else:
|
|
echo(f"Setting HSV: {h} {s} {v}")
|
|
return await dev.set_hsv(h, s, v, transition=transition)
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("state", type=bool, required=False)
|
|
@pass_dev
|
|
async def led(dev, state):
|
|
"""Get or set (Plug's) led state."""
|
|
if state is not None:
|
|
echo(f"Turning led to {state}")
|
|
return await dev.set_led(state)
|
|
else:
|
|
echo(f"LED state: {dev.led}")
|
|
return dev.led
|
|
|
|
|
|
@cli.command()
|
|
@pass_dev
|
|
async def time(dev):
|
|
"""Get the device time."""
|
|
res = dev.time
|
|
echo(f"Current time: {res}")
|
|
return res
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--index", type=int, required=False)
|
|
@click.option("--name", type=str, required=False)
|
|
@click.option("--transition", type=int, required=False)
|
|
@pass_dev
|
|
async def on(dev: Device, index: int, name: str, transition: int):
|
|
"""Turn the device on."""
|
|
if index is not None or name is not None:
|
|
if not dev.is_strip:
|
|
echo("Index and name are only for power strips!")
|
|
return
|
|
|
|
if index is not None:
|
|
dev = dev.get_plug_by_index(index)
|
|
elif name:
|
|
dev = dev.get_plug_by_name(name)
|
|
|
|
echo(f"Turning on {dev.alias}")
|
|
return await dev.turn_on(transition=transition)
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--index", type=int, required=False)
|
|
@click.option("--name", type=str, required=False)
|
|
@click.option("--transition", type=int, required=False)
|
|
@pass_dev
|
|
async def off(dev: Device, index: int, name: str, transition: int):
|
|
"""Turn the device off."""
|
|
if index is not None or name is not None:
|
|
if not dev.is_strip:
|
|
echo("Index and name are only for power strips!")
|
|
return
|
|
|
|
if index is not None:
|
|
dev = dev.get_plug_by_index(index)
|
|
elif name:
|
|
dev = dev.get_plug_by_name(name)
|
|
|
|
echo(f"Turning off {dev.alias}")
|
|
return await dev.turn_off(transition=transition)
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--index", type=int, required=False)
|
|
@click.option("--name", type=str, required=False)
|
|
@click.option("--transition", type=int, required=False)
|
|
@pass_dev
|
|
async def toggle(dev: Device, index: int, name: str, transition: int):
|
|
"""Toggle the device on/off."""
|
|
if index is not None or name is not None:
|
|
if not dev.is_strip:
|
|
echo("Index and name are only for power strips!")
|
|
return
|
|
|
|
if index is not None:
|
|
dev = dev.get_plug_by_index(index)
|
|
elif name:
|
|
dev = dev.get_plug_by_name(name)
|
|
|
|
if dev.is_on:
|
|
echo(f"Turning off {dev.alias}")
|
|
return await dev.turn_off(transition=transition)
|
|
|
|
echo(f"Turning on {dev.alias}")
|
|
return await dev.turn_on(transition=transition)
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--delay", default=1)
|
|
@pass_dev
|
|
async def reboot(plug, delay):
|
|
"""Reboot the device."""
|
|
echo("Rebooting the device..")
|
|
return await plug.reboot(delay)
|
|
|
|
|
|
@cli.group()
|
|
@pass_dev
|
|
async def schedule(dev):
|
|
"""Scheduling commands."""
|
|
|
|
|
|
@schedule.command(name="list")
|
|
@pass_dev
|
|
@click.argument("type", default="schedule")
|
|
def _schedule_list(dev, type):
|
|
"""Return the list of schedule actions for the given type."""
|
|
sched = dev.modules[type]
|
|
for rule in sched.rules:
|
|
print(rule)
|
|
else:
|
|
echo(f"No rules of type {type}")
|
|
|
|
return sched.rules
|
|
|
|
|
|
@schedule.command(name="delete")
|
|
@pass_dev
|
|
@click.option("--id", type=str, required=True)
|
|
async def delete_rule(dev, id):
|
|
"""Delete rule from device."""
|
|
schedule = dev.modules["schedule"]
|
|
rule_to_delete = next(filter(lambda rule: (rule.id == id), schedule.rules), None)
|
|
if rule_to_delete:
|
|
echo(f"Deleting rule id {id}")
|
|
return await schedule.delete_rule(rule_to_delete)
|
|
else:
|
|
echo(f"No rule with id {id} was found")
|
|
|
|
|
|
@cli.group(invoke_without_command=True)
|
|
@click.pass_context
|
|
async def presets(ctx):
|
|
"""List and modify bulb setting presets."""
|
|
if ctx.invoked_subcommand is None:
|
|
return await ctx.invoke(presets_list)
|
|
|
|
|
|
@presets.command(name="list")
|
|
@pass_dev
|
|
def presets_list(dev: IotBulb):
|
|
"""List presets."""
|
|
if not dev.is_bulb or not isinstance(dev, IotBulb):
|
|
echo("Presets only supported on iot bulbs")
|
|
return
|
|
|
|
for preset in dev.presets:
|
|
echo(preset)
|
|
|
|
return dev.presets
|
|
|
|
|
|
@presets.command(name="modify")
|
|
@click.argument("index", type=int)
|
|
@click.option("--brightness", type=int)
|
|
@click.option("--hue", type=int)
|
|
@click.option("--saturation", type=int)
|
|
@click.option("--temperature", type=int)
|
|
@pass_dev
|
|
async def presets_modify(dev: IotBulb, index, brightness, hue, saturation, temperature):
|
|
"""Modify a preset."""
|
|
for preset in dev.presets:
|
|
if preset.index == index:
|
|
break
|
|
else:
|
|
echo(f"No preset found for index {index}")
|
|
return
|
|
|
|
if brightness is not None:
|
|
preset.brightness = brightness
|
|
if hue is not None:
|
|
preset.hue = hue
|
|
if saturation is not None:
|
|
preset.saturation = saturation
|
|
if temperature is not None:
|
|
preset.color_temp = temperature
|
|
|
|
echo(f"Going to save preset: {preset}")
|
|
|
|
return await dev.save_preset(preset)
|
|
|
|
|
|
@cli.command()
|
|
@pass_dev
|
|
@click.option("--type", type=click.Choice(["soft", "hard"], case_sensitive=False))
|
|
@click.option("--last", is_flag=True)
|
|
@click.option("--preset", type=int)
|
|
async def turn_on_behavior(dev: IotBulb, type, last, preset):
|
|
"""Modify bulb turn-on behavior."""
|
|
if not dev.is_bulb or not isinstance(dev, IotBulb):
|
|
echo("Presets only supported on iot bulbs")
|
|
return
|
|
settings = await dev.get_turn_on_behavior()
|
|
echo(f"Current turn on behavior: {settings}")
|
|
|
|
# Return if we are not setting the value
|
|
if not type and not last and not preset:
|
|
return settings
|
|
|
|
# If we are setting the value, the type has to be specified
|
|
if (last or preset) and type is None:
|
|
echo("To set the behavior, you need to define --type")
|
|
return
|
|
|
|
behavior = getattr(settings, type)
|
|
|
|
if last:
|
|
echo(f"Going to set {type} to last")
|
|
behavior.preset = None
|
|
elif preset is not None:
|
|
echo(f"Going to set {type} to preset {preset}")
|
|
behavior.preset = preset
|
|
|
|
return await dev.set_turn_on_behavior(settings)
|
|
|
|
|
|
@cli.command()
|
|
@pass_dev
|
|
@click.option(
|
|
"--username", required=True, prompt=True, help="New username to set on the device"
|
|
)
|
|
@click.option(
|
|
"--password", required=True, prompt=True, help="New password to set on the device"
|
|
)
|
|
async def update_credentials(dev, username, password):
|
|
"""Update device credentials for authenticated devices."""
|
|
if not isinstance(dev, SmartDevice):
|
|
raise NotImplementedError(
|
|
"Credentials can only be updated on authenticated devices."
|
|
)
|
|
|
|
click.confirm("Do you really want to replace the existing credentials?", abort=True)
|
|
|
|
return await dev.update_credentials(username, password)
|
|
|
|
|
|
@cli.command()
|
|
@pass_dev
|
|
async def shell(dev: Device):
|
|
"""Open interactive shell."""
|
|
echo("Opening shell for %s" % dev)
|
|
from ptpython.repl import embed
|
|
|
|
logging.getLogger("parso").setLevel(logging.WARNING) # prompt parsing
|
|
logging.getLogger("asyncio").setLevel(logging.WARNING)
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
await embed(
|
|
globals=globals(),
|
|
locals=locals(),
|
|
return_asyncio_coroutine=True,
|
|
patch_stdout=True,
|
|
)
|
|
except EOFError:
|
|
loop.stop()
|
|
|
|
|
|
@cli.command(name="descriptor")
|
|
@click.argument("name", required=False)
|
|
@click.argument("value", required=False)
|
|
@pass_dev
|
|
async def descriptor(dev, name: str, value):
|
|
"""Access and modify descriptor values.
|
|
|
|
If no *name* is given, lists available descriptors and their values.
|
|
If only *name* is given, the value of named descriptor is returned.
|
|
If both *name* and *value* are set, the described setting is changed.
|
|
"""
|
|
if not name:
|
|
echo("[bold]== Descriptors ==[/bold]")
|
|
for name, desc in dev.descriptors.items():
|
|
echo(f"{desc.name} ({name}): {desc.value}")
|
|
return
|
|
|
|
if name not in dev.descriptors:
|
|
echo(f"No descriptor by name {name}")
|
|
return
|
|
|
|
desc = dev.descriptors[name]
|
|
|
|
if value is None:
|
|
echo(f"{desc.name} ({name}): {desc.value}")
|
|
return desc.value
|
|
|
|
echo(f"Setting {name} to {value}")
|
|
value = ast.literal_eval(value)
|
|
return await dev.descriptors[name].set_value(value)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|