Add support for TP-Link smartbulbs (#30)

* Add support for new-style protocol

Newer devices (including my LB130) seem to include the request length in
the previously empty message header, and ignore requests that lack it. They
also don't send an empty packet as the final part of a response, which can
lead to hangs. Add support for this, with luck not breaking existing devices
in the process.

* Fix tests

We now include the request length in the encrypted packet header, so strip
the header rather than assuming that it's just zeroes.

* Create a SmartDevice parent class

Add a generic SmartDevice class that SmartPlug can inherit from, in
preparation for adding support for other device types.

* Add support for TP-Link smartbulbs

These bulbs use the same protocol as the smart plugs, but have additional
commands for controlling bulb-specific features. In addition, the bulbs
have their emeter under a different target and return responses that
include the energy unit in the key names.

* Add tests for bulbs

Not entirely comprehensive, but has pretty much the same level of testing
as plugs
This commit is contained in:
Matthew Garrett
2017-01-17 05:38:23 -08:00
committed by GadgetReactor
parent 04185706f8
commit 2d6376b597
7 changed files with 819 additions and 269 deletions

View File

@@ -25,6 +25,26 @@ emeter_support = {"get_realtime": get_realtime,
"get_monthstat": get_monthstat,
"get_daystat": get_daystat,}
def get_realtime_units(obj, x):
return {"power_mw": 10800}
def get_monthstat_units(obj, x):
if x["year"] < 2016:
return {"month_list":[]}
return {"month_list": [{"year": 2016, "month": 11, "energy_wh": 32}, {"year": 2016, "month": 12, "energy_wh": 16}]}
def get_daystat_units(obj, x):
if x["year"] < 2016:
return {"day_list":[]}
return {"day_list": [{"year": 2016, "month": 11, "day": 24, "energy_wh": 20},
{"year": 2016, "month": 11, "day": 25, "energy_wh": 32}]}
emeter_units_support = {"get_realtime": get_realtime_units,
"get_monthstat": get_monthstat_units,
"get_daystat": get_daystat_units,}
sysinfo_hs110 = {'system': {'get_sysinfo':
{'active_mode': 'schedule',
'alias': 'Mobile Plug',
@@ -76,6 +96,63 @@ sysinfo_hs200 = {'system': {'get_sysinfo': {'active_mode': 'schedule',
'updating': 0}}
}
sysinfo_lb130 = {'system': {'get_sysinfo':
{'active_mode': 'none',
'alias': 'Living Room Side Table',
'ctrl_protocols': {'name': 'Linkie', 'version': '1.0'},
'description': 'Smart Wi-Fi LED Bulb with Color Changing',
'dev_state': 'normal',
'deviceId': '80123C4640E9FC33A9019A0F3FD8BF5C17B7D9A8',
'disco_ver': '1.0',
'heapsize': 347000,
'hwId': '111E35908497A05512E259BB76801E10',
'hw_ver': '1.0',
'is_color': 1,
'is_dimmable': 1,
'is_factory': False,
'is_variable_color_temp': 1,
'light_state': {'brightness': 100,
'color_temp': 3700,
'hue': 0,
'mode': 'normal',
'on_off': 1,
'saturation': 0},
'mic_mac': '50C7BF104865',
'mic_type': 'IOT.SMARTBULB',
'model': 'LB130(US)',
'oemId': '05BF7B3BE1675C5A6867B7A7E4C9F6F7',
'preferred_state': [{'brightness': 50,
'color_temp': 2700,
'hue': 0,
'index': 0,
'saturation': 0},
{'brightness': 100,
'color_temp': 0,
'hue': 0,
'index': 1,
'saturation': 75},
{'brightness': 100,
'color_temp': 0,
'hue': 120,
'index': 2,
'saturation': 75},
{'brightness': 100,
'color_temp': 0,
'hue': 240,
'index': 3,
'saturation': 75}],
'rssi': -55,
'sw_ver': '1.1.2 Build 160927 Rel.111100'}},
'smartlife.iot.smartbulb.lightingservice': {'get_light_state':
{'on_off':1,
'mode':'normal',
'hue': 0,
'saturation': 0,
'color_temp': 3700,
'brightness': 100,
'err_code': 0}},
'smartlife.iot.common.emeter': emeter_units_support,
}
def error(cls, target, cmd="no-command", msg="default msg"):
return {target: {cmd: {"err_code": -1323, "msg": msg}}}
@@ -90,13 +167,13 @@ def success(target, cmd, res):
class FakeTransportProtocol(TPLinkSmartHomeProtocol):
def __init__(self, invalid=False):
def __init__(self, sysinfo, invalid=False):
""" invalid is set only for testing
to force query() to throw the exception for non-connected """
proto = FakeTransportProtocol.baseproto
for target in sysinfo_hs110:
for cmd in sysinfo_hs110[target]:
proto[target][cmd] = sysinfo_hs110[target][cmd]
for target in sysinfo:
for cmd in sysinfo[target]:
proto[target][cmd] = sysinfo[target][cmd]
self.proto = proto
self.invalid = invalid
@@ -116,6 +193,11 @@ class FakeTransportProtocol(TPLinkSmartHomeProtocol):
_LOGGER.debug("Setting mac to %s", x)
self.proto["system"]["get_sysinfo"][""]
def transition_light_state(self, x):
_LOGGER.debug("Setting light state to %s", x)
for key in x:
self.proto["smartlife.iot.smartbulb.lightingservice"]["get_light_state"][key]=x[key]
baseproto = {
"system": { "set_relay_state": set_relay_state,
"set_dev_alias": set_alias,
@@ -129,6 +211,14 @@ class FakeTransportProtocol(TPLinkSmartHomeProtocol):
"get_monthstat": None,
"erase_emeter_state": None
},
"smartlife.iot.common.emeter": { "get_realtime": None,
"get_daystat": None,
"get_monthstat": None,
"erase_emeter_state": None
},
"smartlife.iot.smartbulb.lightingservice": { "get_light_state": None,
"transition_light_state": transition_light_state,
},
"time": { "get_time": { "year": 2017, "month": 1, "mday": 2, "hour": 3, "min": 4, "sec": 5 },
"get_timezone": {'zone_str': "test", 'dst_offset': -1, 'index': 12, 'tz_str': "test2" },
"set_timezone": None,

198
pyHS100/tests/test_bulb.py Normal file
View File

@@ -0,0 +1,198 @@
from __future__ import absolute_import
from __future__ import unicode_literals
from unittest import TestCase, skip, skipIf
from voluptuous import Schema, Invalid, All, Range
from functools import partial
from pyHS100 import SmartBulb, SmartPlugException
from pyHS100.tests.fakes import FakeTransportProtocol, sysinfo_lb130
BULB_IP = '192.168.250.186'
SKIP_STATE_TESTS = False
# python2 compatibility
try:
basestring
except NameError:
basestring = str
def check_int_bool(x):
if x != 0 and x != 1:
raise Invalid(x)
return x
def check_mode(x):
if x in ['schedule', 'none']:
return x
raise Invalid("invalid mode {}".format(x))
class TestSmartBulb(TestCase):
# these schemas should go to the mainlib as
# they can be useful when adding support for new features/devices
# as well as to check that faked devices are operating properly.
sysinfo_schema = Schema({
'active_mode': check_mode,
'alias': basestring,
'ctrl_protocols': {
'name': basestring,
'version': basestring,
},
'description': basestring,
'dev_state': basestring,
'deviceId': basestring,
'disco_ver': basestring,
'heapsize': int,
'hwId': basestring,
'hw_ver': basestring,
'is_color': check_int_bool,
'is_dimmable': check_int_bool,
'is_factory': bool,
'is_variable_color_temp': check_int_bool,
'light_state': {
'brightness': All(int, Range(min=0, max=100)),
'color_temp': int,
'hue': All(int, Range(min=0, max=255)),
'mode': basestring,
'on_off': check_int_bool,
'saturation': All(int, Range(min=0, max=255)),
},
'mic_mac': basestring,
'mic_type': basestring,
'model': basestring,
'oemId': basestring,
'preferred_state': [{
'brightness': All(int, Range(min=0, max=100)),
'color_temp': int,
'hue': All(int, Range(min=0, max=255)),
'index': int,
'saturation': All(int, Range(min=0, max=255)),
}],
'rssi': All(int, Range(max=0)),
'sw_ver': basestring,
})
current_consumption_schema = Schema({
'power_mw': int,
})
tz_schema = Schema({
'zone_str': basestring,
'dst_offset': int,
'index': All(int, Range(min=0)),
'tz_str': basestring,
})
def setUp(self):
self.bulb = SmartBulb(BULB_IP,
protocol=FakeTransportProtocol(sysinfo_lb130))
def tearDown(self):
self.bulb = None
def test_initialize(self):
self.assertIsNotNone(self.bulb.sys_info)
self.sysinfo_schema(self.bulb.sys_info)
def test_initialize_invalid_connection(self):
bulb = SmartBulb('127.0.0.1',
protocol=FakeTransportProtocol(sysinfo_lb130,
invalid=True))
with self.assertRaises(SmartPlugException):
bulb.sys_info['model']
def test_query_helper(self):
with self.assertRaises(SmartPlugException):
self.bulb._query_helper("test", "testcmd", {})
# TODO check for unwrapping?
@skipIf(SKIP_STATE_TESTS, "SKIP_STATE_TESTS is True, skipping")
def test_state(self):
def set_invalid(x):
self.bulb.state = x
set_invalid_int = partial(set_invalid, 1234)
self.assertRaises(ValueError, set_invalid_int)
set_invalid_str = partial(set_invalid, "1234")
self.assertRaises(ValueError, set_invalid_str)
set_invalid_bool = partial(set_invalid, True)
self.assertRaises(ValueError, set_invalid_bool)
orig_state = self.bulb.state
if orig_state == SmartBulb.BULB_STATE_OFF:
self.bulb.state = SmartBulb.BULB_STATE_ON
self.assertTrue(self.bulb.state == SmartBulb.BULB_STATE_ON)
self.bulb.state = SmartBulb.BULB_STATE_OFF
self.assertTrue(self.bulb.state == SmartBulb.BULB_STATE_OFF)
elif orig_state == SmartBulb.BULB_STATE_ON:
self.bulb.state = SmartBulb.BULB_STATE_OFF
self.assertTrue(self.bulb.state == SmartBulb.BULB_STATE_OFF)
self.bulb.state = SmartBulb.BULB_STATE_ON
self.assertTrue(self.bulb.state == SmartBulb.BULB_STATE_ON)
def test_get_sysinfo(self):
# initialize checks for this already, but just to be sure
self.sysinfo_schema(self.bulb.get_sysinfo())
@skipIf(SKIP_STATE_TESTS, "SKIP_STATE_TESTS is True, skipping")
def test_turns_and_isses(self):
orig_state = self.bulb.state
if orig_state == SmartBulb.BULB_STATE_ON:
self.bulb.state = SmartBulb.BULB_STATE_OFF
self.assertTrue(self.bulb.state == SmartBulb.BULB_STATE_OFF)
self.bulb.state = SmartBulb.BULB_STATE_ON
self.assertTrue(self.bulb.state == SmartBulb.BULB_STATE_ON)
else:
self.bulb.state = SmartBulb.BULB_STATE_ON
self.assertTrue(self.bulb.state == SmartBulb.BULB_STATE_ON)
self.bulb.state = SmartBulb.BULB_STATE_OFF
self.assertTrue(self.bulb.state == SmartBulb.BULB_STATE_OFF)
def test_get_emeter_realtime(self):
self.current_consumption_schema((self.bulb.get_emeter_realtime()))
def test_get_emeter_daily(self):
self.assertEqual(self.bulb.get_emeter_daily(year=1900, month=1), {})
k, v = self.bulb.get_emeter_daily().popitem()
self.assertTrue(isinstance(k, int))
self.assertTrue(isinstance(v, int))
def test_get_emeter_monthly(self):
self.assertEqual(self.bulb.get_emeter_monthly(year=1900), {})
d = self.bulb.get_emeter_monthly()
k, v = d.popitem()
self.assertTrue(isinstance(k, int))
self.assertTrue(isinstance(v, int))
@skip("not clearing your stats..")
def test_erase_emeter_stats(self):
self.fail()
def test_current_consumption(self):
x = self.bulb.current_consumption()
self.assertTrue(isinstance(x, int))
self.assertTrue(x >= 0.0)
def test_alias(self):
test_alias = "TEST1234"
original = self.bulb.alias
self.assertTrue(isinstance(original, basestring))
self.bulb.alias = test_alias
self.assertEqual(self.bulb.alias, test_alias)
self.bulb.alias = original
self.assertEqual(self.bulb.alias, original)
def test_icon(self):
self.assertEqual(set(self.bulb.icon.keys()), {'icon', 'hash'})
def test_rssi(self):
self.sysinfo_schema({'rssi': self.bulb.rssi}) # wrapping for vol

View File

@@ -10,6 +10,6 @@ class TestTPLinkSmartHomeProtocol(TestCase):
def test_encrypt(self):
d = json.dumps({'foo': 1, 'bar': 2})
encrypted = TPLinkSmartHomeProtocol.encrypt(d)
# encrypt appends nullbytes for the protocol sends
encrypted = encrypted.lstrip(b'\0')
# encrypt adds a 4 byte header
encrypted = encrypted[4:]
self.assertEqual(d, TPLinkSmartHomeProtocol.decrypt(encrypted))

View File

@@ -8,7 +8,7 @@ import datetime
import re
from pyHS100 import SmartPlug, SmartPlugException
from pyHS100.tests.fakes import FakeTransportProtocol
from pyHS100.tests.fakes import FakeTransportProtocol, sysinfo_hs110
PLUG_IP = '192.168.250.186'
SKIP_STATE_TESTS = False
@@ -82,7 +82,8 @@ class TestSmartPlug(TestCase):
})
def setUp(self):
self.plug = SmartPlug(PLUG_IP, protocol=FakeTransportProtocol())
self.plug = SmartPlug(PLUG_IP,
protocol=FakeTransportProtocol(sysinfo_hs110))
def tearDown(self):
self.plug = None
@@ -92,7 +93,9 @@ class TestSmartPlug(TestCase):
self.sysinfo_schema(self.plug.sys_info)
def test_initialize_invalid_connection(self):
plug = SmartPlug('127.0.0.1', protocol=FakeTransportProtocol(invalid=True))
plug = SmartPlug('127.0.0.1',
protocol=FakeTransportProtocol(sysinfo_hs110,
invalid=True))
with self.assertRaises(SmartPlugException):
plug.sys_info['model']