python-kasa/pyHS100/pyHS100.py
Teemu R 05a6bbb145 Read all data from the device, disable double-encoding, implement more APIs, refactor querying, update README (#11)
* Read from socket until no data available, disable double string encoding

HS110 sends sometimes datagrams in chunks especially for get_daystat,
this patch makes it to read until there is no more data to be read.

As json.dumps() does JSON encoding already, there's no need to str()
the year or month either.

* Add cli.py, a simple script to query devices for debugging purposes.

* allow easier importing with from pyHS100 import SmartPlug

* move cli.py to examples, add short usage into README.md

* Implement more available APIs, refactor querying code.

This commit adds access to new properties, both read & write,  while keeping the old one (mostly) intact.
Querying is refactored to be done inside _query_helper() method,
which unwraps results automatically and rises SmartPlugException() in case of errors.
Errors are to be handled by clients.

New features:
* Setting device alias (plug.alias = "name")
* led read & write
* icon read (doesn't seem to return anything without cloud support at least), write API is not known, throws an exception currently
* time read (returns datetime), time write implemented, but not working even when no error is returned from the device
* timezone read
* mac read & write, writing is untested for now.

Properties for easier access:
* hw_info: return hw-specific elements from sysinfo
* on_since: pretty-printed from sysinfo
* location: latitude and longitued from sysinfo
* rssi: rssi from sysinfo

* Update README.md with examples of available features.

* Handle comments from mweinelt

* Refactor state handling, use booleans instead of strings

* Fix issues raised during the review.

Following issues are addressed by this commit:
* All API is more or less commented (including return types, exceptions, ..)
* Converted state to use
* Added properties is_on, is_off for those who don't want to check against strings.
* Handled most issues reported by pylint.
* Adjusted _query_helper() to strip off err_code from the result object.
* Fixed broken format() syntax for string formattings.

* Fix ci woes plus one typo.

* Do initialization after changing device properties, fix nits.
2016-12-12 17:13:45 +08:00

612 lines
17 KiB
Python

"""
pyHS100
Python library supporting TP-Link Smart Plugs/Switches (HS100/HS110/Hs200).
The communication protocol was reverse engineered by Lubomir Stroetmann and
Tobias Esser in 'Reverse Engineering the TP-Link HS110':
https://www.softscheck.com/en/reverse-engineering-tp-link-hs110/
This library reuses codes and concepts of the TP-Link WiFi SmartPlug Client
at https://github.com/softScheck/tplink-smartplug, developed by Lubomir
Stroetmann which is licensed under the Apache License, Version 2.0.
You may obtain a copy of the license at
http://www.apache.org/licenses/LICENSE-2.0
"""
import datetime
import json
import logging
import socket
import sys
_LOGGER = logging.getLogger(__name__)
class SmartPlugException(Exception):
"""
SmartPlugException gets raised for errors reported by the plug.
"""
pass
class SmartPlug:
"""Representation of a TP-Link Smart Switch.
Usage example when used as library:
p = SmartPlug("192.168.1.105")
# print the devices alias
print(p.alias)
# change state of plug
p.state = "ON"
p.state = "OFF"
# query and print current state of plug
print(p.state)
Errors reported by the device are raised as SmartPlugExceptions,
and should be handled by the user of the library.
Note:
The library references the same structure as defined for the D-Link Switch
"""
# switch states
SWITCH_STATE_ON = 'ON'
SWITCH_STATE_OFF = 'OFF'
SWITCH_STATE_UNKNOWN = 'UNKNOWN'
# possible device features
FEATURE_ENERGY_METER = 'ENE'
FEATURE_TIMER = 'TIM'
ALL_FEATURES = (FEATURE_ENERGY_METER, FEATURE_TIMER)
def __init__(self, ip_address):
"""
Create a new SmartPlug instance, identified through its IP address.
:param str ip_address: ip address on which the device listens
:raises SmartPlugException: when unable to communicate with the device
"""
socket.inet_pton(socket.AF_INET, ip_address)
self.ip_address = ip_address
self.initialize()
def initialize(self):
"""
(Re-)Initializes the state.
This should be called when the state of the plug is changed anyway.
:raises: SmartPlugException: on error
"""
self.sys_info = self.get_sysinfo()
self._alias, self.model, self.features = self.identify()
def _query_helper(self, target, cmd, arg={}):
"""
Helper returning unwrapped result object and doing error handling.
:param target: Target system {system, time, emeter, ..}
:param cmd: Command to execute
:param arg: JSON object passed as parameter to the command, defaults to {}
:return: Unwrapped result for the call.
:rtype: dict
:raises SmartPlugException: if command was not executed correctly
"""
response = TPLinkSmartHomeProtocol.query(
host=self.ip_address,
request={target: {cmd: arg}}
)
result = response[target][cmd]
if result["err_code"] != 0:
raise SmartPlugException("Error on {}.{}: {}".format(target, cmd, result))
del result["err_code"]
return result
@property
def state(self):
"""
Retrieve the switch state
:returns: one of
SWITCH_STATE_ON
SWITCH_STATE_OFF
SWITCH_STATE_UNKNOWN
:rtype: str
"""
relay_state = self.sys_info['relay_state']
if relay_state == 0:
return SmartPlug.SWITCH_STATE_OFF
elif relay_state == 1:
return SmartPlug.SWITCH_STATE_ON
else:
_LOGGER.warning("Unknown state %s returned.", relay_state)
return SmartPlug.SWITCH_STATE_UNKNOWN
@state.setter
def state(self, value):
"""
Set the new switch state
:param value: one of
SWITCH_STATE_ON
SWITCH_STATE_OFF
:raises ValueError: on invalid state
:raises SmartPlugException: on error
"""
if value.upper() == SmartPlug.SWITCH_STATE_ON:
self.turn_on()
elif value.upper() == SmartPlug.SWITCH_STATE_OFF:
self.turn_off()
else:
raise ValueError("State %s is not valid.", value)
self.initialize()
def get_sysinfo(self):
"""
Retrieve system information.
:return: sysinfo
:rtype dict
:raises SmartPlugException: on error
"""
return self._query_helper("system", "get_sysinfo")
@property
def is_on(self):
"""
Returns whether device is on.
:return: True if device is on, False otherwise
"""
return bool(self.sys_info['relay_state'])
@property
def is_off(self):
"""
Returns whether device is off.
:return: True if device is off, False otherwise.
:rtype: bool
"""
return not self.is_on
def turn_on(self):
"""
Turn the switch on.
:raises SmartPlugException: on error
"""
self._query_helper("system", "set_relay_state", {"state": 1})
self.initialize()
def turn_off(self):
"""
Turn the switch off.
:raises SmartPlugException: on error
"""
self._query_helper("system", "set_relay_state", {"state": 0})
self.initialize()
@property
def has_emeter(self):
"""
Checks feature list for energey meter support.
:return: True if energey meter is available
False if energymeter is missing
"""
return SmartPlug.FEATURE_ENERGY_METER in self.features
def get_emeter_realtime(self):
"""
Retrive current energy readings from device.
:returns: current readings or False
:rtype: dict, False
False if device has no energy meter or error occured
:raises SmartPlugException: on error
"""
if not self.has_emeter:
return False
return self._query_helper("emeter", "get_realtime")
def get_emeter_daily(self, year=None, month=None):
"""
Retrieve daily statistics for a given month
:param year: year for which to retrieve statistics (default: this year)
:param month: month for which to retrieve statistcs (default: this
month)
:return: mapping of day of month to value
False if device has no energy meter or error occured
:rtype: dict
:raises SmartPlugException: on error
"""
if not self.has_emeter:
return False
if year is None:
year = datetime.datetime.now().year
if month is None:
month = datetime.datetime.now().month
response = self._query_helper("emeter", "get_daystat",
{'month': month, 'year': year})
return {entry['day']: entry['energy']
for entry in response['day_list']}
def get_emeter_monthly(self, year=datetime.datetime.now().year):
"""
Retrieve monthly statistics for a given year.
:param year: year for which to retrieve statistics (default: this year)
:return: dict: mapping of month to value
False if device has no energy meter
:rtype: dict
:raises SmartPlugException: on error
"""
if not self.has_emeter:
return False
response = self._query_helper("emeter", "get_monthstat",
{'year': year})
return {entry['month']: entry['energy']
for entry in response['month_list']}
def erase_emeter_stats(self):
"""
Erase energy meter statistics
:return: True if statistics were deleted
False if device has no energy meter.
:rtype: bool
:raises SmartPlugException: on error
"""
if not self.has_emeter:
return False
self._query_helper("emeter", "erase_emeter_stat", None)
self.initialize()
# As query_helper raises exception in case of failure, we have succeeded when we are this far.
return True
def current_consumption(self):
"""
Get the current power consumption in Watt.
:return: the current power consumption in Watt.
False if device has no energy meter.
:raises SmartPlugException: on error
"""
if not self.has_emeter:
return False
response = self.get_emeter_realtime()
return response['power']
def identify(self):
"""
Query device information to identify model and featureset
:return: (alias, model, list of supported features)
:rtype: tuple
"""
alias = self.sys_info['alias']
model = self.sys_info['model']
features = self.sys_info['feature'].split(':')
for feature in features:
if feature not in SmartPlug.ALL_FEATURES:
_LOGGER.warning("Unknown feature %s on device %s.",
feature, model)
return alias, model, features
@property
def alias(self):
"""
Get current device alias (name)
:return: Device name aka alias.
:rtype: str
"""
return self._alias
@alias.setter
def alias(self, alias):
"""
Sets the device name aka alias.
:param alias: New alias (name)
:raises SmartPlugException: on error
"""
self._query_helper("system", "set_dev_alias", {"alias": alias})
self.initialize()
@property
def led(self):
"""
Returns the state of the led.
:return: True if led is on, False otherwise
:rtype: bool
"""
return bool(1 - self.sys_info["led_off"])
@led.setter
def led(self, state):
"""
Sets the state of the led (night mode)
:param bool state: True to set led on, False to set led off
:raises SmartPlugException: on error
"""
self._query_helper("system", "set_led_off", {"off": int(not state)})
self.initialize()
@property
def icon(self):
"""
Returns device icon
Note: not working on HS110, but is always empty.
:return: icon and its hash
:rtype: dict
:raises SmartPlugException: on error
"""
return self._query_helper("system", "get_dev_icon")
@icon.setter
def icon(self, icon):
"""
Content for hash and icon are unknown.
:param str icon: Icon path(?)
:raises NotImplementedError: when not implemented
:raises SmartPlugError: on error
"""
raise NotImplementedError("Values for this call are unknown at this point.")
# here just for the sake of completeness
# self._query_helper("system", "set_dev_icon", {"icon": "", "hash": ""})
# self.initialize()
@property
def time(self):
"""
Returns current time from the device.
:return: datetime for device's time
:rtype: datetime.datetime
:raises SmartPlugException: on error
"""
res = self._query_helper("time", "get_time")
return datetime.datetime(res["year"], res["month"], res["mday"],
res["hour"], res["min"], res["sec"])
@time.setter
def time(self, ts):
"""
Sets time based on datetime object.
Note: this calls set_timezone() for setting.
:param datetime.datetime ts: New date and time
:return: result
:type: dict
:raises NotImplemented: when not implemented.
:raises SmartPlugException: on error
"""
raise NotImplementedError("Fails with err_code == 0 with HS110.")
""" here just for the sake of completeness / if someone figures out why it doesn't work.
ts_obj = {
"index": self.timezone["index"],
"hour": ts.hour,
"min": ts.minute,
"sec": ts.second,
"year": ts.year,
"month": ts.month,
"mday": ts.day,
}
response = self._query_helper("time", "set_timezone", ts_obj)
self.initialize()
return response
"""
@property
def timezone(self):
"""
Returns timezone information
:return: Timezone information
:rtype: dict
:raises SmartPlugException: on error
"""
return self._query_helper("time", "get_timezone")
@property
def hw_info(self):
"""
Returns information about hardware
:return: Information about hardware
:rtype: dict
"""
keys = ["sw_ver", "hw_ver", "mac", "hwId", "fwId", "oemId", "dev_name"]
return {key: self.sys_info[key] for key in keys}
@property
def on_since(self):
"""
Returns pretty-printed on-time
:return: datetime for on since
:rtype: datetime
"""
return datetime.datetime.now() - \
datetime.timedelta(seconds=self.sys_info["on_time"])
@property
def location(self):
"""
Location of the device, as read from sysinfo
:return: latitude and longitude
:rtype: dict
"""
return {"latitude": self.sys_info["latitude"],
"longitude": self.sys_info["longitude"]}
@property
def rssi(self):
"""
Returns WiFi signal strenth (rssi)
:return: rssi
:rtype: int
"""
return self.sys_info["rssi"]
@property
def mac(self):
"""
Returns mac address
:return: mac address in hexadecimal with colons, e.g. 01:23:45:67:89:ab
:rtype: str
"""
return self.sys_info["mac"]
@mac.setter
def mac(self, mac):
"""
Sets new mac address
:param str mac: mac in hexadecimal with colons, e.g. 01:23:45:67:89:ab
:raises SmartPlugException: on error
"""
self._query_helper("system", "set_mac_addr", {"mac": mac})
self.initialize()
class TPLinkSmartHomeProtocol:
"""
Implementation of the TP-Link Smart Home Protocol
Encryption/Decryption methods based on the works of
Lubomir Stroetmann and Tobias Esser
https://www.softscheck.com/en/reverse-engineering-tp-link-hs110/
https://github.com/softScheck/tplink-smartplug/
which are licensed under the Apache License, Version 2.0
http://www.apache.org/licenses/LICENSE-2.0
"""
initialization_vector = 171
@staticmethod
def query(host, request, port=9999):
"""
Request information from a TP-Link SmartHome Device and return the
response.
:param str host: ip address of the device
:param int port: port on the device (default: 9999)
:param request: command to send to the device (can be either dict or
json string)
:return:
"""
if isinstance(request, dict):
request = json.dumps(request)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
_LOGGER.debug("> (%i) %s", len(request), request)
sock.send(TPLinkSmartHomeProtocol.encrypt(request))
buffer = bytes()
while True:
chunk = sock.recv(4096)
buffer += chunk
if not chunk:
break
sock.shutdown(socket.SHUT_RDWR)
sock.close()
response = TPLinkSmartHomeProtocol.decrypt(buffer[4:])
_LOGGER.debug("< (%i) %s", len(response), response)
return json.loads(response)
@staticmethod
def encrypt(request):
"""
Encrypt a request for a TP-Link Smart Home Device.
:param request: plaintext request data
:return: ciphertext request
"""
key = TPLinkSmartHomeProtocol.initialization_vector
buffer = ['\0\0\0\0']
for char in request:
cipher = key ^ ord(char)
key = cipher
buffer.append(chr(cipher))
ciphertext = ''.join(buffer)
if sys.version_info.major > 2:
ciphertext = ciphertext.encode('latin-1')
return ciphertext
@staticmethod
def decrypt(ciphertext):
"""
Decrypt a response of a TP-Link Smart Home Device.
:param ciphertext: encrypted response data
:return: plaintext response
"""
key = TPLinkSmartHomeProtocol.initialization_vector
buffer = []
if sys.version_info.major > 2:
ciphertext = ciphertext.decode('latin-1')
for char in ciphertext:
plain = key ^ ord(char)
key = ord(char)
buffer.append(chr(plain))
plaintext = ''.join(buffer)
return plaintext