mass rename to (python-)kasa (#1)

This commit is contained in:
Teemu R
2019-12-18 09:11:18 +01:00
committed by Bas Nijholt
parent 624c44c27f
commit 3ef5086ffb
33 changed files with 63 additions and 69 deletions

31
kasa/__init__.py Executable file
View File

@@ -0,0 +1,31 @@
"""Python interface for TP-Link's smart home devices.
All common, shared functionalities are available through `SmartDevice` class::
x = SmartDevice("192.168.1.1")
print(x.sys_info)
For device type specific actions `SmartBulb`, `SmartPlug`, or `SmartStrip`
should be used instead.
Module-specific errors are raised as `SmartDeviceException` and are expected
to be handled by the user of the library.
"""
from kasa.discover import Discover
from kasa.protocol import TPLinkSmartHomeProtocol
from kasa.smartbulb import SmartBulb
from kasa.smartdevice import DeviceType, EmeterStatus, SmartDevice, SmartDeviceException
from kasa.smartplug import SmartPlug
from kasa.smartstrip import SmartStrip
__all__ = [
"Discover",
"TPLinkSmartHomeProtocol",
"SmartBulb",
"DeviceType",
"EmeterStatus",
"SmartDevice",
"SmartDeviceException",
"SmartPlug",
"SmartStrip",
]

379
kasa/cli.py Executable file
View File

@@ -0,0 +1,379 @@
"""python-kasa cli tool."""
import asyncio
import logging
import sys
from pprint import pformat as pf
import click
from kasa import Discover, SmartBulb, SmartDevice, SmartStrip
from kasa import SmartPlug # noqa: E402; noqa: E402
if sys.version_info < (3, 6):
print("To use this script you need Python 3.6 or newer! got %s" % sys.version_info)
sys.exit(1)
pass_dev = click.make_pass_decorator(SmartDevice)
@click.group(invoke_without_command=True)
@click.option(
"--ip",
envvar="KASA_IP",
required=False,
help="The IP address of the device to connect to. This option "
"is deprecated and will be removed in the future; use --host "
"instead.",
)
@click.option(
"--host",
envvar="KASA_HOST",
required=False,
help="The host name or IP address of the device to connect to.",
)
@click.option(
"--alias",
envvar="KASA_NAME",
required=False,
help="The device name, or alias, of the device to connect to.",
)
@click.option(
"--target",
default="255.255.255.255",
required=False,
help="The broadcast address to be used for discovery.",
)
@click.option("--debug/--normal", default=False)
@click.option("--bulb", default=False, is_flag=True)
@click.option("--plug", default=False, is_flag=True)
@click.option("--strip", default=False, is_flag=True)
@click.version_option()
@click.pass_context
def cli(ctx, ip, host, alias, target, debug, bulb, plug, strip):
"""A cli tool for controlling TP-Link smart home plugs.""" # noqa
if debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
if ctx.invoked_subcommand == "discover":
return
if ip is not None and host is None:
host = ip
if alias is not None and host is None:
click.echo("Alias is given, using discovery to find host %s" % alias)
host = find_host_from_alias(alias=alias, target=target)
if host:
click.echo(f"Found hostname is {host}")
else:
click.echo(f"No device with name {alias} found")
return
if host is None:
click.echo("No host name given, trying discovery..")
ctx.invoke(discover)
return
else:
if not bulb and not plug and not strip:
click.echo("No --strip nor --bulb nor --plug given, discovering..")
dev = asyncio.run(Discover.discover_single(host))
elif bulb:
dev = SmartBulb(host)
elif plug:
dev = SmartPlug(host)
elif strip:
dev = SmartStrip(host)
else:
click.echo("Unable to detect type, use --strip or --bulb or --plug!")
return
ctx.obj = dev
if ctx.invoked_subcommand is None:
ctx.invoke(state)
@cli.command()
@click.option("--save")
@click.pass_context
def dump_discover(ctx, save):
"""Dump discovery information.
Useful for dumping into a file with `--save` to be added to the test suite.
"""
target = ctx.parent.params["target"]
for dev in Discover.discover(target=target, return_raw=True).values():
model = dev["system"]["get_sysinfo"]["model"]
hw_version = dev["system"]["get_sysinfo"]["hw_ver"]
save_to = f"{model}_{hw_version}.json"
click.echo("Saving info to %s" % save_to)
with open(save_to, "w") as f:
import json
json.dump(dev, f, sort_keys=True, indent=4)
@cli.command()
@click.option("--timeout", default=3, required=False)
@click.option("--discover-only", default=False)
@click.option("--dump-raw", is_flag=True)
@click.pass_context
def discover(ctx, timeout, discover_only, dump_raw):
"""Discover devices in the network."""
target = ctx.parent.params["target"]
click.echo("Discovering devices for %s seconds" % timeout)
found_devs = Discover.discover(
target=target, timeout=timeout, return_raw=dump_raw
).items()
if not discover_only:
for ip, dev in found_devs:
if dump_raw:
click.echo(dev)
continue
ctx.obj = dev
ctx.invoke(state)
print()
return found_devs
def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attempts=3):
"""Discover a device identified by its alias."""
host = None
click.echo(
"Trying to discover %s using %s attempts of %s seconds"
% (alias, attempts, timeout)
)
for attempt in range(1, attempts):
click.echo(f"Attempt {attempt} of {attempts}")
found_devs = Discover.discover(target=target, timeout=timeout).items()
for ip, dev in found_devs:
if dev.alias.lower() == alias.lower():
host = dev.host
return host
return None
@cli.command()
@pass_dev
def sysinfo(dev):
"""Print out full system information."""
dev.sync.update()
click.echo(click.style("== System info ==", bold=True))
click.echo(pf(dev.sys_info))
@cli.command()
@pass_dev
@click.pass_context
def state(ctx, dev: SmartDevice):
"""Print out device state and versions."""
dev.sync.update()
click.echo(click.style(f"== {dev.alias} - {dev.model} ==", bold=True))
click.echo(
click.style(
"Device state: {}".format("ON" if dev.is_on else "OFF"),
fg="green" if dev.is_on else "red",
)
)
if dev.is_strip:
for plug in dev.plugs: # type: ignore
plug.sync.update()
is_on = plug.is_on
alias = plug.alias
click.echo(
click.style(
" * {} state: {}".format(alias, ("ON" if is_on else "OFF")),
fg="green" if is_on else "red",
)
)
click.echo(f"Host/IP: {dev.host}")
for k, v in dev.state_information.items():
click.echo(f"{k}: {v}")
click.echo(click.style("== Generic information ==", bold=True))
click.echo("Time: {}".format(dev.sync.get_time()))
click.echo("Hardware: {}".format(dev.hw_info["hw_ver"]))
click.echo("Software: {}".format(dev.hw_info["sw_ver"]))
click.echo(f"MAC (rssi): {dev.mac} ({dev.rssi})")
click.echo(f"Location: {dev.location}")
ctx.invoke(emeter)
@cli.command()
@pass_dev
@click.argument("new_alias", required=False, default=None)
def alias(dev, new_alias):
"""Get or set the device alias."""
if new_alias is not None:
click.echo(f"Setting alias to {new_alias}")
dev.sync.set_alias(new_alias)
click.echo(f"Alias: {dev.alias}")
@cli.command()
@pass_dev
@click.argument("module")
@click.argument("command")
@click.argument("parameters", default=None, required=False)
def raw_command(dev: SmartDevice, module, command, parameters):
"""Run a raw command on the device."""
import ast
if parameters is not None:
parameters = ast.literal_eval(parameters)
res = dev.sync._query_helper(module, command, parameters)
dev.sync.update()
click.echo(res)
@cli.command()
@pass_dev
@click.option("--year", type=click.DateTime(["%Y"]), default=None, required=False)
@click.option("--month", type=click.DateTime(["%Y-%m"]), default=None, required=False)
@click.option("--erase", is_flag=True)
def emeter(dev, year, month, erase):
"""Query emeter for historical consumption."""
click.echo(click.style("== Emeter ==", bold=True))
dev.sync.update()
if not dev.has_emeter:
click.echo("Device has no emeter")
return
if erase:
click.echo("Erasing emeter statistics..")
dev.sync.erase_emeter_stats()
return
if year:
click.echo(f"== For year {year.year} ==")
emeter_status = dev.sync.get_emeter_monthly(year.year)
elif month:
click.echo(f"== For month {month.month} of {month.year} ==")
emeter_status = dev.sync.get_emeter_daily(year=month.year, month=month.month)
else:
emeter_status = dev.sync.get_emeter_realtime()
click.echo("== Current State ==")
if isinstance(emeter_status, list):
for plug in emeter_status:
click.echo("Plug %d: %s" % (emeter_status.index(plug) + 1, plug))
else:
click.echo(str(emeter_status))
@cli.command()
@click.argument("brightness", type=click.IntRange(0, 100), default=None, required=False)
@pass_dev
def brightness(dev, brightness):
"""Get or set brightness."""
dev.sync.update()
if not dev.is_dimmable:
click.echo("This device does not support brightness.")
return
if brightness is None:
click.echo("Brightness: %s" % dev.brightness)
else:
click.echo("Setting brightness to %s" % brightness)
dev.sync.set_brightness(brightness)
@cli.command()
@click.argument(
"temperature", type=click.IntRange(2500, 9000), default=None, required=False
)
@pass_dev
def temperature(dev: SmartBulb, temperature):
"""Get or set color temperature."""
if temperature is None:
click.echo(f"Color temperature: {dev.color_temp}")
valid_temperature_range = dev.valid_temperature_range
if valid_temperature_range != (0, 0):
click.echo("(min: {}, max: {})".format(*valid_temperature_range))
else:
click.echo(
"Temperature range unknown, please open a github issue"
f" or a pull request for model '{dev.model}'"
)
else:
click.echo(f"Setting color temperature to {temperature}")
dev.sync.set_color_temp(temperature)
@cli.command()
@click.argument("h", type=click.IntRange(0, 360), default=None, required=False)
@click.argument("s", type=click.IntRange(0, 100), default=None, required=False)
@click.argument("v", type=click.IntRange(0, 100), default=None, required=False)
@click.pass_context
@pass_dev
def hsv(dev, ctx, h, s, v):
"""Get or set color in HSV. (Bulb only)."""
if h is None or s is None or v is None:
click.echo("Current HSV: %s %s %s" % dev.hsv)
elif s is None or v is None:
raise click.BadArgumentUsage("Setting a color requires 3 values.", ctx)
else:
click.echo(f"Setting HSV: {h} {s} {v}")
dev.sync.set_hsv(h, s, v)
@cli.command()
@click.argument("state", type=bool, required=False)
@pass_dev
def led(dev, state):
"""Get or set (Plug's) led state."""
if state is not None:
click.echo("Turning led to %s" % state)
dev.sync.set_led(state)
else:
click.echo("LED state: %s" % dev.led)
@cli.command()
@pass_dev
def time(dev):
"""Get the device time."""
click.echo(dev.sync.get_time())
@cli.command()
@click.argument("index", type=int, required=False)
@pass_dev
def on(plug, index):
"""Turn the device on."""
click.echo("Turning on..")
if index is None:
plug.turn_on()
else:
plug.turn_on(index=(index - 1))
@cli.command()
@click.argument("index", type=int, required=False)
@pass_dev
def off(plug, index):
"""Turn the device off."""
click.echo("Turning off..")
if index is None:
plug.turn_off()
else:
plug.turn_off(index=(index - 1))
@cli.command()
@click.option("--delay", default=1)
@pass_dev
def reboot(plug, delay):
"""Reboot the device."""
click.echo("Rebooting the device..")
plug.reboot(delay)
if __name__ == "__main__":
cli()

139
kasa/discover.py Executable file
View File

