diff --git a/.flake8 b/.flake8 deleted file mode 100644 index d9ad0b40..00000000 --- a/.flake8 +++ /dev/null @@ -1,5 +0,0 @@ -[flake8] -ignore = E203, E266, E501, W503, F403, F401 -max-line-length = 79 -max-complexity = 18 -select = B,C,E,F,W,T4,B9 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4186ac89..d573211e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,24 @@ repos: -- repo: https://github.com/python/black - rev: stable - hooks: - - id: black - language_version: python3.7 +- repo: https://github.com/python/black + rev: stable + hooks: + - id: black + language_version: python3.7 + +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v2.3.0 + hooks: + - id: flake8 + additional_dependencies: [flake8-docstrings] + + +- repo: https://github.com/pre-commit/mirrors-isort + rev: v4.3.21 + hooks: + - id: isort + +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v0.740 + hooks: + - id: mypy +# args: [--no-strict-optional, --ignore-missing-imports] diff --git a/azure-pipelines.yml b/azure-pipelines.yml new file mode 100644 index 00000000..806fd1c8 --- /dev/null +++ b/azure-pipelines.yml @@ -0,0 +1,47 @@ +trigger: +- master +pr: +- master + +pool: + vmImage: 'ubuntu-latest' +strategy: + matrix: + Python36: + python.version: '3.6' + Python37: + python.version: '3.7' +# Python38: +# python.version: '3.8' + +steps: +- task: UsePythonVersion@0 + inputs: + versionSpec: '$(python.version)' + displayName: 'Use Python $(python.version)' + +- script: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install pytest pytest-azurepipelines pytest-cov + displayName: 'Install dependencies' + +- script: | + pre-commit run black --all-files + displayName: 'Code formating (black)' + +- script: | + pre-commit run flake8 --all-files + displayName: 'Code formating (flake8)' + +- script: | + pre-commit run mypy --all-files + displayName: 'Typing checks (mypy)' + +- script: | + pre-commit run isort --all-files + displayName: 'Order of imports (isort)' + +- script: | + pytest --cov pyHS100 --cov-report html + displayName: 'Tests' diff --git a/pyHS100/__init__.py b/pyHS100/__init__.py index a1160082..a64b360d 100755 --- a/pyHS100/__init__.py +++ b/pyHS100/__init__.py @@ -1,13 +1,12 @@ -""" -This module provides a way to interface with TP-Link's smart home devices, -such as smart plugs (HS1xx), wall switches (HS2xx), and light bulbs (LB1xx). +"""Python interface for TP-Link's smart home devices. All common, shared functionalities are available through `SmartDevice` class:: x = SmartDevice("192.168.1.1") print(x.sys_info) -For device type specific actions `SmartBulb` or `SmartPlug` must be used instead. +For device type specific actions `SmartBulb`, `SmartPlug`, or `SmartStrip` + should be used instead. Module-specific errors are raised as `SmartDeviceException` and are expected to be handled by the user of the library. @@ -22,7 +21,7 @@ from pyHS100.smartdevice import ( SmartDeviceException, ) from pyHS100.smartplug import SmartPlug -from pyHS100.smartstrip import SmartStrip, SmartStripException +from pyHS100.smartstrip import SmartStrip __all__ = [ "Discover", @@ -34,5 +33,4 @@ __all__ = [ "SmartDeviceException", "SmartPlug", "SmartStrip", - "SmartStripException", ] diff --git a/pyHS100/cli.py b/pyHS100/cli.py index 4fd58952..5c71c971 100755 --- a/pyHS100/cli.py +++ b/pyHS100/cli.py @@ -48,9 +48,10 @@ pass_dev = click.make_pass_decorator(SmartDevice) @click.option("--bulb", default=False, is_flag=True) @click.option("--plug", default=False, is_flag=True) @click.option("--strip", default=False, is_flag=True) +@click.version_option() @click.pass_context def cli(ctx, ip, host, alias, target, debug, bulb, plug, strip): - """A cli tool for controlling TP-Link smart home plugs.""" + """A cli tool for controlling TP-Link smart home plugs.""" # noqa if debug: logging.basicConfig(level=logging.DEBUG) else: @@ -66,9 +67,9 @@ def cli(ctx, ip, host, alias, target, debug, bulb, plug, strip): click.echo("Alias is given, using discovery to find host %s" % alias) host = find_host_from_alias(alias=alias, target=target) if host: - click.echo("Found hostname is {}".format(host)) + click.echo(f"Found hostname is {host}") else: - click.echo("No device with name {} found".format(alias)) + click.echo(f"No device with name {alias} found") return if host is None: @@ -98,11 +99,15 @@ def cli(ctx, ip, host, alias, target, debug, bulb, plug, strip): @click.option("--save") @click.pass_context def dump_discover(ctx, save): + """Dump discovery information. + + Useful for dumping into a file with `--save` to be added to the test suite. + """ target = ctx.parent.params["target"] for dev in Discover.discover(target=target, return_raw=True).values(): model = dev["system"]["get_sysinfo"]["model"] hw_version = dev["system"]["get_sysinfo"]["hw_ver"] - save_to = "%s_%s.json" % (model, hw_version) + save_to = f"{model}_{hw_version}.json" click.echo("Saving info to %s" % save_to) with open(save_to, "w") as f: import json @@ -142,7 +147,7 @@ def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attempts=3) % (alias, attempts, timeout) ) for attempt in range(1, attempts): - click.echo("Attempt %s of %s" % (attempt, attempts)) + click.echo(f"Attempt {attempt} of {attempts}") found_devs = Discover.discover(target=target, timeout=timeout).items() for ip, dev in found_devs: if dev.sync.get_alias().lower() == alias.lower(): @@ -162,11 +167,12 @@ def sysinfo(dev): @cli.command() @pass_dev @click.pass_context -def state(ctx, dev): +def state(ctx, dev: SmartDevice): """Print out device state and versions.""" click.echo( click.style( - "== %s - %s ==" % (dev.sync.get_alias(), dev.sync.get_model()), bold=True + "== {} - {} ==".format(dev.sync.get_alias(), dev.sync.get_model()), + bold=True, ) ) @@ -178,24 +184,25 @@ def state(ctx, dev): ) if dev.num_children > 0: is_on = dev.sync.get_is_on() - aliases = dev.sync.get_alias() - for child in range(dev.num_children): + for plug in range(dev.plugs): + alias = plug.sync.get_alias() click.echo( click.style( - " * %s state: %s" - % (aliases[child], ("ON" if is_on[child] else "OFF")), - fg="green" if is_on[child] else "red", + " * {} state: {}".format( + alias, ("ON" if is_on else "OFF") + ), + fg="green" if is_on else "red", ) ) click.echo("Host/IP: %s" % dev.host) for k, v in dev.sync.get_state_information().items(): - click.echo("%s: %s" % (k, v)) + click.echo(f"{k}: {v}") click.echo(click.style("== Generic information ==", bold=True)) click.echo("Time: %s" % dev.sync.get_time()) click.echo("Hardware: %s" % dev.sync.get_hw_info()["hw_ver"]) click.echo("Software: %s" % dev.sync.get_hw_info()["sw_ver"]) - click.echo("MAC (rssi): %s (%s)" % (dev.sync.get_mac(), dev.sync.get_rssi())) + click.echo("MAC (rssi): {} ({})".format(dev.sync.get_mac(), dev.sync.get_rssi())) click.echo("Location: %s" % dev.sync.get_location()) ctx.invoke(emeter) @@ -249,7 +256,7 @@ def emeter(dev, year, month, erase): click.echo("== For year %s ==" % year.year) emeter_status = dev.sync.get_emeter_monthly(year.year) elif month: - click.echo("== For month %s of %s ==" % (month.month, month.year)) + click.echo(f"== For month {month.month} of {month.year} ==") emeter_status = dev.sync.get_emeter_daily(year=month.year, month=month.month) else: emeter_status = dev.sync.get_emeter_realtime() @@ -306,13 +313,13 @@ def temperature(dev: SmartBulb, temperature): @click.pass_context @pass_dev def hsv(dev, ctx, h, s, v): - """Get or set color in HSV. (Bulb only)""" + """Get or set color in HSV. (Bulb only).""" if h is None or s is None or v is None: click.echo("Current HSV: %s %s %s" % dev.sync.get_hsv()) elif s is None or v is None: raise click.BadArgumentUsage("Setting a color requires 3 values.", ctx) else: - click.echo("Setting HSV: %s %s %s" % (h, s, v)) + click.echo(f"Setting HSV: {h} {s} {v}") dev.sync.set_hsv(h, s, v) diff --git a/pyHS100/discover.py b/pyHS100/discover.py index bf42926c..867f7e60 100755 --- a/pyHS100/discover.py +++ b/pyHS100/discover.py @@ -1,3 +1,4 @@ +"""Discovery module for TP-Link Smart Home devices.""" import json import logging import socket @@ -25,8 +26,6 @@ class Discover: you can initialize the corresponding device class directly without this. The protocol uses UDP broadcast datagrams on port 9999 for discovery. - - """ DISCOVERY_QUERY = { @@ -46,8 +45,8 @@ class Discover: discovery_packets=3, return_raw=False, ) -> Dict[str, SmartDevice]: + """Discover devices. - """ Sends discovery message to 255.255.255.255:9999 in order to detect available supported devices in the local network, and waits for given timeout for answers from devices. diff --git a/pyHS100/protocol.py b/pyHS100/protocol.py index b2df8e04..f29b4bed 100755 --- a/pyHS100/protocol.py +++ b/pyHS100/protocol.py @@ -1,3 +1,14 @@ +"""Implementation of the TP-Link Smart Home Protocol. + +Encryption/Decryption methods based on the works of +Lubomir Stroetmann and Tobias Esser + +https://www.softscheck.com/en/reverse-engineering-tp-link-hs110/ +https://github.com/softScheck/tplink-smartplug/ + +which are licensed under the Apache License, Version 2.0 +http://www.apache.org/licenses/LICENSE-2.0 +""" import asyncio import json import logging @@ -8,17 +19,7 @@ _LOGGER = logging.getLogger(__name__) class TPLinkSmartHomeProtocol: - """Implementation of the TP-Link Smart Home Protocol. - - Encryption/Decryption methods based on the works of - Lubomir Stroetmann and Tobias Esser - - https://www.softscheck.com/en/reverse-engineering-tp-link-hs110/ - https://github.com/softScheck/tplink-smartplug/ - - which are licensed under the Apache License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0 - """ + """Implementation of the TP-Link Smart Home protocol.""" INITIALIZATION_VECTOR = 171 DEFAULT_PORT = 9999 @@ -71,7 +72,7 @@ class TPLinkSmartHomeProtocol: return json.loads(response) @staticmethod - def encrypt(request: str) -> bytearray: + def encrypt(request: str) -> bytes: """ Encrypt a request for a TP-Link Smart Home Device. diff --git a/pyHS100/smartbulb.py b/pyHS100/smartbulb.py index 69edd8b4..ee1c207f 100644 --- a/pyHS100/smartbulb.py +++ b/pyHS100/smartbulb.py @@ -1,3 +1,4 @@ +"""Module for bulbs.""" import re from typing import Any, Dict, Tuple @@ -10,8 +11,8 @@ TPLINK_KELVIN = { "LB230": (2500, 9000), "KB130": (2500, 9000), "KL130": (2500, 9000), - "KL120\(EU\)": (2700, 6500), - "KL120\(US\)": (2700, 5000), + r"KL120\(EU\)": (2700, 6500), + r"KL120\(US\)": (2700, 5000), } @@ -162,7 +163,9 @@ class SmartBulb(SmartDevice): async def set_hsv(self, hue: int, saturation: int, value: int): """Set new HSV. - :param tuple state: hue, saturation and value (degrees, %, %) + :param int hue: hue in degrees + :param int saturation: saturation in percentage [0,100] + :param int value: value in percentage [0, 100] """ if not await self.is_color(): raise SmartDeviceException("Bulb does not support color.") @@ -222,7 +225,7 @@ class SmartBulb(SmartDevice): await self.set_light_state(light_state) async def get_brightness(self) -> int: - """Current brightness of the device. + """Return the current brightness. :return: brightness in percent :rtype: int @@ -237,7 +240,7 @@ class SmartBulb(SmartDevice): return int(light_state["brightness"]) async def set_brightness(self, brightness: int) -> None: - """Set the current brightness of the device. + """Set the brightness. :param int brightness: brightness in percent """ @@ -255,7 +258,7 @@ class SmartBulb(SmartDevice): :return: Bulb information dict, keys in user-presentable form. :rtype: dict """ - info = { + info: Dict[str, Any] = { "Brightness": await self.get_brightness(), "Is dimmable": await self.is_dimmable(), } # type: Dict[str, Any] @@ -281,4 +284,5 @@ class SmartBulb(SmartDevice): await self.set_light_state({"on_off": 1}) async def get_has_emeter(self) -> bool: + """Return that the bulb has an emeter.""" return True diff --git a/pyHS100/smartdevice.py b/pyHS100/smartdevice.py index 8b770a08..53738221 100755 --- a/pyHS100/smartdevice.py +++ b/pyHS100/smartdevice.py @@ -1,6 +1,4 @@ -""" -pyHS100 -Python library supporting TP-Link Smart Plugs/Switches (HS100/HS110/Hs200). +"""Python library supporting TP-Link Smart Home devices. The communication protocol was reverse engineered by Lubomir Stroetmann and Tobias Esser in 'Reverse Engineering the TP-Link HS110': @@ -105,7 +103,7 @@ class SmartDevice: if protocol is None: # pragma: no cover protocol = TPLinkSmartHomeProtocol() self.protocol = protocol - self.emeter_type = "emeter" # type: str + self.emeter_type = "emeter" self.context = context self.num_children = 0 self.cache_ttl = timedelta(seconds=cache_ttl) @@ -115,13 +113,14 @@ class SmartDevice: self.context, self.cache_ttl, ) - self.cache = defaultdict(lambda: defaultdict(lambda: None)) + self.cache = defaultdict(lambda: defaultdict(lambda: None)) # type: ignore self._device_type = DeviceType.Unknown self.ioloop = ioloop or asyncio.get_event_loop() self.sync = SyncSmartDevice(self, ioloop=self.ioloop) def _result_from_cache(self, target, cmd) -> Optional[Dict]: """Return query result from cache if still fresh. + Only results from commands starting with `get_` are considered cacheable. :param target: Target system @@ -146,7 +145,7 @@ class SmartDevice: return None def _insert_to_cache(self, target: str, cmd: str, response: Dict) -> None: - """Internal function to add response to cache. + """Add response for a given command to the cache. :param target: Target system :param cmd: Command @@ -167,9 +166,8 @@ class SmartDevice: :rtype: dict :raises SmartDeviceException: if command was not executed correctly """ - if self.context is None: - request = {target: {cmd: arg}} - else: + request: Dict[str, Any] = {target: {cmd: arg}} + if self.context is not None: request = {"context": {"child_ids": [self.context]}, target: {cmd: arg}} try: @@ -179,24 +177,20 @@ class SmartDevice: response = await self.protocol.query(host=self.host, request=request) self._insert_to_cache(target, cmd, response) except Exception as ex: - raise SmartDeviceException( - "Communication error on %s:%s" % (target, cmd) - ) from ex + raise SmartDeviceException(f"Communication error on {target}:{cmd}") from ex if target not in response: - raise SmartDeviceException( - "No required {} in response: {}".format(target, response) - ) + raise SmartDeviceException(f"No required {target} in response: {response}") result = response[target] if "err_code" in result and result["err_code"] != 0: - raise SmartDeviceException("Error on {}.{}: {}".format(target, cmd, result)) + raise SmartDeviceException(f"Error on {target}.{cmd}: {result}") if cmd not in result: - raise SmartDeviceException("No command in response: {}".format(response)) + raise SmartDeviceException(f"No command in response: {response}") result = result[cmd] if "err_code" in result and result["err_code"] != 0: - raise SmartDeviceException("Error on {} {}: {}".format(target, cmd, result)) + raise SmartDeviceException(f"Error on {target} {cmd}: {result}") if "err_code" in result: del result["err_code"] @@ -211,7 +205,7 @@ class SmartDevice: """ raise NotImplementedError() - async def get_sys_info(self) -> Dict: + async def get_sys_info(self) -> Dict[str, Any]: """Retrieve system information. :return: sysinfo @@ -415,11 +409,10 @@ class SmartDevice: await self._query_helper("system", "set_mac_addr", {"mac": mac}) async def get_emeter_realtime(self) -> EmeterStatus: - """Retrive current energy readings. + """Retrieve current energy readings. :returns: current readings or False :rtype: dict, None - None if device has no energy meter or error occurred :raises SmartDeviceException: on error """ if not await self.get_has_emeter(): @@ -437,7 +430,6 @@ class SmartDevice: month) :param kwh: return usage in kWh (default: True) :return: mapping of day of month to value - None if device has no energy meter or error occurred :rtype: dict :raises SmartDeviceException: on error """ @@ -468,7 +460,6 @@ class SmartDevice: :param year: year for which to retrieve statistics (default: this year) :param kwh: return usage in kWh (default: True) :return: dict: mapping of month to value - None if device has no energy meter :rtype: dict :raises SmartDeviceException: on error """ @@ -493,8 +484,6 @@ class SmartDevice: """Erase energy meter statistics. :return: True if statistics were deleted - False if device has no energy meter. - :rtype: bool :raises SmartDeviceException: on error """ if not await self.get_has_emeter(): @@ -502,15 +491,10 @@ class SmartDevice: await self._query_helper(self.emeter_type, "erase_emeter_stat", None) - # As query_helper raises exception in case of failure, we have - # succeeded when we are this far. - return True - async def current_consumption(self) -> Optional[float]: """Get the current power consumption in Watt. :return: the current power consumption in Watts. - None if device has no energy meter. :raises SmartDeviceException: on error """ if not await self.get_has_emeter(): @@ -570,25 +554,30 @@ class SmartDevice: @property def is_bulb(self) -> bool: + """Return True if the device is a bulb.""" return self._device_type == DeviceType.Bulb @property def is_plug(self) -> bool: + """Return True if the device is a plug.""" return self._device_type == DeviceType.Plug @property def is_strip(self) -> bool: + """Return True if the device is a strip.""" return self._device_type == DeviceType.Strip async def is_dimmable(self): + """Return True if the device is dimmable.""" return False @property def is_variable_color_temp(self) -> bool: + """Return True if the device supports color temperature.""" return False def __repr__(self): - return "<%s model %s at %s (%s), is_on: %s - dev specific: %s>" % ( + return "<{} model {} at {} ({}), is_on: {} - dev specific: {}>".format( self.__class__.__name__, self.sync.get_model(), self.host, diff --git a/pyHS100/smartplug.py b/pyHS100/smartplug.py index 60705f6b..25bf3c4a 100644 --- a/pyHS100/smartplug.py +++ b/pyHS100/smartplug.py @@ -1,3 +1,4 @@ +"""Module for plugs.""" import datetime import logging from typing import Any, Dict diff --git a/pyHS100/smartstrip.py b/pyHS100/smartstrip.py index e8299c2c..bd858d65 100755 --- a/pyHS100/smartstrip.py +++ b/pyHS100/smartstrip.py @@ -1,36 +1,38 @@ -import asyncio +"""Module for multi-socket devices (HS300, HS107). + +.. todo:: describe how this interfaces with single plugs. +""" import datetime import logging -from typing import Any, Dict, Optional, Union +from collections import defaultdict +from typing import Any, DefaultDict, Dict, List from pyHS100.protocol import TPLinkSmartHomeProtocol -from pyHS100.smartdevice import DeviceType, EmeterStatus, SmartDeviceException +from pyHS100.smartdevice import DeviceType from pyHS100.smartplug import SmartPlug _LOGGER = logging.getLogger(__name__) -class SmartStripException(SmartDeviceException): - """SmartStripException gets raised for errors specific to the smart strip.""" - - pass - - class SmartStrip(SmartPlug): """Representation of a TP-Link Smart Power Strip. Usage example when used as library: p = SmartStrip("192.168.1.105") + # query the state of the strip + print(p.sync.is_on()) + # change state of all outlets - p.turn_on() - p.turn_off() + p.sync.turn_on() + p.sync.turn_off() + + # individual outlets are accessible through plugs variable + for plug in p.plugs: + print(f"{p}: {p.sync.is_on()}") # change state of a single outlet - p.turn_on(index=1) - - # query and print current state of all outlets - print(p.get_state()) + p.plugs[0].sync.turn_on() Errors reported by the device are raised as SmartDeviceExceptions, and should be handled by the user of the library. @@ -41,362 +43,133 @@ class SmartStrip(SmartPlug): host: str, protocol: TPLinkSmartHomeProtocol = None, cache_ttl: int = 3, - *, - ioloop=None + ioloop=None, ) -> None: - SmartPlug.__init__( - self, host=host, protocol=protocol, cache_ttl=cache_ttl, ioloop=ioloop - ) + SmartPlug.__init__(self, host=host, protocol=protocol, cache_ttl=cache_ttl) self.emeter_type = "emeter" self._device_type = DeviceType.Strip - self.plugs = {} - - sys_info = self.sync.get_sys_info() - children = sys_info["children"] + self.plugs: List[SmartPlug] = [] + children = self.sync.get_sys_info()["children"] self.num_children = len(children) for plug in range(self.num_children): - self.plugs[plug] = SmartPlug( - host, - protocol, - context=children[plug]["id"], - cache_ttl=cache_ttl, - ioloop=ioloop, + self.plugs.append( + SmartPlug( + host, + protocol, + context=children[plug]["id"], + cache_ttl=cache_ttl, + ioloop=ioloop, + ) ) - def raise_for_index(self, index: int): - """ - Raises SmartStripException if the plug index is out of bounds - - :param index: plug index to check - :raises SmartStripException: index out of bounds - """ - if index not in range(self.num_children): - raise SmartStripException("plug index of %d " "is out of bounds" % index) - - async def get_state(self, *, index=-1) -> Dict[int, str]: - """Retrieve the switch state - - :returns: list with the state of each child plug - STATE_ON - STATE_OFF - :rtype: dict - """ - - def _state_for_bool(_bool): - return SmartPlug.STATE_ON if _bool else SmartPlug.STATE_OFF - - is_on = await self.get_is_on(index=index) - if isinstance(is_on, bool): - return _state_for_bool(is_on) - - return {k: _state_for_bool(v) for k, v in is_on.items()} - - def set_state(self, value: str, *, index: int = -1): - """Sets the state of a plug on the strip - - :param value: one of - STATE_ON - STATE_OFF - :param index: plug index (-1 for all) - :raises ValueError: on invalid state - :raises SmartDeviceException: on error - :raises SmartStripException: index out of bounds - """ - if index < 0: - self.state = value - else: - self.raise_for_index(index) - self.plugs[index].state = value - async def is_on(self) -> bool: - """Return if any of the outlets are on""" - states = await self.get_state() - return any(state == "ON" for state in states.values()) + """Return if any of the outlets are on.""" + for plug in self.plugs: + is_on = await plug.is_on() + if is_on: + return True + return False - async def get_is_on(self, *, index: int = -1) -> Any: - """ - Returns whether device is on. + async def turn_on(self): + """Turn the strip on. - :param index: plug index (-1 for all) - :return: True if device is on, False otherwise, Dict without index - :rtype: bool if index is provided - Dict[int, bool] if no index provided - :raises SmartStripException: index out of bounds - """ - sys_info = await self.get_sys_info() - children = sys_info["children"] - if index < 0: - is_on = {} - for plug in range(self.num_children): - is_on[plug] = bool(children[plug]["state"]) - return is_on - else: - self.raise_for_index(index) - return bool(children[index]["state"]) - - async def get_is_off(self, *, index: int = -1) -> Any: - is_on = await self.get_is_on(index=index) - if isinstance(is_on, bool): - return not is_on - else: - return {k: not v for k, v in is_on} - - async def turn_on(self, *, index: int = -1): - """ - Turns outlets on - - :param index: plug index (-1 for all) :raises SmartDeviceException: on error - :raises SmartStripException: index out of bounds """ - if index < 0: - await self._query_helper("system", "set_relay_state", {"state": 1}) - else: - self.raise_for_index(index) - await self.plugs[index].turn_on() + await self._query_helper("system", "set_relay_state", {"state": 1}) - async def turn_off(self, *, index: int = -1): - """ - Turns outlets off + async def turn_off(self): + """Turn the strip off. - :param index: plug index (-1 for all) :raises SmartDeviceException: on error - :raises SmartStripException: index out of bounds """ - if index < 0: - await self._query_helper("system", "set_relay_state", {"state": 0}) - else: - self.raise_for_index(index) - await self.plugs[index].turn_off() + await self._query_helper("system", "set_relay_state", {"state": 0}) - async def get_max_on_since(self) -> datetime: - """Returns the maximum on-time of all outlets.""" - on_since = await self.get_on_since(index=-1) - return max(v for v in on_since.values()) + async def get_on_since(self) -> datetime.datetime: + """Return the maximum on-time of all outlets.""" + return max([await plug.get_on_since() for plug in self.plugs]) - async def get_on_since(self, *, index: Optional[int] = None) -> Any: - """ - Returns pretty-printed on-time - - :param index: plug index (-1 for all) - :return: datetime for on since - :rtype: datetime with index - Dict[int, str] without index - :raises SmartStripException: index out of bounds - """ - if index is None: - return await self.get_max_on_since() - - if index < 0: - on_since = {} - sys_info = await self.get_sys_info() - children = sys_info["children"] - - for plug in range(self.num_children): - child_ontime = children[plug]["on_time"] - on_since[plug] = datetime.datetime.now() - datetime.timedelta( - seconds=child_ontime - ) - return on_since - else: - self.raise_for_index(index) - return await self.plugs[index].get_on_since() - - async def get_state_information(self) -> Dict[str, Any]: - """ - Returns strip-specific state information. + async def state_information(self) -> Dict[str, Any]: + """Return strip-specific state information. :return: Strip information dict, keys in user-presentable form. :rtype: dict """ - state = {"LED state": await self.get_led()} # XXX: from where? - is_on = await self.get_is_on() - on_since = await self.get_on_since(index=-1) - for plug_index in range(self.num_children): - plug_number = plug_index + 1 - if is_on[plug_index]: - state["Plug %d on since" % plug_number] = on_since[plug_index] + state: Dict[str, Any] = {"LED state": self.led} + for plug in self.plugs: + if await plug.is_on(): + state["Plug %s on since" % str(plug)] = await plug.get_on_since() return state - async def get_emeter_realtime(self, *, index: int = -1) -> Optional[Any]: - """ - Retrieve current energy readings from device + async def current_consumption(self) -> float: + """Get the current power consumption in watts. - :param index: plug index (-1 for all) - :returns: list of current readings or None - :rtype: Dict, Dict[int, Dict], None - Dict if index is provided - Dict[int, Dict] if no index provided - None if device has no energy meter or error occurred + :return: the current power consumption in watts. + :rtype: float :raises SmartDeviceException: on error - :raises SmartStripException: index out of bounds """ - if not await self.get_has_emeter(): # pragma: no cover - raise SmartStripException("Device has no emeter") + consumption = sum([await plug.current_consumption() for plug in self.plugs]) - if index < 0: - emeter_status = {} - for plug in range(self.num_children): - emeter_status[plug] = await self.plugs[plug].get_emeter_realtime() - return emeter_status - else: - self.raise_for_index(index) - return await self.plugs[index].get_emeter_realtime() + return consumption - async def current_consumption(self, *, index: int = -1) -> Optional[Any]: - """ - Get the current power consumption in Watts. + async def icon(self) -> Dict: + """Icon for the device. - :param index: plug index (-1 for all) - :return: the current power consumption in Watts. - None if device has no energy meter. - :rtype: Dict, Dict[int, Dict], None - Dict if index is provided - Dict[int, Dict] if no index provided - None if device has no energy meter or error occurred - :raises SmartDeviceException: on error - :raises SmartStripException: index out of bounds - """ - if not await self.get_has_emeter(): # pragma: no cover - raise SmartStripException("Device has no emeter") - - if index < 0: - consumption = {} - emeter_reading = await self.get_emeter_realtime() - for plug in range(self.num_children): - response = EmeterStatus(emeter_reading[plug]) - consumption[plug] = response["power"] - return consumption - else: - self.raise_for_index(index) - response = EmeterStatus(await self.get_emeter_realtime(index=index)) - return response["power"] - - async def get_icon(self): - """Override for base class icon property, SmartStrip and children do not - have icons so we return dummy strings. + Overriden to keep the API, as the SmartStrip and children do not + have icons, we just return dummy strings. """ return {"icon": "SMARTSTRIP-DUMMY", "hash": "SMARTSTRIP-DUMMY"} - async def get_alias(self, *, index: Optional[int] = None) -> Union[str, Dict[int, str]]: - """Gets the alias for a plug. + async def set_alias(self, alias: str) -> None: + """Set the alias for the strip. - :param index: plug index (-1 for all) - :return: the current power consumption in Watts. - None if device has no energy meter. - :rtype: str if index is provided - Dict[int, str] if no index provided - :raises SmartStripException: index out of bounds - """ - if index is None: - return await super().get_alias() - - sys_info = await self.get_sys_info() - children = sys_info["children"] - - if index < 0: - alias = {} - for plug in range(self.num_children): - alias[plug] = children[plug]["alias"] - return alias - else: - self.raise_for_index(index) - return children[index]["alias"] - - async def set_alias(self, alias: str, *, index: Optional[int] = None): - """Sets the alias for a plug - - :param index: plug index :param alias: new alias :raises SmartDeviceException: on error - :raises SmartStripException: index out of bounds """ - # Renaming the whole strip - if index is None: - return await super().set_alias(alias) - - self.raise_for_index(index) - await self.plugs[index].set_alias(alias) + return await super().set_alias(alias) async def get_emeter_daily( - self, year: int = None, month: int = None, kwh: bool = True, *, index: int = -1 + self, year: int = None, month: int = None, kwh: bool = True ) -> Dict: - """Retrieve daily statistics for a given month + """Retrieve daily statistics for a given month. :param year: year for which to retrieve statistics (default: this year) :param month: month for which to retrieve statistics (default: this month) :param kwh: return usage in kWh (default: True) :return: mapping of day of month to value - None if device has no energy meter or error occurred :rtype: dict :raises SmartDeviceException: on error - :raises SmartStripException: index out of bounds """ - if not await self.get_has_emeter(): # pragma: no cover - raise SmartStripException("Device has no emeter") - - emeter_daily = {} - if index < 0: - for plug in range(self.num_children): - emeter_daily = await self.plugs[plug].get_emeter_daily( - year=year, month=month, kwh=kwh - ) - return emeter_daily - else: - self.raise_for_index(index) - return await self.plugs[index].get_emeter_daily( + emeter_daily: DefaultDict[int, float] = defaultdict(lambda: 0.0) + for plug in self.plugs: + plug_emeter_daily = await plug.get_emeter_daily( year=year, month=month, kwh=kwh ) + for day, value in plug_emeter_daily.items(): + emeter_daily[day] += value + return emeter_daily - async def get_emeter_monthly( - self, year: int = None, kwh: bool = True, *, index: int = -1 - ) -> Dict: + async def get_emeter_monthly(self, year: int = None, kwh: bool = True) -> Dict: """Retrieve monthly statistics for a given year. :param year: year for which to retrieve statistics (default: this year) :param kwh: return usage in kWh (default: True) :return: dict: mapping of month to value - None if device has no energy meter :rtype: dict :raises SmartDeviceException: on error - :raises SmartStripException: index out of bounds """ - if not await self.get_has_emeter(): # pragma: no cover - raise SmartStripException("Device has no emeter") + emeter_monthly: DefaultDict[int, float] = defaultdict(lambda: 0.0) + for plug in self.plugs: + plug_emeter_monthly = await plug.get_emeter_monthly(year=year, kwh=kwh) + for month, value in plug_emeter_monthly: + emeter_monthly[month] += value + return emeter_monthly - emeter_monthly = {} - if index < 0: - for plug in range(self.num_children): - emeter_monthly[plug] = await self.plugs[plug].get_emeter_monthly( - year=year, kwh=kwh - ) - return emeter_monthly - else: - self.raise_for_index(index) - return await self.plugs[index].get_emeter_monthly(year=year, kwh=kwh) + async def erase_emeter_stats(self): + """Erase energy meter statistics for all plugs. - async def erase_emeter_stats(self, *, index: int = -1) -> bool: - """Erase energy meter statistics - - :param index: plug index (-1 for all) - :return: True if statistics were deleted - False if device has no energy meter. - :rtype: bool :raises SmartDeviceException: on error - :raises SmartStripException: index out of bounds """ - if not await self.get_has_emeter(): # pragma: no cover - raise SmartStripException("Device has no emeter") - - if index < 0: - for plug in range(self.num_children): - await self.plugs[plug].erase_emeter_stats() - else: - self.raise_for_index(index) - await self.plugs[index].erase_emeter_stats() - - # As query_helper raises exception in case of failure, we have - # succeeded when we are this far. - return True + for plug in self.plugs: + await plug.erase_emeter_stats() diff --git a/pyHS100/tests/newfakes.py b/pyHS100/tests/newfakes.py index 7948b9f7..0636b1ec 100644 --- a/pyHS100/tests/newfakes.py +++ b/pyHS100/tests/newfakes.py @@ -24,14 +24,14 @@ def check_mode(x): if x in ["schedule", "none", "count_down"]: return x - raise Invalid("invalid mode {}".format(x)) + raise Invalid(f"invalid mode {x}") def lb_dev_state(x): if x in ["normal"]: return x - raise Invalid("Invalid dev_state {}".format(x)) + raise Invalid(f"Invalid dev_state {x}") TZ_SCHEMA = Schema( @@ -409,9 +409,7 @@ class FakeTransportProtocol(TPLinkSmartHomeProtocol): return error(target, cmd, msg="command not found") params = request[target][cmd] - _LOGGER.debug( - "Going to execute {}.{} (params: {}).. ".format(target, cmd, params) - ) + _LOGGER.debug(f"Going to execute {target}.{cmd} (params: {params}).. ") if callable(proto[target][cmd]): res = proto[target][cmd](self, params, child_ids) @@ -424,4 +422,4 @@ class FakeTransportProtocol(TPLinkSmartHomeProtocol): _LOGGER.debug("[static] %s.%s: %s", target, cmd, res) return success(target, cmd, res) else: - raise NotImplementedError("target {} cmd {}".format(target, cmd)) + raise NotImplementedError(f"target {target} cmd {cmd}") diff --git a/pyHS100/tests/test_fixtures.py b/pyHS100/tests/test_fixtures.py index 7613bc04..d20fcb00 100644 --- a/pyHS100/tests/test_fixtures.py +++ b/pyHS100/tests/test_fixtures.py @@ -4,7 +4,7 @@ from unittest.mock import patch import pytest -from pyHS100 import DeviceType, SmartDeviceException, SmartStripException, SmartStrip +from pyHS100 import DeviceType, SmartDeviceException, SmartStrip from .conftest import ( bulb, @@ -380,67 +380,44 @@ def test_non_variable_temp(dev): dev.sync.set_color_temp(2700) -@strip -def test_children_is_on(dev): - is_on = dev.sync.get_is_on() - for i in range(dev.num_children): - assert is_on[i] == dev.sync.get_is_on(index=i) - - @strip @turn_on def test_children_change_state(dev, turn_on): handle_turn_on(dev, turn_on) - for i in range(dev.num_children): - orig_state = dev.sync.get_is_on(index=i) + for plug in dev.plugs: + orig_state = plug.sync.is_on() if orig_state: - dev.sync.turn_off(index=i) - assert not dev.sync.get_is_on(index=i) - assert dev.sync.get_is_off(index=i) + plug.turn_off() + assert not plug.sync.is_on() + assert plug.sync.is_off() - dev.sync.turn_on(index=i) - assert dev.sync.get_is_on(index=i) - assert not dev.sync.get_is_off(index=i) + plug.sync.turn_on() + assert plug.sync.is_on() + assert not plug.sync.is_off() else: - dev.sync.turn_on(index=i) - assert dev.sync.get_is_on(index=i) - assert not dev.sync.get_is_off(index=i) - dev.sync.turn_off(index=i) - assert not dev.sync.get_is_on(index=i) - assert dev.sync.get_is_off(index=i) - - -@strip -def test_children_bounds(dev): - out_of_bounds = dev.num_children + 100 - - with pytest.raises(SmartDeviceException): - dev.sync.turn_off(index=out_of_bounds) - with pytest.raises(SmartDeviceException): - dev.sync.turn_on(index=out_of_bounds) - with pytest.raises(SmartDeviceException): - dev.sync.get_is_on(index=out_of_bounds) - with pytest.raises(SmartDeviceException): - dev.sync.get_alias(index=out_of_bounds) - with pytest.raises(SmartDeviceException): - dev.sync.get_on_since(index=out_of_bounds) + plug.sync.turn_on() + assert plug.sync.is_on() + assert not plug.sync.is_off() + plug.sync.turn_off() + assert not plug.sync.is_on() + assert plug.sync.is_off() @strip def test_children_alias(dev): - original = dev.sync.get_alias() test_alias = "TEST1234" - for idx in range(dev.num_children): - dev.sync.set_alias(alias=test_alias, index=idx) - assert dev.sync.get_alias(index=idx) == test_alias - dev.sync.set_alias(alias=original[idx], index=idx) - assert dev.sync.get_alias(index=idx) == original[idx] + for plug in dev.plugs: + original = plug.sync.get_alias() + plug.sync.set_alias(alias=test_alias) + assert plug.sync.get_alias() == test_alias + plug.sync.set_alias(alias=original) + assert plug.sync.get_alias() == original @strip def test_children_on_since(dev): - for idx in range(dev.num_children): - assert dev.sync.get_on_since(index=idx) + for plug in dev.plugs: + assert plug.sync.get_on_since() @pytest.mark.skip("this test will wear out your relays") @@ -492,80 +469,61 @@ def test_all_binary_states(dev): def test_children_get_emeter_realtime(dev): assert dev.sync.get_has_emeter() # test with index - for plug_index in range(dev.num_children): - emeter = dev.sync.get_emeter_realtime(index=plug_index) + for plug in dev.plugs: + emeter = plug.sync.get_emeter_realtime() CURRENT_CONSUMPTION_SCHEMA(emeter) # test without index - for index, emeter in dev.sync.get_emeter_realtime().items(): - CURRENT_CONSUMPTION_SCHEMA(emeter) + # TODO test that sum matches the sum of individiaul plugs. - # out of bounds - with pytest.raises(SmartStripException): - dev.sync.get_emeter_realtime(index=dev.num_children + 100) + # for index, emeter in dev.sync.get_emeter_realtime().items(): + # CURRENT_CONSUMPTION_SCHEMA(emeter) @strip def test_children_get_emeter_daily(dev): assert dev.sync.get_has_emeter() - # test with index - for plug_index in range(dev.num_children): - emeter = dev.sync.get_emeter_daily(year=1900, month=1, index=plug_index) + # test individual emeters + for plug in dev.plugs: + emeter = plug.sync.get_emeter_daily(year=1900, month=1) assert emeter == {} - emeter = dev.sync.get_emeter_daily(index=plug_index) + emeter = plug.sync.get_emeter_daily() assert len(emeter) > 0 k, v = emeter.popitem() assert isinstance(k, int) assert isinstance(v, float) - # test without index - all_emeters = dev.sync.get_emeter_daily(year=1900, month=1) - for plug_index, emeter in all_emeters.items(): - assert emeter == {} + # test sum of emeters + all_emeter = dev.sync.get_emeter_daily(year=1900, month=1) - emeter = dev.sync.get_emeter_daily() - - k, v = emeter.popitem() - assert isinstance(k, int) - assert isinstance(v, float) - - # out of bounds - with pytest.raises(SmartStripException): - dev.sync.get_emeter_daily(year=1900, month=1, index=dev.num_children + 100) + k, v = all_emeter.popitem() + assert isinstance(k, int) + assert isinstance(v, float) @strip def test_children_get_emeter_monthly(dev): assert dev.sync.get_has_emeter() - # test with index - for plug_index in range(dev.num_children): - emeter = dev.sync.get_emeter_monthly(year=1900, index=plug_index) + # test individual emeters + for plug in dev.plugs: + emeter = plug.sync.get_emeter_monthly(year=1900) assert emeter == {} - emeter = dev.sync.get_emeter_monthly(index=plug_index) + emeter = plug.sync.get_emeter_monthly() assert len(emeter) > 0 k, v = emeter.popitem() assert isinstance(k, int) assert isinstance(v, float) - # test without index - all_emeters = dev.sync.get_emeter_monthly(year=1900) - for index, emeter in all_emeters.items(): - assert emeter == {} + # test sum of emeters + all_emeter = dev.sync.get_emeter_monthly(year=1900) - emeter = dev.sync.get_emeter_daily() - assert len(emeter) > 0 - - k, v = emeter.popitem() - assert isinstance(k, int) - assert isinstance(v, float) - - # out of bounds - with pytest.raises(SmartStripException): - dev.sync.get_emeter_monthly(year=1900, index=dev.num_children + 100) + k, v = all_emeter.popitem() + assert isinstance(k, int) + assert isinstance(v, float) # def test_cache(dev): diff --git a/pyHS100/version.py b/pyHS100/version.py new file mode 100644 index 00000000..9bc7a520 --- /dev/null +++ b/pyHS100/version.py @@ -0,0 +1,2 @@ +# flake8: noqa +__version__ = "0.4.0.dev0" diff --git a/requirements.txt b/requirements.txt index 4983b2e1..1ccbaada 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ click click-datetime pre-commit +voluptuous diff --git a/setup.py b/setup.py index a5da4333..0f4fb94e 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,12 @@ from setuptools import setup +with open("pyHS100/version.py") as f: + exec(f.read()) + setup( name="pyHS100", - version="0.3.5", - description="Interface for TPLink HS1xx plugs, HS2xx wall switches & LB1xx bulbs", + version=__version__, # type: ignore # noqa: F821 + description="Python interface for TPLink KASA-enabled smart home devices", url="https://github.com/GadgetReactor/pyHS100", author="Sean Seah (GadgetReactor)", author_email="sean@gadgetreactor.com", @@ -11,6 +14,6 @@ setup( packages=["pyHS100"], install_requires=["click", "deprecation"], python_requires=">=3.6", - entry_points={"console_scripts": ["pyhs100=pyHS100.cli:cli",],}, + entry_points={"console_scripts": ["pyhs100=pyHS100.cli:cli"]}, zip_safe=False, ) diff --git a/tox.ini b/tox.ini index bcbc19d1..6e010f45 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist=py35,py36,py37,flake8 +envlist=py35,py36,py37,flake8,linting,typing skip_missing_interpreters = True [tox:travis] @@ -20,16 +20,31 @@ commands= py.test --cov --cov-config=tox.ini pyHS100 [testenv:flake8] -deps=flake8 +deps= + flake8 + flake8-docstrings commands=flake8 pyHS100 -max-line-length=88 [testenv:typing] +skip_install=true deps=mypy -commands=mypy --silent-imports pyHS100 +commands=mypy --ignore-missing-imports pyHS100 [flake8] -exclude = .git,.tox,__pycache__,pyHS100/tests/fakes.py +exclude = .git,.tox,__pycache__,pyHS100/tests/newfakes.py,pyHS100/tests/test_fixtures.py +max-line-length = 88 +per-file-ignores = + pyHS100/tests/*.py:D100,D101,D102,D103,D104 + setup.py:D100 +ignore = D105, D107, E203, E501, W503 +#ignore = E203, E266, E501, W503, F403, F401 +#max-complexity = 18 +#select = B,C,E,F,W,T4,B9 + +[testenv:lint] +deps = pre-commit +skip_install = true +commands = pre-commit run --all-files [coverage:run] source = pyHS100 @@ -43,3 +58,12 @@ exclude_lines = # ignore abstract methods raise NotImplementedError def __repr__ + +[isort] +multi_line_output=3 +include_trailing_comma=True +force_grid_wrap=0 +use_parentheses=True +line_length=88 +known_first_party=pyHS100 +known_third_party=click,deprecation,pytest,setuptools,voluptuous