mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-05-16 19:41:09 +00:00
Add device discovery (#25)
* add (untested) discover mode * Keep discovery and normal communication separate, uppercase magic consts This sepearates the earlier test code for discovering devices, and adds 5 sec timeout for gathering responses from potential devices. This commit also uppercases magic constants. Discovery & communication tested with HS110. * update readme with example how to discover devices, pep8ify
This commit is contained in:
parent
1e01530447
commit
71ac1f043a
@ -59,3 +59,9 @@ There is also a simple tool for testing connectivity in examples, to use:
|
|||||||
```python
|
```python
|
||||||
python -m examples.cli <ip>
|
python -m examples.cli <ip>
|
||||||
```
|
```
|
||||||
|
|
||||||
|
# Discovering devices
|
||||||
|
```python
|
||||||
|
python3 -m examples.discover
|
||||||
|
```
|
||||||
|
|
||||||
|
9
examples/discover.py
Normal file
9
examples/discover.py
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
import logging
|
||||||
|
from pprint import pprint as pp
|
||||||
|
|
||||||
|
from pyHS100 import TPLinkSmartHomeProtocol
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
||||||
|
for dev in TPLinkSmartHomeProtocol.discover():
|
||||||
|
print("Found device!")
|
||||||
|
pp(dev)
|
@ -1,3 +1,3 @@
|
|||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
from pyHS100.pyHS100 import SmartPlug, SmartPlugException
|
from pyHS100.pyHS100 import SmartPlug, TPLinkSmartHomeProtocol, SmartPlugException
|
||||||
|
@ -21,10 +21,12 @@ class TPLinkSmartHomeProtocol:
|
|||||||
which are licensed under the Apache License, Version 2.0
|
which are licensed under the Apache License, Version 2.0
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
"""
|
"""
|
||||||
initialization_vector = 171
|
INITIALIZATION_VECTOR = 171
|
||||||
|
DEFAULT_PORT = 9999
|
||||||
|
DEFAULT_TIMEOUT = 5
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def query(host, request, port=9999):
|
def query(host, request, port=DEFAULT_PORT):
|
||||||
"""
|
"""
|
||||||
Request information from a TP-Link SmartHome Device and return the
|
Request information from a TP-Link SmartHome Device and return the
|
||||||
response.
|
response.
|
||||||
@ -62,6 +64,48 @@ class TPLinkSmartHomeProtocol:
|
|||||||
|
|
||||||
return json.loads(response)
|
return json.loads(response)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def discover(timeout=DEFAULT_TIMEOUT, port=DEFAULT_PORT):
|
||||||
|
"""
|
||||||
|
Sends discovery message to 255.255.255.255:9999 in order
|
||||||
|
to detect available supported devices in the local network,
|
||||||
|
and waits for given timeout for answers from devices.
|
||||||
|
|
||||||
|
:param timeout: How long to wait for responses, defaults to 5
|
||||||
|
:param port: port to send broadcast messages, defaults to 9999.
|
||||||
|
:rtype: list[dict]
|
||||||
|
:return: Array of json objects {"ip", "port", "sys_info"}
|
||||||
|
"""
|
||||||
|
discovery_query = {"system": {"get_sysinfo": None},
|
||||||
|
"emeter": {"get_realtime": None}}
|
||||||
|
target = "255.255.255.255"
|
||||||
|
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||||
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
sock.settimeout(timeout)
|
||||||
|
|
||||||
|
req = json.dumps(discovery_query)
|
||||||
|
_LOGGER.debug("Sending discovery to %s:%s", target, port)
|
||||||
|
|
||||||
|
encrypted_req = TPLinkSmartHomeProtocol.encrypt(req)
|
||||||
|
sock.sendto(encrypted_req[4:], (target, port))
|
||||||
|
|
||||||
|
devices = []
|
||||||
|
_LOGGER.debug("Waiting %s seconds for responses...", timeout)
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data, addr = sock.recvfrom(4096)
|
||||||
|
ip, port = addr
|
||||||
|
info = json.loads(TPLinkSmartHomeProtocol.decrypt(data))
|
||||||
|
|
||||||
|
devices.append({"ip": ip, "port": port, "sys_info": info})
|
||||||
|
except Exception as ex:
|
||||||
|
_LOGGER.error("Got exception %s", ex, exc_info=True)
|
||||||
|
|
||||||
|
return devices
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def encrypt(request):
|
def encrypt(request):
|
||||||
"""
|
"""
|
||||||
@ -70,7 +114,7 @@ class TPLinkSmartHomeProtocol:
|
|||||||
:param request: plaintext request data
|
:param request: plaintext request data
|
||||||
:return: ciphertext request
|
:return: ciphertext request
|
||||||
"""
|
"""
|
||||||
key = TPLinkSmartHomeProtocol.initialization_vector
|
key = TPLinkSmartHomeProtocol.INITIALIZATION_VECTOR
|
||||||
buffer = bytearray(4) # 4 nullbytes
|
buffer = bytearray(4) # 4 nullbytes
|
||||||
|
|
||||||
for char in request:
|
for char in request:
|
||||||
@ -88,7 +132,7 @@ class TPLinkSmartHomeProtocol:
|
|||||||
:param ciphertext: encrypted response data
|
:param ciphertext: encrypted response data
|
||||||
:return: plaintext response
|
:return: plaintext response
|
||||||
"""
|
"""
|
||||||
key = TPLinkSmartHomeProtocol.initialization_vector
|
key = TPLinkSmartHomeProtocol.INITIALIZATION_VECTOR
|
||||||
buffer = []
|
buffer = []
|
||||||
|
|
||||||
ciphertext = ciphertext.decode('latin-1')
|
ciphertext = ciphertext.decode('latin-1')
|
||||||
|
Loading…
x
Reference in New Issue
Block a user