@@ -0,0 +1,139 @@
"""Discovery module for TP-Link Smart Home devices."""
import json
import logging
import socket
from typing import Dict, Optional, Type
from kasa.protocol import TPLinkSmartHomeProtocol
from kasa.smartbulb import SmartBulb
from kasa.smartdevice import SmartDevice, SmartDeviceException
from kasa.smartplug import SmartPlug
from kasa.smartstrip import SmartStrip
_LOGGER = logging.getLogger(__name__)
class Discover:
"""Discover TPLink Smart Home devices.
The main entry point for this library is Discover.discover(),
which returns a dictionary of the found devices. The key is the IP address
of the device and the value contains ready-to-use, SmartDevice-derived
device object.
discover_single() can be used to initialize a single device given its
IP address. If the type of the device and its IP address is already known,
you can initialize the corresponding device class directly without this.
The protocol uses UDP broadcast datagrams on port 9999 for discovery.
"""
DISCOVERY_QUERY = {
"system": {"get_sysinfo": None},
"emeter": {"get_realtime": None},
"smartlife.iot.dimmer": {"get_dimmer_parameters": None},
"smartlife.iot.common.emeter": {"get_realtime": None},
"smartlife.iot.smartbulb.lightingservice": {"get_light_state": None},
}
@staticmethod
def discover(
protocol: TPLinkSmartHomeProtocol = None,
target: str = "255.255.255.255",
port: int = 9999,
timeout: int = 3,
discovery_packets=3,
return_raw=False,
) -> Dict[str, SmartDevice]:
"""Discover devices.
Sends discovery message to 255.255.255.255:9999 in order
to detect available supported devices in the local network,
and waits for given timeout for answers from devices.
:param protocol: Protocol implementation to use
:param target: The target broadcast address (e.g. 192.168.xxx.255).
:param timeout: How long to wait for responses, defaults to 3
:param port: port to send broadcast messages, defaults to 9999.
:rtype: dict
:return: Array of json objects {"ip", "port", "sys_info"}
"""
if protocol is None:
protocol = TPLinkSmartHomeProtocol()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(timeout)
req = json.dumps(Discover.DISCOVERY_QUERY)
_LOGGER.debug("Sending discovery to %s:%s", target, port)
encrypted_req = protocol.encrypt(req)
for i in range(discovery_packets):
sock.sendto(encrypted_req[4:], (target, port))
devices = {}
_LOGGER.debug("Waiting %s seconds for responses...", timeout)
try:
while True:
data, addr = sock.recvfrom(4096)
ip, port = addr
info = json.loads(protocol.decrypt(data))
device_class = Discover._get_device_class(info)
if return_raw:
devices[ip] = info
elif device_class is not None:
devices[ip] = device_class(ip)
except socket.timeout:
_LOGGER.debug("Got socket timeout, which is okay.")
except Exception as ex:
_LOGGER.error("Got exception %s", ex, exc_info=True)
_LOGGER.debug("Found %s devices: %s", len(devices), devices)
return devices
@staticmethod
async def discover_single(
host: str, protocol: TPLinkSmartHomeProtocol = None
) -> Optional[SmartDevice]:
"""Discover a single device by the given IP address.
:param host: Hostname of device to query
:param protocol: Protocol implementation to use
:rtype: SmartDevice
:return: Object for querying/controlling found device.
"""
if protocol is None:
protocol = TPLinkSmartHomeProtocol()
info = await protocol.query(host, Discover.DISCOVERY_QUERY)
device_class = Discover._get_device_class(info)
if device_class is not None:
return device_class(host)
return None
@staticmethod
def _get_device_class(info: dict) -> Optional[Type[SmartDevice]]:
"""Find SmartDevice subclass for device described by passed data."""
if "system" in info and "get_sysinfo" in info["system"]:
sysinfo = info["system"]["get_sysinfo"]
if "type" in sysinfo:
type_ = sysinfo["type"]
elif "mic_type" in sysinfo:
type_ = sysinfo["mic_type"]
else:
raise SmartDeviceException("Unable to find the device type field!")
else:
raise SmartDeviceException("No 'system' nor 'get_sysinfo' in response")
if "smartplug" in type_.lower() and "children" in sysinfo:
return SmartStrip
elif "smartplug" in type_.lower():
return SmartPlug
elif "smartbulb" in type_.lower():
return SmartBulb
return None

112
kasa/protocol.py Executable file
View File

@@ -0,0 +1,112 @@
"""Implementation of the TP-Link Smart Home Protocol.
Encryption/Decryption methods based on the works of
Lubomir Stroetmann and Tobias Esser
https://www.softscheck.com/en/reverse-engineering-tp-link-hs110/
https://github.com/softScheck/tplink-smartplug/
which are licensed under the Apache License, Version 2.0
http://www.apache.org/licenses/LICENSE-2.0
"""
import asyncio
import json
import logging
import struct
from typing import Any, Dict, Union
_LOGGER = logging.getLogger(__name__)
class TPLinkSmartHomeProtocol:
"""Implementation of the TP-Link Smart Home protocol."""
INITIALIZATION_VECTOR = 171
DEFAULT_PORT = 9999
DEFAULT_TIMEOUT = 5
@staticmethod
async def query(
host: str, request: Union[str, Dict], port: int = DEFAULT_PORT
) -> Any:
"""Request information from a TP-Link SmartHome Device.
:param str host: host name or ip address of the device
:param int port: port on the device (default: 9999)
:param request: command to send to the device (can be either dict or
json string)
:return: response dict
"""
if isinstance(request, dict):
request = json.dumps(request)
timeout = TPLinkSmartHomeProtocol.DEFAULT_TIMEOUT
writer = None
try:
task = asyncio.open_connection(host, port)
reader, writer = await asyncio.wait_for(task, timeout=timeout)
_LOGGER.debug("> (%i) %s", len(request), request)
writer.write(TPLinkSmartHomeProtocol.encrypt(request))
await writer.drain()
buffer = bytes()
# Some devices send responses with a length header of 0 and
# terminate with a zero size chunk. Others send the length and
# will hang if we attempt to read more data.
length = -1
while True:
chunk = await reader.read(4096)
if length == -1:
length = struct.unpack(">I", chunk[0:4])[0]
buffer += chunk
if (length > 0 and len(buffer) >= length + 4) or not chunk:
break
finally:
if writer:
writer.close()
await writer.wait_closed()
response = TPLinkSmartHomeProtocol.decrypt(buffer[4:])
_LOGGER.debug("< (%i) %s", len(response), response)
return json.loads(response)
@staticmethod
def encrypt(request: str) -> bytes:
"""
Encrypt a request for a TP-Link Smart Home Device.
:param request: plaintext request data
:return: ciphertext request
"""
key = TPLinkSmartHomeProtocol.INITIALIZATION_VECTOR
plainbytes = request.encode()
buffer = bytearray(struct.pack(">I", len(plainbytes)))
for plainbyte in plainbytes:
cipherbyte = key ^ plainbyte
key = cipherbyte
buffer.append(cipherbyte)
return bytes(buffer)
@staticmethod
def decrypt(ciphertext: bytes) -> str:
"""
Decrypt a response of a TP-Link Smart Home Device.
:param ciphertext: encrypted response data
:return: plaintext response
"""
key = TPLinkSmartHomeProtocol.INITIALIZATION_VECTOR
buffer = []
for cipherbyte in ciphertext:
plainbyte = key ^ cipherbyte
key = cipherbyte
buffer.append(plainbyte)
plaintext = bytes(buffer)
return plaintext.decode()

337
kasa/smartbulb.py Normal file
View File

@@ -0,0 +1,337 @@
"""Module for bulbs."""
import re
from typing import Any, Dict, Optional, Tuple
from kasa.protocol import TPLinkSmartHomeProtocol
from kasa.smartdevice import (
DeviceType,
SmartDevice,
SmartDeviceException,
requires_update,
)
TPLINK_KELVIN = {
"LB130": (2500, 9000),
"LB120": (2700, 6500),
"LB230": (2500, 9000),
"KB130": (2500, 9000),
"KL130": (2500, 9000),
r"KL120\(EU\)": (2700, 6500),
r"KL120\(US\)": (2700, 5000),
}
class SmartBulb(SmartDevice):
"""Representation of a TP-Link Smart Bulb.
Usage example when used as library:
```python
p = SmartBulb("192.168.1.105")
# print the devices alias
print(p.sync.alias)
# change state of bulb
p.sync.turn_on()
p.sync.turn_off()
# query and print current state of plug
print(p.sync.state_information())
# check whether the bulb supports color changes
if p.sync.is_color():
# set the color to an HSV tuple
p.sync.set_hsv(180, 100, 100)
# get the current HSV value
print(p.sync.hsv())
# check whether the bulb supports setting color temperature
if p.sync.is_variable_color_temp():
# set the color temperature in Kelvin
p.sync.set_color_temp(3000)
# get the current color temperature
print(p.sync.color_temp)
# check whether the bulb is dimmable
if p.is_dimmable:
# set the bulb to 50% brightness
p.sync.set_brightness(50)
# check the current brightness
print(p.brightness)
```
Omit the `sync` attribute to get coroutines.
Errors reported by the device are raised as SmartDeviceExceptions,
and should be handled by the user of the library.
"""
LIGHT_SERVICE = "smartlife.iot.smartbulb.lightingservice"
def __init__(
self,
host: str,
protocol: TPLinkSmartHomeProtocol = None,
context: str = None,
cache_ttl: int = 3,
*,
ioloop=None,
) -> None:
SmartDevice.__init__(
self,
host=host,
protocol=protocol,
context=context,
cache_ttl=cache_ttl,
ioloop=ioloop,
)
self.emeter_type = "smartlife.iot.common.emeter"
self._device_type = DeviceType.Bulb
self._light_state = None
@property # type: ignore
@requires_update
def is_color(self) -> bool:
"""Whether the bulb supports color changes.
:return: True if the bulb supports color changes, False otherwise
:rtype: bool
"""
sys_info = self.sys_info
return bool(sys_info["is_color"])
@property # type: ignore
@requires_update
def is_dimmable(self) -> bool:
"""Whether the bulb supports brightness changes.
:return: True if the bulb supports brightness changes, False otherwise
:rtype: bool
"""
sys_info = self.sys_info
return bool(sys_info["is_dimmable"])
@property # type: ignore
@requires_update
def is_variable_color_temp(self) -> bool:
"""Whether the bulb supports color temperature changes.
:return: True if the bulb supports color temperature changes, False
otherwise
:rtype: bool
"""
sys_info = self.sys_info
return bool(sys_info["is_variable_color_temp"])
@property # type: ignore
@requires_update
def valid_temperature_range(self) -> Tuple[int, int]:
"""Return the device-specific white temperature range (in Kelvin).
:return: White temperature range in Kelvin (minimun, maximum)
:rtype: tuple
"""
if not self.is_variable_color_temp:
return (0, 0)
for model, temp_range in TPLINK_KELVIN.items():
sys_info = self.sys_info
if re.match(model, sys_info["model"]):
return temp_range
return (0, 0)
async def update(self):
"""Update `sys_info and `light_state`."""
self._sys_info = await self.get_sys_info()
self._light_state = await self.get_light_state()
@property # type: ignore
@requires_update
def light_state(self) -> Optional[Dict[str, Dict]]:
"""Query the light state."""
return self._light_state
async def get_light_state(self) -> Dict[str, Dict]:
"""Query the light state."""
return await self._query_helper(self.LIGHT_SERVICE, "get_light_state")
async def set_light_state(self, state: Dict) -> Dict:
"""Set the light state."""
light_state = await self._query_helper(
self.LIGHT_SERVICE, "transition_light_state", state
)
await self.update()
return light_state
@property # type: ignore
@requires_update
def hsv(self) -> Tuple[int, int, int]:
"""Return the current HSV state of the bulb.
:return: hue, saturation and value (degrees, %, %)
:rtype: tuple
"""
if not self.is_color:
raise SmartDeviceException("Bulb does not support color.")
light_state = self.light_state
if not self.is_on:
hue = light_state["dft_on_state"]["hue"]
saturation = light_state["dft_on_state"]["saturation"]
value = light_state["dft_on_state"]["brightness"]
else:
hue = light_state["hue"]
saturation = light_state["saturation"]
value = light_state["brightness"]
return hue, saturation, value
def _raise_for_invalid_brightness(self, value):
if not isinstance(value, int) or not (0 <= value <= 100):
raise ValueError(
"Invalid brightness value: {} " "(valid range: 0-100%)".format(value)
)
@requires_update
async def set_hsv(self, hue: int, saturation: int, value: int):
"""Set new HSV.
:param int hue: hue in degrees
:param int saturation: saturation in percentage [0,100]
:param int value: value in percentage [0, 100]
"""
if not self.is_color:
raise SmartDeviceException("Bulb does not support color.")
if not isinstance(hue, int) or not (0 <= hue <= 360):
raise ValueError(
"Invalid hue value: {} " "(valid range: 0-360)".format(hue)
)
if not isinstance(saturation, int) or not (0 <= saturation <= 100):
raise ValueError(
"Invalid saturation value: {} "
"(valid range: 0-100%)".format(saturation)
)
self._raise_for_invalid_brightness(value)
light_state = {
"hue": hue,
"saturation": saturation,
"brightness": value,
"color_temp": 0,
}
await self.set_light_state(light_state)
@property # type: ignore
@requires_update
def color_temp(self) -> int:
"""Return color temperature of the device.
:return: Color temperature in Kelvin
:rtype: int
"""
if not self.is_variable_color_temp:
raise SmartDeviceException("Bulb does not support colortemp.")
light_state = self.light_state
if not self.is_on:
return int(light_state["dft_on_state"]["color_temp"])
else:
return int(light_state["color_temp"])
@requires_update
async def set_color_temp(self, temp: int) -> None:
"""Set the color temperature of the device.
:param int temp: The new color temperature, in Kelvin
"""
if not self.is_variable_color_temp:
raise SmartDeviceException("Bulb does not support colortemp.")
valid_temperature_range = self.valid_temperature_range
if temp < valid_temperature_range[0] or temp > valid_temperature_range[1]:
raise ValueError(
"Temperature should be between {} "
"and {}".format(*valid_temperature_range)
)
light_state = {"color_temp": temp}
await self.set_light_state(light_state)
@property # type: ignore
@requires_update
def brightness(self) -> int:
"""Return the current brightness.
:return: brightness in percent
:rtype: int
"""
if not self.is_dimmable: # pragma: no cover
raise SmartDeviceException("Bulb is not dimmable.")
light_state = self.light_state
if not self.is_on:
return int(light_state["dft_on_state"]["brightness"])
else:
return int(light_state["brightness"])
@requires_update
async def set_brightness(self, brightness: int) -> None:
"""Set the brightness.
:param int brightness: brightness in percent
"""
if not self.is_dimmable: # pragma: no cover
raise SmartDeviceException("Bulb is not dimmable.")
self._raise_for_invalid_brightness(brightness)
light_state = {"brightness": brightness}
await self.set_light_state(light_state)
@property # type: ignore
@requires_update
def state_information(self) -> Dict[str, Any]:
"""Return bulb-specific state information.
:return: Bulb information dict, keys in user-presentable form.
:rtype: dict
"""
info: Dict[str, Any] = {
"Brightness": self.brightness,
"Is dimmable": self.is_dimmable,
}
if self.is_variable_color_temp:
info["Color temperature"] = self.color_temp
info["Valid temperature range"] = self.valid_temperature_range
if self.is_color:
info["HSV"] = self.hsv
return info
@property # type: ignore
@requires_update
def is_on(self) -> bool:
"""Return whether the device is on."""
light_state = self.light_state
return bool(light_state["on_off"])
async def turn_off(self) -> None:
"""Turn the bulb off."""
await self.set_light_state({"on_off": 0})
async def turn_on(self) -> None:
"""Turn the bulb on."""
await self.set_light_state({"on_off": 1})
@property # type: ignore
@requires_update
def has_emeter(self) -> bool:
"""Return that the bulb has an emeter."""
return True

