Merge pull request #8 from mweinelt/master

Externalize the TP-Link Smart Home Protocol
This commit is contained in:
GadgetReactor 2016-11-22 21:29:19 +08:00 committed by GitHub
commit fadb76c5a0

View File

@ -1,257 +1,372 @@
# Parts of this code reuse code and concepts by Lubomir Stroetmann from softScheck GmbH """
# licensed under the Apache License v 2.0. pyHS100
# Copy of the Apache License can be found at http://www.apache.org/licenses/LICENSE-2.0 Python library supporting TP-Link Smart Plugs/Switches (HS100/HS110/Hs200).
# The code from Lubomir Stroetmann is located at http://github.com/softScheck/tplink-smartplug
The communication protocol was reverse engineered by Lubomir Stroetmann and
Tobias Esser in 'Reverse Engineering the TP-Link HS110':
https://www.softscheck.com/en/reverse-engineering-tp-link-hs110/
This library reuses codes and concepts of the TP-Link WiFi SmartPlug Client
at https://github.com/softScheck/tplink-smartplug, developed by Lubomir
Stroetmann which is licensed under the Apache License, Version 2.0.
You may obtain a copy of the license at
http://www.apache.org/licenses/LICENSE-2.0
"""
import datetime
import json
import logging import logging
import socket import socket
import codecs
import json
import datetime
import sys import sys
import re
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
# switch states
SWITCH_STATE_ON = 'on'
SWITCH_STATE_OFF = 'off'
SWITCH_STATE_UNKNOWN = 'unknown'
class SmartPlug(object): # possible device features
"""Class to access TPLink Switch. FEATURE_ENERGY_METER = 'ENE'
FEATURE_TIMER = 'TIM'
ALL_FEATURES = (FEATURE_ENERGY_METER, FEATURE_TIMER)
class SmartPlug:
"""Representation of a TP-Link Smart Switch.
Usage example when used as library: Usage example when used as library:
p = SmartPlug("192.168.1.105") p = SmartPlug("192.168.1.105")
# print the devices alias
print(p.alias)
# change state of plug # change state of plug
p.state = "OFF" p.state = "OFF"
p.state = "ON" p.state = "ON"
# query and print current state of plug # query and print current state of plug
print(p.state) print(p.state)
Note: Note:
The library references the same structure as defined for the D-Link Switch The library references the same structure as defined for the D-Link Switch
""" """
def __init__(self, ip): def __init__(self, ip_address):
"""Create a new SmartPlug instance identified by the IP.""" """
self.ip = ip Create a new SmartPlug instance, identified through its IP address.
self.port = 9999
self._error_report = False :param ip_address: ip address on which the device listens
self.model = self._identify_model() """
socket.inet_pton(socket.AF_INET, ip_address)
self.ip_address = ip_address
self.alias, self.model, self.features = self.identify()
@property @property
def state(self): def state(self):
"""Get the device state (i.e. ON or OFF).""" """
response = self.get_info() Retrieve the switch state
relay_state = response["system"]["get_sysinfo"]["relay_state"]
if relay_state is None: :returns: one of
return 'unknown' SWITCH_STATE_ON
elif relay_state == 0: SWITCH_STATE_OFF
return "OFF" SWITCH_STATE_UNKNOWN
"""
response = self.get_sysinfo()
relay_state = response['relay_state']
if relay_state == 0:
return SWITCH_STATE_OFF
elif relay_state == 1: elif relay_state == 1:
return "ON" return SWITCH_STATE_ON
else: else:
_LOGGER.warning("Unknown state %s returned" % str(relay_state)) _LOGGER.warning("Unknown state %s returned.", relay_state)
return 'unknown' return SWITCH_STATE_UNKNOWN
@state.setter @state.setter
def state(self, value): def state(self, value):
"""Set device state.
:type value: str
:param value: Future state (either ON or OFF)
""" """
if value.upper() == 'ON': Set the new switch state
:param value: one of
SWITCH_STATE_ON
SWITCH_STATE_OFF
:return: True if new state was successfully set
False if an error occured
"""
if value.lower() == SWITCH_STATE_ON:
self.turn_on() self.turn_on()
elif value.lower() == SWITCH_STATE_OFF:
elif value.upper() == 'OFF':
self.turn_off() self.turn_off()
else: else:
raise TypeError("State %s is not valid." % str(value))
def get_info(self): raise ValueError("State %s is not valid.", value)
"""Interrogate the switch"""
return self._send_command('{"system":{"get_sysinfo":{}}}') def get_sysinfo(self):
"""
Retrieve system information.
:return: dict sysinfo
"""
response = TPLinkSmartHomeProtocol.query(
host=self.ip_address,
request={'system': {'get_sysinfo': {}}}
)['system']['get_sysinfo']
if response['err_code'] != 0:
return False
return response
def turn_on(self): def turn_on(self):
"""Turns the switch on
Return values:
True on success
False on failure
""" """
response = self._send_command('{"system":{"set_relay_state":{"state":1}}}') Turn the switch on.
if response["system"]["set_relay_state"]["err_code"] == 0: :return: True on success
return True :raises ProtocolError when device responds with err_code != 0
"""
response = TPLinkSmartHomeProtocol.query(
host=self.ip_address,
request={'system': {'set_relay_state': {'state': 1}}}
)['system']['set_relay_state']
return False if response['err_code'] != 0:
return False
return True
def turn_off(self): def turn_off(self):
"""Turns the switch off
Return values:
True on success
False on failure
""" """
response = self._send_command('{"system":{"set_relay_state":{"state":0}}}') Turn the switch off.
if response["system"]["set_relay_state"]["err_code"] == 0: :return: True on success
return True False on error
"""
response = TPLinkSmartHomeProtocol.query(
host=self.ip_address,
request={'system': {'set_relay_state': {'state': 0}}}
)['system']['set_relay_state']
return False if response['err_code'] != 0:
return False
return True
@property
def has_emeter(self):
"""
Checks feature list for energey meter support.
:return: True if energey meter is available
False if energymeter is missing
"""
return FEATURE_ENERGY_METER in self.features
def get_emeter_realtime(self): def get_emeter_realtime(self):
"""Gets the current energy readings from the switch
Return values:
False if command is not successful or the switch doesn't support energy metering
Dict with the current readings
""" """
if self.model != 110: Retrive current energy readings from device.
return False
response = self._send_command('{"emeter":{"get_realtime":{}}}') :returns: dict with current readings
False if device has no energy meter or error occured
if response["emeter"]["get_realtime"]["err_code"] != 0:
return False
response["emeter"]["get_realtime"].pop('err_code', None)
return response["emeter"]["get_realtime"]
def get_emeter_daily(self, year=datetime.datetime.now().year, month=datetime.datetime.now().month):
"""Gets daily statistics for a given month.
Arguments:
year (optional): The year for which to retrieve statistics, defaults to current year
month (optional): The mont for which to retrieve statistics, defaults to current month
Return values:
False if command is not successful or the switch doesn't support energy metering
Dict where the keys represent the days, and the values are the aggregated statistics
""" """
if self.model != 110: if not self.has_emeter:
return False return False
response = self._send_command('{"emeter":{"get_daystat":{"month":' + str(month) + ',"year":' + str(year) + '}}}') response = TPLinkSmartHomeProtocol.query(
host=self.ip_address, request={'emeter': {'get_realtime': {}}}
)['emeter']['get_realtime']
if response["emeter"]["get_daystat"]["err_code"] != 0: if response['err_code'] != 0:
return False return False
data = dict() del response['err_code']
for i, j in enumerate(response["emeter"]["get_daystat"]["day_list"]): return response
if j["energy"] > 0:
data[j["day"]] = j["energy"]
return data def get_emeter_daily(self, year=None, month=None):
"""
Retrieve daily statistics for a given month
:param year: year for which to retrieve statistics (default: this year)
:param month: month for which to retrieve statistcs (default: this
month)
:return: dict: mapping of day of month to value
False if device has no energy meter or error occured
"""
if not self.has_emeter:
return False
if year is None:
year = datetime.datetime.now().year
if month is None:
month = datetime.datetime.now().month
response = TPLinkSmartHomeProtocol.query(
host=self.ip_address,
request={'emeter': {'get_daystat': {'month': str(month),
'year': str(year)}}}
)['emeter']['get_daystat']
if response['err_code'] != 0:
return False
return {entry['day']: entry['energy']
for entry in response['day_list']}
def get_emeter_monthly(self, year=datetime.datetime.now().year): def get_emeter_monthly(self, year=datetime.datetime.now().year):
"""Gets monthly statistics for a given year.
Arguments:
year (optional): The year for which to retrieve statistics, defaults to current year
Return values:
False if command is not successful or the switch doesn't support energy metering
Dict - the keys represent the months, the values are the aggregated statistics
""" """
if self.model != 110: Retrieve monthly statistics for a given year.
:param year: year for which to retrieve statistics (default: this year)
:return: dict: mapping of month to value
False if device has no energy meter or error occured
"""
if not self.has_emeter:
return False return False
response = self._send_command('{"emeter":{"get_monthstat":{"year":' + str(year) + '}}}') response = TPLinkSmartHomeProtocol.query(
host=self.ip_address,
request={'emeter': {'get_monthstat': {'year': str(year)}}}
)['emeter']['get_monthstat']
if response["emeter"]["get_monthstat"]["err_code"] != 0: if response['err_code'] != 0:
return False return False
data = dict() return {entry['month']: entry['energy']
for entry in response['month_list']}
for i, j in enumerate(response["emeter"]["get_monthstat"]["month_list"]):
if j["energy"] > 0:
data[j["month"]] = j["energy"]
return data
def erase_emeter_stats(self): def erase_emeter_stats(self):
"""Erases all statistics.
Return values:
True: Success
False: Failure or not supported by switch
""" """
if self.model != 110: Erase energy meter statistics
:return: True if statistics were deleted
False if device has no energy meter or error occured
"""
if not self.has_emeter:
return False return False
response = self._send_command('{"emeter":{"erase_emeter_stat":null}}') response = TPLinkSmartHomeProtocol.query(
host=self.ip_address,
request={'emeter': {'erase_emeter_stat': None}}
)['emeter']['erase_emeter_stat']
if response["emeter"]["erase_emeter_stat"]["err_code"] != 0: return response['err_code'] == 0
return False
else:
return True
def current_consumption(self): def current_consumption(self):
"""Get the current power consumption in Watt.""" """
if self.model != 110: Get the current power consumption in Watt.
:return: the current power consumption in Watt.
False if device has no energy meter of error occured.
"""
if not self.has_emeter:
return False return False
response = self.get_emeter_realtime() response = self.get_emeter_realtime()
return response["power"] return response['power']
def _encrypt(self, string):
"""Encrypts a command."""
def identify(self):
""" """
Taken from https://raw.githubusercontent.com/softScheck/tplink-smartplug/master/tplink-smartplug.py Query device information to identify model and featureset
Changes: the return value is encoded in latin-1 in Python 3 and later
:return: str model, list of supported features
""" """
key = 171 sys_info = self.get_sysinfo()
result = "\0\0\0\0"
for i in string:
a = key ^ ord(i)
key = a
result += chr(a)
if sys.version_info.major > 2: alias = sys_info['alias']
return result.encode('latin-1') model = sys_info['model']
features = sys_info['feature'].split(':')
return result for feature in features:
if feature not in ALL_FEATURES:
_LOGGER.warning("Unknown feature %s on device %s.",
feature, model)
def _decrypt(self, string): return alias, model, features
"""Decrypts a command."""
class TPLinkSmartHomeProtocol:
"""
Implementation of the TP-Link Smart Home Protocol
Encryption/Decryption methods based on the works of
Lubomir Stroetmann and Tobias Esser
https://www.softscheck.com/en/reverse-engineering-tp-link-hs110/
https://github.com/softScheck/tplink-smartplug/
which are licensed under the Apache License, Version 2.0
http://www.apache.org/licenses/LICENSE-2.0
"""
initialization_vector = 171
@staticmethod
def query(host, request, port=9999):
""" """
Taken from https://raw.githubusercontent.com/softScheck/tplink-smartplug/master/tplink-smartplug.py Request information from a TP-Link SmartHome Device and return the
Changes: the string parameter is decoded from latin-1 in Python 3 and later response.
:param host: ip address of the device
:param port: port on the device (default: 9999)
:param request: command to send to the device (can be either dict or
json string)
:return:
""" """
if sys.version_info.major > 2: if isinstance(request, dict):
string = string.decode('latin-1') request = json.dumps(request)
key = 171 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = "" sock.connect((host, port))
for i in string: sock.send(TPLinkSmartHomeProtocol.encrypt(request))
a = key ^ ord(i) buffer = sock.recv(4096)[4:]
key = ord(i) sock.shutdown(socket.SHUT_RDWR)
result += chr(a) sock.close()
return result
def _send_command(self, command):
"""Sends a command to the switch.
Accepts one argument - the command as a string
Return values:
The decrypted JSON
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.ip, self.port))
s.send(self._encrypt(command))
response = self._decrypt(s.recv(4096)[4:])
s.shutdown(1)
s.close()
response = TPLinkSmartHomeProtocol.decrypt(buffer)
return json.loads(response) return json.loads(response)
def _identify_model(self): @staticmethod
"""Query sysinfo and determine model""" def encrypt(request):
sys_info = self.get_info() """
model = re.sub('HS', '', sys_info["system"]["get_sysinfo"]["model"][:5]) Encrypt a request for a TP-Link Smart Home Device.
return model
:param request: plaintext request data
:return: ciphertext request
"""
key = TPLinkSmartHomeProtocol.initialization_vector
buffer = ['\0\0\0\0']
for char in request:
cipher = key ^ ord(char)
key = cipher
buffer.append(chr(cipher))
ciphertext = ''.join(buffer)
if sys.version_info.major > 2:
ciphertext = ciphertext.encode('latin-1')
return ciphertext
@staticmethod
def decrypt(ciphertext):
"""
Decrypt a response of a TP-Link Smart Home Device.
:param ciphertext: encrypted response data
:return: plaintext response
"""
key = TPLinkSmartHomeProtocol.initialization_vector
buffer = []
if sys.version_info.major > 2:
ciphertext = ciphertext.decode('latin-1')
for char in ciphertext:
plain = key ^ ord(char)
key = ord(char)
buffer.append(chr(plain))
plaintext = ''.join(buffer)
return plaintext