Add support for HS300 power strip (#137)

* discover runs, prints on since of device 0

* added preliminary support for HS300

* forgot to add smartdevice to commit

* added index to CLI

* clean up dirty code

* added fake sysinfo_hs300

* changed device alias to match MAC

* #131 Move _id_to_index into smartstrip so everyone can pass index

* Update pyHS100/discover.py

Co-Authored-By: jimboca <jimboca3@gmail.com>

* refactoring to deduplicate code between smarplug and smartstrip

* fixing CI failures for devices without children

* incorporating feedback from pull request.

* fixing hound violation

* changed internal store from list of dicts to dict

* changed other methods to dictionary store as well

* removed unused optional type from imports

* changed plugs to Dict, remove redundant sys_info calls

* added more functionality for smart strip, added smart strip tests

* updated FakeTransportProtocol for devices with children

* corrected hound violations

* add click-datetime
This commit is contained in:
jimboca 2019-01-08 11:13:25 -08:00 committed by Teemu R
parent ae53e8de1e
commit 6115d96c39
12 changed files with 1035 additions and 39 deletions

View File

@ -14,6 +14,8 @@ Python Library to control TPLink smart plugs/switches and smart bulbs.
* HS103
* HS105
* HS110
* Power Strips
* HS300
* Wall switches
* HS200
* HS210

3
pyHS100/__init__.py Normal file → Executable file
View File

@ -13,8 +13,9 @@ Module-specific errors are raised as `SmartDeviceException` and are expected
to be handled by the user of the library.
"""
# flake8: noqa
from .smartdevice import SmartDevice, SmartDeviceException
from .smartdevice import SmartDevice, SmartDeviceException, EmeterStatus
from .smartplug import SmartPlug
from .smartbulb import SmartBulb
from .smartstrip import SmartStrip, SmartStripException
from .protocol import TPLinkSmartHomeProtocol
from .discover import Discover

40
pyHS100/cli.py Normal file → Executable file
View File

@ -12,6 +12,7 @@ if sys.version_info < (3, 4):
from pyHS100 import (SmartDevice,
SmartPlug,
SmartBulb,
SmartStrip,
Discover) # noqa: E402
pass_dev = click.make_pass_decorator(SmartDevice)
@ -29,8 +30,9 @@ pass_dev = click.make_pass_decorator(SmartDevice)
@click.option('--debug/--normal', default=False)
@click.option('--bulb', default=False, is_flag=True)
@click.option('--plug', default=False, is_flag=True)
@click.option('--strip', default=False, is_flag=True)
@click.pass_context
def cli(ctx, ip, host, alias, debug, bulb, plug):
def cli(ctx, ip, host, alias, debug, bulb, plug, strip):
"""A cli tool for controlling TP-Link smart home plugs."""
if debug:
logging.basicConfig(level=logging.DEBUG)
@ -58,15 +60,18 @@ def cli(ctx, ip, host, alias, debug, bulb, plug):
ctx.invoke(discover)
return
else:
if not bulb and not plug:
click.echo("No --bulb nor --plug given, discovering..")
if not bulb and not plug and not strip:
click.echo("No --strip nor --bulb nor --plug given, discovering..")
dev = Discover.discover_single(host)
elif bulb:
dev = SmartBulb(host)
elif plug:
dev = SmartPlug(host)
elif strip:
dev = SmartStrip(host)
else:
click.echo("Unable to detect type, use --bulb or --plug!")
click.echo(
"Unable to detect type, use --strip or --bulb or --plug!")
return
ctx.obj = dev
@ -168,13 +173,22 @@ def emeter(dev, year, month, erase):
dev.erase_emeter_stats()
return
click.echo("Current state: %s" % dev.get_emeter_realtime())
if year:
click.echo("== For year %s ==" % year.year)
click.echo(dev.get_emeter_monthly(year.year))
emeter_status = dev.get_emeter_monthly(year.year)
elif month:
click.echo("== For month %s of %s ==" % (month.month, month.year))
dev.get_emeter_daily(year=month.year, month=month.month)
emeter_status = dev.get_emeter_daily(year=month.year,
month=month.month)
else:
emeter_status = dev.get_emeter_realtime()
click.echo("== Current State ==")
if isinstance(emeter_status, list):
for plug in emeter_status:
click.echo("Plug %d: %s" % (emeter_status.index(plug) + 1, plug))
else:
click.echo("%s" % emeter_status)
@cli.command()
@ -245,19 +259,27 @@ def time(dev):
@cli.command()
@click.argument('index', type=int, required=False)
@pass_dev
def on(plug):
def on(plug, index):
"""Turn the device on."""
click.echo("Turning on..")
if index is None:
plug.turn_on()
else:
plug.turn_on(index=(index - 1))
@cli.command()
@click.argument('index', type=int, required=False)
@pass_dev
def off(plug):
def off(plug, index):
"""Turn the device off."""
click.echo("Turning off..")
if index is None:
plug.turn_off()
else:
plug.turn_off(index=(index - 1))
@cli.command()

8
pyHS100/discover.py Normal file → Executable file
View File

@ -3,7 +3,8 @@ import logging
import json
from typing import Dict, Type
from pyHS100 import TPLinkSmartHomeProtocol, SmartDevice, SmartPlug, SmartBulb
from pyHS100 import (TPLinkSmartHomeProtocol, SmartDevice, SmartPlug,
SmartBulb, SmartStrip)
_LOGGER = logging.getLogger(__name__)
@ -98,7 +99,10 @@ class Discover:
type = "UNKNOWN"
else:
_LOGGER.error("No 'system' nor 'get_sysinfo' in response")
if "smartplug" in type.lower():
if "smartplug" in type.lower() and "children" in sysinfo:
return SmartStrip
elif "smartplug" in type.lower():
return SmartPlug
elif "smartbulb" in type.lower():
return SmartBulb

0
pyHS100/protocol.py Normal file → Executable file
View File

24
pyHS100/smartdevice.py Normal file → Executable file
View File

@ -74,17 +74,20 @@ class SmartDevice(object):
def __init__(self,
host: str,
protocol: Optional[TPLinkSmartHomeProtocol] = None) -> None:
protocol: Optional[TPLinkSmartHomeProtocol] = None,
context: str = None) -> None:
"""
Create a new SmartDevice instance.
:param str host: host name or ip address on which the device listens
:param context: optional child ID for context in a parent device
"""
self.host = host
if not protocol:
protocol = TPLinkSmartHomeProtocol()
self.protocol = protocol
self.emeter_type = "emeter" # type: str
self.context = context
def _query_helper(self,
target: str,
@ -100,12 +103,17 @@ class SmartDevice(object):
:rtype: dict
:raises SmartDeviceException: if command was not executed correctly
"""
if self.context is None:
request = {target: {cmd: arg}}
else:
request = {"context": {"child_ids": [self.context]},
target: {cmd: arg}}
if arg is None:
arg = {}
try:
response = self.protocol.query(
host=self.host,
request={target: {cmd: arg}}
request=request,
)
except Exception as ex:
raise SmartDeviceException('Communication error') from ex
@ -384,11 +392,11 @@ class SmartDevice(object):
def get_emeter_realtime(self) -> Optional[Dict]:
"""
Retrive current energy readings from device.
Retrieve current energy readings from device.
:returns: current readings or False
:rtype: dict, None
None if device has no energy meter or error occured
None if device has no energy meter or error occurred
:raises SmartDeviceException: on error
"""
if not self.has_emeter:
@ -405,11 +413,11 @@ class SmartDevice(object):
Retrieve daily statistics for a given month
:param year: year for which to retrieve statistics (default: this year)
:param month: month for which to retrieve statistcs (default: this
:param month: month for which to retrieve statistics (default: this
month)
:param kwh: return usage in kWh (default: True)
:return: mapping of day of month to value
None if device has no energy meter or error occured
None if device has no energy meter or error occurred
:rtype: dict
:raises SmartDeviceException: on error
"""
@ -483,9 +491,9 @@ class SmartDevice(object):
def current_consumption(self) -> Optional[float]:
"""
Get the current power consumption in Watt.
Get the current power consumption in Watts.
:return: the current power consumption in Watt.
:return: the current power consumption in Watts.
None if device has no energy meter.
:raises SmartDeviceException: on error
"""

View File

@ -33,9 +33,10 @@ class SmartPlug(SmartDevice):
def __init__(self,
host: str,
protocol: 'TPLinkSmartHomeProtocol' = None) -> None:
SmartDevice.__init__(self, host, protocol)
self.emeter_type = "emeter"
protocol: 'TPLinkSmartHomeProtocol' = None,
context: str = None) -> None:
SmartDevice.__init__(self, host, protocol, context)
self._type = "emeter"
@property
def state(self) -> str:
@ -126,7 +127,6 @@ class SmartPlug(SmartDevice):
:return: True if switch supports brightness changes, False otherwise
:rtype: bool
"""
return "brightness" in self.sys_info
@ -193,8 +193,15 @@ class SmartPlug(SmartDevice):
:return: datetime for on since
:rtype: datetime
"""
return datetime.datetime.now() - \
datetime.timedelta(seconds=self.sys_info["on_time"])
if self.context:
for plug in self.sys_info["children"]:
if plug["id"] == self.context:
on_time = plug["on_time"]
break
else:
on_time = self.sys_info["on_time"]
return datetime.datetime.now() - datetime.timedelta(seconds=on_time)
@property
def state_information(self) -> Dict[str, Any]:

397
pyHS100/smartstrip.py Executable file
View File

@ -0,0 +1,397 @@
import datetime
import logging
from typing import Any, Dict, Optional, Union
from pyHS100 import SmartPlug, SmartDeviceException, EmeterStatus
_LOGGER = logging.getLogger(__name__)
class SmartStripException(SmartDeviceException):
"""
SmartStripException gets raised for errors specific to the smart strip.
"""
pass
class SmartStrip(SmartPlug):
"""Representation of a TP-Link Smart Power Strip.
Usage example when used as library:
p = SmartStrip("192.168.1.105")
# print the devices alias
print(p.alias)
# change state of plug
p.state = "ON"
p.state = "OFF"
# query and print current state of plug
print(p.state)
Errors reported by the device are raised as SmartDeviceExceptions,
and should be handled by the user of the library.
Note:
The library references the same structure as defined for the D-Link Switch
"""
def __init__(self,
host: str,
protocol: 'TPLinkSmartHomeProtocol' = None) -> None:
SmartPlug.__init__(self, host, protocol)
self.emeter_type = "emeter"
self.plugs = {}
children = self.sys_info["children"]
self.num_children = len(children)
for plug in range(self.num_children):
self.plugs[plug] = SmartPlug(host, protocol,
context=children[plug]["id"])
def raise_for_index(self, index: int):
"""
Raises SmartStripException if the plug index is out of bounds
:param index: plug index to check
:raises SmartStripException: index out of bounds
"""
if index not in range(self.num_children):
raise SmartStripException("plug index of %d "
"is out of bounds" % index)
@property
def state(self) -> Dict[int, str]:
"""
Retrieve the switch state
:returns: list with the state of each child plug
SWITCH_STATE_ON
SWITCH_STATE_OFF
SWITCH_STATE_UNKNOWN
:rtype: dict
"""
states = {}
children = self.sys_info["children"]
for plug in range(self.num_children):
relay_state = children[plug]["state"]
if relay_state == 0:
switch_state = SmartPlug.SWITCH_STATE_OFF
elif relay_state == 1:
switch_state = SmartPlug.SWITCH_STATE_ON
else:
_LOGGER.warning("Unknown state %s returned for plug %u.",
relay_state, plug)
switch_state = SmartPlug.SWITCH_STATE_UNKNOWN
states[plug] = switch_state
return states
@state.setter
def state(self, value: str):
"""
Sets the state of all plugs in the strip
:param value: one of
SWITCH_STATE_ON
SWITCH_STATE_OFF
:raises ValueError: on invalid state
:raises SmartDeviceException: on error
"""
if not isinstance(value, str):
raise ValueError("State must be str, not of %s.", type(value))
elif value.upper() == SmartPlug.SWITCH_STATE_ON:
self.turn_on()
elif value.upper() == SmartPlug.SWITCH_STATE_OFF:
self.turn_off()
else:
raise ValueError("State %s is not valid.", value)
def set_state(self, value: str, *, index: int = -1):
"""
Sets the state of a plug on the strip
:param value: one of
SWITCH_STATE_ON
SWITCH_STATE_OFF
:param index: plug index (-1 for all)
:raises ValueError: on invalid state
:raises SmartDeviceException: on error
:raises SmartStripException: index out of bounds
"""
if index < 0:
self.state = value
else:
self.raise_for_index(index)
self.plugs[index].state = value
def is_on(self, *, index: int = -1) -> Any:
"""
Returns whether device is on.
:param index: plug index (-1 for all)
:return: True if device is on, False otherwise, Dict without index
:rtype: bool if index is provided
Dict[int, bool] if no index provided
:raises SmartStripException: index out of bounds
"""
children = self.sys_info["children"]
if index < 0:
is_on = {}
for plug in range(self.num_children):
is_on[plug] = bool(children[plug]["state"])
return is_on
else:
self.raise_for_index(index)
return bool(children[index]["state"])
def turn_on(self, *, index: int = -1):
"""
Turns outlets on
:param index: plug index (-1 for all)
:raises SmartDeviceException: on error
:raises SmartStripException: index out of bounds
"""
if index < 0:
self._query_helper("system", "set_relay_state", {"state": 1})
else:
self.raise_for_index(index)
self.plugs[index].turn_on()
def turn_off(self, *, index: int = -1):
"""
Turns outlets off
:param index: plug index (-1 for all)
:raises SmartDeviceException: on error
:raises SmartStripException: index out of bounds
"""
if index < 0:
self._query_helper("system", "set_relay_state", {"state": 0})
else:
self.raise_for_index(index)
self.plugs[index].turn_off()
def on_since(self, *, index: int = -1) -> Any:
"""
Returns pretty-printed on-time
:param index: plug index (-1 for all)
:return: datetime for on since
:rtype: datetime with index
Dict[int, str] without index
:raises SmartStripException: index out of bounds
"""
if index < 0:
on_since = {}
children = self.sys_info["children"]
for plug in range(self.num_children):
on_since[plug] = \
datetime.datetime.now() - \
datetime.timedelta(seconds=children[plug]["on_time"])
return on_since
else:
self.raise_for_index(index)
return self.plugs[index].on_since
@property
def state_information(self) -> Dict[str, Any]:
"""
Returns strip-specific state information.
:return: Strip information dict, keys in user-presentable form.
:rtype: dict
"""
state = {'LED state': self.led}
on_since = self.on_since()
for plug_index in range(self.num_children):
state['Plug %d on since' % (plug_index + 1)] = on_since[plug_index]
return state
def get_emeter_realtime(self, *, index: int = -1) -> Optional[Any]:
"""
Retrieve current energy readings from device
:param index: plug index (-1 for all)
:returns: list of current readings or None
:rtype: Dict, Dict[int, Dict], None
Dict if index is provided
Dict[int, Dict] if no index provided
None if device has no energy meter or error occurred
:raises SmartDeviceException: on error
:raises SmartStripException: index out of bounds
"""
if not self.has_emeter:
return None
if index < 0:
emeter_status = {}
for plug in range(self.num_children):
emeter_status[plug] = self.plugs[plug].get_emeter_realtime()
return emeter_status
else:
self.raise_for_index(index)
return self.plugs[index].get_emeter_realtime()
def current_consumption(self, *, index: int = -1) -> Optional[Any]:
"""
Get the current power consumption in Watts.
:param index: plug index (-1 for all)
:return: the current power consumption in Watts.
None if device has no energy meter.
:rtype: Dict, Dict[int, Dict], None
Dict if index is provided
Dict[int, Dict] if no index provided
None if device has no energy meter or error occurred
:raises SmartDeviceException: on error
:raises SmartStripException: index out of bounds
"""
if not self.has_emeter:
return None
if index < 0:
consumption = {}
emeter_reading = self.get_emeter_realtime()
for plug in range(self.num_children):
response = EmeterStatus(emeter_reading[plug])
consumption[plug] = response["power"]
return consumption
else:
self.raise_for_index(index)
response = EmeterStatus(self.get_emeter_realtime(index=index))
return response["power"]
@property
def icon(self):
"""
Override for base class icon property, SmartStrip and children do not
have icons.
:raises NotImplementedError: always
"""
raise NotImplementedError("no icons for this device")
def get_alias(self, *, index: int = -1) -> Union[str, Dict[int, str]]:
"""
Gets the alias for a plug.
:param index: plug index (-1 for all)
:return: the current power consumption in Watts.
None if device has no energy meter.
:rtype: str if index is provided
Dict[int, str] if no index provided
:raises SmartStripException: index out of bounds
"""
children = self.sys_info["children"]
if index < 0:
alias = {}
for plug in range(self.num_children):
alias[plug] = children[plug]["alias"]
return alias
else:
self.raise_for_index(index)
return children[index]["alias"]
def set_alias(self, alias: str, index: int):
"""
Sets the alias for a plug
:param index: plug index
:param alias: new alias
:raises SmartDeviceException: on error
:raises SmartStripException: index out of bounds
"""
self.raise_for_index(index)
self.plugs[index].alias = alias
def get_emeter_daily(self,
year: int = None,
month: int = None,
kwh: bool = True,
*,
index: int = -1) -> Optional[Dict]:
"""
Retrieve daily statistics for a given month
:param year: year for which to retrieve statistics (default: this year)
:param month: month for which to retrieve statistics (default: this
month)
:param kwh: return usage in kWh (default: True)
:return: mapping of day of month to value
None if device has no energy meter or error occurred
:rtype: dict
:raises SmartDeviceException: on error
:raises SmartStripException: index out of bounds
"""
if not self.has_emeter:
return None
emeter_daily = {}
if index < 0:
for plug in range(self.num_children):
emeter_daily = self.plugs[plug].get_emeter_daily(year=year,
month=month,
kwh=kwh)
return emeter_daily
else:
self.raise_for_index(index)
return self.plugs[index].get_emeter_daily(year=year,
month=month,
kwh=kwh)
def get_emeter_monthly(self,
year: int = None,
kwh: bool = True,
*,
index: int = -1) -> Optional[Dict]:
"""
Retrieve monthly statistics for a given year.
:param year: year for which to retrieve statistics (default: this year)
:param kwh: return usage in kWh (default: True)
:return: dict: mapping of month to value
None if device has no energy meter
:rtype: dict
:raises SmartDeviceException: on error
:raises SmartStripException: index out of bounds
"""
if not self.has_emeter:
return None
emeter_monthly = {}
if index < 0:
for plug in range(self.num_children):
emeter_monthly = self.plugs[plug].get_emeter_monthly(year=year,
kwh=kwh)
return emeter_monthly
else:
self.raise_for_index(index)
return self.plugs[index].get_emeter_monthly(year=year,
kwh=kwh)
def erase_emeter_stats(self, *, index: int = -1) -> bool:
"""
Erase energy meter statistics
:param index: plug index (-1 for all)
:return: True if statistics were deleted
False if device has no energy meter.
:rtype: bool
:raises SmartDeviceException: on error
:raises SmartStripException: index out of bounds
"""
if not self.has_emeter:
return False
if index < 0:
for plug in range(self.num_children):
self.plugs[plug].erase_emeter_stats()
else:
self.raise_for_index(index)
self.plugs[index].erase_emeter_stats()
# As query_helper raises exception in case of failure, we have
# succeeded when we are this far.
return True

View File

@ -5,35 +5,42 @@ import logging
_LOGGER = logging.getLogger(__name__)
def get_realtime(obj, x):
def get_realtime(obj, x, child_ids=[]):
return {"current":0.268587,"voltage":125.836131,"power":33.495623,"total":0.199000}
def get_monthstat(obj, x):
def get_monthstat(obj, x, child_ids=[]):
if x["year"] < 2016:
return {"month_list":[]}
return {"month_list": [{"year": 2016, "month": 11, "energy": 1.089000}, {"year": 2016, "month": 12, "energy": 1.582000}]}
def get_daystat(obj, x):
def get_daystat(obj, x, child_ids=[]):
if x["year"] < 2016:
return {"day_list":[]}
return {"day_list": [{"year": 2016, "month": 11, "day": 24, "energy": 0.026000},
{"year": 2016, "month": 11, "day": 25, "energy": 0.109000}]}
emeter_support = {"get_realtime": get_realtime,
"get_monthstat": get_monthstat,
"get_daystat": get_daystat,}
def get_realtime_units(obj, x):
return {"power_mw": 10800}
def get_monthstat_units(obj, x):
if x["year"] < 2016:
return {"month_list":[]}
return {"month_list": [{"year": 2016, "month": 11, "energy_wh": 32}, {"year": 2016, "month": 12, "energy_wh": 16}]}
def get_daystat_units(obj, x):
if x["year"] < 2016:
return {"day_list":[]}
@ -41,10 +48,91 @@ def get_daystat_units(obj, x):
return {"day_list": [{"year": 2016, "month": 11, "day": 24, "energy_wh": 20},
{"year": 2016, "month": 11, "day": 25, "energy_wh": 32}]}
emeter_units_support = {"get_realtime": get_realtime_units,
"get_monthstat": get_monthstat_units,
"get_daystat": get_daystat_units,}
sysinfo_hs300 = {
'system': {
'get_sysinfo': {
'sw_ver': '1.0.6 Build 180627 Rel.081000',
'hw_ver': '1.0',
'model': 'HS300(US)',
'deviceId': '7003ADE7030B7EFADE747104261A7A70931DADF4',
'oemId': 'FFF22CFF774A0B89F7624BFC6F50D5DE',
'hwId': '22603EA5E716DEAEA6642A30BE87AFCB',
'rssi': -53,
'longitude_i': -1198698,
'latitude_i': 352737,
'alias': 'TP-LINK_Power Strip_2233',
'mic_type': 'IOT.SMARTPLUGSWITCH',
'feature': 'TIM:ENE',
'mac': '50:C7:BF:11:22:33',
'updating': 0,
'led_off': 0,
'children': [
{
'id': '7003ADE7030B7EFADE747104261A7A70931DADF400',
'state': 1,
'alias': 'my plug 1 device',
'on_time': 5423,
'next_action': {
'type': -1
}
},
{
'id': '7003ADE7030B7EFADE747104261A7A70931DADF401',
'state': 1,
'alias': 'my plug 2 device',
'on_time': 4750,
'next_action': {
'type': -1
}
},
{
'id': '7003ADE7030B7EFADE747104261A7A70931DADF402',
'state': 1,
'alias': 'my plug 3 device',
'on_time': 4748,
'next_action': {
'type': -1
}
},
{
'id': '7003ADE7030B7EFADE747104261A7A70931DADF403',
'state': 1,
'alias': 'my plug 4 device',
'on_time': 4742,
'next_action': {
'type': -1
}
},
{
'id': '7003ADE7030B7EFADE747104261A7A70931DADF404',
'state': 1,
'alias': 'my plug 5 device',
'on_time': 4745,
'next_action': {
'type': -1
}
},
{
'id': '7003ADE7030B7EFADE747104261A7A70931DADF405',
'state': 1,
'alias': 'my plug 6 device',
'on_time': 5028,
'next_action': {
'type': -1
}
}
],
'child_num': 6,
'err_code': 0
}
}
}
sysinfo_hs100 = {'system': {'get_sysinfo':
{'active_mode': 'schedule',
'alias': 'My Smart Plug',
@ -484,12 +572,27 @@ class FakeTransportProtocol(TPLinkSmartHomeProtocol):
self.proto = proto
self.invalid = invalid
def set_alias(self, x):
def set_alias(self, x, child_ids=[]):
_LOGGER.debug("Setting alias to %s", x["alias"])
if child_ids:
for child in self.proto["system"]["get_sysinfo"]["children"]:
if child["id"] in child_ids:
child["alias"] = x["alias"]
else:
self.proto["system"]["get_sysinfo"]["alias"] = x["alias"]
def set_relay_state(self, x):
_LOGGER.debug("Setting relay state to %s", x)
def set_relay_state(self, x, child_ids=[]):
_LOGGER.debug("Setting relay state to %s", x["state"])
if not child_ids and "children" in self.proto["system"]["get_sysinfo"]:
for child in self.proto["system"]["get_sysinfo"]["children"]:
child_ids.append(child["id"])
if child_ids:
for child in self.proto["system"]["get_sysinfo"]["children"]:
if child["id"] in child_ids:
child["state"] = x["state"]
else:
self.proto["system"]["get_sysinfo"]["relay_state"] = x["state"]
def set_led_off(self, x):
@ -516,6 +619,7 @@ class FakeTransportProtocol(TPLinkSmartHomeProtocol):
"get_dev_icon": {"icon": None, "hash": None},
"set_mac_addr": set_mac,
"get_sysinfo": None,
"context": None,
},
"emeter": { "get_realtime": None,
"get_daystat": None,
@ -537,14 +641,24 @@ class FakeTransportProtocol(TPLinkSmartHomeProtocol):
# HS220 brightness, different setter and getter
"smartlife.iot.dimmer": { "set_brightness": set_hs220_brightness,
},
"context": {"child_ids": None},
}
def query(self, host, request, port=9999):
if self.invalid:
raise SmartDeviceException("Invalid connection, can't query!")
_LOGGER.debug("Requesting {} from {}:{}".format(request, host, port))
proto = self.proto
# collect child ids from context
try:
child_ids = request["context"]["child_ids"]
request.pop("context", None)
except KeyError:
child_ids = []
target = next(iter(request))
if target not in proto.keys():
return error(target, msg="target not found")
@ -557,6 +671,9 @@ class FakeTransportProtocol(TPLinkSmartHomeProtocol):
_LOGGER.debug("Going to execute {}.{} (params: {}).. ".format(target, cmd, params))
if callable(proto[target][cmd]):
if child_ids:
res = proto[target][cmd](self, params, child_ids)
else:
res = proto[target][cmd](self, params)
# verify that change didn't break schema, requires refactoring..
#TestSmartPlug.sysinfo_schema(self.proto["system"]["get_sysinfo"])

View File

@ -84,7 +84,7 @@ class TestSmartPlugHS100(TestCase):
'total': Any(Coerce(float, Range(min=0)), None),
'current': Any(All(float, Range(min=0)), None),
'voltage_mw': Any(All(float, Range(min=0, max=300000)), None),
'voltage_mv': Any(All(float, Range(min=0, max=300000)), None),
'power_mw': Any(Coerce(float, Range(min=0)), None),
'total_wh': Any(Coerce(float, Range(min=0)), None),
'current_ma': Any(All(float, Range(min=0)), None),

438
pyHS100/tests/test_strip.py Normal file
View File

@ -0,0 +1,438 @@
from unittest import TestCase, skip
from voluptuous import Schema, All, Any, Range, Coerce
import datetime
from .. import SmartStrip, SmartPlug, SmartStripException, SmartDeviceException
from .fakes import FakeTransportProtocol, sysinfo_hs300
from .test_pyHS100 import check_mac, check_int_bool
# Set IP instead of None if you want to run tests on a device.
STRIP_IP = None
class TestSmartStripHS300(TestCase):
SYSINFO = sysinfo_hs300 # type: Dict
# these schemas should go to the mainlib as
# they can be useful when adding support for new features/devices
# as well as to check that faked devices are operating properly.
sysinfo_schema = Schema({
"sw_ver": str,
"hw_ver": str,
"model": str,
"deviceId": str,
"oemId": str,
"hwId": str,
"rssi": Any(int, None), # rssi can also be positive, see #54
"longitude": Any(All(int, Range(min=-1800000, max=1800000)), None),
"latitude": Any(All(int, Range(min=-900000, max=900000)), None),
"longitude_i": Any(All(int, Range(min=-1800000, max=1800000)), None),
"latitude_i": Any(All(int, Range(min=-900000, max=900000)), None),
"alias": str,
"mic_type": str,
"feature": str,
"mac": check_mac,
"updating": check_int_bool,
"led_off": check_int_bool,
"children": [{
"id": str,
"state": int,
"alias": str,
"on_time": int,
"next_action": {"type": int},
}],
"child_num": int,
"err_code": int,
})
current_consumption_schema = Schema(
Any(
{
"voltage": Any(All(float, Range(min=0, max=300)), None),
"power": Any(Coerce(float, Range(min=0)), None),
"total": Any(Coerce(float, Range(min=0)), None),
"current": Any(All(float, Range(min=0)), None),
"voltage_mv": Any(All(int, Range(min=0, max=300000)), None),
"power_mw": Any(Coerce(int, Range(min=0)), None),
"total_wh": Any(Coerce(int, Range(min=0)), None),
"current_ma": Any(All(int, Range(min=0)), None),
},
None
)
)
tz_schema = Schema({
"zone_str": str,
"dst_offset": int,
"index": All(int, Range(min=0)),
"tz_str": str,
})
def setUp(self):
if STRIP_IP is not None:
self.strip = SmartStrip(STRIP_IP)
else:
self.strip = SmartStrip(
host="127.0.0.1",
protocol=FakeTransportProtocol(self.SYSINFO)
)
def tearDown(self):
self.strip = None
def test_initialize(self):
self.assertIsNotNone(self.strip.sys_info)
self.assertTrue(self.strip.num_children)
self.sysinfo_schema(self.strip.sys_info)
def test_initialize_invalid_connection(self):
with self.assertRaises(SmartDeviceException):
SmartStrip(
host="127.0.0.1",
protocol=FakeTransportProtocol(self.SYSINFO, invalid=True))
def test_query_helper(self):
with self.assertRaises(SmartDeviceException):
self.strip._query_helper("test", "testcmd", {})
def test_raise_for_index(self):
with self.assertRaises(SmartStripException):
self.strip.raise_for_index(index=self.strip.num_children + 100)
def test_state_strip(self):
with self.assertRaises(ValueError):
self.strip.state = 1234
with self.assertRaises(ValueError):
self.strip.state = "1234"
with self.assertRaises(ValueError):
self.strip.state = True
orig_state = self.strip.state
if orig_state == SmartPlug.SWITCH_STATE_OFF:
self.strip.state = "ON"
self.assertTrue(self.strip.state == SmartPlug.SWITCH_STATE_ON)
self.strip.state = "OFF"
self.assertTrue(self.strip.state == SmartPlug.SWITCH_STATE_OFF)
elif orig_state == SmartPlug.SWITCH_STATE_ON:
self.strip.state = "OFF"
self.assertTrue(self.strip.state == SmartPlug.SWITCH_STATE_OFF)
self.strip.state = "ON"
self.assertTrue(self.strip.state == SmartPlug.SWITCH_STATE_ON)
elif orig_state == SmartPlug.SWITCH_STATE_UNKNOWN:
self.fail("can't test for unknown state")
def test_state_plugs(self):
# value errors
for plug_index in range(self.strip.num_children):
with self.assertRaises(ValueError):
self.strip.set_state(value=1234, index=plug_index)
with self.assertRaises(ValueError):
self.strip.set_state(value="1234", index=plug_index)
with self.assertRaises(ValueError):
self.strip.set_state(value=True, index=plug_index)
# out of bounds error
with self.assertRaises(SmartStripException):
self.strip.set_state(
value=SmartPlug.SWITCH_STATE_ON,
index=self.strip.num_children + 100
)
# on off
for plug_index in range(self.strip.num_children):
orig_state = self.strip.state[plug_index]
if orig_state == SmartPlug.SWITCH_STATE_OFF:
self.strip.set_state(value="ON", index=plug_index)
self.assertTrue(
self.strip.state[plug_index] == SmartPlug.SWITCH_STATE_ON)
self.strip.set_state(value="OFF", index=plug_index)
self.assertTrue(
self.strip.state[plug_index] == SmartPlug.SWITCH_STATE_OFF)
elif orig_state == SmartPlug.SWITCH_STATE_ON:
self.strip.set_state(value="OFF", index=plug_index)
self.assertTrue(
self.strip.state[plug_index] == SmartPlug.SWITCH_STATE_OFF)
self.strip.set_state(value="ON", index=plug_index)
self.assertTrue(
self.strip.state[plug_index] == SmartPlug.SWITCH_STATE_ON)
elif orig_state == SmartPlug.SWITCH_STATE_UNKNOWN:
self.fail("can't test for unknown state")
def test_turns_and_isses(self):
# all on
self.strip.turn_on()
for index, state in self.strip.is_on().items():
self.assertTrue(state)
self.assertTrue(self.strip.is_on(index=index) == state)
# all off
self.strip.turn_off()
for index, state in self.strip.is_on().items():
self.assertFalse(state)
self.assertTrue(self.strip.is_on(index=index) == state)
# individual on
for plug_index in range(self.strip.num_children):
original_states = self.strip.is_on()
self.strip.turn_on(index=plug_index)
# only target outlet should have state changed
for index, state in self.strip.is_on().items():
if index == plug_index:
self.assertTrue(state != original_states[index])
else:
self.assertTrue(state == original_states[index])
# individual off
for plug_index in range(self.strip.num_children):
original_states = self.strip.is_on()
self.strip.turn_off(index=plug_index)
# only target outlet should have state changed
for index, state in self.strip.is_on().items():
if index == plug_index:
self.assertTrue(state != original_states[index])
else:
self.assertTrue(state == original_states[index])
# out of bounds
with self.assertRaises(SmartStripException):
self.strip.turn_off(index=self.strip.num_children + 100)
with self.assertRaises(SmartStripException):
self.strip.turn_on(index=self.strip.num_children + 100)
with self.assertRaises(SmartStripException):
self.strip.is_on(index=self.strip.num_children + 100)
@skip("this test will wear out your relays")
def test_all_binary_states(self):
# test every binary state
for state in range(2 ** self.strip.num_children):
# create binary state map
state_map = {}
for plug_index in range(self.strip.num_children):
state_map[plug_index] = bool((state >> plug_index) & 1)
if state_map[plug_index]:
self.strip.turn_on(index=plug_index)
else:
self.strip.turn_off(index=plug_index)
# check state map applied
for index, state in self.strip.is_on().items():
self.assertTrue(state_map[index] == state)
# toggle each outlet with state map applied
for plug_index in range(self.strip.num_children):
# toggle state
if state_map[plug_index]:
self.strip.turn_off(index=plug_index)
else:
self.strip.turn_on(index=plug_index)
# only target outlet should have state changed
for index, state in self.strip.is_on().items():
if index == plug_index:
self.assertTrue(state != state_map[index])
else:
self.assertTrue(state == state_map[index])
# reset state
if state_map[plug_index]:
self.strip.turn_on(index=plug_index)
else:
self.strip.turn_off(index=plug_index)
# original state map should be restored
for index, state in self.strip.is_on().items():
self.assertTrue(state == state_map[index])
def test_has_emeter(self):
# a not so nice way for checking for emeter availability..
if "HS300" in self.strip.sys_info["model"]:
self.assertTrue(self.strip.has_emeter)
else:
self.assertFalse(self.strip.has_emeter)
def test_get_emeter_realtime(self):
if self.strip.has_emeter:
# test with index
for plug_index in range(self.strip.num_children):
emeter = self.strip.get_emeter_realtime(index=plug_index)
self.current_consumption_schema(emeter)
# test without index
for index, emeter in self.strip.get_emeter_realtime().items():
self.current_consumption_schema(emeter)
# out of bounds
with self.assertRaises(SmartStripException):
self.strip.get_emeter_realtime(
index=self.strip.num_children + 100
)
else:
self.assertEqual(self.strip.get_emeter_realtime(), None)
def test_get_emeter_daily(self):
if self.strip.has_emeter:
# test with index
for plug_index in range(self.strip.num_children):
emeter = self.strip.get_emeter_daily(year=1900, month=1,
index=plug_index)
self.assertEqual(emeter, {})
if len(emeter) < 1:
print("no emeter daily information, skipping..")
return
k, v = emeter.popitem()
self.assertTrue(isinstance(k, int))
self.assertTrue(isinstance(v, float))
# test without index
all_emeter = self.strip.get_emeter_daily(year=1900, month=1)
for index, emeter in all_emeter.items():
self.assertEqual(emeter, {})
if len(emeter) < 1:
print("no emeter daily information, skipping..")
return
k, v = emeter.popitem()
self.assertTrue(isinstance(k, int))
self.assertTrue(isinstance(v, float))
# out of bounds
with self.assertRaises(SmartStripException):
self.strip.get_emeter_daily(
year=1900,
month=1,
index=self.strip.num_children + 100
)
else:
self.assertEqual(
self.strip.get_emeter_daily(year=1900, month=1), None)
def test_get_emeter_monthly(self):
if self.strip.has_emeter:
# test with index
for plug_index in range(self.strip.num_children):
emeter = self.strip.get_emeter_monthly(year=1900,
index=plug_index)
self.assertEqual(emeter, {})
if len(emeter) < 1:
print("no emeter daily information, skipping..")
return
k, v = emeter.popitem()
self.assertTrue(isinstance(k, int))
self.assertTrue(isinstance(v, float))
# test without index
all_emeter = self.strip.get_emeter_monthly(year=1900)
for index, emeter in all_emeter.items():
self.assertEqual(emeter, {})
if len(emeter) < 1:
print("no emeter daily information, skipping..")
return
k, v = emeter.popitem()
self.assertTrue(isinstance(k, int))
self.assertTrue(isinstance(v, float))
# out of bounds
with self.assertRaises(SmartStripException):
self.strip.get_emeter_monthly(
year=1900,
index=self.strip.num_children + 100
)
else:
self.assertEqual(self.strip.get_emeter_monthly(year=1900), None)
@skip("not clearing your stats..")
def test_erase_emeter_stats(self):
self.fail()
def test_current_consumption(self):
if self.strip.has_emeter:
# test with index
for plug_index in range(self.strip.num_children):
emeter = self.strip.current_consumption(index=plug_index)
self.assertTrue(isinstance(emeter, float))
self.assertTrue(emeter >= 0.0)
# test without index
for index, emeter in self.strip.current_consumption().items():
self.assertTrue(isinstance(emeter, float))
self.assertTrue(emeter >= 0.0)
# out of bounds
with self.assertRaises(SmartStripException):
self.strip.current_consumption(
index=self.strip.num_children + 100
)
else:
self.assertEqual(self.strip.current_consumption(), None)
def test_alias(self):
test_alias = "TEST1234"
# strip alias
original = self.strip.alias
self.assertTrue(isinstance(original, str))
self.strip.alias = test_alias
self.assertEqual(self.strip.alias, test_alias)
self.strip.alias = original
self.assertEqual(self.strip.alias, original)
# plug alias
original = self.strip.get_alias()
for plug in range(self.strip.num_children):
self.strip.set_alias(alias=test_alias, index=plug)
self.assertEqual(self.strip.get_alias(index=plug), test_alias)
self.strip.set_alias(alias=original[plug], index=plug)
self.assertEqual(self.strip.get_alias(index=plug), original[plug])
def test_led(self):
original = self.strip.led
self.strip.led = False
self.assertFalse(self.strip.led)
self.strip.led = True
self.assertTrue(self.strip.led)
self.strip.led = original
def test_icon(self):
with self.assertRaises(NotImplementedError):
self.strip.icon
def test_time(self):
self.assertTrue(isinstance(self.strip.time, datetime.datetime))
# TODO check setting?
def test_timezone(self):
self.tz_schema(self.strip.timezone)
def test_hw_info(self):
self.sysinfo_schema(self.strip.hw_info)
def test_on_since(self):
# out of bounds
with self.assertRaises(SmartStripException):
self.strip.on_since(index=self.strip.num_children + 1)
# individual on_since
for plug_index in range(self.strip.num_children):
self.assertTrue(isinstance(
self.strip.on_since(index=plug_index), datetime.datetime))
# all on_since
for index, plug_on_since in self.strip.on_since().items():
self.assertTrue(isinstance(plug_on_since, datetime.datetime))
def test_location(self):
print(self.strip.location)
self.sysinfo_schema(self.strip.location)
def test_rssi(self):
self.sysinfo_schema({'rssi': self.strip.rssi}) # wrapping for vol
def test_mac(self):
self.sysinfo_schema({'mac': self.strip.mac}) # wrapping for vol