685
kasa/smartdevice.py Executable file
View File

@@ -0,0 +1,685 @@
"""Python library supporting TP-Link Smart Home devices.
The communication protocol was reverse engineered by Lubomir Stroetmann and
Tobias Esser in 'Reverse Engineering the TP-Link HS110':
https://www.softscheck.com/en/reverse-engineering-tp-link-hs110/
This library reuses codes and concepts of the TP-Link WiFi SmartPlug Client
at https://github.com/softScheck/tplink-smartplug, developed by Lubomir
Stroetmann which is licensed under the Apache License, Version 2.0.
You may obtain a copy of the license at
http://www.apache.org/licenses/LICENSE-2.0
"""
import asyncio
import functools
import inspect
import logging
from collections import defaultdict
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict, Optional
from kasa.protocol import TPLinkSmartHomeProtocol
_LOGGER = logging.getLogger(__name__)
class DeviceType(Enum):
"""Device type enum."""
Plug = 1
Bulb = 2
Strip = 3
Unknown = -1
class SmartDeviceException(Exception):
"""Base exception for device errors."""
class EmeterStatus(dict):
"""Container for converting different representations of emeter data.
Newer FW/HW versions postfix the variable names with the used units,
where-as the olders do not have this feature.
This class automatically converts between these two to allow
backwards and forwards compatibility.
"""
def __getitem__(self, item):
valid_keys = [
"voltage_mv",
"power_mw",
"current_ma",
"energy_wh",
"total_wh",
"voltage",
"power",
"current",
"total",
"energy",
]
# 1. if requested data is available, return it
if item in super().keys():
return super().__getitem__(item)
# otherwise decide how to convert it
else:
if item not in valid_keys:
raise KeyError(item)
if "_" in item: # upscale
return super().__getitem__(item[: item.find("_")]) * 1000
else: # downscale
for i in super().keys():
if i.startswith(item):
return self.__getitem__(i) / 1000
raise SmartDeviceException("Unable to find a value for '%s'" % item)
def requires_update(f):
"""Indicate that `update` should be called before accessing this method.""" # noqa: D202
if inspect.iscoroutinefunction(f):
@functools.wraps(f)
async def wrapped(*args, **kwargs):
self = args[0]
assert self._sys_info is not None
return await f(*args, **kwargs)
else:
@functools.wraps(f)
def wrapped(*args, **kwargs):
self = args[0]
assert self._sys_info is not None
return f(*args, **kwargs)
f.requires_update = True
return wrapped
class SmartDevice:
"""Base class for all supported device types."""
STATE_ON = "ON"
STATE_OFF = "OFF"
def __init__(
self,
host: str,
protocol: Optional[TPLinkSmartHomeProtocol] = None,
context: str = None,
cache_ttl: int = 3,
*,
ioloop=None,
) -> None:
"""Create a new SmartDevice instance.
:param str host: host name or ip address on which the device listens
:param context: optional child ID for context in a parent device
"""
self.host = host
if protocol is None: # pragma: no cover
protocol = TPLinkSmartHomeProtocol()
self.protocol = protocol
self.emeter_type = "emeter"
self.context = context
self.num_children = 0
self.cache_ttl = timedelta(seconds=cache_ttl)
_LOGGER.debug(
"Initializing %s using context %s and cache ttl %s",
self.host,
self.context,
self.cache_ttl,
)
self.cache = defaultdict(lambda: defaultdict(lambda: None)) # type: ignore
self._device_type = DeviceType.Unknown
self.ioloop = ioloop or asyncio.get_event_loop()
self.sync = SyncSmartDevice(self, ioloop=self.ioloop)
self._sys_info = None
def _result_from_cache(self, target, cmd) -> Optional[Dict]:
"""Return query result from cache if still fresh.
Only results from commands starting with `get_` are considered cacheable.
:param target: Target system
:param cmd: Command
:rtype: query result or None if expired.
"""
_LOGGER.debug("Checking cache for %s %s", target, cmd)
if cmd not in self.cache[target]:
return None
cached = self.cache[target][cmd]
if cached and cached["last_updated"] is not None:
if cached[
"last_updated"
] + self.cache_ttl > datetime.utcnow() and cmd.startswith("get_"):
_LOGGER.debug("Got cached %s %s", target, cmd)
return self.cache[target][cmd]
else:
_LOGGER.debug("Invalidating the cache for %s cmd %s", target, cmd)
for cache_entry in self.cache[target].values():
cache_entry["last_updated"] = datetime.utcfromtimestamp(0)
return None
def _insert_to_cache(self, target: str, cmd: str, response: Dict) -> None:
"""Add response for a given command to the cache.
:param target: Target system
:param cmd: Command
:param response: Response to be cached
"""
self.cache[target][cmd] = response.copy()
self.cache[target][cmd]["last_updated"] = datetime.utcnow()
async def _query_helper(
self, target: str, cmd: str, arg: Optional[Dict] = None
) -> Any:
"""Handle result unwrapping and error handling.
:param target: Target system {system, time, emeter, ..}
:param cmd: Command to execute
:param arg: JSON object passed as parameter to the command
:return: Unwrapped result for the call.
:rtype: dict
:raises SmartDeviceException: if command was not executed correctly
"""
request: Dict[str, Any] = {target: {cmd: arg}}
if self.context is not None:
request = {"context": {"child_ids": [self.context]}, target: {cmd: arg}}
try:
response = self._result_from_cache(target, cmd)
if response is None:
_LOGGER.debug("Got no result from cache, querying the device.")
response = await self.protocol.query(host=self.host, request=request)
self._insert_to_cache(target, cmd, response)
except Exception as ex:
raise SmartDeviceException(f"Communication error on {target}:{cmd}") from ex
if target not in response:
raise SmartDeviceException(f"No required {target} in response: {response}")
result = response[target]
if "err_code" in result and result["err_code"] != 0:
raise SmartDeviceException(f"Error on {target}.{cmd}: {result}")
if cmd not in result:
raise SmartDeviceException(f"No command in response: {response}")
result = result[cmd]
if "err_code" in result and result["err_code"] != 0:
raise SmartDeviceException(f"Error on {target} {cmd}: {result}")
if "err_code" in result:
del result["err_code"]
return result
def has_emeter(self) -> bool:
"""Return if device has an energy meter.
:return: True if energey meter is available
False if energymeter is missing
"""
raise NotImplementedError()
async def get_sys_info(self) -> Dict[str, Any]:
"""Retrieve system information.
:return: sysinfo
:rtype dict
:raises SmartDeviceException: on error
"""
return await self._query_helper("system", "get_sysinfo")
async def update(self):
"""Update some of the attributes.
Needed for methods that are decorated with `requires_update`.
"""
self._sys_info = await self.get_sys_info()
@property # type: ignore
@requires_update
def sys_info(self) -> Dict[str, Any]:
"""Retrieve system information.
:return: sysinfo
:rtype dict
:raises SmartDeviceException: on error
"""
assert self._sys_info is not None
return self._sys_info
@property # type: ignore
@requires_update
def model(self) -> str:
"""Return device model.
:return: device model
:rtype: str
:raises SmartDeviceException: on error
"""
sys_info = self.sys_info
return str(sys_info["model"])
@property # type: ignore
@requires_update
def alias(self) -> str:
"""Return device name (alias).
:return: Device name aka alias.
:rtype: str
"""
sys_info = self.sys_info
return str(sys_info["alias"])
async def set_alias(self, alias: str) -> None:
"""Set the device name (alias).
:param alias: New alias (name)
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_dev_alias", {"alias": alias})
await self.update()
async def get_icon(self) -> Dict:
"""Return device icon.
Note: not working on HS110, but is always empty.
:return: icon and its hash
:rtype: dict
:raises SmartDeviceException: on error
"""
return await self._query_helper("system", "get_dev_icon")
def set_icon(self, icon: str) -> None:
"""Set device icon.
Content for hash and icon are unknown.
:param str icon: Icon path(?)
:raises NotImplementedError: when not implemented
:raises SmartPlugError: on error
"""
raise NotImplementedError()
# here just for the sake of completeness
# await self._query_helper("system",
# "set_dev_icon", {"icon": "", "hash": ""})
# self.initialize()
async def get_time(self) -> Optional[datetime]:
"""Return current time from the device.
:return: datetime for device's time
:rtype: datetime or None when not available
:raises SmartDeviceException: on error
"""
try:
res = await self._query_helper("time", "get_time")
return datetime(
res["year"],
res["month"],
res["mday"],
res["hour"],
res["min"],
res["sec"],
)
except SmartDeviceException:
return None
async def set_time(self, ts: datetime) -> None:
"""Set the device time.
Note: this calls set_timezone() for setting.
:param datetime ts: New date and time
:return: result
:type: dict
:raises NotImplemented: when not implemented.
:raises SmartDeviceException: on error
"""
raise NotImplementedError("Fails with err_code == 0 with HS110.")
"""
here just for the sake of completeness.
if someone figures out why it doesn't work,
please create a PR :-)
ts_obj = {
"index": self.timezone["index"],
"hour": ts.hour,
"min": ts.minute,
"sec": ts.second,
"year": ts.year,
"month": ts.month,
"mday": ts.day,
}
response = await self._query_helper("time", "set_timezone", ts_obj)
self.initialize()
return response
"""
async def get_timezone(self) -> Dict:
"""Return timezone information.
:return: Timezone information
:rtype: dict
:raises SmartDeviceException: on error
"""
return await self._query_helper("time", "get_timezone")
@property # type: ignore
@requires_update
def hw_info(self) -> Dict:
"""Return hardware information.
:return: Information about hardware
:rtype: dict
"""
keys = [
"sw_ver",
"hw_ver",
"mac",
"mic_mac",
"type",
"mic_type",
"hwId",
"fwId",
"oemId",
"dev_name",
]
sys_info = self.sys_info
return {key: sys_info[key] for key in keys if key in sys_info}
@property # type: ignore
@requires_update
def location(self) -> Dict:
"""Return geographical location.
:return: latitude and longitude
:rtype: dict
"""
sys_info = self.sys_info
loc = {"latitude": None, "longitude": None}
if "latitude" in sys_info and "longitude" in sys_info:
loc["latitude"] = sys_info["latitude"]
loc["longitude"] = sys_info["longitude"]
elif "latitude_i" in sys_info and "longitude_i" in sys_info:
loc["latitude"] = sys_info["latitude_i"]
loc["longitude"] = sys_info["longitude_i"]
else:
_LOGGER.warning("Unsupported device location.")
return loc
@property # type: ignore
@requires_update
def rssi(self) -> Optional[int]:
"""Return WiFi signal strenth (rssi).
:return: rssi
:rtype: int
"""
sys_info = self.sys_info
if "rssi" in sys_info:
return int(sys_info["rssi"])
return None
@property # type: ignore
@requires_update
def mac(self) -> str:
"""Return mac address.
:return: mac address in hexadecimal with colons, e.g. 01:23:45:67:89:ab
:rtype: str
"""
sys_info = self.sys_info
if "mac" in sys_info:
return str(sys_info["mac"])
elif "mic_mac" in sys_info:
return ":".join(
format(s, "02x") for s in bytes.fromhex(sys_info["mic_mac"])
)
raise SmartDeviceException(
"Unknown mac, please submit a bug report with sys_info output."
)
async def set_mac(self, mac):
"""Set the mac address.
:param str mac: mac in hexadecimal with colons, e.g. 01:23:45:67:89:ab
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_mac_addr", {"mac": mac})
await self.update()
@requires_update
async def get_emeter_realtime(self) -> EmeterStatus:
"""Retrieve current energy readings.
:returns: current readings or False
:rtype: dict, None
:raises SmartDeviceException: on error
"""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
return EmeterStatus(await self._query_helper(self.emeter_type, "get_realtime"))
@requires_update
async def get_emeter_daily(
self, year: int = None, month: int = None, kwh: bool = True
) -> Dict:
"""Retrieve daily statistics for a given month.
:param year: year for which to retrieve statistics (default: this year)
:param month: month for which to retrieve statistics (default: this
month)
:param kwh: return usage in kWh (default: True)
:return: mapping of day of month to value
:rtype: dict
:raises SmartDeviceException: on error
"""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
if year is None:
year = datetime.now().year
if month is None:
month = datetime.now().month
response = await self._query_helper(
self.emeter_type, "get_daystat", {"month": month, "year": year}
)
response = [EmeterStatus(**x) for x in response["day_list"]]
key = "energy_wh"
if kwh:
key = "energy"
data = {entry["day"]: entry[key] for entry in response}
return data
@requires_update
async def get_emeter_monthly(self, year: int = None, kwh: bool = True) -> Dict:
"""Retrieve monthly statistics for a given year.
:param year: year for which to retrieve statistics (default: this year)
:param kwh: return usage in kWh (default: True)
:return: dict: mapping of month to value
:rtype: dict
:raises SmartDeviceException: on error
"""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
if year is None:
year = datetime.now().year
response = await self._query_helper(
self.emeter_type, "get_monthstat", {"year": year}
)
response = [EmeterStatus(**x) for x in response["month_list"]]
key = "energy_wh"
if kwh:
key = "energy"
return {entry["month"]: entry[key] for entry in response}
@requires_update
async def erase_emeter_stats(self):
"""Erase energy meter statistics.
:return: True if statistics were deleted
:raises SmartDeviceException: on error
"""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
await self._query_helper(self.emeter_type, "erase_emeter_stat", None)
await self.update()
@requires_update
async def current_consumption(self) -> float:
"""Get the current power consumption in Watt.
:return: the current power consumption in Watts.
:raises SmartDeviceException: on error
"""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
response = EmeterStatus(await self.get_emeter_realtime())
return response["power"]
async def reboot(self, delay=1) -> None:
"""Reboot the device.
Note that giving a delay of zero causes this to block,
as the device reboots immediately without responding to the call.
:param delay: Delay the reboot for `delay` seconds.
:return: None
"""
await self._query_helper("system", "reboot", {"delay": delay})
async def turn_off(self) -> None:
"""Turn off the device."""
raise NotImplementedError("Device subclass needs to implement this.")
@property # type: ignore
@requires_update
def is_off(self) -> bool:
"""Return True if device is off.
:return: True if device is off, False otherwise.
:rtype: bool
"""
return not self.is_on
async def turn_on(self) -> None:
"""Turn device on."""
raise NotImplementedError("Device subclass needs to implement this.")
@property # type: ignore
@requires_update
def is_on(self) -> bool:
"""Return if the device is on.
:return: True if the device is on, False otherwise.
:rtype: bool
:return:
"""
raise NotImplementedError("Device subclass needs to implement this.")
@property # type: ignore
@requires_update
def state_information(self) -> Dict[str, Any]:
"""Return device-type specific, end-user friendly state information.
:return: dict with state information.
:rtype: dict
"""
raise NotImplementedError("Device subclass needs to implement this.")
@property
def device_type(self) -> DeviceType:
"""Return the device type."""
return self._device_type
@property
def is_bulb(self) -> bool:
"""Return True if the device is a bulb."""
return self._device_type == DeviceType.Bulb
@property
def is_plug(self) -> bool:
"""Return True if the device is a plug."""
return self._device_type == DeviceType.Plug
@property
def is_strip(self) -> bool:
"""Return True if the device is a strip."""
return self._device_type == DeviceType.Strip
@property
def is_dimmable(self):
"""Return True if the device is dimmable."""
return False
@property
def is_variable_color_temp(self) -> bool:
"""Return True if the device supports color temperature."""
return False
def __repr__(self):
self.sync.update()
return "<{} model {} at {} ({}), is_on: {} - dev specific: {}>".format(
self.__class__.__name__,
self.model,
self.host,
self.alias,
self.is_on,
self.sync.state_information,
)
class SyncSmartDevice:
"""A synchronous SmartDevice speaker class.
This has the same methods as `SyncSmartDevice`, however, it wraps all async
methods and call them in a blocking way.
Taken from https://github.com/basnijholt/media_player.kef/
"""
def __init__(self, async_device, ioloop):
self.async_device = async_device
self.ioloop = ioloop
def __getattr__(self, attr):
method = getattr(self.async_device, attr)
if method is None:
raise AttributeError(f"'SyncSmartDevice' object has no attribute '{attr}.'")
if inspect.iscoroutinefunction(method):
@functools.wraps(method)
def wrapped(*args, **kwargs):
return self.ioloop.run_until_complete(method(*args, **kwargs))
return wrapped
else:
return method

