Fix emeter support for newer HS110 firmwares (#107)

* Add support for new-style emeter

This commit adds a straightforward dict-extending container,
which converts between the old and new keys of the get_emeter_realtime()
Furthermore the unit tests are converted to base on HS100
instead of HS110.

This is the first step to fix #103, other emeter-using functionality
has not yet been converted, only getting the current consumption.

* fix a couple of linting issues

* Convert new-style emeter values also for get_emeter_daily() and get_emeter_monthly()

* Adds a new 'kwh' parameter for those calls, which defaults to True
* This changes the behavior of bulbs emeter reporting, use False if you prefer the preciser values
This commit is contained in:
Teemu R 2018-06-16 21:16:35 +02:00 committed by GitHub
parent 11a7042a04
commit ef2e21ff69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 162 additions and 39 deletions

View File

@ -46,7 +46,6 @@ class SmartBulb(SmartDevice):
protocol: 'TPLinkSmartHomeProtocol' = None) -> None: protocol: 'TPLinkSmartHomeProtocol' = None) -> None:
SmartDevice.__init__(self, host, protocol) SmartDevice.__init__(self, host, protocol)
self.emeter_type = "smartlife.iot.common.emeter" self.emeter_type = "smartlife.iot.common.emeter"
self.emeter_units = True
@property @property
def is_color(self) -> bool: def is_color(self) -> bool:

View File

