diff --git a/pyHS100/pyHS100.py b/pyHS100/pyHS100.py index 8d388c2a..771d4654 100644 --- a/pyHS100/pyHS100.py +++ b/pyHS100/pyHS100.py @@ -1,21 +1,29 @@ -# Parts of this code reuse code and concepts by Lubomir Stroetmann from softScheck GmbH -# licensed under the Apache License v 2.0. -# Copy of the Apache License can be found at http://www.apache.org/licenses/LICENSE-2.0 -# The code from Lubomir Stroetmann is located at http://github.com/softScheck/tplink-smartplug +# pyHS100 +# Python Library supporting TP-Link Smart Plugs/Switches (HS100/HS110/Hs200) +# +# The communication protocol was reverse engineered by Lubomir Stroetmann and +# Tobias Esser in 'Reverse Engineering the TP-Link HS110' +# https://www.softscheck.com/en/reverse-engineering-tp-link-hs110/ +# +# This library reuses codes and concepts of the TP-Link WiFi SmartPlug Client +# at https://github.com/softScheck/tplink-smartplug, developed by Lubomir +# Stroetmann which is licensed under the Apache License, Version 2.0. +# +# You may obtain a copy of the license at +# http://www.apache.org/licenses/LICENSE-2.0 -import logging -import socket -import codecs -import json import datetime -import sys +import json +import logging import re +import socket +import sys _LOGGER = logging.getLogger(__name__) class SmartPlug(object): - """Class to access TPLink Switch. + """Representation of a TP-Link Smart Switch. Usage example when used as library: p = SmartPlug("192.168.1.105") @@ -68,7 +76,8 @@ class SmartPlug(object): def get_info(self): """Interrogate the switch""" - return self._send_command('{"system":{"get_sysinfo":{}}}') + return TPLinkSmartHomeProtocol.query( + host=self.ip, request='{"system":{"get_sysinfo":{}}}') def turn_on(self): """Turns the switch on @@ -77,7 +86,8 @@ class SmartPlug(object): True on success False on failure """ - response = self._send_command('{"system":{"set_relay_state":{"state":1}}}') + response = TPLinkSmartHomeProtocol.query( + host=self.ip, request='{"system":{"set_relay_state":{"state":1}}}') if response["system"]["set_relay_state"]["err_code"] == 0: return True @@ -91,7 +101,8 @@ class SmartPlug(object): True on success False on failure """ - response = self._send_command('{"system":{"set_relay_state":{"state":0}}}') + response = TPLinkSmartHomeProtocol.query( + host=self.ip, request='{"system":{"set_relay_state":{"state":0}}}') if response["system"]["set_relay_state"]["err_code"] == 0: return True @@ -108,7 +119,8 @@ class SmartPlug(object): if self.model != 110: return False - response = self._send_command('{"emeter":{"get_realtime":{}}}') + response = TPLinkSmartHomeProtocol.query( + host=self.ip, request='{"emeter":{"get_realtime":{}}}') if response["emeter"]["get_realtime"]["err_code"] != 0: return False @@ -130,7 +142,8 @@ class SmartPlug(object): if self.model != 110: return False - response = self._send_command('{"emeter":{"get_daystat":{"month":' + str(month) + ',"year":' + str(year) + '}}}') + response = TPLinkSmartHomeProtocol.query( + host=self.ip, request='{"emeter":{"get_daystat":{"month":' + str(month) + ',"year":' + str(year) + '}}}') if response["emeter"]["get_daystat"]["err_code"] != 0: return False @@ -156,7 +169,8 @@ class SmartPlug(object): if self.model != 110: return False - response = self._send_command('{"emeter":{"get_monthstat":{"year":' + str(year) + '}}}') + response = TPLinkSmartHomeProtocol.query( + host=self.ip, request='{"emeter":{"get_monthstat":{"year":' + str(year) + '}}}') if response["emeter"]["get_monthstat"]["err_code"] != 0: return False @@ -179,7 +193,8 @@ class SmartPlug(object): if self.model != 110: return False - response = self._send_command('{"emeter":{"erase_emeter_stat":null}}') + response = TPLinkSmartHomeProtocol.query( + host=self.ip, request='{"emeter":{"erase_emeter_stat":null}}') if response["emeter"]["erase_emeter_stat"]["err_code"] != 0: return False @@ -195,63 +210,94 @@ class SmartPlug(object): return response["power"] - def _encrypt(self, string): - """Encrypts a command.""" - - """ - Taken from https://raw.githubusercontent.com/softScheck/tplink-smartplug/master/tplink-smartplug.py - Changes: the return value is encoded in latin-1 in Python 3 and later - """ - key = 171 - result = "\0\0\0\0" - for i in string: - a = key ^ ord(i) - key = a - result += chr(a) - - if sys.version_info.major > 2: - return result.encode('latin-1') - - return result - - def _decrypt(self, string): - """Decrypts a command.""" - - """ - Taken from https://raw.githubusercontent.com/softScheck/tplink-smartplug/master/tplink-smartplug.py - Changes: the string parameter is decoded from latin-1 in Python 3 and later - """ - if sys.version_info.major > 2: - string = string.decode('latin-1') - - key = 171 - result = "" - for i in string: - a = key ^ ord(i) - key = ord(i) - result += chr(a) - - return result - - def _send_command(self, command): - """Sends a command to the switch. - - Accepts one argument - the command as a string - - Return values: - The decrypted JSON - """ - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect((self.ip, self.port)) - s.send(self._encrypt(command)) - response = self._decrypt(s.recv(4096)[4:]) - s.shutdown(1) - s.close() - - return json.loads(response) - def _identify_model(self): """Query sysinfo and determine model""" sys_info = self.get_info() model = re.sub('HS', '', sys_info["system"]["get_sysinfo"]["model"][:5]) return model + + +class TPLinkSmartHomeProtocol: + """ + Implementation of the TP-Link Smart Home Protocol + + Encryption/Decryption methods based on the works of + Lubomir Stroetmann and Tobias Esser + + https://www.softscheck.com/en/reverse-engineering-tp-link-hs110/ + https://github.com/softScheck/tplink-smartplug/ + + which are licensed under the Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0 + """ + IV = 171 + + @staticmethod + def query(host, request, port=9999): + """ + Request information from a TP-Link SmartHome Device and return the + response. + + :param host: ip address of the device + :param port: port on the device (default: 9999) + :param request: command to send to the device (can be either dict or + json string) + :return: + """ + if isinstance(request, dict): + request = json.dumps(request) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + sock.send(TPLinkSmartHomeProtocol.encrypt(request)) + buffer = sock.recv(4096)[4:] + sock.shutdown(socket.SHUT_RDWR) + sock.close() + + response = TPLinkSmartHomeProtocol.decrypt(buffer) + return json.loads(response) + + @staticmethod + def encrypt(request): + """ + Encrypt a request for a TP-Link Smart Home Device. + + :param request: plaintext request data + :return: ciphertext request + """ + key = TPLinkSmartHomeProtocol.IV + buffer = ['\0\0\0\0'] + + for char in request: + cipher = key ^ ord(char) + key = cipher + buffer.append(chr(cipher)) + + ciphertext = ''.join(buffer) + if sys.version_info.major > 2: + ciphertext = ciphertext.encode('latin-1') + + return ciphertext + + @staticmethod + def decrypt(ciphertext): + """ + Decrypt a response of a TP-Link Smart Home Device. + + :param ciphertext: encrypted response data + :return: plaintext response + """ + key = TPLinkSmartHomeProtocol.IV + buffer = [] + + if sys.version_info.major > 2: + ciphertext = ciphertext.decode('latin-1') + + for char in ciphertext: + plain = key ^ ord(char) + key = ord(char) + buffer.append(chr(plain)) + + plaintext = ''.join(buffer) + + return plaintext