194
kasa/smartplug.py Normal file
View File

@@ -0,0 +1,194 @@
"""Module for plugs."""
import datetime
import logging
from typing import Any, Dict
from kasa.protocol import TPLinkSmartHomeProtocol
from kasa.smartdevice import (
DeviceType,
SmartDevice,
SmartDeviceException,
requires_update,
)
_LOGGER = logging.getLogger(__name__)
class SmartPlug(SmartDevice):
"""Representation of a TP-Link Smart Switch.
Usage example when used a a synchronous library:
```python
p = SmartPlug("192.168.1.105")
# print the devices alias
print(p.sync.alias)
# change state of plug
p.sync.turn_on()
p.sync.turn_off()
# query and print current state of plug
print(p.sync.state_information)
```
Omit the `sync` attribute to get coroutines.
Errors reported by the device are raised as SmartDeviceExceptions,
and should be handled by the user of the library.
"""
def __init__(
self,
host: str,
protocol: "TPLinkSmartHomeProtocol" = None,
context: str = None,
cache_ttl: int = 3,
*,
ioloop=None,
) -> None:
SmartDevice.__init__(self, host, protocol, context, cache_ttl, ioloop=ioloop)
self.emeter_type = "emeter"
self._device_type = DeviceType.Plug
@property # type: ignore
@requires_update
def brightness(self) -> int:
"""Return current brightness on dimmers.
Will return a range between 0 - 100.
:returns: integer
:rtype: int
"""
if not self.is_dimmable:
raise SmartDeviceException("Device is not dimmable.")
sys_info = self.sys_info
return int(sys_info["brightness"])
@requires_update
async def set_brightness(self, value: int):
"""Set the new dimmer brightness level.
Note:
When setting brightness, if the light is not
already on, it will be turned on automatically.
:param value: integer between 1 and 100
"""
if not self.is_dimmable:
raise SmartDeviceException("Device is not dimmable.")
if not isinstance(value, int):
raise ValueError("Brightness must be integer, " "not of %s.", type(value))
elif 0 < value <= 100:
await self.turn_on()
await self._query_helper(
"smartlife.iot.dimmer", "set_brightness", {"brightness": value}
)
await self.update()
else:
raise ValueError("Brightness value %s is not valid." % value)
@property # type: ignore
@requires_update
def is_dimmable(self):
"""Whether the switch supports brightness changes.
:return: True if switch supports brightness changes, False otherwise
:rtype: bool
"""
sys_info = self.sys_info
return "brightness" in sys_info
@property # type: ignore
@requires_update
def has_emeter(self):
"""Return whether device has an energy meter.
:return: True if energy meter is available
False otherwise
"""
sys_info = self.sys_info
features = sys_info["feature"].split(":")
return "ENE" in features
@property # type: ignore
@requires_update
def is_on(self) -> bool:
"""Return whether device is on.
:return: True if device is on, False otherwise
"""
sys_info = self.sys_info
return bool(sys_info["relay_state"])
async def turn_on(self):
"""Turn the switch on.
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_relay_state", {"state": 1})
await self.update()
async def turn_off(self):
"""Turn the switch off.
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_relay_state", {"state": 0})
await self.update()
@property # type: ignore
@requires_update
def led(self) -> bool:
"""Return the state of the led.
:return: True if led is on, False otherwise
:rtype: bool
"""
sys_info = self.sys_info
return bool(1 - sys_info["led_off"])
async def set_led(self, state: bool):
"""Set the state of the led (night mode).
:param bool state: True to set led on, False to set led off
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_led_off", {"off": int(not state)})
await self.update()
@property # type: ignore
@requires_update
def on_since(self) -> datetime.datetime:
"""Return pretty-printed on-time.
:return: datetime for on since
:rtype: datetime
"""
sys_info = self.sys_info
if self.context:
for plug in sys_info["children"]:
if plug["id"] == self.context:
on_time = plug["on_time"]
break
else:
on_time = sys_info["on_time"]
return datetime.datetime.now() - datetime.timedelta(seconds=on_time)
@property # type: ignore
@requires_update
def state_information(self) -> Dict[str, Any]:
"""Return switch-specific state information.
:return: Switch information dict, keys in user-presentable form.
:rtype: dict
"""
info = {"LED state": self.led, "On since": self.on_since}
if self.is_dimmable:
info["Brightness"] = self.brightness
return info

199
kasa/smartstrip.py Executable file
View File

@@ -0,0 +1,199 @@
"""Module for multi-socket devices (HS300, HS107).
.. todo:: describe how this interfaces with single plugs.
"""
import datetime
import logging
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List
from kasa.protocol import TPLinkSmartHomeProtocol
from kasa.smartdevice import DeviceType, requires_update
from kasa.smartplug import SmartPlug
_LOGGER = logging.getLogger(__name__)
class SmartStrip(SmartPlug):
"""Representation of a TP-Link Smart Power Strip.
Usage example when used as library:
```python
p = SmartStrip("192.168.1.105")
# query the state of the strip
print(p.is_on)
# change state of all outlets
p.sync.turn_on()
p.sync.turn_off()
# individual outlets are accessible through plugs variable
for plug in p.plugs:
print(f"{p}: {p.is_on}")
# change state of a single outlet
p.plugs[0].sync.turn_on()
```
Omit the `sync` attribute to get coroutines.
Errors reported by the device are raised as SmartDeviceExceptions,
and should be handled by the user of the library.
"""
def __init__(
self,
host: str,
protocol: TPLinkSmartHomeProtocol = None,
cache_ttl: int = 3,
ioloop=None,
) -> None:
SmartPlug.__init__(self, host=host, protocol=protocol, cache_ttl=cache_ttl)
self.emeter_type = "emeter"
self._device_type = DeviceType.Strip
self.plugs: List[SmartPlug] = []
children = self.sync.get_sys_info()["children"]
self.num_children = len(children)
for child in children:
self.plugs.append(
SmartPlug(
host,
protocol,
context=child["id"],
cache_ttl=cache_ttl,
ioloop=ioloop,
)
)
@property # type: ignore
@requires_update
def is_on(self) -> bool:
"""Return if any of the outlets are on."""
for plug in self.plugs:
is_on = plug.is_on
if is_on:
return True
return False
async def update(self):
"""Update some of the attributes.
Needed for methods that are decorated with `requires_update`.
"""
await super().update()
for plug in self.plugs:
await plug.update()
async def turn_on(self):
"""Turn the strip on.
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_relay_state", {"state": 1})
await self.update()
async def turn_off(self):
"""Turn the strip off.
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_relay_state", {"state": 0})
await self.update()
@property # type: ignore
@requires_update
def on_since(self) -> datetime.datetime:
"""Return the maximum on-time of all outlets."""
return max(plug.on_since for plug in self.plugs)
@property # type: ignore
@requires_update
def state_information(self) -> Dict[str, Any]:
"""Return strip-specific state information.
:return: Strip information dict, keys in user-presentable form.
:rtype: dict
"""
state: Dict[str, Any] = {"LED state": self.led}
for plug in self.plugs:
if plug.is_on:
state["Plug %s on since" % str(plug)] = self.on_since
return state
async def current_consumption(self) -> float:
"""Get the current power consumption in watts.
:return: the current power consumption in watts.
:rtype: float
:raises SmartDeviceException: on error
"""
consumption = sum([await plug.current_consumption() for plug in self.plugs])
return consumption
async def get_icon(self) -> Dict:
"""Icon for the device.
Overriden to keep the API, as the SmartStrip and children do not
have icons, we just return dummy strings.
"""
return {"icon": "SMARTSTRIP-DUMMY", "hash": "SMARTSTRIP-DUMMY"}
async def set_alias(self, alias: str) -> None:
"""Set the alias for the strip.
:param alias: new alias
:raises SmartDeviceException: on error
"""
return await super().set_alias(alias)
@requires_update
async def get_emeter_daily(
self, year: int = None, month: int = None, kwh: bool = True
) -> Dict:
"""Retrieve daily statistics for a given month.
:param year: year for which to retrieve statistics (default: this year)
:param month: month for which to retrieve statistics (default: this
month)
:param kwh: return usage in kWh (default: True)
:return: mapping of day of month to value
:rtype: dict
:raises SmartDeviceException: on error
"""
emeter_daily: DefaultDict[int, float] = defaultdict(lambda: 0.0)
for plug in self.plugs:
plug_emeter_daily = await plug.get_emeter_daily(
year=year, month=month, kwh=kwh
)
for day, value in plug_emeter_daily.items():
emeter_daily[day] += value
return emeter_daily
@requires_update
async def get_emeter_monthly(self, year: int = None, kwh: bool = True) -> Dict:
"""Retrieve monthly statistics for a given year.
:param year: year for which to retrieve statistics (default: this year)
:param kwh: return usage in kWh (default: True)
:return: dict: mapping of month to value
:rtype: dict
:raises SmartDeviceException: on error
"""
emeter_monthly: DefaultDict[int, float] = defaultdict(lambda: 0.0)
for plug in self.plugs:
plug_emeter_monthly = await plug.get_emeter_monthly(year=year, kwh=kwh)
for month, value in plug_emeter_monthly:
emeter_monthly[month] += value
return emeter_monthly
@requires_update
async def erase_emeter_stats(self):
"""Erase energy meter statistics for all plugs.
:raises SmartDeviceException: on error
"""
for plug in self.plugs:
await plug.erase_emeter_stats()

0
kasa/tests/__init__.py Normal file
View File

124
kasa/tests/conftest.py Normal file
View File

@@ -0,0 +1,124 @@
import asyncio
import glob
import json
import os
from os.path import basename
import pytest
from kasa import Discover, SmartBulb, SmartPlug, SmartStrip
from .newfakes import FakeTransportProtocol
SUPPORTED_DEVICES = glob.glob(
os.path.dirname(os.path.abspath(__file__)) + "/fixtures/*.json"
)
BULBS = {"LB100", "LB120", "LB130", "KL120"}
VARIABLE_TEMP = {"LB120", "LB130", "KL120"}
PLUGS = {"HS100", "HS105", "HS110", "HS200", "HS220", "HS300"}
STRIPS = {"HS300"}
COLOR_BULBS = {"LB130"}
DIMMABLE = {*BULBS, "HS220"}
EMETER = {"HS110", "HS300", *BULBS}
ALL_DEVICES = BULBS.union(PLUGS)
def filter_model(filter):
print(filter)
filtered = list()
for dev in SUPPORTED_DEVICES:
for filt in filter:
if filt in basename(dev):
filtered.append(dev)
return filtered
def get_ioloop():
ioloop = asyncio.new_event_loop()
asyncio.set_event_loop(ioloop)
return ioloop
has_emeter = pytest.mark.parametrize("dev", filter_model(EMETER), indirect=True)
no_emeter = pytest.mark.parametrize(
"dev", filter_model(ALL_DEVICES - EMETER), indirect=True
)
bulb = pytest.mark.parametrize("dev", filter_model(BULBS), indirect=True)
plug = pytest.mark.parametrize("dev", filter_model(PLUGS), indirect=True)
strip = pytest.mark.parametrize("dev", filter_model(STRIPS), indirect=True)
dimmable = pytest.mark.parametrize("dev", filter_model(DIMMABLE), indirect=True)
non_dimmable = pytest.mark.parametrize(
"dev", filter_model(ALL_DEVICES - DIMMABLE), indirect=True
)
variable_temp = pytest.mark.parametrize(
"dev", filter_model(VARIABLE_TEMP), indirect=True
)
non_variable_temp = pytest.mark.parametrize(
"dev", filter_model(BULBS - VARIABLE_TEMP), indirect=True
)
color_bulb = pytest.mark.parametrize("dev", filter_model(COLOR_BULBS), indirect=True)
non_color_bulb = pytest.mark.parametrize(
"dev", filter_model(BULBS - COLOR_BULBS), indirect=True
)
# Parametrize tests to run with device both on and off
turn_on = pytest.mark.parametrize("turn_on", [True, False])
def handle_turn_on(dev, turn_on):
if turn_on:
dev.sync.turn_on()
else:
dev.sync.turn_off()
@pytest.fixture(params=SUPPORTED_DEVICES)
def dev(request):
ioloop = get_ioloop()
file = request.param
ip = request.config.getoption("--ip")
if ip:
d = ioloop.run_until_complete(Discover.discover_single(ip))
print(d.model)
if d.model in file:
return d
return
with open(file) as f:
sysinfo = json.load(f)
model = basename(file)
params = {
"host": "123.123.123.123",
"protocol": FakeTransportProtocol(sysinfo),
"cache_ttl": 0,
}
if "LB" in model or "KL" in model:
p = SmartBulb(**params, ioloop=ioloop)
elif "HS300" in model:
p = SmartStrip(**params, ioloop=ioloop)
elif "HS" in model:
p = SmartPlug(**params, ioloop=ioloop)
else:
raise Exception("No tests for %s" % model)
yield p
def pytest_addoption(parser):
parser.addoption("--ip", action="store", default=None, help="run against device")
def pytest_collection_modifyitems(config, items):
if not config.getoption("--ip"):
print("Testing against fixtures.")
return
else:
print("Running against ip %s" % config.getoption("--ip"))

47
kasa/tests/fixtures/HS100(US)_1.0.json vendored Normal file
View File

@@ -0,0 +1,47 @@
{
"emeter": {
"get_realtime": {
"err_code": -1,
"err_msg": "module not support"
}
},
"smartlife.iot.common.emeter": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.dimmer": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.smartbulb.lightingservice": {
"err_code": -1,
"err_msg": "module not support"
},
"system": {
"get_sysinfo": {
"active_mode": "schedule",
"alias": "Mock hs100",
"dev_name": "Wi-Fi Smart Plug",
"deviceId": "048F069965D3230FD1382F0B78EAE68D42CAA2DE",
"err_code": 0,
"feature": "TIM",
"hwId": "92688D028799C60F926049D1C9EFD9E8",
"hw_ver": "1.0",
"icon_hash": "",
"latitude": 63.5442,
"latitude_i": 63.5442,
"led_off": 0,
"longitude": -148.2817,
"longitude_i": -148.2817,
"mac": "50:c7:bf:a3:71:c0",
"model": "HS100(US)",
"oemId": "149C8A24AA3A1445DE84F00DFB210D60",
"on_time": 0,
"relay_state": 0,
"rssi": -65,
"sw_ver": "1.2.5 Build 171129 Rel.174814",
"type": "IOT.SMARTPLUGSWITCH",
"updating": 0
}
}
}

47
kasa/tests/fixtures/HS105(US)_1.0.json vendored Normal file
View File

@@ -0,0 +1,47 @@
{
"emeter": {
"get_realtime": {
"err_code": -1,
"err_msg": "module not support"
}
},
"smartlife.iot.common.emeter": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.dimmer": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.smartbulb.lightingservice": {
"err_code": -1,
"err_msg": "module not support"
},
"system": {
"get_sysinfo": {
"active_mode": "schedule",
"alias": "Mock hs105",
"dev_name": "Smart Wi-Fi Plug Mini",
"deviceId": "F0723FAFC1FA27FC755B9F228A2297D921FEBCD1",
"err_code": 0,
"feature": "TIM",
"hwId": "51E17031929D5FEF9147091AD67B954A",
"hw_ver": "1.0",
"icon_hash": "",
"INVALIDlatitude": 79.7779,
"latitude_i": 79.7779,
"led_off": 0,
"INVALIDlongitude": 90.8844,
"longitude_i": 90.8844,
"mac": "50:c7:bf:ac:c0:6a",
"model": "HS105(US)",
"oemId": "990ADB7AEDE871C41D1B7613D1FE7A76",
"on_time": 0,
"relay_state": 0,
"rssi": -65,
"sw_ver": "1.2.9 Build 170808 Rel.145916",
"type": "IOT.SMARTPLUGSWITCH",
"updating": 0
}
}
}

View File

@@ -0,0 +1,49 @@
{
"emeter": {
"get_realtime": {
"current": 0.015342,
"err_code": 0,
"power": 0.983971,
"total": 32.448,
"voltage": 235.595234
}
},
"smartlife.iot.common.emeter": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.dimmer": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.smartbulb.lightingservice": {
"err_code": -1,
"err_msg": "module not support"
},
"system": {
"get_sysinfo": {
"active_mode": "schedule",
"alias": "Kitchen",
"dev_name": "Wi-Fi Smart Plug With Energy Monitoring",
"deviceId": "8006588E50AD389303FF31AB6302907A17442F16",
"err_code": 0,
"feature": "TIM:ENE",
"fwId": "00000000000000000000000000000000",
"hwId": "45E29DA8382494D2E82688B52A0B2EB5",
"hw_ver": "1.0",
"icon_hash": "",
"latitude": 51.476938,
"led_off": 1,
"longitude": 7.216849,
"mac": "50:C7:BF:01:F8:CD",
"model": "HS110(EU)",
"oemId": "3D341ECE302C0642C99E31CE2430544B",
"on_time": 512874,
"relay_state": 1,
"rssi": -71,
"sw_ver": "1.2.5 Build 171213 Rel.101523",
"type": "IOT.SMARTPLUGSWITCH",
"updating": 0
}
}
}

50
kasa/tests/fixtures/HS110(EU)_2.0.json vendored Normal file
View File

@@ -0,0 +1,50 @@
{
"emeter": {
"get_realtime": {
"current_ma": 125,
"err_code": 0,
"power_mw": 3140,
"total_wh": 51493,
"voltage_mv": 122049
}
},
"smartlife.iot.common.emeter": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.dimmer": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.smartbulb.lightingservice": {
"err_code": -1,
"err_msg": "module not support"
},
"system": {
"get_sysinfo": {
"active_mode": "schedule",
"alias": "Mock hs110v2",
"dev_name": "Smart Wi-Fi Plug With Energy Monitoring",
"deviceId": "A466BCDB5026318939145B7CC7EF18D8C1D3A954",
"err_code": 0,
"feature": "TIM:ENE",
"hwId": "1F7FABB46373CA51E3AFDE5930ECBB36",
"hw_ver": "2.0",
"icon_hash": "",
"INVALIDlatitude": -60.4599,
"latitude_i": -60.4599,
"led_off": 0,
"INVALIDlongitude": 76.1249,
"longitude_i": 76.1249,
"mac": "50:c7:bf:b9:40:08",
"model": "HS110(EU)",
"oemId": "BB668B949FA4559655F1187DD56622BD",
"on_time": 0,
"relay_state": 0,
"rssi": -65,
"sw_ver": "1.5.2 Build 180130 Rel.085820",
"type": "IOT.SMARTPLUGSWITCH",
"updating": 0
}
}
}

50
kasa/tests/fixtures/HS110(US)_1.0.json vendored Normal file
View File

@@ -0,0 +1,50 @@
{
"emeter": {
"get_realtime": {
"current": 0.1256,
"err_code": 0,
"power": 3.14,
"total": 51.493,
"voltage": 122.049119
}
},
"smartlife.iot.common.emeter": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.dimmer": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.smartbulb.lightingservice": {
"err_code": -1,
"err_msg": "module not support"
},
"system": {
"get_sysinfo": {
"active_mode": "schedule",
"alias": "Mock hs110",
"dev_name": "Wi-Fi Smart Plug With Energy Monitoring",
"deviceId": "11A5FD4A0FA1FCE5468F55D23CE77D1753A93E11",
"err_code": 0,
"feature": "TIM:ENE",
"hwId": "6C56A17315351DD0EDE0BDB1D9EBBD66",
"hw_ver": "1.0",
"icon_hash": "",
"latitude": 82.2866,
"latitude_i": 82.2866,
"led_off": 0,
"longitude": 10.0036,
"longitude_i": 10.0036,
"mac": "50:c7:bf:66:29:29",
"model": "HS110(US)",
"oemId": "F7DFC14D43DA806B55DB66D21F212B60",
"on_time": 0,
"relay_state": 0,
"rssi": -65,
"sw_ver": "1.0.8 Build 151113 Rel.24658",
"type": "IOT.SMARTPLUGSWITCH",
"updating": 0
}
}
}

47
kasa/tests/fixtures/HS200(US)_1.0.json vendored Normal file
View File

@@ -0,0 +1,47 @@
{
"emeter": {
"get_realtime": {
"err_code": -1,
"err_msg": "module not support"
}
},
"smartlife.iot.common.emeter": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.dimmer": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.smartbulb.lightingservice": {
"err_code": -1,
"err_msg": "module not support"
},
"system": {
"get_sysinfo": {
"active_mode": "schedule",
"alias": "Mock hs200",
"dev_name": "Wi-Fi Smart Light Switch",
"deviceId": "EC565185337CF59A4C9A73442AAD5F11C6E91716",
"err_code": 0,
"feature": "TIM",
"hwId": "4B5DB5E42F13728107D075EF5C3ECFA1",
"hw_ver": "1.0",
"icon_hash": "",
"latitude": 58.7882,
"latitude_i": 58.7882,
"led_off": 0,
"longitude": 107.7225,
"longitude_i": 107.7225,
"mac": "50:c7:bf:95:4b:45",
"model": "HS200(US)",
"oemId": "D2A5D690B25980755216FD684AF8CD88",
"on_time": 0,
"relay_state": 0,
"rssi": -65,
"sw_ver": "1.1.0 Build 160521 Rel.085826",
"type": "IOT.SMARTPLUGSWITCH",
"updating": 0
}
}
}

75
kasa/tests/fixtures/HS220(US)_1.0.json vendored Normal file
View File

@@ -0,0 +1,75 @@
{
"emeter": {
"get_realtime": {
"err_code": -1,
"err_msg": "module not support"
}
},
"smartlife.iot.common.emeter": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.dimmer": {
"get_dimmer_parameters": {
"bulb_type": 1,
"err_code": 0,
"fadeOffTime": 3000,
"fadeOnTime": 3000,
"gentleOffTime": 510000,
"gentleOnTime": 3000,
"minThreshold": 0,
"rampRate": 30
}
},
"smartlife.iot.smartbulb.lightingservice": {
"err_code": -1,
"err_msg": "module not support"
},
"system": {
"get_sysinfo": {
"active_mode": "count_down",
"alias": "Mock hs220",
"brightness": 50,
"dev_name": "Smart Wi-Fi Dimmer",
"deviceId": "98E16F2D5ED204F3094CF472260237133DC0D547",
"err_code": 0,
"feature": "TIM",
"hwId": "231004CCCDB6C0B8FC7A3260C3470257",
"hw_ver": "1.0",
"icon_hash": "",
"INVALIDlatitude": 11.6210,
"latitude_i": 11.6210,
"led_off": 0,
"INVALIDlongitude": 42.2074,
"longitude_i": 42.2074,
"mac": "50:c7:bf:af:75:5d",
"mic_type": "IOT.SMARTPLUGSWITCH",
"model": "HS220(US)",
"oemId": "8FBD0F3CCF7E82836DC7996C524EF772",
"on_time": 0,
"preferred_state": [
{
"brightness": 100,
"index": 0
},
{
"brightness": 75,
"index": 1
},
{
"brightness": 50,
"index": 2
},
{
"brightness": 25,
"index": 3
}
],
"relay_state": 0,
"rssi": -65,
"sw_ver": "1.5.7 Build 180912 Rel.104837",
"type": "IOT.SMARTPLUGSWITCH",
"updating": 0
}
}
}

View File

@@ -0,0 +1,76 @@
{
"emeter": {
"get_realtime": {
"err_code": -1,
"err_msg": "module not support"
}
},
"smartlife.iot.common.emeter": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.dimmer": {
"get_dimmer_parameters": {
"bulb_type": 1,
"err_code": 0,
"fadeOffTime": 1000,
"fadeOnTime": 1000,
"gentleOffTime": 10000,
"gentleOnTime": 3000,
"minThreshold": 0,
"rampRate": 30
}
},
"smartlife.iot.smartbulb.lightingservice": {
"err_code": -1,
"err_msg": "module not support"
},
"system": {
"get_sysinfo": {
"active_mode": "none",
"alias": "Living room left dimmer",
"brightness": 25,
"dev_name": "Smart Wi-Fi Dimmer",
"deviceId": "000000000000000000000000000000000000000",
"err_code": 0,
"feature": "TIM",
"fwId": "00000000000000000000000000000000",
"hwId": "00000000000000000000000000000000",
"hw_ver": "1.0",
"icon_hash": "",
"latitude_i": 11.6210,
"led_off": 0,
"longitude_i": 42.2074,
"mac": "00:00:00:00:00:00",
"mic_type": "IOT.SMARTPLUGSWITCH",
"model": "HS220(US)",
"next_action": {
"type": -1
},
"oemId": "00000000000000000000000000000000",
"on_time": 0,
"preferred_state": [
{
"brightness": 100,
"index": 0
},
{
"brightness": 75,
"index": 1
},
{
"brightness": 50,
"index": 2
},
{
"brightness": 25,
"index": 3
}
],
"relay_state": 0,
"rssi": -35,
"sw_ver": "1.5.7 Build 180912 Rel.104837",
"updating": 0
}
}
}

102
kasa/tests/fixtures/HS300(US)_1.0.json vendored Normal file
View File

@@ -0,0 +1,102 @@
{
"emeter": {
"get_realtime": {
"current_ma": 125,
"err_code": 0,
"power_mw": 3140,
"total_wh": 51493,
"voltage_mv": 122049
}
},
"smartlife.iot.common.emeter": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.dimmer": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.smartbulb.lightingservice": {
"err_code": -1,
"err_msg": "module not support"
},
"system": {
"get_sysinfo": {
"alias": "Mock hs300",
"child_num": 6,
"children": [
{
"alias": "Mock One",
"id": "00",
"next_action": {
"type": -1
},
"on_time": 123,
"state": 1
},
{
"alias": "Mock Two",
"id": "01",
"next_action": {
"type": -1
},
"on_time": 333,
"state": 1
},
{
"alias": "Mock Three",
"id": "02",
"next_action": {
"type": -1
},
"on_time": 0,
"state": 0
},
{
"alias": "Mock Four",
"id": "03",
"next_action": {
"type": -1
},
"on_time": 0,
"state": 0
},
{
"alias": "Mock Five",
"id": "04",
"next_action": {
"type": -1
},
"on_time": 0,
"state": 0
},
{
"alias": "Mock Six",
"id": "05",
"next_action": {
"type": -1
},
"on_time": 0,
"state": 0
}
],
"deviceId": "4BFC2F2C8678FE623700FD3737EC4E245196F3CF",
"err_code": 0,
"feature": "TIM:ENE",
"hwId": "1B63E5DF21B5AFB52F364DE66BFAAF8A",
"hw_ver": "1.0",
"latitude": -68.9980,
"latitude_i": -68.9980,
"led_off": 0,
"longitude": -109.4400,
"longitude_i": -109.4400,
"mac": "50:c7:bf:c2:75:88",
"mic_type": "IOT.SMARTPLUGSWITCH",
"model": "HS300(US)",
"oemId": "FC71DAAB004326F9369EDEF4353E4FE1",
"rssi": -68,
"sw_ver": "1.0.6 Build 180627 Rel.081000",
"updating": 0
}
}
}

View File

@@ -0,0 +1,93 @@
{
"emeter": {
"err_code": -2001,
"err_msg": "Module not support"
},
"smartlife.iot.common.emeter": {
"get_realtime": {
"err_code": 0,
"power_mw": 1800
}
},
"smartlife.iot.dimmer": {
"err_code": -2001,
"err_msg": "Module not support"
},
"smartlife.iot.smartbulb.lightingservice": {
"get_light_state": {
"brightness": 10,
"color_temp": 2700,
"err_code": 0,
"hue": 0,
"mode": "normal",
"on_off": 1,
"saturation": 0
}
},
"system": {
"get_sysinfo": {
"active_mode": "none",
"alias": "",
"ctrl_protocols": {
"name": "Linkie",
"version": "1.0"
},
"description": "Smart Wi-Fi LED Bulb with Tunable White Light",
"dev_state": "normal",
"deviceId": "801200814AD69370AC59DE5501319C051AF409C3",
"disco_ver": "1.0",
"err_code": 0,
"heapsize": 290784,
"hwId": "111E35908497A05512E259BB76801E10",
"hw_ver": "1.0",
"is_color": 0,
"is_dimmable": 1,
"is_factory": false,
"is_variable_color_temp": 1,
"light_state": {
"brightness": 10,
"color_temp": 2700,
"hue": 0,
"mode": "normal",
"on_off": 1,
"saturation": 0
},
"mic_mac": "D80D17150474",
"mic_type": "IOT.SMARTBULB",
"model": "KL120(US)",
"oemId": "1210657CD7FBDC72895644388EEFAE8B",
"preferred_state": [
{
"brightness": 100,
"color_temp": 3500,
"hue": 0,
"index": 0,
"saturation": 0
},
{
"brightness": 50,
"color_temp": 5000,
"hue": 0,
"index": 1,
"saturation": 0
},
{
"brightness": 50,
"color_temp": 2700,
"hue": 0,
"index": 2,
"saturation": 0
},
{
"brightness": 1,
"color_temp": 2700,
"hue": 0,
"index": 3,
"saturation": 0
}
],
"rssi": -52,
"sw_ver": "1.8.6 Build 180809 Rel.091659"
}
}
}

103
kasa/tests/fixtures/LB100(US)_1.0.json vendored Normal file
View File

@@ -0,0 +1,103 @@
{
"emeter": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.common.emeter": {
"get_realtime": {
"err_code": 0,
"power_mw": 10800
}
},
"smartlife.iot.dimmer": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.smartbulb.lightingservice": {
"get_light_state": {
"dft_on_state": {
"brightness": 100,
"color_temp": 2700,
"hue": 0,
"mode": "normal",
"saturation": 0
},
"err_code": 0,
"on_off": 0
}
},
"system": {
"get_sysinfo": {
"active_mode": "none",
"alias": "Mock lb100",
"ctrl_protocols": {
"name": "Linkie",
"version": "1.0"
},
"description": "Smart Wi-Fi LED Bulb with Dimmable Light",
"dev_state": "normal",
"deviceId": "15BD5A6C4B729A7C0D4D46ADDFA7E2600793C56A",
"disco_ver": "1.0",
"err_code": 0,
"heapsize": 302452,
"hwId": "1B0DF0A2EFE6251DBE726D1D2167C78F",
"hw_ver": "1.0",
"is_color": 0,
"is_dimmable": 1,
"is_factory": false,
"is_variable_color_temp": 0,
"latitude": -51.8361,
"latitude_i": -51.8361,
"light_state": {
"dft_on_state": {
"brightness": 100,
"color_temp": 2700,
"hue": 0,
"mode": "normal",
"saturation": 0
},
"err_code": 0,
"on_off": 0
},
"longitude": -34.0697,
"longitude_i": -34.0697,
"mac": "50:c7:bf:51:10:65",
"mic_type": "IOT.SMARTBULB",
"model": "LB100(US)",
"oemId": "C9CF655C9A5AA101E66EBA5B382E40CC",
"on_time": 0,
"preferred_state": [
{
"brightness": 100,
"color_temp": 2700,
"hue": 0,
"index": 0,
"saturation": 0
},
{
"brightness": 75,
"color_temp": 2700,
"hue": 0,
"index": 1,
"saturation": 0
},
{
"brightness": 25,
"color_temp": 2700,
"hue": 0,
"index": 2,
"saturation": 0
},
{
"brightness": 1,
"color_temp": 2700,
"hue": 0,
"index": 3,
"saturation": 0
}
],
"rssi": -65,
"sw_ver": "1.4.3 Build 170504 Rel.144921"
}
}
}

103
kasa/tests/fixtures/LB120(US)_1.0.json vendored Normal file
View File

@@ -0,0 +1,103 @@
{
"emeter": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.common.emeter": {
"get_realtime": {
"err_code": 0,
"power_mw": 10800
}
},
"smartlife.iot.dimmer": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.smartbulb.lightingservice": {
"get_light_state": {
"dft_on_state": {
"brightness": 100,
"color_temp": 2700,
"hue": 0,
"mode": "normal",
"saturation": 0
},
"err_code": 0,
"on_off": 0
}
},
"system": {
"get_sysinfo": {
"active_mode": "none",
"alias": "Mock lb120",
"ctrl_protocols": {
"name": "Linkie",
"version": "1.0"
},
"description": "Smart Wi-Fi LED Bulb with Tunable White Light",
"dev_state": "normal",
"deviceId": "62FD818E5B66A509D571D07D0F00FA4DD6468494",
"disco_ver": "1.0",
"err_code": 0,
"heapsize": 302452,
"hwId": "CC0588817E251DF996F1848ED331F543",
"hw_ver": "1.0",
"is_color": 0,
"is_dimmable": 1,
"is_factory": false,
"is_variable_color_temp": 1,
"latitude": -76.9197,
"latitude_i": -76.9197,
"light_state": {
"dft_on_state": {
"brightness": 100,
"color_temp": 2700,
"hue": 0,
"mode": "normal",
"saturation": 0
},
"err_code": 0,
"on_off": 0
},
"longitude": 164.7293,
"longitude_i": 164.7293,
"mac": "50:c7:bf:dc:62:13",
"mic_type": "IOT.SMARTBULB",
"model": "LB120(US)",
"oemId": "05D0D97951F565579A7F5A70A57AED0B",
"on_time": 0,
"preferred_state": [
{
"brightness": 100,
"color_temp": 2700,
"hue": 0,
"index": 0,
"saturation": 0
},
{
"brightness": 75,
"color_temp": 2700,
"hue": 0,
"index": 1,
"saturation": 0
},
{
"brightness": 25,
"color_temp": 2700,
"hue": 0,
"index": 2,
"saturation": 0
},
{
"brightness": 1,
"color_temp": 2700,
"hue": 0,
"index": 3,
"saturation": 0
}
],
"rssi": -65,
"sw_ver": "1.1.0 Build 160630 Rel.085319"
}
}
}

104
kasa/tests/fixtures/LB130(US)_1.0.json vendored Normal file
View File

@@ -0,0 +1,104 @@
{
"emeter": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.common.emeter": {
"get_realtime": {
"err_code": 0,
"power_mw": 10800
}
},
"smartlife.iot.dimmer": {
"err_code": -1,
"err_msg": "module not support"
},
"smartlife.iot.smartbulb.lightingservice": {
"get_light_state": {
"dft_on_state": {
"brightness": 100,
"color_temp": 2700,
"hue": 0,
"mode": "normal",
"saturation": 0
},
"err_code": 0,
"on_off": 0
}
},
"system": {
"get_sysinfo": {
"active_mode": "none",
"alias": "Mock lb130",
"ctrl_protocols": {
"name": "Linkie",
"version": "1.0"
},
"description": "Smart Wi-Fi LED Bulb with Color Changing",
"dev_state": "normal",
"deviceId": "50BE9E7B6F26CA75D495C13EAA459C491768F143",
"disco_ver": "1.0",
"err_code": 0,
"heapsize": 302452,
"hwId": "C8AD962B53417C2845CC10CE25C00BB1",
"hw_ver": "1.0",
"is_color": 1,
"is_dimmable": 1,
"is_factory": false,
"is_variable_color_temp": 1,
"latitude": 76.8649,
"latitude_i": 76.8649,
"light_state": {
"dft_on_state": {
"brightness": 100,
"color_temp": 2700,
"hue": 0,
"mode": "normal",
"saturation": 0
},
"err_code": 0,
"on_off": 0
},
"longitude": -40.7284,
"longitude_i": -40.7284,
"INVALIDmac": "50:c7:bf:ac:f6:19",
"mic_mac": "50C7BFACF619",
"mic_type": "IOT.SMARTBULB",
"model": "LB130(US)",
"oemId": "CF78964560AAB75A43F15D2E468B63EF",
"on_time": 0,
"preferred_state": [
{
"brightness": 100,
"color_temp": 2700,
"hue": 0,
"index": 0,
"saturation": 0
},
{
"brightness": 75,
"color_temp": 2700,
"hue": 0,
"index": 1,
"saturation": 0
},
{
"brightness": 25,
"color_temp": 2700,
"hue": 0,
"index": 2,
"saturation": 0
},
{
"brightness": 1,
"color_temp": 2700,
"hue": 0,
"index": 3,
"saturation": 0
}
],
"rssi": -65,
"sw_ver": "1.6.0 Build 170703 Rel.141938"
}
}
}

425
kasa/tests/newfakes.py Normal file
View File

@@ -0,0 +1,425 @@
import logging
import re
from voluptuous import REMOVE_EXTRA, All, Any, Coerce, Invalid, Optional, Range, Schema
from ..protocol import TPLinkSmartHomeProtocol
_LOGGER = logging.getLogger(__name__)
def check_int_bool(x):
if x != 0 and x != 1:
raise Invalid(x)
return x
def check_mac(x):
if re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", x.lower()):
return x
raise Invalid(x)
def check_mode(x):
if x in ["schedule", "none", "count_down"]:
return x
raise Invalid(f"invalid mode {x}")
def lb_dev_state(x):
if x in ["normal"]:
return x
raise Invalid(f"Invalid dev_state {x}")
TZ_SCHEMA = Schema(
{"zone_str": str, "dst_offset": int, "index": All(int, Range(min=0)), "tz_str": str}
)
CURRENT_CONSUMPTION_SCHEMA = Schema(
Any(
{
"voltage": Any(All(float, Range(min=0, max=300)), None),
"power": Any(Coerce(float, Range(min=0)), None),
"total": Any(Coerce(float, Range(min=0)), None),
"current": Any(All(float, Range(min=0)), None),
"voltage_mv": Any(
All(float, Range(min=0, max=300000)), int, None
), # TODO can this be int?
"power_mw": Any(Coerce(float, Range(min=0)), None),
"total_wh": Any(Coerce(float, Range(min=0)), None),
"current_ma": Any(
All(float, Range(min=0)), int, None
), # TODO can this be int?
},
None,
)
)
# these schemas should go to the mainlib as
# they can be useful when adding support for new features/devices
# as well as to check that faked devices are operating properly.
PLUG_SCHEMA = Schema(
{
"active_mode": check_mode,
"alias": str,
"dev_name": str,
"deviceId": str,
"feature": str,
"fwId": str,
"hwId": str,
"hw_ver": str,
"icon_hash": str,
"led_off": check_int_bool,
"latitude": Any(All(float, Range(min=-90, max=90)), None),
"latitude_i": Any(All(float, Range(min=-90, max=90)), None),
"longitude": Any(All(float, Range(min=-180, max=180)), None),
"longitude_i": Any(All(float, Range(min=-180, max=180)), None),
"mac": check_mac,
"model": str,
"oemId": str,
"on_time": int,
"relay_state": int,
"rssi": Any(int, None), # rssi can also be positive, see #54
"sw_ver": str,
"type": str,
"mic_type": str,
"updating": check_int_bool,
# these are available on hs220
"brightness": int,
"preferred_state": [
{"brightness": All(int, Range(min=0, max=100)), "index": int}
],
"next_action": {"type": int},
"child_num": Optional(Any(None, int)), # TODO fix hs300 checks
"children": Optional(list), # TODO fix hs300
# TODO some tplink simulator entries contain invalid (mic_mac, _i variants for lat/lon)
# Therefore we add REMOVE_EXTRA..
# "INVALIDmac": Optional,
# "INVALIDlatitude": Optional,
# "INVALIDlongitude": Optional,
},
extra=REMOVE_EXTRA,
)
BULB_SCHEMA = PLUG_SCHEMA.extend(
{
"ctrl_protocols": Optional(dict),
"description": Optional(str), # TODO: LBxxx similar to dev_name
"dev_state": lb_dev_state,
"disco_ver": str,
"heapsize": int,
"is_color": check_int_bool,
"is_dimmable": check_int_bool,
"is_factory": bool,
"is_variable_color_temp": check_int_bool,
"light_state": {
"brightness": All(int, Range(min=0, max=100)),
"color_temp": int,
"hue": All(int, Range(min=0, max=255)),
"mode": str,
"on_off": check_int_bool,
"saturation": All(int, Range(min=0, max=255)),
"dft_on_state": Optional(
{
"brightness": All(int, Range(min=0, max=100)),
"color_temp": All(int, Range(min=2700, max=9000)),
"hue": All(int, Range(min=0, max=255)),
"mode": str,
"saturation": All(int, Range(min=0, max=255)),
}
),
"err_code": int,
},
"preferred_state": [
{
"brightness": All(int, Range(min=0, max=100)),
"color_temp": int,
"hue": All(int, Range(min=0, max=255)),
"index": int,
"saturation": All(int, Range(min=0, max=255)),
}
],
}
)
def get_realtime(obj, x, *args):
return {
"current": 0.268587,
"voltage": 125.836131,
"power": 33.495623,
"total": 0.199000,
}
def get_monthstat(obj, x, *args):
if x["year"] < 2016:
return {"month_list": []}
return {
"month_list": [
{"year": 2016, "month": 11, "energy": 1.089000},
{"year": 2016, "month": 12, "energy": 1.582000},
]
}
def get_daystat(obj, x, *args):
if x["year"] < 2016:
return {"day_list": []}
return {
"day_list": [
{"year": 2016, "month": 11, "day": 24, "energy": 0.026000},
{"year": 2016, "month": 11, "day": 25, "energy": 0.109000},
]
}
emeter_support = {
"get_realtime": get_realtime,
"get_monthstat": get_monthstat,
"get_daystat": get_daystat,
}
def get_realtime_units(obj, x, *args):
return {"power_mw": 10800}
def get_monthstat_units(obj, x, *args):
if x["year"] < 2016:
return {"month_list": []}
return {
"month_list": [
{"year": 2016, "month": 11, "energy_wh": 32},
{"year": 2016, "month": 12, "energy_wh": 16},
]
}
def get_daystat_units(obj, x, *args):
if x["year"] < 2016:
return {"day_list": []}
return {
"day_list": [
{"year": 2016, "month": 11, "day": 24, "energy_wh": 20},
{"year": 2016, "month": 11, "day": 25, "energy_wh": 32},
]
}
emeter_units_support = {
"get_realtime": get_realtime_units,
"get_monthstat": get_monthstat_units,
"get_daystat": get_daystat_units,
}
emeter_commands = {
"emeter": emeter_support,
"smartlife.iot.common.emeter": emeter_units_support,
}
def error(target, cmd="no-command", msg="default msg"):
return {target: {cmd: {"err_code": -1323, "msg": msg}}}
def success(target, cmd, res):
if res:
res.update({"err_code": 0})
else:
res = {"err_code": 0}
return {target: {cmd: res}}
class FakeTransportProtocol(TPLinkSmartHomeProtocol):
def __init__(self, info, invalid=False):
# TODO remove invalid when removing the old tests.
proto = FakeTransportProtocol.baseproto
for target in info:
# print("target %s" % target)
for cmd in info[target]:
# print("initializing tgt %s cmd %s" % (target, cmd))
proto[target][cmd] = info[target][cmd]
# if we have emeter support, check for it
for module in ["emeter", "smartlife.iot.common.emeter"]:
if module not in info:
# TODO required for old tests
continue
if "get_realtime" in info[module]:
get_realtime_res = info[module]["get_realtime"]
# TODO remove when removing old tests
if callable(get_realtime_res):
get_realtime_res = get_realtime_res()
if (
"err_code" not in get_realtime_res
or not get_realtime_res["err_code"]
):
proto[module] = emeter_commands[module]
self.proto = proto
def set_alias(self, x, child_ids=[]):
_LOGGER.debug("Setting alias to %s, child_ids: %s", x["alias"], child_ids)
if child_ids:
for child in self.proto["system"]["get_sysinfo"]["children"]:
if child["id"] in child_ids:
child["alias"] = x["alias"]
else:
self.proto["system"]["get_sysinfo"]["alias"] = x["alias"]
def set_relay_state(self, x, child_ids=[]):
_LOGGER.debug("Setting relay state to %s", x["state"])
if not child_ids and "children" in self.proto["system"]["get_sysinfo"]:
for child in self.proto["system"]["get_sysinfo"]["children"]:
child_ids.append(child["id"])
_LOGGER.info("child_ids: %s", child_ids)
if child_ids:
for child in self.proto["system"]["get_sysinfo"]["children"]:
if child["id"] in child_ids:
_LOGGER.info("Found %s, turning to %s", child, x["state"])
child["state"] = x["state"]
else:
self.proto["system"]["get_sysinfo"]["relay_state"] = x["state"]
def set_led_off(self, x, *args):
_LOGGER.debug("Setting led off to %s", x)
self.proto["system"]["get_sysinfo"]["led_off"] = x["off"]
def set_mac(self, x, *args):
_LOGGER.debug("Setting mac to %s", x)
self.proto["system"]["get_sysinfo"]["mac"] = x
def set_hs220_brightness(self, x, *args):
_LOGGER.debug("Setting brightness to %s", x)
self.proto["system"]["get_sysinfo"]["brightness"] = x["brightness"]
def transition_light_state(self, x, *args):
_LOGGER.debug("Setting light state to %s", x)
light_state = self.proto["smartlife.iot.smartbulb.lightingservice"][
"get_light_state"
]
# The required change depends on the light state,
# exception being turning the bulb on and off
if "on_off" in x:
if x["on_off"] and not light_state["on_off"]: # turning on
new_state = light_state["dft_on_state"]
new_state["on_off"] = 1
self.proto["smartlife.iot.smartbulb.lightingservice"][
"get_light_state"
] = new_state
elif not x["on_off"] and light_state["on_off"]:
new_state = {"dft_on_state": light_state, "on_off": 0}
self.proto["smartlife.iot.smartbulb.lightingservice"][
"get_light_state"
] = new_state
return
if not light_state["on_off"] and "on_off" not in x:
light_state = light_state["dft_on_state"]
_LOGGER.debug("Current state: %s", light_state)
for key in x:
light_state[key] = x[key]
def light_state(self, x, *args):
light_state = self.proto["smartlife.iot.smartbulb.lightingservice"][
"get_light_state"
]
# Our tests have light state off, so we simply return the dft_on_state when device is on.
_LOGGER.info("reporting light state: %s", light_state)
if light_state["on_off"]:
return light_state["dft_on_state"]
else:
return light_state
baseproto = {
"system": {
"set_relay_state": set_relay_state,
"set_dev_alias": set_alias,
"set_led_off": set_led_off,
"get_dev_icon": {"icon": None, "hash": None},
"set_mac_addr": set_mac,
"get_sysinfo": None,
},
"emeter": {
"get_realtime": None,
"get_daystat": None,
"get_monthstat": None,
"erase_emeter_state": None,
},
"smartlife.iot.common.emeter": {
"get_realtime": None,
"get_daystat": None,
"get_monthstat": None,
"erase_emeter_state": None,
},
"smartlife.iot.smartbulb.lightingservice": {
"get_light_state": light_state,
"transition_light_state": transition_light_state,
},
"time": {
"get_time": {
"year": 2017,
"month": 1,
"mday": 2,
"hour": 3,
"min": 4,
"sec": 5,
},
"get_timezone": {
"zone_str": "test",
"dst_offset": -1,
"index": 12,
"tz_str": "test2",
},
"set_timezone": None,
},
# HS220 brightness, different setter and getter
"smartlife.iot.dimmer": {"set_brightness": set_hs220_brightness},
}
async def query(self, host, request, port=9999):
proto = self.proto
# collect child ids from context
try:
child_ids = request["context"]["child_ids"]
request.pop("context", None)
except KeyError:
child_ids = []
target = next(iter(request))
if target not in proto.keys():
return error(target, msg="target not found")
cmd = next(iter(request[target]))
if cmd not in proto[target].keys():
return error(target, cmd, msg="command not found")
params = request[target][cmd]
_LOGGER.debug(f"Going to execute {target}.{cmd} (params: {params}).. ")
if callable(proto[target][cmd]):
res = proto[target][cmd](self, params, child_ids)
_LOGGER.debug("[callable] %s.%s: %s", target, cmd, res)
# verify that change didn't break schema, requires refactoring..
# TestSmartPlug.sysinfo_schema(self.proto["system"]["get_sysinfo"])
return success(target, cmd, res)
elif isinstance(proto[target][cmd], dict):
res = proto[target][cmd]
_LOGGER.debug("[static] %s.%s: %s", target, cmd, res)
return success(target, cmd, res)
else:
raise NotImplementedError(f"target {target} cmd {cmd}")

609
kasa/tests/test_fixtures.py Normal file
View File

@@ -0,0 +1,609 @@
import asyncio
import datetime
from unittest.mock import patch
import pytest
from kasa import DeviceType, SmartDeviceException, SmartStrip
from .conftest import (
bulb,
color_bulb,
dimmable,
handle_turn_on,
has_emeter,
no_emeter,
non_color_bulb,
non_dimmable,
non_variable_temp,
plug,
strip,
turn_on,
variable_temp,
)
from .newfakes import (
BULB_SCHEMA,
CURRENT_CONSUMPTION_SCHEMA,
PLUG_SCHEMA,
TZ_SCHEMA,
FakeTransportProtocol,
)
@plug
def test_plug_sysinfo(dev):
dev.sync.update()
assert dev.sys_info is not None
PLUG_SCHEMA(dev.sys_info)
assert dev.model is not None
assert dev.device_type == DeviceType.Plug or dev.device_type == DeviceType.Strip
assert dev.is_plug or dev.is_strip
@bulb
def test_bulb_sysinfo(dev):
dev.sync.update()
assert dev.sys_info is not None
BULB_SCHEMA(dev.sys_info)
assert dev.model is not None
assert dev.device_type == DeviceType.Bulb
assert dev.is_bulb
def test_state_info(dev):
dev.sync.update()
assert isinstance(dev.sync.state_information, dict)
def test_invalid_connection(dev):
with patch.object(FakeTransportProtocol, "query", side_effect=SmartDeviceException):
with pytest.raises(SmartDeviceException):
dev.sync.update()
dev.is_on
def test_query_helper(dev):
with pytest.raises(SmartDeviceException):
dev.sync._query_helper("test", "testcmd", {})
# TODO check for unwrapping?
@turn_on
def test_state(dev, turn_on):
handle_turn_on(dev, turn_on)
dev.sync.update()
orig_state = dev.is_on
if orig_state:
dev.sync.turn_off()
assert not dev.is_on
assert dev.is_off
dev.sync.turn_on()
assert dev.is_on
assert not dev.is_off
else:
dev.sync.turn_on()
assert dev.is_on
assert not dev.is_off
dev.sync.turn_off()
assert not dev.is_on
assert dev.is_off
@no_emeter
def test_no_emeter(dev):
dev.sync.update()
assert not dev.has_emeter
with pytest.raises(SmartDeviceException):
dev.sync.get_emeter_realtime()
with pytest.raises(SmartDeviceException):
dev.sync.get_emeter_daily()
with pytest.raises(SmartDeviceException):
dev.sync.get_emeter_monthly()
with pytest.raises(SmartDeviceException):
dev.sync.erase_emeter_stats()
@has_emeter
def test_get_emeter_realtime(dev):
dev.sync.update()
if dev.is_strip:
pytest.skip("Disabled for HS300 temporarily")
assert dev.has_emeter
current_emeter = dev.sync.get_emeter_realtime()
CURRENT_CONSUMPTION_SCHEMA(current_emeter)
@has_emeter
def test_get_emeter_daily(dev):
dev.sync.update()
if dev.is_strip:
pytest.skip("Disabled for HS300 temporarily")
assert dev.has_emeter
assert dev.sync.get_emeter_daily(year=1900, month=1) == {}
d = dev.sync.get_emeter_daily()
assert len(d) > 0
k, v = d.popitem()
assert isinstance(k, int)
assert isinstance(v, float)
# Test kwh (energy, energy_wh)
d = dev.sync.get_emeter_daily(kwh=False)
k2, v2 = d.popitem()
assert v * 1000 == v2
@has_emeter
def test_get_emeter_monthly(dev):
dev.sync.update()
if dev.is_strip:
pytest.skip("Disabled for HS300 temporarily")
assert dev.has_emeter
assert dev.sync.get_emeter_monthly(year=1900) == {}
d = dev.sync.get_emeter_monthly()
assert len(d) > 0
k, v = d.popitem()
assert isinstance(k, int)
assert isinstance(v, float)
# Test kwh (energy, energy_wh)
d = dev.sync.get_emeter_monthly(kwh=False)
k2, v2 = d.popitem()
assert v * 1000 == v2
@has_emeter
def test_emeter_status(dev):
dev.sync.update()
if dev.is_strip:
pytest.skip("Disabled for HS300 temporarily")
assert dev.has_emeter
d = dev.sync.get_emeter_realtime()
with pytest.raises(KeyError):
assert d["foo"]
assert d["power_mw"] == d["power"] * 1000
# bulbs have only power according to tplink simulator.
if not dev.is_bulb:
assert d["voltage_mv"] == d["voltage"] * 1000
assert d["current_ma"] == d["current"] * 1000
assert d["total_wh"] == d["total"] * 1000
@pytest.mark.skip("not clearing your stats..")
@has_emeter
def test_erase_emeter_stats(dev):
dev.sync.update()
assert dev.has_emeter
dev.sync.erase_emeter()
@has_emeter
def test_current_consumption(dev):
dev.sync.update()
if dev.is_strip:
pytest.skip("Disabled for HS300 temporarily")
if dev.has_emeter:
x = dev.sync.current_consumption()
assert isinstance(x, float)
assert x >= 0.0
else:
assert dev.sync.current_consumption() is None
def test_alias(dev):
dev.sync.update()
test_alias = "TEST1234"
original = dev.sync.alias
assert isinstance(original, str)
dev.sync.set_alias(test_alias)
assert dev.sync.alias == test_alias
dev.sync.set_alias(original)
assert dev.sync.alias == original
@plug
def test_led(dev):
dev.sync.update()
original = dev.led
dev.sync.set_led(False)
assert not dev.led
dev.sync.set_led(True)
assert dev.led
dev.sync.set_led(original)
@plug
def test_on_since(dev):
dev.sync.update()
assert isinstance(dev.on_since, datetime.datetime)
def test_icon(dev):
assert set(dev.sync.get_icon().keys()), {"icon", "hash"}
def test_time(dev):
assert isinstance(dev.sync.get_time(), datetime.datetime)
# TODO check setting?
def test_timezone(dev):
TZ_SCHEMA(dev.sync.get_timezone())
def test_hw_info(dev):
dev.sync.update()
PLUG_SCHEMA(dev.hw_info)
def test_location(dev):
dev.sync.update()
PLUG_SCHEMA(dev.location)
def test_rssi(dev):
dev.sync.update()
PLUG_SCHEMA({"rssi": dev.rssi}) # wrapping for vol
def test_mac(dev):
dev.sync.update()
PLUG_SCHEMA({"mac": dev.mac}) # wrapping for val
# TODO check setting?
@non_variable_temp
def test_temperature_on_nonsupporting(dev):
dev.sync.update()
assert dev.valid_temperature_range == (0, 0)
# TODO test when device does not support temperature range
with pytest.raises(SmartDeviceException):
dev.sync.set_color_temp(2700)
with pytest.raises(SmartDeviceException):
print(dev.sync.color_temp)
@variable_temp
def test_out_of_range_temperature(dev):
dev.sync.update()
with pytest.raises(ValueError):
dev.sync.set_color_temp(1000)
with pytest.raises(ValueError):
dev.sync.set_color_temp(10000)
@non_dimmable
def test_non_dimmable(dev):
dev.sync.update()
assert not dev.is_dimmable
with pytest.raises(SmartDeviceException):
assert dev.brightness == 0
with pytest.raises(SmartDeviceException):
dev.sync.set_brightness(100)
@dimmable
@turn_on
def test_dimmable_brightness(dev, turn_on):
handle_turn_on(dev, turn_on)
dev.sync.update()
assert dev.is_dimmable
dev.sync.set_brightness(50)
assert dev.brightness == 50
dev.sync.set_brightness(10)
assert dev.brightness == 10
with pytest.raises(ValueError):
dev.sync.set_brightness("foo")
@dimmable
def test_invalid_brightness(dev):
dev.sync.update()
assert dev.is_dimmable
with pytest.raises(ValueError):
dev.sync.set_brightness(110)
with pytest.raises(ValueError):
dev.sync.set_brightness(-100)
@color_bulb
@turn_on
def test_hsv(dev, turn_on):
handle_turn_on(dev, turn_on)
dev.sync.update()
assert dev.is_color
hue, saturation, brightness = dev.hsv
assert 0 <= hue <= 255
assert 0 <= saturation <= 100
assert 0 <= brightness <= 100
dev.sync.set_hsv(hue=1, saturation=1, value=1)
hue, saturation, brightness = dev.hsv
assert hue == 1
assert saturation == 1
assert brightness == 1
@color_bulb
@turn_on
def test_invalid_hsv(dev, turn_on):
handle_turn_on(dev, turn_on)
dev.sync.update()
assert dev.is_color
for invalid_hue in [-1, 361, 0.5]:
with pytest.raises(ValueError):
dev.sync.set_hsv(invalid_hue, 0, 0)
for invalid_saturation in [-1, 101, 0.5]:
with pytest.raises(ValueError):
dev.sync.set_hsv(0, invalid_saturation, 0)
for invalid_brightness in [-1, 101, 0.5]:
with pytest.raises(ValueError):
dev.sync.set_hsv(0, 0, invalid_brightness)
@non_color_bulb
def test_hsv_on_non_color(dev):
dev.sync.update()
assert not dev.is_color
with pytest.raises(SmartDeviceException):
dev.sync.set_hsv(0, 0, 0)
with pytest.raises(SmartDeviceException):
print(dev.hsv)
@variable_temp
@turn_on
def test_try_set_colortemp(dev, turn_on):
dev.sync.update()
handle_turn_on(dev, turn_on)
dev.sync.set_color_temp(2700)
assert dev.sync.color_temp == 2700
@non_variable_temp
def test_non_variable_temp(dev):
with pytest.raises(SmartDeviceException):
dev.sync.update()
dev.sync.set_color_temp(2700)
@strip
@turn_on
def test_children_change_state(dev, turn_on):
dev.sync.update()
handle_turn_on(dev, turn_on)
for plug in dev.plugs:
plug.sync.update()
orig_state = plug.is_on
if orig_state:
plug.turn_off()
plug.sync.update()
assert not plug.is_on
assert plug.is_off
plug.sync.turn_on()
plug.sync.update()
assert plug.is_on
assert not plug.is_off
else:
plug.sync.turn_on()
plug.sync.update()
assert plug.is_on
assert not plug.is_off
plug.sync.turn_off()
plug.sync.update()
assert not plug.is_on
assert plug.is_off
@strip
def test_children_alias(dev):
test_alias = "TEST1234"
for plug in dev.plugs:
plug.sync.update()
original = plug.alias
plug.sync.set_alias(alias=test_alias)
plug.sync.update()
assert plug.alias == test_alias
plug.sync.set_alias(alias=original)
plug.sync.update()
assert plug.alias == original
@strip
def test_children_on_since(dev):
for plug in dev.plugs:
plug.sync.update()
assert plug.on_since
@pytest.mark.skip("this test will wear out your relays")
def test_all_binary_states(dev):
# test every binary state
for state in range(2 ** dev.num_children):
# create binary state map
state_map = {}
for plug_index in range(dev.num_children):
state_map[plug_index] = bool((state >> plug_index) & 1)
if state_map[plug_index]:
dev.sync.turn_on(index=plug_index)
else:
dev.sync.turn_off(index=plug_index)
# check state map applied
for index, state in dev.is_on.items():
assert state_map[index] == state
# toggle each outlet with state map applied
for plug_index in range(dev.num_children):
# toggle state
if state_map[plug_index]:
dev.sync.turn_off(index=plug_index)
else:
dev.sync.turn_on(index=plug_index)
# only target outlet should have state changed
for index, state in dev.is_on.items():
if index == plug_index:
assert state != state_map[index]
else:
assert state == state_map[index]
# reset state
if state_map[plug_index]:
dev.sync.turn_on(index=plug_index)
else:
dev.sync.turn_off(index=plug_index)
# original state map should be restored
for index, state in dev.is_on.items():
assert state == state_map[index]
@strip
def test_children_get_emeter_realtime(dev):
dev.sync.update()
assert dev.has_emeter
# test with index
for plug in dev.plugs:
plug.sync.update()
emeter = plug.sync.get_emeter_realtime()
CURRENT_CONSUMPTION_SCHEMA(emeter)
# test without index
# TODO test that sum matches the sum of individiaul plugs.
# for index, emeter in dev.sync.get_emeter_realtime().items():
# CURRENT_CONSUMPTION_SCHEMA(emeter)
@strip
def test_children_get_emeter_daily(dev):
dev.sync.update()
assert dev.has_emeter
# test individual emeters
for plug in dev.plugs:
plug.sync.update()
emeter = plug.sync.get_emeter_daily(year=1900, month=1)
assert emeter == {}
emeter = plug.sync.get_emeter_daily()
assert len(emeter) > 0
k, v = emeter.popitem()
assert isinstance(k, int)
assert isinstance(v, float)
# test sum of emeters
all_emeter = dev.sync.get_emeter_daily(year=1900, month=1)
k, v = all_emeter.popitem()
assert isinstance(k, int)
assert isinstance(v, float)
@strip
def test_children_get_emeter_monthly(dev):
dev.sync.update()
assert dev.has_emeter
# test individual emeters
for plug in dev.plugs:
plug.sync.update()
emeter = plug.sync.get_emeter_monthly(year=1900)
assert emeter == {}
emeter = plug.sync.get_emeter_monthly()
assert len(emeter) > 0
k, v = emeter.popitem()
assert isinstance(k, int)
assert isinstance(v, float)
# test sum of emeters
all_emeter = dev.sync.get_emeter_monthly(year=1900)
k, v = all_emeter.popitem()
assert isinstance(k, int)
assert isinstance(v, float)
# def test_cache(dev):
# from datetime import timedelta
# dev.sync.cache_ttl = timedelta(seconds=3)
# with patch.object(
# FakeTransportProtocol, "query", wraps=dev.protocol.query
# ) as query_mock:
# CHECK_COUNT = 1
# # Smartstrip calls sysinfo in its __init__ to request children, so
# # the even first get call here will get its results from the cache.
# if dev.is_strip:
# CHECK_COUNT = 0
# dev.sys_info
# assert query_mock.call_count == CHECK_COUNT
# dev.sys_info
# assert query_mock.call_count == CHECK_COUNT
# def test_cache_invalidates(dev):
# from datetime import timedelta
# dev.sync.cache_ttl = timedelta(seconds=0)
# with patch.object(
# FakeTransportProtocol, "query", wraps=dev.protocol.query
# ) as query_mock:
# dev.sys_info
# assert query_mock.call_count == 1
# dev.sys_info
# assert query_mock.call_count == 2
# # assert query_mock.called_once()
def test_representation(dev):
import re
pattern = re.compile("<.* model .* at .* (.*), is_on: .* - dev specific: .*>")
assert pattern.match(str(dev))

View File

@@ -0,0 +1,73 @@
import json
from unittest import TestCase
from ..protocol import TPLinkSmartHomeProtocol
class TestTPLinkSmartHomeProtocol(TestCase):
def test_encrypt(self):
d = json.dumps({"foo": 1, "bar": 2})
encrypted = TPLinkSmartHomeProtocol.encrypt(d)
# encrypt adds a 4 byte header
encrypted = encrypted[4:]
self.assertEqual(d, TPLinkSmartHomeProtocol.decrypt(encrypted))
def test_encrypt_unicode(self):
d = "{'snowman': '\u2603'}"
e = bytes(
[
208,
247,
132,
234,
133,
242,
159,
254,
144,
183,
141,
173,
138,
104,
240,
115,
84,
41,
]
)
encrypted = TPLinkSmartHomeProtocol.encrypt(d)
# encrypt adds a 4 byte header
encrypted = encrypted[4:]
self.assertEqual(e, encrypted)
def test_decrypt_unicode(self):
e = bytes(
[
208,
247,
132,
234,
133,
242,
159,
254,
144,
183,
141,
173,
138,
104,
240,
115,
84,
41,
]
)
d = "{'snowman': '\u2603'}"
self.assertEqual(d, TPLinkSmartHomeProtocol.decrypt(e))

2
kasa/version.py Normal file
View File

@@ -0,0 +1,2 @@
# flake8: noqa
__version__ = "0.4.0.dev0"