From 6115d96c394e4b1c5488166251b252d1ca20be3d Mon Sep 17 00:00:00 2001 From: jimboca Date: Tue, 8 Jan 2019 11:13:25 -0800 Subject: [PATCH] Add support for HS300 power strip (#137) * discover runs, prints on since of device 0 * added preliminary support for HS300 * forgot to add smartdevice to commit * added index to CLI * clean up dirty code * added fake sysinfo_hs300 * changed device alias to match MAC * #131 Move _id_to_index into smartstrip so everyone can pass index * Update pyHS100/discover.py Co-Authored-By: jimboca * refactoring to deduplicate code between smarplug and smartstrip * fixing CI failures for devices without children * incorporating feedback from pull request. * fixing hound violation * changed internal store from list of dicts to dict * changed other methods to dictionary store as well * removed unused optional type from imports * changed plugs to Dict, remove redundant sys_info calls * added more functionality for smart strip, added smart strip tests * updated FakeTransportProtocol for devices with children * corrected hound violations * add click-datetime --- README.md | 2 + pyHS100/__init__.py | 3 +- pyHS100/cli.py | 44 +++- pyHS100/discover.py | 8 +- pyHS100/protocol.py | 0 pyHS100/smartdevice.py | 24 +- pyHS100/smartplug.py | 19 +- pyHS100/smartstrip.py | 397 ++++++++++++++++++++++++++++++ pyHS100/tests/fakes.py | 135 ++++++++++- pyHS100/tests/test_pyHS100.py | 2 +- pyHS100/tests/test_strip.py | 438 ++++++++++++++++++++++++++++++++++ requirements.txt | 2 +- 12 files changed, 1035 insertions(+), 39 deletions(-) mode change 100644 => 100755 pyHS100/__init__.py mode change 100644 => 100755 pyHS100/cli.py mode change 100644 => 100755 pyHS100/discover.py mode change 100644 => 100755 pyHS100/protocol.py mode change 100644 => 100755 pyHS100/smartdevice.py create mode 100755 pyHS100/smartstrip.py create mode 100644 pyHS100/tests/test_strip.py diff --git a/README.md b/README.md index 8a570338..e5849463 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ Python Library to control TPLink smart plugs/switches and smart bulbs. * HS103 * HS105 * HS110 +* Power Strips + * HS300 * Wall switches * HS200 * HS210 diff --git a/pyHS100/__init__.py b/pyHS100/__init__.py old mode 100644 new mode 100755 index 800c221c..9884488d --- a/pyHS100/__init__.py +++ b/pyHS100/__init__.py @@ -13,8 +13,9 @@ Module-specific errors are raised as `SmartDeviceException` and are expected to be handled by the user of the library. """ # flake8: noqa -from .smartdevice import SmartDevice, SmartDeviceException +from .smartdevice import SmartDevice, SmartDeviceException, EmeterStatus from .smartplug import SmartPlug from .smartbulb import SmartBulb +from .smartstrip import SmartStrip, SmartStripException from .protocol import TPLinkSmartHomeProtocol from .discover import Discover diff --git a/pyHS100/cli.py b/pyHS100/cli.py old mode 100644 new mode 100755 index 75d737ba..892331db --- a/pyHS100/cli.py +++ b/pyHS100/cli.py @@ -12,6 +12,7 @@ if sys.version_info < (3, 4): from pyHS100 import (SmartDevice, SmartPlug, SmartBulb, + SmartStrip, Discover) # noqa: E402 pass_dev = click.make_pass_decorator(SmartDevice) @@ -29,8 +30,9 @@ pass_dev = click.make_pass_decorator(SmartDevice) @click.option('--debug/--normal', default=False) @click.option('--bulb', default=False, is_flag=True) @click.option('--plug', default=False, is_flag=True) +@click.option('--strip', default=False, is_flag=True) @click.pass_context -def cli(ctx, ip, host, alias, debug, bulb, plug): +def cli(ctx, ip, host, alias, debug, bulb, plug, strip): """A cli tool for controlling TP-Link smart home plugs.""" if debug: logging.basicConfig(level=logging.DEBUG) @@ -58,15 +60,18 @@ def cli(ctx, ip, host, alias, debug, bulb, plug): ctx.invoke(discover) return else: - if not bulb and not plug: - click.echo("No --bulb nor --plug given, discovering..") + if not bulb and not plug and not strip: + click.echo("No --strip nor --bulb nor --plug given, discovering..") dev = Discover.discover_single(host) elif bulb: dev = SmartBulb(host) elif plug: dev = SmartPlug(host) + elif strip: + dev = SmartStrip(host) else: - click.echo("Unable to detect type, use --bulb or --plug!") + click.echo( + "Unable to detect type, use --strip or --bulb or --plug!") return ctx.obj = dev @@ -168,13 +173,22 @@ def emeter(dev, year, month, erase): dev.erase_emeter_stats() return - click.echo("Current state: %s" % dev.get_emeter_realtime()) if year: click.echo("== For year %s ==" % year.year) - click.echo(dev.get_emeter_monthly(year.year)) + emeter_status = dev.get_emeter_monthly(year.year) elif month: click.echo("== For month %s of %s ==" % (month.month, month.year)) - dev.get_emeter_daily(year=month.year, month=month.month) + emeter_status = dev.get_emeter_daily(year=month.year, + month=month.month) + else: + emeter_status = dev.get_emeter_realtime() + click.echo("== Current State ==") + + if isinstance(emeter_status, list): + for plug in emeter_status: + click.echo("Plug %d: %s" % (emeter_status.index(plug) + 1, plug)) + else: + click.echo("%s" % emeter_status) @cli.command() @@ -245,19 +259,27 @@ def time(dev): @cli.command() +@click.argument('index', type=int, required=False) @pass_dev -def on(plug): +def on(plug, index): """Turn the device on.""" click.echo("Turning on..") - plug.turn_on() + if index is None: + plug.turn_on() + else: + plug.turn_on(index=(index - 1)) @cli.command() +@click.argument('index', type=int, required=False) @pass_dev -def off(plug): +def off(plug, index): """Turn the device off.""" click.echo("Turning off..") - plug.turn_off() + if index is None: + plug.turn_off() + else: + plug.turn_off(index=(index - 1)) @cli.command() diff --git a/pyHS100/discover.py b/pyHS100/discover.py old mode 100644 new mode 100755 index f04c5845..e37d8e05 --- a/pyHS100/discover.py +++ b/pyHS100/discover.py @@ -3,7 +3,8 @@ import logging import json from typing import Dict, Type -from pyHS100 import TPLinkSmartHomeProtocol, SmartDevice, SmartPlug, SmartBulb +from pyHS100 import (TPLinkSmartHomeProtocol, SmartDevice, SmartPlug, + SmartBulb, SmartStrip) _LOGGER = logging.getLogger(__name__) @@ -98,7 +99,10 @@ class Discover: type = "UNKNOWN" else: _LOGGER.error("No 'system' nor 'get_sysinfo' in response") - if "smartplug" in type.lower(): + + if "smartplug" in type.lower() and "children" in sysinfo: + return SmartStrip + elif "smartplug" in type.lower(): return SmartPlug elif "smartbulb" in type.lower(): return SmartBulb diff --git a/pyHS100/protocol.py b/pyHS100/protocol.py old mode 100644 new mode 100755 diff --git a/pyHS100/smartdevice.py b/pyHS100/smartdevice.py old mode 100644 new mode 100755 index 778b8814..90464873 --- a/pyHS100/smartdevice.py +++ b/pyHS100/smartdevice.py @@ -74,17 +74,20 @@ class SmartDevice(object): def __init__(self, host: str, - protocol: Optional[TPLinkSmartHomeProtocol] = None) -> None: + protocol: Optional[TPLinkSmartHomeProtocol] = None, + context: str = None) -> None: """ Create a new SmartDevice instance. :param str host: host name or ip address on which the device listens + :param context: optional child ID for context in a parent device """ self.host = host if not protocol: protocol = TPLinkSmartHomeProtocol() self.protocol = protocol self.emeter_type = "emeter" # type: str + self.context = context def _query_helper(self, target: str, @@ -100,12 +103,17 @@ class SmartDevice(object): :rtype: dict :raises SmartDeviceException: if command was not executed correctly """ + if self.context is None: + request = {target: {cmd: arg}} + else: + request = {"context": {"child_ids": [self.context]}, + target: {cmd: arg}} if arg is None: arg = {} try: response = self.protocol.query( host=self.host, - request={target: {cmd: arg}} + request=request, ) except Exception as ex: raise SmartDeviceException('Communication error') from ex @@ -384,11 +392,11 @@ class SmartDevice(object): def get_emeter_realtime(self) -> Optional[Dict]: """ - Retrive current energy readings from device. + Retrieve current energy readings from device. :returns: current readings or False :rtype: dict, None - None if device has no energy meter or error occured + None if device has no energy meter or error occurred :raises SmartDeviceException: on error """ if not self.has_emeter: @@ -405,11 +413,11 @@ class SmartDevice(object): Retrieve daily statistics for a given month :param year: year for which to retrieve statistics (default: this year) - :param month: month for which to retrieve statistcs (default: this + :param month: month for which to retrieve statistics (default: this month) :param kwh: return usage in kWh (default: True) :return: mapping of day of month to value - None if device has no energy meter or error occured + None if device has no energy meter or error occurred :rtype: dict :raises SmartDeviceException: on error """ @@ -483,9 +491,9 @@ class SmartDevice(object): def current_consumption(self) -> Optional[float]: """ - Get the current power consumption in Watt. + Get the current power consumption in Watts. - :return: the current power consumption in Watt. + :return: the current power consumption in Watts. None if device has no energy meter. :raises SmartDeviceException: on error """ diff --git a/pyHS100/smartplug.py b/pyHS100/smartplug.py index 603dbde8..0f606758 100644 --- a/pyHS100/smartplug.py +++ b/pyHS100/smartplug.py @@ -33,9 +33,10 @@ class SmartPlug(SmartDevice): def __init__(self, host: str, - protocol: 'TPLinkSmartHomeProtocol' = None) -> None: - SmartDevice.__init__(self, host, protocol) - self.emeter_type = "emeter" + protocol: 'TPLinkSmartHomeProtocol' = None, + context: str = None) -> None: + SmartDevice.__init__(self, host, protocol, context) + self._type = "emeter" @property def state(self) -> str: @@ -126,7 +127,6 @@ class SmartPlug(SmartDevice): :return: True if switch supports brightness changes, False otherwise :rtype: bool - """ return "brightness" in self.sys_info @@ -193,8 +193,15 @@ class SmartPlug(SmartDevice): :return: datetime for on since :rtype: datetime """ - return datetime.datetime.now() - \ - datetime.timedelta(seconds=self.sys_info["on_time"]) + if self.context: + for plug in self.sys_info["children"]: + if plug["id"] == self.context: + on_time = plug["on_time"] + break + else: + on_time = self.sys_info["on_time"] + + return datetime.datetime.now() - datetime.timedelta(seconds=on_time) @property def state_information(self) -> Dict[str, Any]: diff --git a/pyHS100/smartstrip.py b/pyHS100/smartstrip.py new file mode 100755 index 00000000..633d4838 --- /dev/null +++ b/pyHS100/smartstrip.py @@ -0,0 +1,397 @@ +import datetime +import logging +from typing import Any, Dict, Optional, Union + +from pyHS100 import SmartPlug, SmartDeviceException, EmeterStatus + +_LOGGER = logging.getLogger(__name__) + + +class SmartStripException(SmartDeviceException): + """ + SmartStripException gets raised for errors specific to the smart strip. + """ + pass + + +class SmartStrip(SmartPlug): + """Representation of a TP-Link Smart Power Strip. + + Usage example when used as library: + p = SmartStrip("192.168.1.105") + # print the devices alias + print(p.alias) + # change state of plug + p.state = "ON" + p.state = "OFF" + # query and print current state of plug + print(p.state) + + Errors reported by the device are raised as SmartDeviceExceptions, + and should be handled by the user of the library. + + Note: + The library references the same structure as defined for the D-Link Switch + """ + + def __init__(self, + host: str, + protocol: 'TPLinkSmartHomeProtocol' = None) -> None: + SmartPlug.__init__(self, host, protocol) + self.emeter_type = "emeter" + self.plugs = {} + children = self.sys_info["children"] + self.num_children = len(children) + for plug in range(self.num_children): + self.plugs[plug] = SmartPlug(host, protocol, + context=children[plug]["id"]) + + def raise_for_index(self, index: int): + """ + Raises SmartStripException if the plug index is out of bounds + + :param index: plug index to check + :raises SmartStripException: index out of bounds + """ + if index not in range(self.num_children): + raise SmartStripException("plug index of %d " + "is out of bounds" % index) + + @property + def state(self) -> Dict[int, str]: + """ + Retrieve the switch state + + :returns: list with the state of each child plug + SWITCH_STATE_ON + SWITCH_STATE_OFF + SWITCH_STATE_UNKNOWN + :rtype: dict + """ + states = {} + children = self.sys_info["children"] + for plug in range(self.num_children): + relay_state = children[plug]["state"] + + if relay_state == 0: + switch_state = SmartPlug.SWITCH_STATE_OFF + elif relay_state == 1: + switch_state = SmartPlug.SWITCH_STATE_ON + else: + _LOGGER.warning("Unknown state %s returned for plug %u.", + relay_state, plug) + switch_state = SmartPlug.SWITCH_STATE_UNKNOWN + + states[plug] = switch_state + + return states + + @state.setter + def state(self, value: str): + """ + Sets the state of all plugs in the strip + + :param value: one of + SWITCH_STATE_ON + SWITCH_STATE_OFF + :raises ValueError: on invalid state + :raises SmartDeviceException: on error + """ + if not isinstance(value, str): + raise ValueError("State must be str, not of %s.", type(value)) + elif value.upper() == SmartPlug.SWITCH_STATE_ON: + self.turn_on() + elif value.upper() == SmartPlug.SWITCH_STATE_OFF: + self.turn_off() + else: + raise ValueError("State %s is not valid.", value) + + def set_state(self, value: str, *, index: int = -1): + """ + Sets the state of a plug on the strip + + :param value: one of + SWITCH_STATE_ON + SWITCH_STATE_OFF + :param index: plug index (-1 for all) + :raises ValueError: on invalid state + :raises SmartDeviceException: on error + :raises SmartStripException: index out of bounds + """ + if index < 0: + self.state = value + else: + self.raise_for_index(index) + self.plugs[index].state = value + + def is_on(self, *, index: int = -1) -> Any: + """ + Returns whether device is on. + + :param index: plug index (-1 for all) + :return: True if device is on, False otherwise, Dict without index + :rtype: bool if index is provided + Dict[int, bool] if no index provided + :raises SmartStripException: index out of bounds + """ + children = self.sys_info["children"] + if index < 0: + is_on = {} + for plug in range(self.num_children): + is_on[plug] = bool(children[plug]["state"]) + return is_on + else: + self.raise_for_index(index) + return bool(children[index]["state"]) + + def turn_on(self, *, index: int = -1): + """ + Turns outlets on + + :param index: plug index (-1 for all) + :raises SmartDeviceException: on error + :raises SmartStripException: index out of bounds + """ + if index < 0: + self._query_helper("system", "set_relay_state", {"state": 1}) + else: + self.raise_for_index(index) + self.plugs[index].turn_on() + + def turn_off(self, *, index: int = -1): + """ + Turns outlets off + + :param index: plug index (-1 for all) + :raises SmartDeviceException: on error + :raises SmartStripException: index out of bounds + """ + if index < 0: + self._query_helper("system", "set_relay_state", {"state": 0}) + else: + self.raise_for_index(index) + self.plugs[index].turn_off() + + def on_since(self, *, index: int = -1) -> Any: + """ + Returns pretty-printed on-time + + :param index: plug index (-1 for all) + :return: datetime for on since + :rtype: datetime with index + Dict[int, str] without index + :raises SmartStripException: index out of bounds + """ + if index < 0: + on_since = {} + children = self.sys_info["children"] + for plug in range(self.num_children): + on_since[plug] = \ + datetime.datetime.now() - \ + datetime.timedelta(seconds=children[plug]["on_time"]) + return on_since + else: + self.raise_for_index(index) + return self.plugs[index].on_since + + @property + def state_information(self) -> Dict[str, Any]: + """ + Returns strip-specific state information. + + :return: Strip information dict, keys in user-presentable form. + :rtype: dict + """ + state = {'LED state': self.led} + on_since = self.on_since() + for plug_index in range(self.num_children): + state['Plug %d on since' % (plug_index + 1)] = on_since[plug_index] + return state + + def get_emeter_realtime(self, *, index: int = -1) -> Optional[Any]: + """ + Retrieve current energy readings from device + + :param index: plug index (-1 for all) + :returns: list of current readings or None + :rtype: Dict, Dict[int, Dict], None + Dict if index is provided + Dict[int, Dict] if no index provided + None if device has no energy meter or error occurred + :raises SmartDeviceException: on error + :raises SmartStripException: index out of bounds + """ + if not self.has_emeter: + return None + + if index < 0: + emeter_status = {} + for plug in range(self.num_children): + emeter_status[plug] = self.plugs[plug].get_emeter_realtime() + return emeter_status + else: + self.raise_for_index(index) + return self.plugs[index].get_emeter_realtime() + + def current_consumption(self, *, index: int = -1) -> Optional[Any]: + """ + Get the current power consumption in Watts. + + :param index: plug index (-1 for all) + :return: the current power consumption in Watts. + None if device has no energy meter. + :rtype: Dict, Dict[int, Dict], None + Dict if index is provided + Dict[int, Dict] if no index provided + None if device has no energy meter or error occurred + :raises SmartDeviceException: on error + :raises SmartStripException: index out of bounds + """ + if not self.has_emeter: + return None + + if index < 0: + consumption = {} + emeter_reading = self.get_emeter_realtime() + for plug in range(self.num_children): + response = EmeterStatus(emeter_reading[plug]) + consumption[plug] = response["power"] + return consumption + else: + self.raise_for_index(index) + response = EmeterStatus(self.get_emeter_realtime(index=index)) + return response["power"] + + @property + def icon(self): + """ + Override for base class icon property, SmartStrip and children do not + have icons. + + :raises NotImplementedError: always + """ + raise NotImplementedError("no icons for this device") + + def get_alias(self, *, index: int = -1) -> Union[str, Dict[int, str]]: + """ + Gets the alias for a plug. + + :param index: plug index (-1 for all) + :return: the current power consumption in Watts. + None if device has no energy meter. + :rtype: str if index is provided + Dict[int, str] if no index provided + :raises SmartStripException: index out of bounds + """ + children = self.sys_info["children"] + + if index < 0: + alias = {} + for plug in range(self.num_children): + alias[plug] = children[plug]["alias"] + return alias + else: + self.raise_for_index(index) + return children[index]["alias"] + + def set_alias(self, alias: str, index: int): + """ + Sets the alias for a plug + + :param index: plug index + :param alias: new alias + :raises SmartDeviceException: on error + :raises SmartStripException: index out of bounds + """ + self.raise_for_index(index) + self.plugs[index].alias = alias + + def get_emeter_daily(self, + year: int = None, + month: int = None, + kwh: bool = True, + *, + index: int = -1) -> Optional[Dict]: + """ + Retrieve daily statistics for a given month + + :param year: year for which to retrieve statistics (default: this year) + :param month: month for which to retrieve statistics (default: this + month) + :param kwh: return usage in kWh (default: True) + :return: mapping of day of month to value + None if device has no energy meter or error occurred + :rtype: dict + :raises SmartDeviceException: on error + :raises SmartStripException: index out of bounds + """ + if not self.has_emeter: + return None + + emeter_daily = {} + if index < 0: + for plug in range(self.num_children): + emeter_daily = self.plugs[plug].get_emeter_daily(year=year, + month=month, + kwh=kwh) + return emeter_daily + else: + self.raise_for_index(index) + return self.plugs[index].get_emeter_daily(year=year, + month=month, + kwh=kwh) + + def get_emeter_monthly(self, + year: int = None, + kwh: bool = True, + *, + index: int = -1) -> Optional[Dict]: + """ + Retrieve monthly statistics for a given year. + + :param year: year for which to retrieve statistics (default: this year) + :param kwh: return usage in kWh (default: True) + :return: dict: mapping of month to value + None if device has no energy meter + :rtype: dict + :raises SmartDeviceException: on error + :raises SmartStripException: index out of bounds + """ + if not self.has_emeter: + return None + + emeter_monthly = {} + if index < 0: + for plug in range(self.num_children): + emeter_monthly = self.plugs[plug].get_emeter_monthly(year=year, + kwh=kwh) + return emeter_monthly + else: + self.raise_for_index(index) + return self.plugs[index].get_emeter_monthly(year=year, + kwh=kwh) + + def erase_emeter_stats(self, *, index: int = -1) -> bool: + """ + Erase energy meter statistics + + :param index: plug index (-1 for all) + :return: True if statistics were deleted + False if device has no energy meter. + :rtype: bool + :raises SmartDeviceException: on error + :raises SmartStripException: index out of bounds + """ + if not self.has_emeter: + return False + + if index < 0: + for plug in range(self.num_children): + self.plugs[plug].erase_emeter_stats() + else: + self.raise_for_index(index) + self.plugs[index].erase_emeter_stats() + + # As query_helper raises exception in case of failure, we have + # succeeded when we are this far. + return True diff --git a/pyHS100/tests/fakes.py b/pyHS100/tests/fakes.py index fa172a9f..5fc34409 100644 --- a/pyHS100/tests/fakes.py +++ b/pyHS100/tests/fakes.py @@ -5,35 +5,42 @@ import logging _LOGGER = logging.getLogger(__name__) -def get_realtime(obj, x): + +def get_realtime(obj, x, child_ids=[]): return {"current":0.268587,"voltage":125.836131,"power":33.495623,"total":0.199000} -def get_monthstat(obj, x): + +def get_monthstat(obj, x, child_ids=[]): if x["year"] < 2016: return {"month_list":[]} return {"month_list": [{"year": 2016, "month": 11, "energy": 1.089000}, {"year": 2016, "month": 12, "energy": 1.582000}]} -def get_daystat(obj, x): + +def get_daystat(obj, x, child_ids=[]): if x["year"] < 2016: return {"day_list":[]} return {"day_list": [{"year": 2016, "month": 11, "day": 24, "energy": 0.026000}, {"year": 2016, "month": 11, "day": 25, "energy": 0.109000}]} + emeter_support = {"get_realtime": get_realtime, "get_monthstat": get_monthstat, "get_daystat": get_daystat,} + def get_realtime_units(obj, x): return {"power_mw": 10800} + def get_monthstat_units(obj, x): if x["year"] < 2016: return {"month_list":[]} return {"month_list": [{"year": 2016, "month": 11, "energy_wh": 32}, {"year": 2016, "month": 12, "energy_wh": 16}]} + def get_daystat_units(obj, x): if x["year"] < 2016: return {"day_list":[]} @@ -41,10 +48,91 @@ def get_daystat_units(obj, x): return {"day_list": [{"year": 2016, "month": 11, "day": 24, "energy_wh": 20}, {"year": 2016, "month": 11, "day": 25, "energy_wh": 32}]} + emeter_units_support = {"get_realtime": get_realtime_units, "get_monthstat": get_monthstat_units, "get_daystat": get_daystat_units,} +sysinfo_hs300 = { + 'system': { + 'get_sysinfo': { + 'sw_ver': '1.0.6 Build 180627 Rel.081000', + 'hw_ver': '1.0', + 'model': 'HS300(US)', + 'deviceId': '7003ADE7030B7EFADE747104261A7A70931DADF4', + 'oemId': 'FFF22CFF774A0B89F7624BFC6F50D5DE', + 'hwId': '22603EA5E716DEAEA6642A30BE87AFCB', + 'rssi': -53, + 'longitude_i': -1198698, + 'latitude_i': 352737, + 'alias': 'TP-LINK_Power Strip_2233', + 'mic_type': 'IOT.SMARTPLUGSWITCH', + 'feature': 'TIM:ENE', + 'mac': '50:C7:BF:11:22:33', + 'updating': 0, + 'led_off': 0, + 'children': [ + { + 'id': '7003ADE7030B7EFADE747104261A7A70931DADF400', + 'state': 1, + 'alias': 'my plug 1 device', + 'on_time': 5423, + 'next_action': { + 'type': -1 + } + }, + { + 'id': '7003ADE7030B7EFADE747104261A7A70931DADF401', + 'state': 1, + 'alias': 'my plug 2 device', + 'on_time': 4750, + 'next_action': { + 'type': -1 + } + }, + { + 'id': '7003ADE7030B7EFADE747104261A7A70931DADF402', + 'state': 1, + 'alias': 'my plug 3 device', + 'on_time': 4748, + 'next_action': { + 'type': -1 + } + }, + { + 'id': '7003ADE7030B7EFADE747104261A7A70931DADF403', + 'state': 1, + 'alias': 'my plug 4 device', + 'on_time': 4742, + 'next_action': { + 'type': -1 + } + }, + { + 'id': '7003ADE7030B7EFADE747104261A7A70931DADF404', + 'state': 1, + 'alias': 'my plug 5 device', + 'on_time': 4745, + 'next_action': { + 'type': -1 + } + }, + { + 'id': '7003ADE7030B7EFADE747104261A7A70931DADF405', + 'state': 1, + 'alias': 'my plug 6 device', + 'on_time': 5028, + 'next_action': { + 'type': -1 + } + } + ], + 'child_num': 6, + 'err_code': 0 + } + } +} + sysinfo_hs100 = {'system': {'get_sysinfo': {'active_mode': 'schedule', 'alias': 'My Smart Plug', @@ -484,13 +572,28 @@ class FakeTransportProtocol(TPLinkSmartHomeProtocol): self.proto = proto self.invalid = invalid - def set_alias(self, x): + def set_alias(self, x, child_ids=[]): _LOGGER.debug("Setting alias to %s", x["alias"]) - self.proto["system"]["get_sysinfo"]["alias"] = x["alias"] + if child_ids: + for child in self.proto["system"]["get_sysinfo"]["children"]: + if child["id"] in child_ids: + child["alias"] = x["alias"] + else: + self.proto["system"]["get_sysinfo"]["alias"] = x["alias"] - def set_relay_state(self, x): - _LOGGER.debug("Setting relay state to %s", x) - self.proto["system"]["get_sysinfo"]["relay_state"] = x["state"] + def set_relay_state(self, x, child_ids=[]): + _LOGGER.debug("Setting relay state to %s", x["state"]) + + if not child_ids and "children" in self.proto["system"]["get_sysinfo"]: + for child in self.proto["system"]["get_sysinfo"]["children"]: + child_ids.append(child["id"]) + + if child_ids: + for child in self.proto["system"]["get_sysinfo"]["children"]: + if child["id"] in child_ids: + child["state"] = x["state"] + else: + self.proto["system"]["get_sysinfo"]["relay_state"] = x["state"] def set_led_off(self, x): _LOGGER.debug("Setting led off to %s", x) @@ -516,6 +619,7 @@ class FakeTransportProtocol(TPLinkSmartHomeProtocol): "get_dev_icon": {"icon": None, "hash": None}, "set_mac_addr": set_mac, "get_sysinfo": None, + "context": None, }, "emeter": { "get_realtime": None, "get_daystat": None, @@ -537,14 +641,24 @@ class FakeTransportProtocol(TPLinkSmartHomeProtocol): # HS220 brightness, different setter and getter "smartlife.iot.dimmer": { "set_brightness": set_hs220_brightness, }, + "context": {"child_ids": None}, } def query(self, host, request, port=9999): if self.invalid: raise SmartDeviceException("Invalid connection, can't query!") + _LOGGER.debug("Requesting {} from {}:{}".format(request, host, port)) + proto = self.proto + # collect child ids from context + try: + child_ids = request["context"]["child_ids"] + request.pop("context", None) + except KeyError: + child_ids = [] + target = next(iter(request)) if target not in proto.keys(): return error(target, msg="target not found") @@ -557,7 +671,10 @@ class FakeTransportProtocol(TPLinkSmartHomeProtocol): _LOGGER.debug("Going to execute {}.{} (params: {}).. ".format(target, cmd, params)) if callable(proto[target][cmd]): - res = proto[target][cmd](self, params) + if child_ids: + res = proto[target][cmd](self, params, child_ids) + else: + res = proto[target][cmd](self, params) # verify that change didn't break schema, requires refactoring.. #TestSmartPlug.sysinfo_schema(self.proto["system"]["get_sysinfo"]) return success(target, cmd, res) diff --git a/pyHS100/tests/test_pyHS100.py b/pyHS100/tests/test_pyHS100.py index e9aa4763..cb377980 100644 --- a/pyHS100/tests/test_pyHS100.py +++ b/pyHS100/tests/test_pyHS100.py @@ -84,7 +84,7 @@ class TestSmartPlugHS100(TestCase): 'total': Any(Coerce(float, Range(min=0)), None), 'current': Any(All(float, Range(min=0)), None), - 'voltage_mw': Any(All(float, Range(min=0, max=300000)), None), + 'voltage_mv': Any(All(float, Range(min=0, max=300000)), None), 'power_mw': Any(Coerce(float, Range(min=0)), None), 'total_wh': Any(Coerce(float, Range(min=0)), None), 'current_ma': Any(All(float, Range(min=0)), None), diff --git a/pyHS100/tests/test_strip.py b/pyHS100/tests/test_strip.py new file mode 100644 index 00000000..d84ef7fc --- /dev/null +++ b/pyHS100/tests/test_strip.py @@ -0,0 +1,438 @@ +from unittest import TestCase, skip +from voluptuous import Schema, All, Any, Range, Coerce +import datetime + +from .. import SmartStrip, SmartPlug, SmartStripException, SmartDeviceException +from .fakes import FakeTransportProtocol, sysinfo_hs300 +from .test_pyHS100 import check_mac, check_int_bool + +# Set IP instead of None if you want to run tests on a device. +STRIP_IP = None + + +class TestSmartStripHS300(TestCase): + SYSINFO = sysinfo_hs300 # type: Dict + # these schemas should go to the mainlib as + # they can be useful when adding support for new features/devices + # as well as to check that faked devices are operating properly. + sysinfo_schema = Schema({ + "sw_ver": str, + "hw_ver": str, + "model": str, + "deviceId": str, + "oemId": str, + "hwId": str, + "rssi": Any(int, None), # rssi can also be positive, see #54 + "longitude": Any(All(int, Range(min=-1800000, max=1800000)), None), + "latitude": Any(All(int, Range(min=-900000, max=900000)), None), + "longitude_i": Any(All(int, Range(min=-1800000, max=1800000)), None), + "latitude_i": Any(All(int, Range(min=-900000, max=900000)), None), + "alias": str, + "mic_type": str, + "feature": str, + "mac": check_mac, + "updating": check_int_bool, + "led_off": check_int_bool, + "children": [{ + "id": str, + "state": int, + "alias": str, + "on_time": int, + "next_action": {"type": int}, + }], + "child_num": int, + "err_code": int, + }) + + current_consumption_schema = Schema( + Any( + { + "voltage": Any(All(float, Range(min=0, max=300)), None), + "power": Any(Coerce(float, Range(min=0)), None), + "total": Any(Coerce(float, Range(min=0)), None), + "current": Any(All(float, Range(min=0)), None), + + "voltage_mv": Any(All(int, Range(min=0, max=300000)), None), + "power_mw": Any(Coerce(int, Range(min=0)), None), + "total_wh": Any(Coerce(int, Range(min=0)), None), + "current_ma": Any(All(int, Range(min=0)), None), + }, + None + ) + ) + + tz_schema = Schema({ + "zone_str": str, + "dst_offset": int, + "index": All(int, Range(min=0)), + "tz_str": str, + }) + + def setUp(self): + if STRIP_IP is not None: + self.strip = SmartStrip(STRIP_IP) + else: + self.strip = SmartStrip( + host="127.0.0.1", + protocol=FakeTransportProtocol(self.SYSINFO) + ) + + def tearDown(self): + self.strip = None + + def test_initialize(self): + self.assertIsNotNone(self.strip.sys_info) + self.assertTrue(self.strip.num_children) + self.sysinfo_schema(self.strip.sys_info) + + def test_initialize_invalid_connection(self): + with self.assertRaises(SmartDeviceException): + SmartStrip( + host="127.0.0.1", + protocol=FakeTransportProtocol(self.SYSINFO, invalid=True)) + + def test_query_helper(self): + with self.assertRaises(SmartDeviceException): + self.strip._query_helper("test", "testcmd", {}) + + def test_raise_for_index(self): + with self.assertRaises(SmartStripException): + self.strip.raise_for_index(index=self.strip.num_children + 100) + + def test_state_strip(self): + with self.assertRaises(ValueError): + self.strip.state = 1234 + with self.assertRaises(ValueError): + self.strip.state = "1234" + with self.assertRaises(ValueError): + self.strip.state = True + + orig_state = self.strip.state + if orig_state == SmartPlug.SWITCH_STATE_OFF: + self.strip.state = "ON" + self.assertTrue(self.strip.state == SmartPlug.SWITCH_STATE_ON) + self.strip.state = "OFF" + self.assertTrue(self.strip.state == SmartPlug.SWITCH_STATE_OFF) + elif orig_state == SmartPlug.SWITCH_STATE_ON: + self.strip.state = "OFF" + self.assertTrue(self.strip.state == SmartPlug.SWITCH_STATE_OFF) + self.strip.state = "ON" + self.assertTrue(self.strip.state == SmartPlug.SWITCH_STATE_ON) + elif orig_state == SmartPlug.SWITCH_STATE_UNKNOWN: + self.fail("can't test for unknown state") + + def test_state_plugs(self): + # value errors + for plug_index in range(self.strip.num_children): + with self.assertRaises(ValueError): + self.strip.set_state(value=1234, index=plug_index) + + with self.assertRaises(ValueError): + self.strip.set_state(value="1234", index=plug_index) + + with self.assertRaises(ValueError): + self.strip.set_state(value=True, index=plug_index) + + # out of bounds error + with self.assertRaises(SmartStripException): + self.strip.set_state( + value=SmartPlug.SWITCH_STATE_ON, + index=self.strip.num_children + 100 + ) + + # on off + for plug_index in range(self.strip.num_children): + orig_state = self.strip.state[plug_index] + if orig_state == SmartPlug.SWITCH_STATE_OFF: + self.strip.set_state(value="ON", index=plug_index) + self.assertTrue( + self.strip.state[plug_index] == SmartPlug.SWITCH_STATE_ON) + self.strip.set_state(value="OFF", index=plug_index) + self.assertTrue( + self.strip.state[plug_index] == SmartPlug.SWITCH_STATE_OFF) + elif orig_state == SmartPlug.SWITCH_STATE_ON: + self.strip.set_state(value="OFF", index=plug_index) + self.assertTrue( + self.strip.state[plug_index] == SmartPlug.SWITCH_STATE_OFF) + self.strip.set_state(value="ON", index=plug_index) + self.assertTrue( + self.strip.state[plug_index] == SmartPlug.SWITCH_STATE_ON) + elif orig_state == SmartPlug.SWITCH_STATE_UNKNOWN: + self.fail("can't test for unknown state") + + def test_turns_and_isses(self): + # all on + self.strip.turn_on() + for index, state in self.strip.is_on().items(): + self.assertTrue(state) + self.assertTrue(self.strip.is_on(index=index) == state) + + # all off + self.strip.turn_off() + for index, state in self.strip.is_on().items(): + self.assertFalse(state) + self.assertTrue(self.strip.is_on(index=index) == state) + + # individual on + for plug_index in range(self.strip.num_children): + original_states = self.strip.is_on() + self.strip.turn_on(index=plug_index) + + # only target outlet should have state changed + for index, state in self.strip.is_on().items(): + if index == plug_index: + self.assertTrue(state != original_states[index]) + else: + self.assertTrue(state == original_states[index]) + + # individual off + for plug_index in range(self.strip.num_children): + original_states = self.strip.is_on() + self.strip.turn_off(index=plug_index) + + # only target outlet should have state changed + for index, state in self.strip.is_on().items(): + if index == plug_index: + self.assertTrue(state != original_states[index]) + else: + self.assertTrue(state == original_states[index]) + + # out of bounds + with self.assertRaises(SmartStripException): + self.strip.turn_off(index=self.strip.num_children + 100) + with self.assertRaises(SmartStripException): + self.strip.turn_on(index=self.strip.num_children + 100) + with self.assertRaises(SmartStripException): + self.strip.is_on(index=self.strip.num_children + 100) + + @skip("this test will wear out your relays") + def test_all_binary_states(self): + # test every binary state + for state in range(2 ** self.strip.num_children): + + # create binary state map + state_map = {} + for plug_index in range(self.strip.num_children): + state_map[plug_index] = bool((state >> plug_index) & 1) + + if state_map[plug_index]: + self.strip.turn_on(index=plug_index) + else: + self.strip.turn_off(index=plug_index) + + # check state map applied + for index, state in self.strip.is_on().items(): + self.assertTrue(state_map[index] == state) + + # toggle each outlet with state map applied + for plug_index in range(self.strip.num_children): + + # toggle state + if state_map[plug_index]: + self.strip.turn_off(index=plug_index) + else: + self.strip.turn_on(index=plug_index) + + # only target outlet should have state changed + for index, state in self.strip.is_on().items(): + if index == plug_index: + self.assertTrue(state != state_map[index]) + else: + self.assertTrue(state == state_map[index]) + + # reset state + if state_map[plug_index]: + self.strip.turn_on(index=plug_index) + else: + self.strip.turn_off(index=plug_index) + + # original state map should be restored + for index, state in self.strip.is_on().items(): + self.assertTrue(state == state_map[index]) + + def test_has_emeter(self): + # a not so nice way for checking for emeter availability.. + if "HS300" in self.strip.sys_info["model"]: + self.assertTrue(self.strip.has_emeter) + else: + self.assertFalse(self.strip.has_emeter) + + def test_get_emeter_realtime(self): + if self.strip.has_emeter: + # test with index + for plug_index in range(self.strip.num_children): + emeter = self.strip.get_emeter_realtime(index=plug_index) + self.current_consumption_schema(emeter) + + # test without index + for index, emeter in self.strip.get_emeter_realtime().items(): + self.current_consumption_schema(emeter) + + # out of bounds + with self.assertRaises(SmartStripException): + self.strip.get_emeter_realtime( + index=self.strip.num_children + 100 + ) + else: + self.assertEqual(self.strip.get_emeter_realtime(), None) + + def test_get_emeter_daily(self): + if self.strip.has_emeter: + # test with index + for plug_index in range(self.strip.num_children): + emeter = self.strip.get_emeter_daily(year=1900, month=1, + index=plug_index) + self.assertEqual(emeter, {}) + if len(emeter) < 1: + print("no emeter daily information, skipping..") + return + k, v = emeter.popitem() + self.assertTrue(isinstance(k, int)) + self.assertTrue(isinstance(v, float)) + + # test without index + all_emeter = self.strip.get_emeter_daily(year=1900, month=1) + for index, emeter in all_emeter.items(): + self.assertEqual(emeter, {}) + if len(emeter) < 1: + print("no emeter daily information, skipping..") + return + k, v = emeter.popitem() + self.assertTrue(isinstance(k, int)) + self.assertTrue(isinstance(v, float)) + + # out of bounds + with self.assertRaises(SmartStripException): + self.strip.get_emeter_daily( + year=1900, + month=1, + index=self.strip.num_children + 100 + ) + else: + self.assertEqual( + self.strip.get_emeter_daily(year=1900, month=1), None) + + def test_get_emeter_monthly(self): + if self.strip.has_emeter: + # test with index + for plug_index in range(self.strip.num_children): + emeter = self.strip.get_emeter_monthly(year=1900, + index=plug_index) + self.assertEqual(emeter, {}) + if len(emeter) < 1: + print("no emeter daily information, skipping..") + return + k, v = emeter.popitem() + self.assertTrue(isinstance(k, int)) + self.assertTrue(isinstance(v, float)) + + # test without index + all_emeter = self.strip.get_emeter_monthly(year=1900) + for index, emeter in all_emeter.items(): + self.assertEqual(emeter, {}) + if len(emeter) < 1: + print("no emeter daily information, skipping..") + return + k, v = emeter.popitem() + self.assertTrue(isinstance(k, int)) + self.assertTrue(isinstance(v, float)) + + # out of bounds + with self.assertRaises(SmartStripException): + self.strip.get_emeter_monthly( + year=1900, + index=self.strip.num_children + 100 + ) + else: + self.assertEqual(self.strip.get_emeter_monthly(year=1900), None) + + @skip("not clearing your stats..") + def test_erase_emeter_stats(self): + self.fail() + + def test_current_consumption(self): + if self.strip.has_emeter: + # test with index + for plug_index in range(self.strip.num_children): + emeter = self.strip.current_consumption(index=plug_index) + self.assertTrue(isinstance(emeter, float)) + self.assertTrue(emeter >= 0.0) + + # test without index + for index, emeter in self.strip.current_consumption().items(): + self.assertTrue(isinstance(emeter, float)) + self.assertTrue(emeter >= 0.0) + + # out of bounds + with self.assertRaises(SmartStripException): + self.strip.current_consumption( + index=self.strip.num_children + 100 + ) + else: + self.assertEqual(self.strip.current_consumption(), None) + + def test_alias(self): + test_alias = "TEST1234" + + # strip alias + original = self.strip.alias + self.assertTrue(isinstance(original, str)) + self.strip.alias = test_alias + self.assertEqual(self.strip.alias, test_alias) + self.strip.alias = original + self.assertEqual(self.strip.alias, original) + + # plug alias + original = self.strip.get_alias() + for plug in range(self.strip.num_children): + self.strip.set_alias(alias=test_alias, index=plug) + self.assertEqual(self.strip.get_alias(index=plug), test_alias) + self.strip.set_alias(alias=original[plug], index=plug) + self.assertEqual(self.strip.get_alias(index=plug), original[plug]) + + def test_led(self): + original = self.strip.led + + self.strip.led = False + self.assertFalse(self.strip.led) + self.strip.led = True + self.assertTrue(self.strip.led) + self.strip.led = original + + def test_icon(self): + with self.assertRaises(NotImplementedError): + self.strip.icon + + def test_time(self): + self.assertTrue(isinstance(self.strip.time, datetime.datetime)) + # TODO check setting? + + def test_timezone(self): + self.tz_schema(self.strip.timezone) + + def test_hw_info(self): + self.sysinfo_schema(self.strip.hw_info) + + def test_on_since(self): + # out of bounds + with self.assertRaises(SmartStripException): + self.strip.on_since(index=self.strip.num_children + 1) + + # individual on_since + for plug_index in range(self.strip.num_children): + self.assertTrue(isinstance( + self.strip.on_since(index=plug_index), datetime.datetime)) + + # all on_since + for index, plug_on_since in self.strip.on_since().items(): + self.assertTrue(isinstance(plug_on_since, datetime.datetime)) + + def test_location(self): + print(self.strip.location) + self.sysinfo_schema(self.strip.location) + + def test_rssi(self): + self.sysinfo_schema({'rssi': self.strip.rssi}) # wrapping for vol + + def test_mac(self): + self.sysinfo_schema({'mac': self.strip.mac}) # wrapping for vol diff --git a/requirements.txt b/requirements.txt index 09e8c8ac..a9e2a2bb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ click -click-datetime \ No newline at end of file +click-datetime