@ -15,7 +15,6 @@ http://www.apache.org/licenses/LICENSE-2.0
""" """
import datetime import datetime
import logging import logging
import socket
import warnings import warnings
from collections import defaultdict from collections import defaultdict
from typing import Any, Dict, List, Tuple, Optional from typing import Any, Dict, List, Tuple, Optional
@ -32,6 +31,40 @@ class SmartDeviceException(Exception):
pass pass
class EmeterStatus(dict):
"""Container for converting different representations of emeter data.
Newer FW/HW versions postfix the variable names with the used units,
where-as the olders do not have this feature.
This class automatically converts between these two to allow
backwards and forwards compatibility.
"""
def __getitem__(self, item):
valid_keys = ['voltage_mv', 'power_mw', 'current_ma',
'energy_wh', 'total_wh',
'voltage', 'power', 'current', 'total',
'energy']
# 1. if requested data is available, return it
if item in super().keys():
return super().__getitem__(item)
# otherwise decide how to convert it
else:
if item not in valid_keys:
raise KeyError(item)
if '_' in item: # upscale
return super().__getitem__(item[:item.find('_')]) * 10**3
else: # downscale
for i in super().keys():
if i.startswith(item):
return self.__getitem__(i) / 10**3
raise SmartDeviceException("Unable to find a value for '%s'" %
item)
class SmartDevice(object): class SmartDevice(object):
# possible device features # possible device features
FEATURE_ENERGY_METER = 'ENE' FEATURE_ENERGY_METER = 'ENE'
@ -52,7 +85,6 @@ class SmartDevice(object):
protocol = TPLinkSmartHomeProtocol() protocol = TPLinkSmartHomeProtocol()
self.protocol = protocol self.protocol = protocol
self.emeter_type = "emeter" # type: str self.emeter_type = "emeter" # type: str
self.emeter_units = False
def _query_helper(self, def _query_helper(self,
target: str, target: str,
@ -380,17 +412,20 @@ class SmartDevice(object):
if not self.has_emeter: if not self.has_emeter:
return None return None
return self._query_helper(self.emeter_type, "get_realtime") return EmeterStatus(self._query_helper(self.emeter_type,
"get_realtime"))
def get_emeter_daily(self, def get_emeter_daily(self,
year: int = None, year: int = None,
month: int = None) -> Optional[Dict]: month: int = None,
kwh: bool = True) -> Optional[Dict]:
""" """
Retrieve daily statistics for a given month Retrieve daily statistics for a given month
:param year: year for which to retrieve statistics (default: this year) :param year: year for which to retrieve statistics (default: this year)
:param month: month for which to retrieve statistcs (default: this :param month: month for which to retrieve statistcs (default: this
month) month)
:param kwh: return usage in kWh (default: True)
:return: mapping of day of month to value :return: mapping of day of month to value
None if device has no energy meter or error occured None if device has no energy meter or error occured
:rtype: dict :rtype: dict
@ -406,20 +441,24 @@ class SmartDevice(object):
response = self._query_helper(self.emeter_type, "get_daystat", response = self._query_helper(self.emeter_type, "get_daystat",
{'month': month, 'year': year}) {'month': month, 'year': year})
response = [EmeterStatus(**x) for x in response["day_list"]]
if self.emeter_units:
key = 'energy_wh' key = 'energy_wh'
else: if kwh:
key = 'energy' key = 'energy'
return {entry['day']: entry[key] data = {entry['day']: entry[key]
for entry in response['day_list']} for entry in response}
def get_emeter_monthly(self, year=None) -> Optional[Dict]: return data
def get_emeter_monthly(self, year: int = None,
kwh: bool = True) -> Optional[Dict]:
""" """
Retrieve monthly statistics for a given year. Retrieve monthly statistics for a given year.
:param year: year for which to retrieve statistics (default: this year) :param year: year for which to retrieve statistics (default: this year)
:param kwh: return usage in kWh (default: True)
:return: dict: mapping of month to value :return: dict: mapping of month to value
None if device has no energy meter None if device has no energy meter
:rtype: dict :rtype: dict
@ -433,14 +472,14 @@ class SmartDevice(object):
response = self._query_helper(self.emeter_type, "get_monthstat", response = self._query_helper(self.emeter_type, "get_monthstat",
{'year': year}) {'year': year})
response = [EmeterStatus(**x) for x in response["month_list"]]
if self.emeter_units:
key = 'energy_wh' key = 'energy_wh'
else: if kwh:
key = 'energy' key = 'energy'
return {entry['month']: entry[key] return {entry['month']: entry[key]
for entry in response['month_list']} for entry in response}
def erase_emeter_stats(self) -> bool: def erase_emeter_stats(self) -> bool:
""" """
@ -471,11 +510,8 @@ class SmartDevice(object):
if not self.has_emeter: if not self.has_emeter:
return None return None
response = self.get_emeter_realtime() response = EmeterStatus(self.get_emeter_realtime())
if self.emeter_units: return response['power']
return float(response['power_mw'])
else:
return float(response['power'])
def turn_off(self) -> None: def turn_off(self) -> None:
""" """

View File

@ -36,7 +36,6 @@ class SmartPlug(SmartDevice):
protocol: 'TPLinkSmartHomeProtocol' = None) -> None: protocol: 'TPLinkSmartHomeProtocol' = None) -> None:
SmartDevice.__init__(self, host, protocol) SmartDevice.__init__(self, host, protocol)
self.emeter_type = "emeter" self.emeter_type = "emeter"
self.emeter_units = False
@property @property
def state(self) -> str: def state(self) -> str:

View File

