import json import socket import struct import logging _LOGGER = logging.getLogger(__name__) class TPLinkSmartHomeProtocol: """ Implementation of the TP-Link Smart Home Protocol Encryption/Decryption methods based on the works of Lubomir Stroetmann and Tobias Esser https://www.softscheck.com/en/reverse-engineering-tp-link-hs110/ https://github.com/softScheck/tplink-smartplug/ which are licensed under the Apache License, Version 2.0 http://www.apache.org/licenses/LICENSE-2.0 """ INITIALIZATION_VECTOR = 171 DEFAULT_PORT = 9999 DEFAULT_TIMEOUT = 5 @staticmethod def query(host, request, port=DEFAULT_PORT): """ Request information from a TP-Link SmartHome Device and return the response. :param str host: ip address of the device :param int port: port on the device (default: 9999) :param request: command to send to the device (can be either dict or json string) :return: """ if isinstance(request, dict): request = json.dumps(request) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5.0) try: sock.connect((host, port)) _LOGGER.debug("> (%i) %s", len(request), request) sock.send(TPLinkSmartHomeProtocol.encrypt(request)) buffer = bytes() # Some devices send responses with a length header of 0 and # terminate with a zero size chunk. Others send the length and # will hang if we attempt to read more data. length = -1 while True: chunk = sock.recv(4096) if length == -1: length = struct.unpack(">I", chunk[0:4])[0] buffer += chunk if (length > 0 and len(buffer) >= length + 4) or not chunk: break finally: try: sock.shutdown(socket.SHUT_RDWR) sock.close() except OSError: # OSX raises OSError when shutdown() gets called on a closed # socket. We ignore it here as the data has already been read # into the buffer at this point. pass response = TPLinkSmartHomeProtocol.decrypt(buffer[4:]) _LOGGER.debug("< (%i) %s", len(response), response) return json.loads(response) @staticmethod def discover(timeout=DEFAULT_TIMEOUT, port=DEFAULT_PORT): """ Sends discovery message to 255.255.255.255:9999 in order to detect available supported devices in the local network, and waits for given timeout for answers from devices. :param timeout: How long to wait for responses, defaults to 5 :param port: port to send broadcast messages, defaults to 9999. :rtype: list[dict] :return: Array of json objects {"ip", "port", "sys_info"} """ discovery_query = {"system": {"get_sysinfo": None}, "emeter": {"get_realtime": None}} target = "255.255.255.255" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.settimeout(timeout) req = json.dumps(discovery_query) _LOGGER.debug("Sending discovery to %s:%s", target, port) encrypted_req = TPLinkSmartHomeProtocol.encrypt(req) sock.sendto(encrypted_req[4:], (target, port)) devices = [] _LOGGER.debug("Waiting %s seconds for responses...", timeout) try: while True: data, addr = sock.recvfrom(4096) ip, port = addr info = json.loads(TPLinkSmartHomeProtocol.decrypt(data)) devices.append({"ip": ip, "port": port, "sys_info": info}) except socket.timeout: _LOGGER.debug("Got socket timeout, which is okay.") except Exception as ex: _LOGGER.error("Got exception %s", ex, exc_info=True) return devices @staticmethod def encrypt(request): """ Encrypt a request for a TP-Link Smart Home Device. :param request: plaintext request data :return: ciphertext request """ key = TPLinkSmartHomeProtocol.INITIALIZATION_VECTOR buffer = bytearray(struct.pack(">I", len(request))) for char in request: cipher = key ^ ord(char) key = cipher buffer.append(cipher) return buffer @staticmethod def decrypt(ciphertext): """ Decrypt a response of a TP-Link Smart Home Device. :param ciphertext: encrypted response data :return: plaintext response """ key = TPLinkSmartHomeProtocol.INITIALIZATION_VECTOR buffer = [] ciphertext = ciphertext.decode('latin-1') for char in ciphertext: plain = key ^ ord(char) key = ord(char) buffer.append(chr(plain)) plaintext = ''.join(buffer) return plaintext