diff --git a/pyHS100/pyHS100.py b/pyHS100/pyHS100.py index d837a720..0a811972 100644 --- a/pyHS100/pyHS100.py +++ b/pyHS100/pyHS100.py @@ -14,8 +14,13 @@ You may obtain a copy of the license at http://www.apache.org/licenses/LICENSE-2.0 """ +# python2 compatibility from __future__ import absolute_import from __future__ import unicode_literals +try: + basestring +except NameError: + basestring = str import datetime import logging @@ -33,7 +38,7 @@ class SmartPlugException(Exception): pass -class SmartPlug: +class SmartPlug(object): """Representation of a TP-Link Smart Switch. Usage example when used as library: @@ -63,7 +68,7 @@ class SmartPlug: ALL_FEATURES = (FEATURE_ENERGY_METER, FEATURE_TIMER) - def __init__(self, ip_address, protocol=TPLinkSmartHomeProtocol): + def __init__(self, ip_address, protocol=None): """ Create a new SmartPlug instance, identified through its IP address. @@ -72,6 +77,8 @@ class SmartPlug: """ socket.inet_pton(socket.AF_INET, ip_address) self.ip_address = ip_address + if not protocol: + protocol = TPLinkSmartHomeProtocol() self.protocol = protocol self._sys_info = None @@ -105,6 +112,9 @@ class SmartPlug: except Exception as ex: raise SmartPlugException(ex) from ex + if target not in response: + raise SmartPlugException("No required {} in response: {}".format(target, response)) + result = response[target] if "err_code" in result and result["err_code"] != 0: raise SmartPlugException("Error on {}.{}: {}".format(target, cmd, result)) @@ -154,7 +164,7 @@ class SmartPlug: :raises SmartPlugException: on error """ - if not isinstance(value, str): + if not isinstance(value, basestring): raise ValueError("State must be str, not of %s.", type(value)) elif value.upper() == SmartPlug.SWITCH_STATE_ON: self.turn_on() diff --git a/pyHS100/tests/__init__.py b/pyHS100/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pyHS100/tests/fakes.py b/pyHS100/tests/fakes.py new file mode 100644 index 00000000..b86466d5 --- /dev/null +++ b/pyHS100/tests/fakes.py @@ -0,0 +1,164 @@ +from pyHS100.protocol import TPLinkSmartHomeProtocol +from pyHS100 import SmartPlugException +import logging + + +_LOGGER = logging.getLogger(__name__) + +def get_realtime(obj, x): + return {"current":0.268587,"voltage":125.836131,"power":33.495623,"total":0.199000} + +def get_monthstat(obj, x): + if x["year"] < 2016: + return {"month_list":[]} + + return {"month_list": [{"year": 2016, "month": 11, "energy": 1.089000}, {"year": 2016, "month": 12, "energy": 1.582000}]} + +def get_daystat(obj, x): + if x["year"] < 2016: + return {"day_list":[]} + + return {"day_list": [{"year": 2016, "month": 11, "day": 24, "energy": 0.026000}, + {"year": 2016, "month": 11, "day": 25, "energy": 0.109000}]} + +emeter_support = {"get_realtime": get_realtime, + "get_monthstat": get_monthstat, + "get_daystat": get_daystat,} + +sysinfo_hs110 = {'system': {'get_sysinfo': + {'active_mode': 'schedule', + 'alias': 'Mobile Plug', + 'dev_name': 'Wi-Fi Smart Plug With Energy Monitoring', + 'deviceId': '800654F32938FCBA8F7327887A386476172B5B53', + 'err_code': 0, + 'feature': 'TIM:ENE', + 'fwId': 'E16EB3E95DB6B47B5B72B3FD86FD1438', + 'hwId': '60FF6B258734EA6880E186F8C96DDC61', + 'hw_ver': '1.0', + 'icon_hash': '', + 'latitude': 12.2, + 'led_off': 0, + 'longitude': -12.2, + 'mac': 'AA:BB:CC:11:22:33', + 'model': 'HS110(US)', + 'oemId': 'FFF22CFF774A0B89F7624BFC6F50D5DE', + 'on_time': 9022, + 'relay_state': 1, + 'rssi': -61, + 'sw_ver': '1.0.8 Build 151113 Rel.24658', + 'type': 'IOT.SMARTPLUGSWITCH', + 'updating': 0} + }, + "emeter": emeter_support, +} + +sysinfo_hs200 = {'system': {'get_sysinfo': {'active_mode': 'schedule', + 'alias': 'Christmas Tree Switch', + 'dev_name': 'Wi-Fi Smart Light Switch', + 'deviceId': '8006E0D62C90698C6A3EF72944F56DDC17D0DB80', + 'err_code': 0, + 'feature': 'TIM', + 'fwId': 'DB4F3246CD85AA59CAE738A63E7B9C34', + 'hwId': 'A0E3CC8F5C1166B27A16D56BE262A6D3', + 'hw_ver': '1.0', + 'icon_hash': '', + 'latitude': 12.2, + 'led_off': 0, + 'longitude': -12.2, + 'mac': 'AA:BB:CC:11:22:33', + 'mic_type': 'IOT.SMARTPLUGSWITCH', + 'model': 'HS200(US)', + 'oemId': '4AFE44A41F868FD2340E6D1308D8551D', + 'on_time': 9586, + 'relay_state': 1, + 'rssi': -53, + 'sw_ver': '1.1.0 Build 160521 Rel.085826', + 'updating': 0}} +} + + +def error(cls, target, cmd="no-command", msg="default msg"): + return {target: {cmd: {"err_code": -1323, "msg": msg}}} + + +def success(target, cmd, res): + if res: + res.update({"err_code": 0}) + else: + res = {"err_code": 0} + return {target: {cmd: res}} + + +class FakeTransportProtocol(TPLinkSmartHomeProtocol): + def __init__(self, invalid=False): + """ invalid is set only for testing + to force query() to throw the exception for non-connected """ + proto = FakeTransportProtocol.baseproto + for target in sysinfo_hs110: + for cmd in sysinfo_hs110[target]: + proto[target][cmd] = sysinfo_hs110[target][cmd] + self.proto = proto + self.invalid = invalid + + def set_alias(self, x): + _LOGGER.debug("Setting alias to %s", x["alias"]) + self.proto["system"]["get_sysinfo"]["alias"] = x["alias"] + + def set_relay_state(self, x): + _LOGGER.debug("Setting relay state to %s", x) + self.proto["system"]["get_sysinfo"]["relay_state"] = x["state"] + + def set_led_off(self, x): + _LOGGER.debug("Setting led off to %s", x) + self.proto["system"]["get_sysinfo"]["led_off"] = x["off"] + + def set_mac(self, x): + _LOGGER.debug("Setting mac to %s", x) + self.proto["system"]["get_sysinfo"][""] + + baseproto = { + "system": { "set_relay_state": set_relay_state, + "set_dev_alias": set_alias, + "set_led_off": set_led_off, + "get_dev_icon": {"icon": None, "hash": None}, + "set_mac_addr": set_mac, + "get_sysinfo": None, + }, + "emeter": { "get_realtime": None, + "get_daystat": None, + "get_monthstat": None, + "erase_emeter_state": None + }, + "time": { "get_time": { "year": 2017, "month": 1, "mday": 2, "hour": 3, "min": 4, "sec": 5 }, + "get_timezone": {'zone_str': "test", 'dst_offset': -1, 'index': 12, 'tz_str': "test2" }, + "set_timezone": None, + + } + } + + def query(self, host, request, port=9999): + if self.invalid: + raise SmartPlugException("Invalid connection, can't query!") + + proto = self.proto + + target = next(iter(request)) + if target not in proto.keys(): + return error(target, msg="target not found") + + cmd = next(iter(request[target])) + if cmd not in proto[target].keys(): + return error(target, cmd, msg="command not found") + + params = request[target][cmd] + _LOGGER.debug("Going to execute {}.{} (params: {}).. ".format(target, cmd, params)) + + if callable(proto[target][cmd]): + res = proto[target][cmd](self, params) + # verify that change didn't break schema, requires refactoring.. + #TestSmartPlug.sysinfo_schema(self.proto["system"]["get_sysinfo"]) + return success(target, cmd, res) + elif isinstance(proto[target][cmd], dict): + return success(target, cmd, proto[target][cmd]) + else: + raise NotImplementedError("target {} cmd {}".format(target, cmd)) diff --git a/pyHS100/tests/test_pyHS100.py b/pyHS100/tests/test_pyHS100.py index 521c0fa7..e8d91b4a 100644 --- a/pyHS100/tests/test_pyHS100.py +++ b/pyHS100/tests/test_pyHS100.py @@ -8,9 +8,10 @@ import datetime import re from pyHS100 import SmartPlug, SmartPlugException +from pyHS100.tests.fakes import FakeTransportProtocol PLUG_IP = '192.168.250.186' -SKIP_STATE_TESTS = True +SKIP_STATE_TESTS = False # python2 compatibility try: @@ -39,6 +40,9 @@ def check_mode(x): class TestSmartPlug(TestCase): + # these schemas should go to the mainlib as + # they can be useful when adding support for new features/devices + # as well as to check that faked devices are operating properly. sysinfo_schema = Schema({ 'active_mode': check_mode, 'alias': basestring, @@ -78,7 +82,7 @@ class TestSmartPlug(TestCase): }) def setUp(self): - self.plug = SmartPlug(PLUG_IP) + self.plug = SmartPlug(PLUG_IP, protocol=FakeTransportProtocol()) def tearDown(self): self.plug = None @@ -88,7 +92,7 @@ class TestSmartPlug(TestCase): self.sysinfo_schema(self.plug.sys_info) def test_initialize_invalid_connection(self): - plug = SmartPlug('127.0.0.1') + plug = SmartPlug('127.0.0.1', protocol=FakeTransportProtocol(invalid=True)) with self.assertRaises(SmartPlugException): plug.sys_info['model']