@ -113,9 +113,34 @@ sysinfo_hs110 = {'system': {'get_sysinfo':
'type': 'IOT.SMARTPLUGSWITCH', 'type': 'IOT.SMARTPLUGSWITCH',
'updating': 0} 'updating': 0}
}, },
"emeter": emeter_support, 'emeter': emeter_support,
} }
sysinfo_hs110_au_v2 = {'system': {'get_sysinfo':
{'active_mode': 'none',
'alias': 'Tplink Test',
'dev_name': 'Smart Wi-Fi Plug With Energy Monitoring',
'deviceId': '80062952E2F3D9461CFB91FF21B7868F194F627A',
'feature': 'TIM:ENE',
'fwId': '00000000000000000000000000000000',
'hwId': 'A28C8BB92AFCB6CAFB83A8C00145F7E2',
'hw_ver': '2.0',
'icon_hash': '',
'latitude_i': -1.1,
'led_off': 0,
'longitude_i': 2.2,
'mac': '70:4F:57:12:12:12',
'model': 'HS110(AU)',
'oemId': '6480C2101948463DC65D7009CAECDECC',
'on_time': 0,
'relay_state': 0,
'rssi': -70,
'sw_ver': '1.5.2 Build 171201 Rel.084625',
'type': 'IOT.SMARTPLUGSWITCH',
'updating': 0}
},
'emeter': {'voltage_mv': 246520, 'power_mw': 258401, 'current_ma': 3104, 'total_wh': 387}}
sysinfo_hs200 = {'system': {'get_sysinfo': {'active_mode': 'schedule', sysinfo_hs200 = {'system': {'get_sysinfo': {'active_mode': 'schedule',
'alias': 'Christmas Tree Switch', 'alias': 'Christmas Tree Switch',
'dev_name': 'Wi-Fi Smart Light Switch', 'dev_name': 'Wi-Fi Smart Light Switch',

View File

@ -156,18 +156,27 @@ class TestSmartBulb(TestCase):
def test_get_emeter_daily(self): def test_get_emeter_daily(self):
self.assertEqual(self.bulb.get_emeter_daily(year=1900, month=1), {}) self.assertEqual(self.bulb.get_emeter_daily(year=1900, month=1), {})
k, v = self.bulb.get_emeter_daily().popitem() k, v = self.bulb.get_emeter_daily(kwh=False).popitem()
self.assertTrue(isinstance(k, int)) self.assertTrue(isinstance(k, int))
self.assertTrue(isinstance(v, int)) self.assertTrue(isinstance(v, int))
k, v = self.bulb.get_emeter_daily(kwh=True).popitem()
self.assertTrue(isinstance(k, int))
self.assertTrue(isinstance(v, float))
def test_get_emeter_monthly(self): def test_get_emeter_monthly(self):
self.assertEqual(self.bulb.get_emeter_monthly(year=1900), {}) self.assertEqual(self.bulb.get_emeter_monthly(year=1900), {})
d = self.bulb.get_emeter_monthly() d = self.bulb.get_emeter_monthly(kwh=False)
k, v = d.popitem() k, v = d.popitem()
self.assertTrue(isinstance(k, int)) self.assertTrue(isinstance(k, int))
self.assertTrue(isinstance(v, int)) self.assertTrue(isinstance(v, int))
d = self.bulb.get_emeter_monthly(kwh=True)
k, v = d.popitem()
self.assertTrue(isinstance(k, int))
self.assertTrue(isinstance(v, float))
@skip("not clearing your stats..") @skip("not clearing your stats..")
def test_erase_emeter_stats(self): def test_erase_emeter_stats(self):
self.fail() self.fail()

View File

@ -10,6 +10,7 @@ from .fakes import (FakeTransportProtocol,
sysinfo_hs100, sysinfo_hs100,
sysinfo_hs105, sysinfo_hs105,
sysinfo_hs110, sysinfo_hs110,
sysinfo_hs110_au_v2,
sysinfo_hs200) sysinfo_hs200)
# Set IP instead of None if you want to run tests on a device. # Set IP instead of None if you want to run tests on a device.
@ -35,8 +36,8 @@ def check_mode(x):
raise Invalid("invalid mode {}".format(x)) raise Invalid("invalid mode {}".format(x))
class TestSmartPlugHS110(TestCase): class TestSmartPlugHS100(TestCase):
SYSINFO = sysinfo_hs110 # type: Dict SYSINFO = sysinfo_hs100 # type: Dict
# these schemas should go to the mainlib as # these schemas should go to the mainlib as
# they can be useful when adding support for new features/devices # they can be useful when adding support for new features/devices
# as well as to check that faked devices are operating properly. # as well as to check that faked devices are operating properly.
@ -68,10 +69,15 @@ class TestSmartPlugHS110(TestCase):
}) })
current_consumption_schema = Schema(Any({ current_consumption_schema = Schema(Any({
'voltage': All(float, Range(min=0, max=300)), 'voltage': Any(All(float, Range(min=0, max=300)), None),
'power': Coerce(float, Range(min=0)), 'power': Any(Coerce(float, Range(min=0)), None),
'total': Coerce(float, Range(min=0)), 'total': Any(Coerce(float, Range(min=0)), None),
'current': All(float, Range(min=0)), 'current': Any(All(float, Range(min=0)), None),
'voltage_mw': Any(All(float, Range(min=0, max=300000)), None),
'power_mw': Any(Coerce(float, Range(min=0)), None),
'total_wh': Any(Coerce(float, Range(min=0)), None),
'current_ma': Any(All(float, Range(min=0)), None),
}, None)) }, None))
tz_schema = Schema({ tz_schema = Schema({
@ -210,11 +216,6 @@ class TestSmartPlugHS110(TestCase):
else: else:
self.assertEqual(self.plug.current_consumption(), None) self.assertEqual(self.plug.current_consumption(), None)
def test_identify(self):
ident = self.plug.identify()
self.assertTrue(isinstance(ident, tuple))
self.assertTrue(len(ident) == 3)
def test_alias(self): def test_alias(self):
test_alias = "TEST1234" test_alias = "TEST1234"
original = self.plug.alias original = self.plug.alias
@ -261,15 +262,69 @@ class TestSmartPlugHS110(TestCase):
# TODO check setting? # TODO check setting?
class TestSmartPlugHS100(TestSmartPlugHS110): class TestSmartPlugHS110(TestSmartPlugHS100):
SYSINFO = sysinfo_hs100 SYSINFO = sysinfo_hs110
def test_emeter_upcast(self):
emeter = self.plug.get_emeter_realtime()
self.assertAlmostEqual(emeter["power"] * 10**3, emeter["power_mw"])
self.assertAlmostEqual(emeter["voltage"] * 10**3, emeter["voltage_mv"])
self.assertAlmostEqual(emeter["current"] * 10**3, emeter["current_ma"])
self.assertAlmostEqual(emeter["total"] * 10**3, emeter["total_wh"])
def test_emeter_daily_upcast(self):
emeter = self.plug.get_emeter_daily()
_, v = emeter.popitem()
emeter = self.plug.get_emeter_daily(kwh=False)
_, v2 = emeter.popitem()
self.assertAlmostEqual(v * 10**3, v2)
def test_get_emeter_monthly_upcast(self):
emeter = self.plug.get_emeter_monthly()
_, v = emeter.popitem()
emeter = self.plug.get_emeter_monthly(kwh=False)
_, v2 = emeter.popitem()
self.assertAlmostEqual(v * 10**3, v2)
class TestSmartPlugHS200(TestSmartPlugHS110): class TestSmartPlugHS110_HW2(TestSmartPlugHS100):
SYSINFO = sysinfo_hs110_au_v2
def test_emeter_downcast(self):
emeter = self.plug.get_emeter_realtime()
self.assertAlmostEqual(emeter["power"], emeter["power_mw"] / 10**3)
self.assertAlmostEqual(emeter["voltage"], emeter["voltage_mv"] / 10**3)
self.assertAlmostEqual(emeter["current"], emeter["current_ma"] / 10**3)
self.assertAlmostEqual(emeter["total"], emeter["total_wh"] / 10**3)
def test_emeter_daily_downcast(self):
emeter = self.plug.get_emeter_daily()
_, v = emeter.popitem()
emeter = self.plug.get_emeter_daily(kwh=False)
_, v2 = emeter.popitem()
self.assertAlmostEqual(v * 10**3, v2)
def test_get_emeter_monthly_downcast(self):
emeter = self.plug.get_emeter_monthly()
_, v = emeter.popitem()
emeter = self.plug.get_emeter_monthly(kwh=False)
_, v2 = emeter.popitem()
self.assertAlmostEqual(v * 10**3, v2)
class TestSmartPlugHS200(TestSmartPlugHS100):
SYSINFO = sysinfo_hs200 SYSINFO = sysinfo_hs200
class TestSmartPlugHS105(TestSmartPlugHS110): class TestSmartPlugHS105(TestSmartPlugHS100):
SYSINFO = sysinfo_hs105 SYSINFO = sysinfo_hs105
def test_location_i(self): def test_location_i(self):