General cleanups all around (janitoring) (#63)

* Move tests to device-type specific test files to make improvements more approachable

* protocol: remove the port parameter from query, as there are no other known ports, fix docstrings

* Revise docstrings, remove superfluous information and remove unused methods ({set,get_icon} and set_time)

* cli: indent device output to make it more easily readable when having multiple devices

* remove adjust flake8 ignores (we have no setup.py anymore)

* pyproject: include cli tool to coverage, add config for interrogate (docstring coverage)

* bulb: raise exception on color_temp error cases instead of returning zero values

* improve bulb tests, simplify conftest

* strip: rename plugs property to children and move it to smartdevice
This commit is contained in:
Teemu R 2020-05-27 16:55:18 +02:00 committed by GitHub
parent 836f1701b9
commit 644a10a0d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 762 additions and 969 deletions

View File

@ -3,6 +3,5 @@ exclude = .git,.tox,__pycache__,kasa/tests/newfakes.py,kasa/tests/test_fixtures.
max-line-length = 88
per-file-ignores =
kasa/tests/*.py:D100,D101,D102,D103,D104
setup.py:D100
ignore = D105, D107, E203, E501, W503
max-complexity = 18

View File

@ -42,7 +42,7 @@ pass_dev = click.make_pass_decorator(SmartDevice)
@click.version_option()
@click.pass_context
async def cli(ctx, host, alias, target, debug, bulb, plug, strip):
"""A cli tool for controlling TP-Link smart home plugs.""" # noqa
"""A tool for controlling TP-Link smart home devices.""" # noqa
if debug:
logging.basicConfig(level=logging.DEBUG)
else:
@ -214,37 +214,44 @@ async def state(ctx, dev: SmartDevice):
"""Print out device state and versions."""
await dev.update()
click.echo(click.style(f"== {dev.alias} - {dev.model} ==", bold=True))
click.echo(f"\tHost: {dev.host}")
click.echo(
click.style(
"Device state: {}".format("ON" if dev.is_on else "OFF"),
"\tDevice state: {}\n".format("ON" if dev.is_on else "OFF"),
fg="green" if dev.is_on else "red",
)
)
if dev.is_strip:
for plug in dev.plugs: # type: ignore
click.echo(click.style("\t== Plugs ==", bold=True))
for plug in dev.children: # type: ignore
is_on = plug.is_on
alias = plug.alias
click.echo(
click.style(
" * Socket '{}' state: {} on_since: {}".format(
"\t* Socket '{}' state: {} on_since: {}".format(
alias, ("ON" if is_on else "OFF"), plug.on_since
),
fg="green" if is_on else "red",
)
)
click.echo()
click.echo(f"Host/IP: {dev.host}")
click.echo(click.style("\t== Generic information ==", bold=True))
click.echo(f"\tTime: {await dev.get_time()}")
click.echo(f"\tHardware: {dev.hw_info['hw_ver']}")
click.echo(f"\tSoftware: {dev.hw_info['sw_ver']}")
click.echo(f"\tMAC (rssi): {dev.mac} ({dev.rssi})")
click.echo(f"\tLocation: {dev.location}")
click.echo(click.style("\n\t== Device specific information ==", bold=True))
for k, v in dev.state_information.items():
click.echo(f"{k}: {v}")
click.echo(click.style("== Generic information ==", bold=True))
click.echo(f"Time: {await dev.get_time()}")
click.echo(f"Hardware: {dev.hw_info['hw_ver']}")
click.echo(f"Software: {dev.hw_info['sw_ver']}")
click.echo(f"MAC (rssi): {dev.mac} ({dev.rssi})")
click.echo(f"Location: {dev.location}")
click.echo(f"\t{k}: {v}")
click.echo()
await ctx.invoke(emeter)
if dev.has_emeter:
click.echo(click.style("\n\t== Current State ==", bold=True))
emeter_status = dev.emeter_realtime
click.echo(f"\t{emeter_status}")
@cli.command()
@ -267,7 +274,7 @@ async def alias(dev, new_alias, index):
click.echo(f"Alias: {dev.alias}")
if dev.is_strip:
for plug in dev.plugs:
for plug in dev.children:
click.echo(f" * {plug.alias}")

View File

@ -62,6 +62,7 @@ class _DiscoverProtocol(asyncio.DatagramProtocol):
self.transport.sendto(encrypted_req[4:], self.target) # type: ignore
def datagram_received(self, data, addr) -> None:
"""Handle discovery responses."""
ip, port = addr
if ip in self.discovered_devices:
return
@ -82,10 +83,11 @@ class _DiscoverProtocol(asyncio.DatagramProtocol):
_LOGGER.error("Received invalid response: %s", info)
def error_received(self, ex):
"""Handle asyncio.Protocol errors."""
_LOGGER.error("Got error: %s", ex)
def connection_lost(self, ex):
pass
"""NOP implementation of connection lost."""
class Discover:

View File

@ -14,7 +14,7 @@ import json
import logging
import struct
from pprint import pformat as pf
from typing import Any, Dict, Union
from typing import Dict, Union
_LOGGER = logging.getLogger(__name__)
@ -27,13 +27,10 @@ class TPLinkSmartHomeProtocol:
DEFAULT_TIMEOUT = 5
@staticmethod
async def query(
host: str, request: Union[str, Dict], port: int = DEFAULT_PORT
) -> Any:
async def query(host: str, request: Union[str, Dict]) -> Dict:
"""Request information from a TP-Link SmartHome Device.
:param str host: host name or ip address of the device
:param int port: port on the device (default: 9999)
:param request: command to send to the device (can be either dict or
json string)
:return: response dict
@ -44,7 +41,7 @@ class TPLinkSmartHomeProtocol:
timeout = TPLinkSmartHomeProtocol.DEFAULT_TIMEOUT
writer = None
try:
task = asyncio.open_connection(host, port)
task = asyncio.open_connection(host, TPLinkSmartHomeProtocol.DEFAULT_PORT)
reader, writer = await asyncio.wait_for(task, timeout=timeout)
_LOGGER.debug("> (%i) %s", len(request), request)
writer.write(TPLinkSmartHomeProtocol.encrypt(request))
@ -75,11 +72,10 @@ class TPLinkSmartHomeProtocol:
@staticmethod
def encrypt(request: str) -> bytes:
"""
Encrypt a request for a TP-Link Smart Home Device.
"""Encrypt a request for a TP-Link Smart Home Device.
:param request: plaintext request data
:return: ciphertext request
:return: ciphertext to be send over wire, in bytes
"""
key = TPLinkSmartHomeProtocol.INITIALIZATION_VECTOR
@ -95,8 +91,7 @@ class TPLinkSmartHomeProtocol:
@staticmethod
def decrypt(ciphertext: bytes) -> str:
"""
Decrypt a response of a TP-Link Smart Home Device.
"""Decrypt a response of a TP-Link Smart Home Device.
:param ciphertext: encrypted response data
:return: plaintext response

View File

@ -1,4 +1,4 @@
"""Module for bulbs."""
"""Module for bulbs (LB*, KL*, KB*)."""
import re
from typing import Any, Dict, Tuple, cast
@ -82,34 +82,21 @@ class SmartBulb(SmartDevice):
@property # type: ignore
@requires_update
def is_color(self) -> bool:
"""Whether the bulb supports color changes.
:return: True if the bulb supports color changes, False otherwise
:rtype: bool
"""
"""Whether the bulb supports color changes."""
sys_info = self.sys_info
return bool(sys_info["is_color"])
@property # type: ignore
@requires_update
def is_dimmable(self) -> bool:
"""Whether the bulb supports brightness changes.
:return: True if the bulb supports brightness changes, False otherwise
:rtype: bool
"""
"""Whether the bulb supports brightness changes."""
sys_info = self.sys_info
return bool(sys_info["is_dimmable"])
@property # type: ignore
@requires_update
def is_variable_color_temp(self) -> bool:
"""Whether the bulb supports color temperature changes.
:return: True if the bulb supports color temperature changes, False
otherwise
:rtype: bool
"""
"""Whether the bulb supports color temperature changes."""
sys_info = self.sys_info
return bool(sys_info["is_variable_color_temp"])
@ -118,16 +105,18 @@ class SmartBulb(SmartDevice):
def valid_temperature_range(self) -> Tuple[int, int]:
"""Return the device-specific white temperature range (in Kelvin).
:return: White temperature range in Kelvin (minimun, maximum)
:rtype: tuple
:return: White temperature range in Kelvin (minimum, maximum)
"""
if not self.is_variable_color_temp:
return (0, 0)
raise SmartDeviceException("Color temperature not supported")
for model, temp_range in TPLINK_KELVIN.items():
sys_info = self.sys_info
if re.match(model, sys_info["model"]):
return temp_range
return (0, 0)
raise SmartDeviceException(
"Unknown color temperature range, please open an issue on github"
)
@property # type: ignore
@requires_update
@ -166,7 +155,6 @@ class SmartBulb(SmartDevice):
"""Return the current HSV state of the bulb.
:return: hue, saturation and value (degrees, %, %)
:rtype: tuple
"""
if not self.is_color:
raise SmartDeviceException("Bulb does not support color.")
@ -220,11 +208,7 @@ class SmartBulb(SmartDevice):
@property # type: ignore
@requires_update
def color_temp(self) -> int:
"""Return color temperature of the device.
:return: Color temperature in Kelvin
:rtype: int
"""
"""Return color temperature of the device in kelvin."""
if not self.is_variable_color_temp:
raise SmartDeviceException("Bulb does not support colortemp.")
@ -233,10 +217,7 @@ class SmartBulb(SmartDevice):
@requires_update
async def set_color_temp(self, temp: int) -> None:
"""Set the color temperature of the device.
:param int temp: The new color temperature, in Kelvin
"""
"""Set the color temperature of the device in kelvin."""
if not self.is_variable_color_temp:
raise SmartDeviceException("Bulb does not support colortemp.")
@ -253,11 +234,7 @@ class SmartBulb(SmartDevice):
@property # type: ignore
@requires_update
def brightness(self) -> int:
"""Return the current brightness.
:return: brightness in percent
:rtype: int
"""
"""Return the current brightness in percentage."""
if not self.is_dimmable: # pragma: no cover
raise SmartDeviceException("Bulb is not dimmable.")
@ -266,10 +243,7 @@ class SmartBulb(SmartDevice):
@requires_update
async def set_brightness(self, brightness: int) -> None:
"""Set the brightness.
:param int brightness: brightness in percent
"""
"""Set the brightness in percentage."""
if not self.is_dimmable: # pragma: no cover
raise SmartDeviceException("Bulb is not dimmable.")
@ -281,11 +255,7 @@ class SmartBulb(SmartDevice):
@property # type: ignore
@requires_update
def state_information(self) -> Dict[str, Any]:
"""Return bulb-specific state information.
:return: Bulb information dict, keys in user-presentable form.
:rtype: dict
"""
"""Return bulb-specific state information."""
info: Dict[str, Any] = {
"Brightness": self.brightness,
"Is dimmable": self.is_dimmable,

View File

@ -127,7 +127,6 @@ class SmartDevice:
"""Create a new SmartDevice instance.
:param str host: host name or ip address on which the device listens
:param child_id: optional child ID for context in a parent device
"""
self.host = host
@ -141,6 +140,8 @@ class SmartDevice:
self._last_update: Any = None
self._sys_info: Any = None # TODO: this is here to avoid changing tests
self.children: List["SmartDevice"] = []
def _create_request(
self, target: str, cmd: str, arg: Optional[Dict] = None, child_ids=None
):
@ -153,15 +154,13 @@ class SmartDevice:
async def _query_helper(
self, target: str, cmd: str, arg: Optional[Dict] = None, child_ids=None
) -> Any:
"""Handle result unwrapping and error handling.
"""Query device, return results or raise an exception.
:param target: Target system {system, time, emeter, ..}
:param cmd: Command to execute
:param arg: JSON object passed as parameter to the command
:param arg: payload dict to be send to the device
:param child_ids: ids of child devices
:return: Unwrapped result for the call.
:rtype: dict
:raises SmartDeviceException: if command was not executed correctly
"""
request = self._create_request(target, cmd, arg, child_ids)
@ -191,22 +190,13 @@ class SmartDevice:
@property # type: ignore
@requires_update
def has_emeter(self) -> bool:
"""Return whether device has an energy meter.
:return: True if energy meter is available
False otherwise
"""
"""Return True if device has an energy meter."""
sys_info = self.sys_info
features = sys_info["feature"].split(":")
return "ENE" in features
async def get_sys_info(self) -> Dict[str, Any]:
"""Retrieve system information.
:return: sysinfo
:rtype dict
:raises SmartDeviceException: on error
"""
"""Retrieve system information."""
return await self._query_helper("system", "get_sysinfo")
async def update(self):
@ -227,78 +217,29 @@ class SmartDevice:
@property # type: ignore
@requires_update
def sys_info(self) -> Dict[str, Any]:
"""Retrieve system information.
:return: sysinfo
:rtype dict
:raises SmartDeviceException: on error
"""
"""Return system information."""
return self._sys_info # type: ignore
@property # type: ignore
@requires_update
def model(self) -> str:
"""Return device model.
:return: device model
:rtype: str
:raises SmartDeviceException: on error
"""
"""Return device model."""
sys_info = self.sys_info
return str(sys_info["model"])
@property # type: ignore
@requires_update
def alias(self) -> str:
"""Return device name (alias).
:return: Device name aka alias.
:rtype: str
"""
"""Return device name (alias)."""
sys_info = self.sys_info
return str(sys_info["alias"])
async def set_alias(self, alias: str) -> None:
"""Set the device name (alias).
:param alias: New alias (name)
:raises SmartDeviceException: on error
"""
"""Set the device name (alias)."""
return await self._query_helper("system", "set_dev_alias", {"alias": alias})
async def get_icon(self) -> Dict:
"""Return device icon.
Note: not working on HS110, but is always empty.
:return: icon and its hash
:rtype: dict
:raises SmartDeviceException: on error
"""
return await self._query_helper("system", "get_dev_icon")
def set_icon(self, icon: str) -> None:
"""Set device icon.
Content for hash and icon are unknown.
:param str icon: Icon path(?)
:raises NotImplementedError: when not implemented
:raises SmartPlugError: on error
"""
raise NotImplementedError()
# here just for the sake of completeness
# await self._query_helper("system",
# "set_dev_icon", {"icon": "", "hash": ""})
# self.initialize()
async def get_time(self) -> Optional[datetime]:
"""Return current time from the device.
:return: datetime for device's time
:rtype: datetime or None when not available
:raises SmartDeviceException: on error
"""
"""Return current time from the device, if available."""
try:
res = await self._query_helper("time", "get_time")
return datetime(
@ -312,46 +253,8 @@ class SmartDevice:
except SmartDeviceException:
return None
async def set_time(self, ts: datetime) -> None:
"""Set the device time.
Note: this calls set_timezone() for setting.
:param datetime ts: New date and time
:return: result
:type: dict
:raises NotImplemented: when not implemented.
:raises SmartDeviceException: on error
"""
raise NotImplementedError("Fails with err_code == 0 with HS110.")
"""
here just for the sake of completeness.
if someone figures out why it doesn't work,
please create a PR :-)
ts_obj = {
"index": self.timezone["index"],
"hour": ts.hour,
"min": ts.minute,
"sec": ts.second,
"year": ts.year,
"month": ts.month,
"mday": ts.day,
}
response = await self._query_helper("time", "set_timezone", ts_obj)
self.initialize()
return response
"""
async def get_timezone(self) -> Dict:
"""Return timezone information.
:return: Timezone information
:rtype: dict
:raises SmartDeviceException: on error
"""
"""Return timezone information."""
return await self._query_helper("time", "get_timezone")
@property # type: ignore
@ -359,8 +262,7 @@ class SmartDevice:
def hw_info(self) -> Dict:
"""Return hardware information.
:return: Information about hardware
:rtype: dict
This returns just a selection of sysinfo keys that are related to hardware.
"""
keys = [
"sw_ver",
@ -380,11 +282,7 @@ class SmartDevice:
@property # type: ignore
@requires_update
def location(self) -> Dict:
"""Return geographical location.
:return: latitude and longitude
:rtype: dict
"""
"""Return geographical location."""
sys_info = self.sys_info
loc = {"latitude": None, "longitude": None}
@ -402,11 +300,7 @@ class SmartDevice:
@property # type: ignore
@requires_update
def rssi(self) -> Optional[int]:
"""Return WiFi signal strenth (rssi).
:return: rssi
:rtype: int
"""
"""Return WiFi signal strenth (rssi)."""
sys_info = self.sys_info
if "rssi" in sys_info:
return int(sys_info["rssi"])
@ -418,7 +312,6 @@ class SmartDevice:
"""Return mac address.
:return: mac address in hexadecimal with colons, e.g. 01:23:45:67:89:ab
:rtype: str
"""
sys_info = self.sys_info
@ -437,26 +330,20 @@ class SmartDevice:
"""Set the mac address.
:param str mac: mac in hexadecimal with colons, e.g. 01:23:45:67:89:ab
:raises SmartDeviceException: on error
"""
return await self._query_helper("system", "set_mac_addr", {"mac": mac})
@property # type: ignore
@requires_update
def emeter_realtime(self) -> EmeterStatus:
"""Return current emeter status."""
"""Return current energy readings."""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
return EmeterStatus(self._last_update[self.emeter_type]["get_realtime"])
async def get_emeter_realtime(self) -> EmeterStatus:
"""Retrieve current energy readings.
:returns: current readings or False
:rtype: dict, None
:raises SmartDeviceException: on error
"""
"""Retrieve current energy readings."""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
@ -549,8 +436,6 @@ class SmartDevice:
month)
:param kwh: return usage in kWh (default: True)
:return: mapping of day of month to value
:rtype: dict
:raises SmartDeviceException: on error
"""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
@ -573,8 +458,6 @@ class SmartDevice:
:param year: year for which to retrieve statistics (default: this year)
:param kwh: return usage in kWh (default: True)
:return: dict: mapping of month to value
:rtype: dict
:raises SmartDeviceException: on error
"""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
@ -589,12 +472,8 @@ class SmartDevice:
return self._emeter_convert_emeter_data(response["month_list"], kwh)
@requires_update
async def erase_emeter_stats(self):
"""Erase energy meter statistics.
:return: True if statistics were deleted
:raises SmartDeviceException: on error
"""
async def erase_emeter_stats(self) -> Dict:
"""Erase energy meter statistics."""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
@ -602,11 +481,7 @@ class SmartDevice:
@requires_update
async def current_consumption(self) -> float:
"""Get the current power consumption in Watt.
:return: the current power consumption in Watts.
:raises SmartDeviceException: on error
"""
"""Get the current power consumption in Watt."""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
@ -618,9 +493,6 @@ class SmartDevice:
Note that giving a delay of zero causes this to block,
as the device reboots immediately without responding to the call.
:param delay: Delay the reboot for `delay` seconds.
:return: None
"""
await self._query_helper("system", "reboot", {"delay": delay})
@ -631,11 +503,7 @@ class SmartDevice:
@property # type: ignore
@requires_update
def is_off(self) -> bool:
"""Return True if device is off.
:return: True if device is off, False otherwise.
:rtype: bool
"""
"""Return True if device is off."""
return not self.is_on
async def turn_on(self) -> None:
@ -645,21 +513,13 @@ class SmartDevice:
@property # type: ignore
@requires_update
def is_on(self) -> bool:
"""Return if the device is on.
:return: True if the device is on, False otherwise.
:rtype: bool
:return:
"""
"""Return True if the device is on."""
raise NotImplementedError("Device subclass needs to implement this.")
@property # type: ignore
@requires_update
def on_since(self) -> Optional[datetime]:
"""Return pretty-printed on-time, if available.
Returns None if the device is turned off or does not report it.
"""
"""Return pretty-printed on-time, or None if not available."""
if "on_time" not in self.sys_info:
return None
@ -673,11 +533,7 @@ class SmartDevice:
@property # type: ignore
@requires_update
def state_information(self) -> Dict[str, Any]:
"""Return device-type specific, end-user friendly state information.
:return: dict with state information.
:rtype: dict
"""
"""Return device-type specific, end-user friendly state information."""
raise NotImplementedError("Device subclass needs to implement this.")
@property # type: ignore
@ -726,6 +582,22 @@ class SmartDevice:
)
return await _join("smartlife.iot.common.softaponboarding", payload)
def get_plug_by_name(self, name: str) -> "SmartDevice":
"""Return child device for the given name."""
for p in self.children:
if p.alias == name:
return p
raise SmartDeviceException(f"Device has no child with {name}")
def get_plug_by_index(self, index: int) -> "SmartDevice":
"""Return child device for the given index."""
if index + 1 > len(self.children) or index < 0:
raise SmartDeviceException(
f"Invalid index {index}, device has {len(self.children)} plugs"
)
return self.children[index]
@property
def device_type(self) -> DeviceType:
"""Return the device type."""

View File

@ -33,9 +33,6 @@ class SmartDimmer(SmartPlug):
"""Return current brightness on dimmers.
Will return a range between 0 - 100.
:returns: integer
:rtype: int
"""
if not self.is_dimmable:
raise SmartDeviceException("Device is not dimmable.")
@ -45,15 +42,7 @@ class SmartDimmer(SmartPlug):
@requires_update
async def set_brightness(self, value: int):
"""Set the new dimmer brightness level.
Note:
When setting brightness, if the light is not
already on, it will be turned on automatically.
:param value: integer between 0 and 100
"""
"""Set the new dimmer brightness level in percentage."""
if not self.is_dimmable:
raise SmartDeviceException("Device is not dimmable.")
@ -68,23 +57,15 @@ class SmartDimmer(SmartPlug):
@property # type: ignore
@requires_update
def is_dimmable(self):
"""Whether the switch supports brightness changes.
:return: True if switch supports brightness changes, False otherwise
:rtype: bool
"""
def is_dimmable(self) -> bool:
"""Whether the switch supports brightness changes."""
sys_info = self.sys_info
return "brightness" in sys_info
@property # type: ignore
@requires_update
def state_information(self) -> Dict[str, Any]:
"""Return switch-specific state information.
:return: Switch information dict, keys in user-presentable form.
:rtype: dict
"""
"""Return switch-specific state information."""
info = super().state_information
info["Brightness"] = self.brightness

View File

@ -1,4 +1,4 @@
"""Module for plugs."""
"""Module for smart plugs (HS100, HS110, ..)."""
import logging
from typing import Any, Dict
@ -38,44 +38,27 @@ class SmartPlug(SmartDevice):
@property # type: ignore
@requires_update
def is_on(self) -> bool:
"""Return whether device is on.
:return: True if device is on, False otherwise
"""
"""Return whether device is on."""
sys_info = self.sys_info
return bool(sys_info["relay_state"])
async def turn_on(self):
"""Turn the switch on.
:raises SmartDeviceException: on error
"""
"""Turn the switch on."""
return await self._query_helper("system", "set_relay_state", {"state": 1})
async def turn_off(self):
"""Turn the switch off.
:raises SmartDeviceException: on error
"""
"""Turn the switch off."""
return await self._query_helper("system", "set_relay_state", {"state": 0})
@property # type: ignore
@requires_update
def led(self) -> bool:
"""Return the state of the led.
:return: True if led is on, False otherwise
:rtype: bool
"""
"""Return the state of the led."""
sys_info = self.sys_info
return bool(1 - sys_info["led_off"])
async def set_led(self, state: bool):
"""Set the state of the led (night mode).
:param bool state: True to set led on, False to set led off
:raises SmartDeviceException: on error
"""
"""Set the state of the led (night mode)."""
return await self._query_helper(
"system", "set_led_off", {"off": int(not state)}
)
@ -83,10 +66,6 @@ class SmartPlug(SmartDevice):
@property # type: ignore
@requires_update
def state_information(self) -> Dict[str, Any]:
"""Return switch-specific state information.
:return: Switch information dict, keys in user-presentable form.
:rtype: dict
"""
"""Return switch-specific state information."""
info = {"LED state": self.led, "On since": self.on_since}
return info

View File

@ -1,11 +1,8 @@
"""Module for multi-socket devices (HS300, HS107).
.. todo:: describe how this interfaces with single plugs.
"""
"""Module for multi-socket devices (HS300, HS107, KP303, ..)."""
import logging
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Any, DefaultDict, Dict, List, Optional
from typing import Any, DefaultDict, Dict, Optional
from kasa.smartdevice import (
DeviceType,
@ -49,13 +46,12 @@ class SmartStrip(SmartDevice):
super().__init__(host=host)
self.emeter_type = "emeter"
self._device_type = DeviceType.Strip
self.plugs: List[SmartStripPlug] = []
@property # type: ignore
@requires_update
def is_on(self) -> bool:
"""Return if any of the outlets are on."""
for plug in self.plugs:
for plug in self.children:
is_on = plug.is_on
if is_on:
return True
@ -69,46 +65,24 @@ class SmartStrip(SmartDevice):
await super().update()
# Initialize the child devices during the first update.
if not self.plugs:
if not self.children:
children = self.sys_info["children"]
_LOGGER.debug("Initializing %s child sockets", len(children))
for child in children:
self.plugs.append(
self.children.append(
SmartStripPlug(self.host, parent=self, child_id=child["id"])
)
async def turn_on(self):
"""Turn the strip on.
:raises SmartDeviceException: on error
"""
"""Turn the strip on."""
await self._query_helper("system", "set_relay_state", {"state": 1})
await self.update()
async def turn_off(self):
"""Turn the strip off.
:raises SmartDeviceException: on error
"""
"""Turn the strip off."""
await self._query_helper("system", "set_relay_state", {"state": 0})
await self.update()
def get_plug_by_name(self, name: str) -> "SmartStripPlug":
"""Return child plug for given name."""
for p in self.plugs:
if p.alias == name:
return p
raise SmartDeviceException(f"Device has no child with {name}")
def get_plug_by_index(self, index: int) -> "SmartStripPlug":
"""Return child plug for given index."""
if index + 1 > len(self.plugs) or index < 0:
raise SmartDeviceException(
f"Invalid index {index}, device has {len(self.plugs)} plugs"
)
return self.plugs[index]
@property # type: ignore
@requires_update
def on_since(self) -> Optional[datetime]:
@ -116,25 +90,17 @@ class SmartStrip(SmartDevice):
if self.is_off:
return None
return max(plug.on_since for plug in self.plugs if plug.on_since is not None)
return max(plug.on_since for plug in self.children if plug.on_since is not None)
@property # type: ignore
@requires_update
def led(self) -> bool:
"""Return the state of the led.
:return: True if led is on, False otherwise
:rtype: bool
"""
"""Return the state of the led."""
sys_info = self.sys_info
return bool(1 - sys_info["led_off"])
async def set_led(self, state: bool):
"""Set the state of the led (night mode).
:param bool state: True to set led on, False to set led off
:raises SmartDeviceException: on error
"""
"""Set the state of the led (night mode)."""
await self._query_helper("system", "set_led_off", {"off": int(not state)})
await self.update()
@ -144,38 +110,23 @@ class SmartStrip(SmartDevice):
"""Return strip-specific state information.
:return: Strip information dict, keys in user-presentable form.
:rtype: dict
"""
return {
"LED state": self.led,
"Childs count": len(self.plugs),
"Childs count": len(self.children),
"On since": self.on_since,
}
async def current_consumption(self) -> float:
"""Get the current power consumption in watts.
:return: the current power consumption in watts.
:rtype: float
:raises SmartDeviceException: on error
"""
consumption = sum([await plug.current_consumption() for plug in self.plugs])
"""Get the current power consumption in watts."""
consumption = sum([await plug.current_consumption() for plug in self.children])
return consumption
async def get_icon(self) -> Dict:
"""Icon for the device.
Overriden to keep the API, as the SmartStrip and children do not
have icons, we just return dummy strings.
"""
return {"icon": "SMARTSTRIP-DUMMY", "hash": "SMARTSTRIP-DUMMY"}
async def set_alias(self, alias: str) -> None:
"""Set the alias for the strip.
:param alias: new alias
:raises SmartDeviceException: on error
"""
return await super().set_alias(alias)
@ -190,11 +141,9 @@ class SmartStrip(SmartDevice):
month)
:param kwh: return usage in kWh (default: True)
:return: mapping of day of month to value
:rtype: dict
:raises SmartDeviceException: on error
"""
emeter_daily: DefaultDict[int, float] = defaultdict(lambda: 0.0)
for plug in self.plugs:
for plug in self.children:
plug_emeter_daily = await plug.get_emeter_daily(
year=year, month=month, kwh=kwh
)
@ -208,12 +157,9 @@ class SmartStrip(SmartDevice):
:param year: year for which to retrieve statistics (default: this year)
:param kwh: return usage in kWh (default: True)
:return: dict: mapping of month to value
:rtype: dict
:raises SmartDeviceException: on error
"""
emeter_monthly: DefaultDict[int, float] = defaultdict(lambda: 0.0)
for plug in self.plugs:
for plug in self.children:
plug_emeter_monthly = await plug.get_emeter_monthly(year=year, kwh=kwh)
for month, value in plug_emeter_monthly:
emeter_monthly[month] += value
@ -222,11 +168,8 @@ class SmartStrip(SmartDevice):
@requires_update
async def erase_emeter_stats(self):
"""Erase energy meter statistics for all plugs.
:raises SmartDeviceException: on error
"""
for plug in self.plugs:
"""Erase energy meter statistics for all plugs."""
for plug in self.children:
await plug.erase_emeter_stats()
@ -267,10 +210,7 @@ class SmartStripPlug(SmartPlug):
@property # type: ignore
@requires_update
def is_on(self) -> bool:
"""Return whether device is on.
:return: True if device is on, False otherwise
"""
"""Return whether device is on."""
info = self._get_child_info()
return info["state"]
@ -280,9 +220,6 @@ class SmartStripPlug(SmartPlug):
"""Return the state of the led.
This is always false for subdevices.
:return: True if led is on, False otherwise
:rtype: bool
"""
return False
@ -304,11 +241,7 @@ class SmartStripPlug(SmartPlug):
@property # type: ignore
@requires_update
def alias(self) -> str:
"""Return device name (alias).
:return: Device name aka alias.
:rtype: str
"""
"""Return device name (alias)."""
info = self._get_child_info()
return info["alias"]
@ -322,11 +255,7 @@ class SmartStripPlug(SmartPlug):
@property # type: ignore
@requires_update
def on_since(self) -> Optional[datetime]:
"""Return pretty-printed on-time.
:return: datetime for on since
:rtype: datetime
"""
"""Return on-time, if available."""
if self.is_off:
return None
@ -338,21 +267,14 @@ class SmartStripPlug(SmartPlug):
@property # type: ignore
@requires_update
def model(self) -> str:
"""Return device model for a child socket.
:return: device model
:rtype: str
:raises SmartDeviceException: on error
"""
"""Return device model for a child socket."""
sys_info = self.parent.sys_info
return f"Socket for {sys_info['model']}"
def _get_child_info(self) -> Dict:
"""Return the subdevice information for this device.
:raises SmartDeviceException: if the information is not found.
"""
"""Return the subdevice information for this device."""
for plug in self.parent.sys_info["children"]:
if plug["id"] == self.child_id:
return plug
raise SmartDeviceException(f"Unable to find children {self.child_id}")

View File

@ -17,12 +17,14 @@ SUPPORTED_DEVICES = glob.glob(
BULBS = {"KL60", "LB100", "LB120", "LB130", "KL120", "KL130"}
VARIABLE_TEMP = {"LB120", "LB130", "KL120", "KL130"}
COLOR_BULBS = {"LB130", "KL130"}
PLUGS = {"HS100", "HS103", "HS105", "HS110", "HS200", "HS210"}
STRIPS = {"HS107", "HS300", "KP303", "KP400"}
DIMMERS = {"HS220"}
COLOR_BULBS = {"LB130", "KL130"}
DIMMABLE = {*BULBS, "HS220"}
EMETER = {"HS110", "HS300", *BULBS, *STRIPS}
DIMMABLE = {*BULBS, *DIMMERS}
WITH_EMETER = {"HS110", "HS300", *BULBS, *STRIPS}
ALL_DEVICES = BULBS.union(PLUGS).union(STRIPS).union(DIMMERS)
@ -39,17 +41,28 @@ def filter_model(desc, filter):
return filtered
has_emeter = pytest.mark.parametrize(
"dev", filter_model("has emeter", EMETER), indirect=True
)
no_emeter = pytest.mark.parametrize(
"dev", filter_model("no emeter", ALL_DEVICES - EMETER), indirect=True
)
def parametrize(desc, devices, ids=None):
# if ids is None:
# ids = ["on", "off"]
return pytest.mark.parametrize(
"dev", filter_model(desc, devices), indirect=True, ids=ids
)
bulb = pytest.mark.parametrize("dev", filter_model("bulbs", BULBS), indirect=True)
plug = pytest.mark.parametrize("dev", filter_model("plugs", PLUGS), indirect=True)
strip = pytest.mark.parametrize("dev", filter_model("strips", STRIPS), indirect=True)
dimmer = pytest.mark.parametrize("dev", filter_model("dimmers", DIMMERS), indirect=True)
has_emeter = parametrize("has emeter", WITH_EMETER)
no_emeter = parametrize("no emeter", ALL_DEVICES - WITH_EMETER)
def name_for_filename(x):
from os.path import basename
return basename(x)
bulb = parametrize("bulbs", BULBS, ids=name_for_filename)
plug = parametrize("plugs", PLUGS, ids=name_for_filename)
strip = parametrize("strips", STRIPS, ids=name_for_filename)
dimmer = parametrize("dimmers", DIMMERS, ids=name_for_filename)
# This ensures that every single file inside fixtures/ is being placed in some category
categorized_fixtures = set(dimmer.args[1] + strip.args[1] + plug.args[1] + bulb.args[1])
@ -62,29 +75,14 @@ if diff:
)
raise Exception("Missing category for %s" % diff)
dimmable = pytest.mark.parametrize(
"dev", filter_model("dimmable", DIMMABLE), indirect=True
)
non_dimmable = pytest.mark.parametrize(
"dev",
filter_model("non-dimmable", ALL_DEVICES - DIMMABLE - STRIPS - PLUGS),
indirect=True,
)
variable_temp = pytest.mark.parametrize(
"dev", filter_model("variable color temp", VARIABLE_TEMP), indirect=True
)
non_variable_temp = pytest.mark.parametrize(
"dev", filter_model("non-variable color temp", BULBS - VARIABLE_TEMP), indirect=True
)
color_bulb = pytest.mark.parametrize(
"dev", filter_model("color bulbs", COLOR_BULBS), indirect=True
)
non_color_bulb = pytest.mark.parametrize(
"dev", filter_model("non-color bulbs", BULBS - COLOR_BULBS), indirect=True
)
# bulb types
dimmable = parametrize("dimmable", DIMMABLE)
non_dimmable = parametrize("non-dimmable", BULBS - DIMMABLE)
variable_temp = parametrize("variable color temp", VARIABLE_TEMP)
non_variable_temp = parametrize("non-variable color temp", BULBS - VARIABLE_TEMP)
color_bulb = parametrize("color bulbs", COLOR_BULBS)
non_color_bulb = parametrize("non-color bulbs", BULBS - COLOR_BULBS)
# Parametrize tests to run with device both on and off
turn_on = pytest.mark.parametrize("turn_on", [True, False])
@ -97,6 +95,10 @@ async def handle_turn_on(dev, turn_on):
await dev.turn_off()
# to avoid adding this for each async function separately
pytestmark = pytest.mark.asyncio
@pytest.fixture(params=SUPPORTED_DEVICES)
def dev(request):
"""Device fixture.
@ -112,7 +114,7 @@ def dev(request):
asyncio.run(d.update())
if d.model in file:
return d
return
raise Exception("Unable to find type for %s" % ip)
def device_for_file(model):
for d in STRIPS:

View File

@ -113,6 +113,27 @@ PLUG_SCHEMA = Schema(
extra=REMOVE_EXTRA,
)
LIGHT_STATE_SCHEMA = Schema(
{
"brightness": All(int, Range(min=0, max=100)),
"color_temp": int,
"hue": All(int, Range(min=0, max=255)),
"mode": str,
"on_off": check_int_bool,
"saturation": All(int, Range(min=0, max=255)),
"dft_on_state": Optional(
{
"brightness": All(int, Range(min=0, max=100)),
"color_temp": All(int, Range(min=2000, max=9000)),
"hue": All(int, Range(min=0, max=255)),
"mode": str,
"saturation": All(int, Range(min=0, max=255)),
}
),
"err_code": int,
}
)
BULB_SCHEMA = PLUG_SCHEMA.extend(
{
"ctrl_protocols": Optional(dict),
@ -124,24 +145,7 @@ BULB_SCHEMA = PLUG_SCHEMA.extend(
"is_dimmable": check_int_bool,
"is_factory": bool,
"is_variable_color_temp": check_int_bool,
"light_state": {
"brightness": All(int, Range(min=0, max=100)),
"color_temp": int,
"hue": All(int, Range(min=0, max=255)),
"mode": str,
"on_off": check_int_bool,
"saturation": All(int, Range(min=0, max=255)),
"dft_on_state": Optional(
{
"brightness": All(int, Range(min=0, max=100)),
"color_temp": All(int, Range(min=2000, max=9000)),
"hue": All(int, Range(min=0, max=255)),
"mode": str,
"saturation": All(int, Range(min=0, max=255)),
}
),
"err_code": int,
},
"light_state": LIGHT_STATE_SCHEMA,
"preferred_state": [
{
"brightness": All(int, Range(min=0, max=100)),

187
kasa/tests/test_bulb.py Normal file
View File

@ -0,0 +1,187 @@
import pytest
from kasa import DeviceType, SmartDeviceException
from .conftest import (
bulb,
color_bulb,
dimmable,
handle_turn_on,
non_color_bulb,
non_dimmable,
non_variable_temp,
turn_on,
variable_temp,
)
from .newfakes import BULB_SCHEMA, LIGHT_STATE_SCHEMA
@bulb
async def test_bulb_sysinfo(dev):
assert dev.sys_info is not None
BULB_SCHEMA(dev.sys_info)
assert dev.model is not None
assert dev.device_type == DeviceType.Bulb
assert dev.is_bulb
@bulb
async def test_state_attributes(dev):
assert "Brightness" in dev.state_information
assert dev.state_information["Brightness"] == dev.brightness
assert "Is dimmable" in dev.state_information
assert dev.state_information["Is dimmable"] == dev.is_dimmable
@bulb
async def test_light_state_without_update(dev, monkeypatch):
with pytest.raises(SmartDeviceException):
monkeypatch.setitem(
dev._last_update["system"]["get_sysinfo"], "light_state", None
)
print(dev.light_state)
@bulb
async def test_get_light_state(dev):
LIGHT_STATE_SCHEMA(await dev.get_light_state())
@color_bulb
@turn_on
async def test_hsv(dev, turn_on):
await handle_turn_on(dev, turn_on)
assert dev.is_color
hue, saturation, brightness = dev.hsv
assert 0 <= hue <= 255
assert 0 <= saturation <= 100
assert 0 <= brightness <= 100
await dev.set_hsv(hue=1, saturation=1, value=1)
hue, saturation, brightness = dev.hsv
assert hue == 1
assert saturation == 1
assert brightness == 1
@color_bulb
@turn_on
async def test_invalid_hsv(dev, turn_on):
await handle_turn_on(dev, turn_on)
assert dev.is_color
for invalid_hue in [-1, 361, 0.5]:
with pytest.raises(ValueError):
await dev.set_hsv(invalid_hue, 0, 0)
for invalid_saturation in [-1, 101, 0.5]:
with pytest.raises(ValueError):
await dev.set_hsv(0, invalid_saturation, 0)
for invalid_brightness in [-1, 101, 0.5]:
with pytest.raises(ValueError):
await dev.set_hsv(0, 0, invalid_brightness)
@color_bulb
async def test_color_state_information(dev):
assert "HSV" in dev.state_information
assert dev.state_information["HSV"] == dev.hsv
@non_color_bulb
async def test_hsv_on_non_color(dev):
assert not dev.is_color
with pytest.raises(SmartDeviceException):
await dev.set_hsv(0, 0, 0)
with pytest.raises(SmartDeviceException):
print(dev.hsv)
@variable_temp
async def test_variable_temp_state_information(dev):
assert "Color temperature" in dev.state_information
assert dev.state_information["Color temperature"] == dev.color_temp
assert "Valid temperature range" in dev.state_information
assert (
dev.state_information["Valid temperature range"] == dev.valid_temperature_range
)
@variable_temp
@turn_on
async def test_try_set_colortemp(dev, turn_on):
await handle_turn_on(dev, turn_on)
await dev.set_color_temp(2700)
assert dev.color_temp == 2700
@variable_temp
async def test_unknown_temp_range(dev, monkeypatch):
with pytest.raises(SmartDeviceException):
monkeypatch.setitem(dev._sys_info, "model", "unknown bulb")
dev.valid_temperature_range()
@variable_temp
async def test_out_of_range_temperature(dev):
with pytest.raises(ValueError):
await dev.set_color_temp(1000)
with pytest.raises(ValueError):
await dev.set_color_temp(10000)
@non_variable_temp
async def test_non_variable_temp(dev):
with pytest.raises(SmartDeviceException):
await dev.set_color_temp(2700)
with pytest.raises(SmartDeviceException):
dev.valid_temperature_range()
with pytest.raises(SmartDeviceException):
print(dev.color_temp)
@dimmable
@turn_on
async def test_dimmable_brightness(dev, turn_on):
await handle_turn_on(dev, turn_on)
assert dev.is_dimmable
await dev.set_brightness(50)
assert dev.brightness == 50
await dev.set_brightness(10)
assert dev.brightness == 10
with pytest.raises(ValueError):
await dev.set_brightness("foo")
@dimmable
async def test_invalid_brightness(dev):
assert dev.is_dimmable
with pytest.raises(ValueError):
await dev.set_brightness(110)
with pytest.raises(ValueError):
await dev.set_brightness(-100)
@non_dimmable
async def test_non_dimmable(dev):
assert not dev.is_dimmable
with pytest.raises(SmartDeviceException):
assert dev.brightness == 0
with pytest.raises(SmartDeviceException):
await dev.set_brightness(100)

View File

@ -28,9 +28,6 @@ async def test_state(dev, turn_on):
else:
assert "Device state: OFF" in res.output
if not dev.has_emeter:
assert "Device has no emeter" in res.output
async def test_alias(dev):
runner = CliRunner()

117
kasa/tests/test_emeter.py Normal file
View File

@ -0,0 +1,117 @@
import pytest
from kasa import SmartDeviceException
from .conftest import has_emeter, no_emeter
from .newfakes import CURRENT_CONSUMPTION_SCHEMA
@no_emeter
async def test_no_emeter(dev):
assert not dev.has_emeter
with pytest.raises(SmartDeviceException):
await dev.get_emeter_realtime()
with pytest.raises(SmartDeviceException):
await dev.get_emeter_daily()
with pytest.raises(SmartDeviceException):
await dev.get_emeter_monthly()
with pytest.raises(SmartDeviceException):
await dev.erase_emeter_stats()
@has_emeter
async def test_get_emeter_realtime(dev):
if dev.is_strip:
pytest.skip("Disabled for strips temporarily")
assert dev.has_emeter
current_emeter = await dev.get_emeter_realtime()
CURRENT_CONSUMPTION_SCHEMA(current_emeter)
@has_emeter
async def test_get_emeter_daily(dev):
if dev.is_strip:
pytest.skip("Disabled for strips temporarily")
assert dev.has_emeter
assert await dev.get_emeter_daily(year=1900, month=1) == {}
d = await dev.get_emeter_daily()
assert len(d) > 0
k, v = d.popitem()
assert isinstance(k, int)
assert isinstance(v, float)
# Test kwh (energy, energy_wh)
d = await dev.get_emeter_daily(kwh=False)
k2, v2 = d.popitem()
assert v * 1000 == v2
@has_emeter
async def test_get_emeter_monthly(dev):
if dev.is_strip:
pytest.skip("Disabled for strips temporarily")
assert dev.has_emeter
assert await dev.get_emeter_monthly(year=1900) == {}
d = await dev.get_emeter_monthly()
assert len(d) > 0
k, v = d.popitem()
assert isinstance(k, int)
assert isinstance(v, float)
# Test kwh (energy, energy_wh)
d = await dev.get_emeter_monthly(kwh=False)
k2, v2 = d.popitem()
assert v * 1000 == v2
@has_emeter
async def test_emeter_status(dev):
if dev.is_strip:
pytest.skip("Disabled for strips temporarily")
assert dev.has_emeter
d = await dev.get_emeter_realtime()
with pytest.raises(KeyError):
assert d["foo"]
assert d["power_mw"] == d["power"] * 1000
# bulbs have only power according to tplink simulator.
if not dev.is_bulb:
assert d["voltage_mv"] == d["voltage"] * 1000
assert d["current_ma"] == d["current"] * 1000
assert d["total_wh"] == d["total"] * 1000
@pytest.mark.skip("not clearing your stats..")
@has_emeter
async def test_erase_emeter_stats(dev):
assert dev.has_emeter
await dev.erase_emeter()
@has_emeter
async def test_current_consumption(dev):
if dev.is_strip:
pytest.skip("Disabled for strips temporarily")
if dev.has_emeter:
x = await dev.current_consumption()
assert isinstance(x, float)
assert x >= 0.0
else:
assert await dev.current_consumption() is None

View File

@ -1,518 +0,0 @@
from datetime import datetime
from unittest.mock import patch
import pytest # type: ignore # https://github.com/pytest-dev/pytest/issues/3342
from kasa import DeviceType, SmartDeviceException, SmartStrip
from .conftest import (
bulb,
color_bulb,
dimmable,
handle_turn_on,
has_emeter,
no_emeter,
non_color_bulb,
non_dimmable,
non_variable_temp,
plug,
strip,
turn_on,
variable_temp,
)
from .newfakes import (
BULB_SCHEMA,
CURRENT_CONSUMPTION_SCHEMA,
PLUG_SCHEMA,
TZ_SCHEMA,
FakeTransportProtocol,
)
# to avoid adding this for each async function separately
pytestmark = pytest.mark.asyncio
@plug
async def test_plug_sysinfo(dev):
assert dev.sys_info is not None
PLUG_SCHEMA(dev.sys_info)
assert dev.model is not None
assert dev.device_type == DeviceType.Plug or dev.device_type == DeviceType.Strip
assert dev.is_plug or dev.is_strip
@bulb
async def test_bulb_sysinfo(dev):
assert dev.sys_info is not None
BULB_SCHEMA(dev.sys_info)
assert dev.model is not None
assert dev.device_type == DeviceType.Bulb
assert dev.is_bulb
async def test_state_info(dev):
assert isinstance(dev.state_information, dict)
async def test_invalid_connection(dev):
with patch.object(FakeTransportProtocol, "query", side_effect=SmartDeviceException):
with pytest.raises(SmartDeviceException):
await dev.update()
dev.is_on
async def test_query_helper(dev):
with pytest.raises(SmartDeviceException):
await dev._query_helper("test", "testcmd", {})
# TODO check for unwrapping?
@turn_on
async def test_state(dev, turn_on):
await handle_turn_on(dev, turn_on)
orig_state = dev.is_on
if orig_state:
await dev.turn_off()
assert not dev.is_on
assert dev.is_off
await dev.turn_on()
assert dev.is_on
assert not dev.is_off
else:
await dev.turn_on()
assert dev.is_on
assert not dev.is_off
await dev.turn_off()
assert not dev.is_on
assert dev.is_off
@no_emeter
async def test_no_emeter(dev):
assert not dev.has_emeter
with pytest.raises(SmartDeviceException):
await dev.get_emeter_realtime()
with pytest.raises(SmartDeviceException):
await dev.get_emeter_daily()
with pytest.raises(SmartDeviceException):
await dev.get_emeter_monthly()
with pytest.raises(SmartDeviceException):
await dev.erase_emeter_stats()
@has_emeter
async def test_get_emeter_realtime(dev):
if dev.is_strip:
pytest.skip("Disabled for strips temporarily")
assert dev.has_emeter
current_emeter = await dev.get_emeter_realtime()
CURRENT_CONSUMPTION_SCHEMA(current_emeter)
@has_emeter
async def test_get_emeter_daily(dev):
if dev.is_strip:
pytest.skip("Disabled for strips temporarily")
assert dev.has_emeter
assert await dev.get_emeter_daily(year=1900, month=1) == {}
d = await dev.get_emeter_daily()
assert len(d) > 0
k, v = d.popitem()
assert isinstance(k, int)
assert isinstance(v, float)
# Test kwh (energy, energy_wh)
d = await dev.get_emeter_daily(kwh=False)
k2, v2 = d.popitem()
assert v * 1000 == v2
@has_emeter
async def test_get_emeter_monthly(dev):
if dev.is_strip:
pytest.skip("Disabled for strips temporarily")
assert dev.has_emeter
assert await dev.get_emeter_monthly(year=1900) == {}
d = await dev.get_emeter_monthly()
assert len(d) > 0
k, v = d.popitem()
assert isinstance(k, int)
assert isinstance(v, float)
# Test kwh (energy, energy_wh)
d = await dev.get_emeter_monthly(kwh=False)
k2, v2 = d.popitem()
assert v * 1000 == v2
@has_emeter
async def test_emeter_status(dev):
if dev.is_strip:
pytest.skip("Disabled for strips temporarily")
assert dev.has_emeter
d = await dev.get_emeter_realtime()
with pytest.raises(KeyError):
assert d["foo"]
assert d["power_mw"] == d["power"] * 1000
# bulbs have only power according to tplink simulator.
if not dev.is_bulb:
assert d["voltage_mv"] == d["voltage"] * 1000
assert d["current_ma"] == d["current"] * 1000
assert d["total_wh"] == d["total"] * 1000
@pytest.mark.skip("not clearing your stats..")
@has_emeter
async def test_erase_emeter_stats(dev):
assert dev.has_emeter
await dev.erase_emeter()
@has_emeter
async def test_current_consumption(dev):
if dev.is_strip:
pytest.skip("Disabled for strips temporarily")
if dev.has_emeter:
x = await dev.current_consumption()
assert isinstance(x, float)
assert x >= 0.0
else:
assert await dev.current_consumption() is None
async def test_alias(dev):
test_alias = "TEST1234"
original = dev.alias
assert isinstance(original, str)
await dev.set_alias(test_alias)
assert dev.alias == test_alias
await dev.set_alias(original)
assert dev.alias == original
@plug
async def test_led(dev):
original = dev.led
await dev.set_led(False)
assert not dev.led
await dev.set_led(True)
assert dev.led
await dev.set_led(original)
@turn_on
async def test_on_since(dev, turn_on):
await handle_turn_on(dev, turn_on)
orig_state = dev.is_on
if "on_time" not in dev.sys_info and not dev.is_strip:
assert dev.on_since is None
elif orig_state:
assert isinstance(dev.on_since, datetime)
else:
assert dev.on_since is None
async def test_icon(dev):
assert set((await dev.get_icon()).keys()), {"icon", "hash"}
async def test_time(dev):
assert isinstance(await dev.get_time(), datetime)
# TODO check setting?
async def test_timezone(dev):
TZ_SCHEMA(await dev.get_timezone())
async def test_hw_info(dev):
PLUG_SCHEMA(dev.hw_info)
async def test_location(dev):
PLUG_SCHEMA(dev.location)
async def test_rssi(dev):
PLUG_SCHEMA({"rssi": dev.rssi}) # wrapping for vol
async def test_mac(dev):
PLUG_SCHEMA({"mac": dev.mac}) # wrapping for val
# TODO check setting?
@non_variable_temp
async def test_temperature_on_nonsupporting(dev):
assert dev.valid_temperature_range == (0, 0)
# TODO test when device does not support temperature range
with pytest.raises(SmartDeviceException):
await dev.set_color_temp(2700)
with pytest.raises(SmartDeviceException):
print(dev.color_temp)
@variable_temp
async def test_out_of_range_temperature(dev):
with pytest.raises(ValueError):
await dev.set_color_temp(1000)
with pytest.raises(ValueError):
await dev.set_color_temp(10000)
@non_dimmable
async def test_non_dimmable(dev):
assert not dev.is_dimmable
with pytest.raises(SmartDeviceException):
assert dev.brightness == 0
with pytest.raises(SmartDeviceException):
await dev.set_brightness(100)
@dimmable
@turn_on
async def test_dimmable_brightness(dev, turn_on):
await handle_turn_on(dev, turn_on)
assert dev.is_dimmable
await dev.set_brightness(50)
assert dev.brightness == 50
await dev.set_brightness(10)
assert dev.brightness == 10
with pytest.raises(ValueError):
await dev.set_brightness("foo")
@dimmable
async def test_invalid_brightness(dev):
assert dev.is_dimmable
with pytest.raises(ValueError):
await dev.set_brightness(110)
with pytest.raises(ValueError):
await dev.set_brightness(-100)
@color_bulb
@turn_on
async def test_hsv(dev, turn_on):
await handle_turn_on(dev, turn_on)
assert dev.is_color
hue, saturation, brightness = dev.hsv
assert 0 <= hue <= 255
assert 0 <= saturation <= 100
assert 0 <= brightness <= 100
await dev.set_hsv(hue=1, saturation=1, value=1)
hue, saturation, brightness = dev.hsv
assert hue == 1
assert saturation == 1
assert brightness == 1
@color_bulb
@turn_on
async def test_invalid_hsv(dev, turn_on):
await handle_turn_on(dev, turn_on)
assert dev.is_color
for invalid_hue in [-1, 361, 0.5]:
with pytest.raises(ValueError):
await dev.set_hsv(invalid_hue, 0, 0)
for invalid_saturation in [-1, 101, 0.5]:
with pytest.raises(ValueError):
await dev.set_hsv(0, invalid_saturation, 0)
for invalid_brightness in [-1, 101, 0.5]:
with pytest.raises(ValueError):
await dev.set_hsv(0, 0, invalid_brightness)
@non_color_bulb
async def test_hsv_on_non_color(dev):
assert not dev.is_color
with pytest.raises(SmartDeviceException):
await dev.set_hsv(0, 0, 0)
with pytest.raises(SmartDeviceException):
print(dev.hsv)
@variable_temp
@turn_on
async def test_try_set_colortemp(dev, turn_on):
await handle_turn_on(dev, turn_on)
await dev.set_color_temp(2700)
assert dev.color_temp == 2700
@non_variable_temp
async def test_non_variable_temp(dev):
with pytest.raises(SmartDeviceException):
await dev.set_color_temp(2700)
@strip
@turn_on
async def test_children_change_state(dev, turn_on):
await handle_turn_on(dev, turn_on)
for plug in dev.plugs:
orig_state = plug.is_on
if orig_state:
await plug.turn_off()
assert not plug.is_on
assert plug.is_off
await plug.turn_on()
assert plug.is_on
assert not plug.is_off
else:
await plug.turn_on()
assert plug.is_on
assert not plug.is_off
await plug.turn_off()
assert not plug.is_on
assert plug.is_off
@strip
async def test_children_alias(dev):
test_alias = "TEST1234"
for plug in dev.plugs:
original = plug.alias
await plug.set_alias(alias=test_alias)
await dev.update() # TODO: set_alias does not call parent's update()..
assert plug.alias == test_alias
await plug.set_alias(alias=original)
await dev.update() # TODO: set_alias does not call parent's update()..
assert plug.alias == original
@strip
async def test_children_on_since(dev):
on_sinces = []
for plug in dev.plugs:
if plug.is_on:
on_sinces.append(plug.on_since)
assert isinstance(plug.on_since, datetime)
else:
assert plug.on_since is None
if dev.is_off:
assert dev.on_since is None
# TODO: testing this would require some mocking utcnow which is not
# very straightforward.
# else:
# assert dev.on_since == max(on_sinces)
@strip
async def test_get_plug_by_name(dev: SmartStrip):
name = dev.plugs[0].alias
assert dev.get_plug_by_name(name) == dev.plugs[0]
with pytest.raises(SmartDeviceException):
dev.get_plug_by_name("NONEXISTING NAME")
@strip
async def test_get_plug_by_index(dev: SmartStrip):
assert dev.get_plug_by_index(0) == dev.plugs[0]
with pytest.raises(SmartDeviceException):
dev.get_plug_by_index(-1)
with pytest.raises(SmartDeviceException):
dev.get_plug_by_index(len(dev.plugs))
@pytest.mark.skip("this test will wear out your relays")
async def test_all_binary_states(dev):
# test every binary state
# TODO: this needs to be fixed, dev.plugs is not available for each device..
for state in range(2 ** len(dev.plugs)):
# create binary state map
state_map = {}
for plug_index in range(len(dev.plugs)):
state_map[plug_index] = bool((state >> plug_index) & 1)
if state_map[plug_index]:
await dev.turn_on(index=plug_index)
else:
await dev.turn_off(index=plug_index)
# check state map applied
for index, state in dev.is_on.items():
assert state_map[index] == state
# toggle each outlet with state map applied
for plug_index in range(len(dev.plugs)):
# toggle state
if state_map[plug_index]:
await dev.turn_off(index=plug_index)
else:
await dev.turn_on(index=plug_index)
# only target outlet should have state changed
for index, state in dev.is_on.items():
if index == plug_index:
assert state != state_map[index]
else:
assert state == state_map[index]
# reset state
if state_map[plug_index]:
await dev.turn_on(index=plug_index)
else:
await dev.turn_off(index=plug_index)
# original state map should be restored
for index, state in dev.is_on.items():
assert state == state_map[index]
async def test_representation(dev):
import re
pattern = re.compile("<.* model .* at .* (.*), is_on: .* - dev specific: .*>")
assert pattern.match(str(dev))

28
kasa/tests/test_plug.py Normal file
View File

@ -0,0 +1,28 @@
from kasa import DeviceType
from .conftest import plug
from .newfakes import PLUG_SCHEMA
@plug
async def test_plug_sysinfo(dev):
assert dev.sys_info is not None
PLUG_SCHEMA(dev.sys_info)
assert dev.model is not None
assert dev.device_type == DeviceType.Plug or dev.device_type == DeviceType.Strip
assert dev.is_plug or dev.is_strip
@plug
async def test_led(dev):
original = dev.led
await dev.set_led(False)
assert not dev.led
await dev.set_led(True)
assert dev.led
await dev.set_led(original)

View File

@ -0,0 +1,111 @@
from datetime import datetime
from unittest.mock import patch
import pytest # type: ignore # https://github.com/pytest-dev/pytest/issues/3342
from kasa import SmartDeviceException
from .conftest import handle_turn_on, turn_on
from .newfakes import PLUG_SCHEMA, TZ_SCHEMA, FakeTransportProtocol
async def test_state_info(dev):
assert isinstance(dev.state_information, dict)
async def test_invalid_connection(dev):
with patch.object(FakeTransportProtocol, "query", side_effect=SmartDeviceException):
with pytest.raises(SmartDeviceException):
await dev.update()
dev.is_on
async def test_query_helper(dev):
with pytest.raises(SmartDeviceException):
await dev._query_helper("test", "testcmd", {})
# TODO check for unwrapping?
@turn_on
async def test_state(dev, turn_on):
await handle_turn_on(dev, turn_on)
orig_state = dev.is_on
if orig_state:
await dev.turn_off()
assert not dev.is_on
assert dev.is_off
await dev.turn_on()
assert dev.is_on
assert not dev.is_off
else:
await dev.turn_on()
assert dev.is_on
assert not dev.is_off
await dev.turn_off()
assert not dev.is_on
assert dev.is_off
async def test_alias(dev):
test_alias = "TEST1234"
original = dev.alias
assert isinstance(original, str)
await dev.set_alias(test_alias)
assert dev.alias == test_alias
await dev.set_alias(original)
assert dev.alias == original
@turn_on
async def test_on_since(dev, turn_on):
await handle_turn_on(dev, turn_on)
orig_state = dev.is_on
if "on_time" not in dev.sys_info and not dev.is_strip:
assert dev.on_since is None
elif orig_state:
assert isinstance(dev.on_since, datetime)
else:
assert dev.on_since is None
async def test_time(dev):
assert isinstance(await dev.get_time(), datetime)
async def test_timezone(dev):
TZ_SCHEMA(await dev.get_timezone())
async def test_hw_info(dev):
PLUG_SCHEMA(dev.hw_info)
async def test_location(dev):
PLUG_SCHEMA(dev.location)
async def test_rssi(dev):
PLUG_SCHEMA({"rssi": dev.rssi}) # wrapping for vol
async def test_mac(dev):
PLUG_SCHEMA({"mac": dev.mac}) # wrapping for val
async def test_representation(dev):
import re
pattern = re.compile("<.* model .* at .* (.*), is_on: .* - dev specific: .*>")
assert pattern.match(str(dev))
async def test_childrens(dev):
"""Make sure that children property is exposed by every device."""
if dev.is_strip:
assert len(dev.children) > 0
else:
assert len(dev.children) == 0

129
kasa/tests/test_strip.py Normal file
View File

@ -0,0 +1,129 @@
from datetime import datetime
import pytest
from kasa import SmartDeviceException, SmartStrip
from .conftest import handle_turn_on, strip, turn_on
@strip
@turn_on
async def test_children_change_state(dev, turn_on):
await handle_turn_on(dev, turn_on)
for plug in dev.children:
orig_state = plug.is_on
if orig_state:
await plug.turn_off()
assert not plug.is_on
assert plug.is_off
await plug.turn_on()
assert plug.is_on
assert not plug.is_off
else:
await plug.turn_on()
assert plug.is_on
assert not plug.is_off
await plug.turn_off()
assert not plug.is_on
assert plug.is_off
@strip
async def test_children_alias(dev):
test_alias = "TEST1234"
for plug in dev.children:
original = plug.alias
await plug.set_alias(alias=test_alias)
await dev.update() # TODO: set_alias does not call parent's update()..
assert plug.alias == test_alias
await plug.set_alias(alias=original)
await dev.update() # TODO: set_alias does not call parent's update()..
assert plug.alias == original
@strip
async def test_children_on_since(dev):
on_sinces = []
for plug in dev.children:
if plug.is_on:
on_sinces.append(plug.on_since)
assert isinstance(plug.on_since, datetime)
else:
assert plug.on_since is None
if dev.is_off:
assert dev.on_since is None
# TODO: testing this would require some mocking utcnow which is not
# very straightforward.
# else:
# assert dev.on_since == max(on_sinces)
@strip
async def test_get_plug_by_name(dev: SmartStrip):
name = dev.children[0].alias
assert dev.get_plug_by_name(name) == dev.children[0]
with pytest.raises(SmartDeviceException):
dev.get_plug_by_name("NONEXISTING NAME")
@strip
async def test_get_plug_by_index(dev: SmartStrip):
assert dev.get_plug_by_index(0) == dev.children[0]
with pytest.raises(SmartDeviceException):
dev.get_plug_by_index(-1)
with pytest.raises(SmartDeviceException):
dev.get_plug_by_index(len(dev.children))
@pytest.mark.skip("this test will wear out your relays")
async def test_all_binary_states(dev):
# test every binary state
# TODO: this needs to be fixed, dev.plugs is not available for each device..
for state in range(2 ** len(dev.children)):
# create binary state map
state_map = {}
for plug_index in range(len(dev.children)):
state_map[plug_index] = bool((state >> plug_index) & 1)
if state_map[plug_index]:
await dev.turn_on(index=plug_index)
else:
await dev.turn_off(index=plug_index)
# check state map applied
for index, state in dev.is_on.items():
assert state_map[index] == state
# toggle each outlet with state map applied
for plug_index in range(len(dev.children)):
# toggle state
if state_map[plug_index]:
await dev.turn_off(index=plug_index)
else:
await dev.turn_on(index=plug_index)
# only target outlet should have state changed
for index, state in dev.is_on.items():
if index == plug_index:
assert state != state_map[index]
else:
assert state == state_map[index]
# reset state
if state_map[plug_index]:
await dev.turn_on(index=plug_index)
else:
await dev.turn_off(index=plug_index)
# original state map should be restored
for index, state in dev.is_on.items():
assert state == state_map[index]

View File

@ -43,7 +43,7 @@ known_third_party = ["asyncclick", "pytest", "setuptools", "voluptuous"]
[tool.coverage.run]
source = ["kasa"]
branch = true
omit = ["kasa/cli.py", "kasa/tests/*"]
omit = ["kasa/tests/*"]
[tool.coverage.report]
exclude_lines = [
@ -52,6 +52,15 @@ exclude_lines = [
"def __repr__"
]
[tool.interrogate]
ignore-init-method = true
ignore-magic = true
ignore-private = true
ignore-semiprivate = true
fail-under = 100
exclude = ['kasa/tests/*']
verbose = 2
[build-system]
requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"