mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-08-06 18:54:08 +00:00
Add klap protocol (#509)
* Add support for the new encryption protocol This adds support for the new TP-Link discovery and encryption protocols. It is currently incomplete - only devices without username and password are current supported, and single device discovery is not implemented. Discovery should find both old and new devices. When accessing a device by IP the --klap option can be specified on the command line to active the new connection protocol. sdb9696 - This commit also contains 16 later commits from Simon Wilkinson squashed into the original * Update klap changes 2023 to fix encryption, deal with kasa credential switching and work with new discovery changes * Move from aiohttp to httpx * Changes following review comments --------- Co-authored-by: Simon Wilkinson <simon@sxw.org.uk>
This commit is contained in:
@@ -6,8 +6,8 @@ import sys
|
||||
import pytest # type: ignore # https://github.com/pytest-dev/pytest/issues/3342
|
||||
|
||||
from kasa import DeviceType, Discover, SmartDevice, SmartDeviceException, protocol
|
||||
from kasa.discover import _DiscoverProtocol, json_dumps
|
||||
from kasa.exceptions import UnsupportedDeviceException
|
||||
from kasa.discover import DiscoveryResult, _DiscoverProtocol, json_dumps
|
||||
from kasa.exceptions import AuthenticationException, UnsupportedDeviceException
|
||||
|
||||
from .conftest import bulb, dimmer, lightstrip, plug, strip
|
||||
|
||||
@@ -51,7 +51,7 @@ async def test_type_detection_lightstrip(dev: SmartDevice):
|
||||
|
||||
async def test_type_unknown():
|
||||
invalid_info = {"system": {"get_sysinfo": {"type": "nosuchtype"}}}
|
||||
with pytest.raises(SmartDeviceException):
|
||||
with pytest.raises(UnsupportedDeviceException):
|
||||
Discover._get_device_class(invalid_info)
|
||||
|
||||
|
||||
@@ -239,3 +239,73 @@ async def test_discover_invalid_responses(msg, data, mocker):
|
||||
|
||||
proto.datagram_received(data, ("127.0.0.1", 9999))
|
||||
assert len(proto.discovered_devices) == 0
|
||||
|
||||
|
||||
AUTHENTICATION_DATA_KLAP = {
|
||||
"result": {
|
||||
"device_id": "xx",
|
||||
"owner": "xx",
|
||||
"device_type": "IOT.SMARTPLUGSWITCH",
|
||||
"device_model": "HS100(UK)",
|
||||
"ip": "127.0.0.1",
|
||||
"mac": "12-34-56-78-90-AB",
|
||||
"is_support_iot_cloud": True,
|
||||
"obd_src": "tplink",
|
||||
"factory_default": False,
|
||||
"mgt_encrypt_schm": {
|
||||
"is_support_https": False,
|
||||
"encrypt_type": "KLAP",
|
||||
"http_port": 80,
|
||||
},
|
||||
},
|
||||
"error_code": 0,
|
||||
}
|
||||
|
||||
|
||||
async def test_discover_single_authentication(mocker):
|
||||
"""Make sure that discover_single handles authenticating devices correctly."""
|
||||
host = "127.0.0.1"
|
||||
|
||||
def mock_discover(self):
|
||||
if discovery_data:
|
||||
data = (
|
||||
b"\x02\x00\x00\x01\x01[\x00\x00\x00\x00\x00\x00W\xcev\xf8"
|
||||
+ json_dumps(discovery_data).encode()
|
||||
)
|
||||
self.datagram_received(data, (host, 20002))
|
||||
|
||||
mocker.patch.object(_DiscoverProtocol, "do_discover", mock_discover)
|
||||
mocker.patch.object(
|
||||
SmartDevice,
|
||||
"update",
|
||||
side_effect=AuthenticationException("Failed to authenticate"),
|
||||
)
|
||||
|
||||
# Test with a valid unsupported response
|
||||
discovery_data = AUTHENTICATION_DATA_KLAP
|
||||
with pytest.raises(
|
||||
AuthenticationException,
|
||||
match="Failed to authenticate",
|
||||
):
|
||||
await Discover.discover_single(host)
|
||||
|
||||
mocker.patch.object(SmartDevice, "update")
|
||||
device = await Discover.discover_single(host)
|
||||
assert device.device_type == DeviceType.Plug
|
||||
|
||||
|
||||
async def test_device_update_from_new_discovery_info():
|
||||
device = SmartDevice("127.0.0.7")
|
||||
discover_info = DiscoveryResult(**AUTHENTICATION_DATA_KLAP["result"])
|
||||
discover_dump = discover_info.get_dict()
|
||||
device.update_from_discover_info(discover_dump)
|
||||
|
||||
assert device.alias == discover_dump["alias"]
|
||||
assert device.mac == discover_dump["mac"].replace("-", ":")
|
||||
assert device.model == discover_dump["model"]
|
||||
|
||||
with pytest.raises(
|
||||
SmartDeviceException,
|
||||
match=re.escape("You need to await update() to access the data"),
|
||||
):
|
||||
assert device.supported_modules
|
||||
|
306
kasa/tests/test_klapprotocol.py
Normal file
306
kasa/tests/test_klapprotocol.py
Normal file
@@ -0,0 +1,306 @@
|
||||
import errno
|
||||
import json
|
||||
import logging
|
||||
import secrets
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
from contextlib import nullcontext as does_not_raise
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from ..credentials import Credentials
|
||||
from ..exceptions import AuthenticationException, SmartDeviceException
|
||||
from ..klapprotocol import KlapEncryptionSession, TPLinkKlap, _sha256
|
||||
|
||||
|
||||
class _mock_response:
|
||||
def __init__(self, status_code, content: bytes):
|
||||
self.status_code = status_code
|
||||
self.content = content
|
||||
|
||||
|
||||
@pytest.mark.parametrize("retry_count", [1, 3, 5])
|
||||
async def test_protocol_retries(mocker, retry_count):
|
||||
conn = mocker.patch.object(
|
||||
TPLinkKlap, "client_post", side_effect=Exception("dummy exception")
|
||||
)
|
||||
with pytest.raises(SmartDeviceException):
|
||||
await TPLinkKlap("127.0.0.1").query({}, retry_count=retry_count)
|
||||
|
||||
assert conn.call_count == retry_count + 1
|
||||
|
||||
|
||||
async def test_protocol_no_retry_on_connection_error(mocker):
|
||||
conn = mocker.patch.object(
|
||||
TPLinkKlap,
|
||||
"client_post",
|
||||
side_effect=httpx.ConnectError("foo"),
|
||||
)
|
||||
with pytest.raises(SmartDeviceException):
|
||||
await TPLinkKlap("127.0.0.1").query({}, retry_count=5)
|
||||
|
||||
assert conn.call_count == 1
|
||||
|
||||
|
||||
async def test_protocol_retry_recoverable_error(mocker):
|
||||
conn = mocker.patch.object(
|
||||
TPLinkKlap,
|
||||
"client_post",
|
||||
side_effect=httpx.CloseError("foo"),
|
||||
)
|
||||
with pytest.raises(SmartDeviceException):
|
||||
await TPLinkKlap("127.0.0.1").query({}, retry_count=5)
|
||||
|
||||
assert conn.call_count == 6
|
||||
|
||||
|
||||
@pytest.mark.parametrize("retry_count", [1, 3, 5])
|
||||
async def test_protocol_reconnect(mocker, retry_count):
|
||||
remaining = retry_count
|
||||
|
||||
def _fail_one_less_than_retry_count(*_, **__):
|
||||
nonlocal remaining, encryption_session
|
||||
remaining -= 1
|
||||
if remaining:
|
||||
raise Exception("Simulated post failure")
|
||||
# Do the encrypt just before returning the value so the incrementing sequence number is correct
|
||||
encrypted, seq = encryption_session.encrypt('{"great":"success"}')
|
||||
return 200, encrypted
|
||||
|
||||
seed = secrets.token_bytes(16)
|
||||
auth_hash = TPLinkKlap.generate_auth_hash(Credentials("foo", "bar"))
|
||||
encryption_session = KlapEncryptionSession(seed, seed, auth_hash)
|
||||
protocol = TPLinkKlap("127.0.0.1")
|
||||
protocol.handshake_done = True
|
||||
protocol.session_expire_at = time.time() + 86400
|
||||
protocol.encryption_session = encryption_session
|
||||
mocker.patch.object(
|
||||
TPLinkKlap, "client_post", side_effect=_fail_one_less_than_retry_count
|
||||
)
|
||||
|
||||
response = await protocol.query({}, retry_count=retry_count)
|
||||
assert response == {"great": "success"}
|
||||
|
||||
|
||||
@pytest.mark.parametrize("log_level", [logging.WARNING, logging.DEBUG])
|
||||
async def test_protocol_logging(mocker, caplog, log_level):
|
||||
caplog.set_level(log_level)
|
||||
logging.getLogger("kasa").setLevel(log_level)
|
||||
|
||||
def _return_encrypted(*_, **__):
|
||||
nonlocal encryption_session
|
||||
# Do the encrypt just before returning the value so the incrementing sequence number is correct
|
||||
encrypted, seq = encryption_session.encrypt('{"great":"success"}')
|
||||
return 200, encrypted
|
||||
|
||||
seed = secrets.token_bytes(16)
|
||||
auth_hash = TPLinkKlap.generate_auth_hash(Credentials("foo", "bar"))
|
||||
encryption_session = KlapEncryptionSession(seed, seed, auth_hash)
|
||||
protocol = TPLinkKlap("127.0.0.1")
|
||||
|
||||
protocol.handshake_done = True
|
||||
protocol.session_expire_at = time.time() + 86400
|
||||
protocol.encryption_session = encryption_session
|
||||
mocker.patch.object(TPLinkKlap, "client_post", side_effect=_return_encrypted)
|
||||
|
||||
response = await protocol.query({})
|
||||
assert response == {"great": "success"}
|
||||
if log_level == logging.DEBUG:
|
||||
assert "success" in caplog.text
|
||||
else:
|
||||
assert "success" not in caplog.text
|
||||
|
||||
|
||||
def test_encrypt():
|
||||
d = json.dumps({"foo": 1, "bar": 2})
|
||||
|
||||
seed = secrets.token_bytes(16)
|
||||
auth_hash = TPLinkKlap.generate_auth_hash(Credentials("foo", "bar"))
|
||||
encryption_session = KlapEncryptionSession(seed, seed, auth_hash)
|
||||
|
||||
encrypted, seq = encryption_session.encrypt(d)
|
||||
|
||||
assert d == encryption_session.decrypt(encrypted)
|
||||
|
||||
|
||||
def test_encrypt_unicode():
|
||||
d = "{'snowman': '\u2603'}"
|
||||
|
||||
seed = secrets.token_bytes(16)
|
||||
auth_hash = TPLinkKlap.generate_auth_hash(Credentials("foo", "bar"))
|
||||
encryption_session = KlapEncryptionSession(seed, seed, auth_hash)
|
||||
|
||||
encrypted, seq = encryption_session.encrypt(d)
|
||||
|
||||
decrypted = encryption_session.decrypt(encrypted)
|
||||
|
||||
assert d == decrypted
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"device_credentials, expectation",
|
||||
[
|
||||
(Credentials("foo", "bar"), does_not_raise()),
|
||||
(Credentials("", ""), does_not_raise()),
|
||||
(
|
||||
Credentials(TPLinkKlap.KASA_SETUP_EMAIL, TPLinkKlap.KASA_SETUP_PASSWORD),
|
||||
does_not_raise(),
|
||||
),
|
||||
(
|
||||
Credentials("shouldfail", "shouldfail"),
|
||||
pytest.raises(AuthenticationException),
|
||||
),
|
||||
],
|
||||
ids=("client", "blank", "kasa_setup", "shouldfail"),
|
||||
)
|
||||
async def test_handshake1(mocker, device_credentials, expectation):
|
||||
async def _return_handshake1_response(url, params=None, data=None, *_, **__):
|
||||
nonlocal client_seed, server_seed, device_auth_hash
|
||||
|
||||
client_seed = data
|
||||
client_seed_auth_hash = _sha256(data + device_auth_hash)
|
||||
|
||||
return _mock_response(200, server_seed + client_seed_auth_hash)
|
||||
|
||||
client_seed = None
|
||||
server_seed = secrets.token_bytes(16)
|
||||
client_credentials = Credentials("foo", "bar")
|
||||
device_auth_hash = TPLinkKlap.generate_auth_hash(device_credentials)
|
||||
|
||||
mocker.patch.object(
|
||||
httpx.AsyncClient, "post", side_effect=_return_handshake1_response
|
||||
)
|
||||
|
||||
protocol = TPLinkKlap("127.0.0.1", credentials=client_credentials)
|
||||
|
||||
protocol.http_client = httpx.AsyncClient()
|
||||
with expectation:
|
||||
(
|
||||
local_seed,
|
||||
device_remote_seed,
|
||||
auth_hash,
|
||||
) = await protocol.perform_handshake1()
|
||||
|
||||
assert local_seed == client_seed
|
||||
assert device_remote_seed == server_seed
|
||||
assert device_auth_hash == auth_hash
|
||||
await protocol.close()
|
||||
|
||||
|
||||
async def test_handshake(mocker):
|
||||
async def _return_handshake_response(url, params=None, data=None, *_, **__):
|
||||
nonlocal response_status, client_seed, server_seed, device_auth_hash
|
||||
|
||||
if url == "http://127.0.0.1/app/handshake1":
|
||||
client_seed = data
|
||||
client_seed_auth_hash = _sha256(data + device_auth_hash)
|
||||
|
||||
return _mock_response(200, server_seed + client_seed_auth_hash)
|
||||
elif url == "http://127.0.0.1/app/handshake2":
|
||||
return _mock_response(response_status, b"")
|
||||
|
||||
client_seed = None
|
||||
server_seed = secrets.token_bytes(16)
|
||||
client_credentials = Credentials("foo", "bar")
|
||||
device_auth_hash = TPLinkKlap.generate_auth_hash(client_credentials)
|
||||
|
||||
mocker.patch.object(
|
||||
httpx.AsyncClient, "post", side_effect=_return_handshake_response
|
||||
)
|
||||
|
||||
protocol = TPLinkKlap("127.0.0.1", credentials=client_credentials)
|
||||
protocol.http_client = httpx.AsyncClient()
|
||||
|
||||
response_status = 200
|
||||
await protocol.perform_handshake()
|
||||
assert protocol.handshake_done is True
|
||||
|
||||
response_status = 403
|
||||
with pytest.raises(AuthenticationException):
|
||||
await protocol.perform_handshake()
|
||||
assert protocol.handshake_done is False
|
||||
await protocol.close()
|
||||
|
||||
|
||||
async def test_query(mocker):
|
||||
async def _return_response(url, params=None, data=None, *_, **__):
|
||||
nonlocal client_seed, server_seed, device_auth_hash, protocol, seq
|
||||
|
||||
if url == "http://127.0.0.1/app/handshake1":
|
||||
client_seed = data
|
||||
client_seed_auth_hash = _sha256(data + device_auth_hash)
|
||||
|
||||
return _mock_response(200, server_seed + client_seed_auth_hash)
|
||||
elif url == "http://127.0.0.1/app/handshake2":
|
||||
return _mock_response(200, b"")
|
||||
elif url == "http://127.0.0.1/app/request":
|
||||
encryption_session = KlapEncryptionSession(
|
||||
protocol.encryption_session.local_seed,
|
||||
protocol.encryption_session.remote_seed,
|
||||
protocol.encryption_session.user_hash,
|
||||
)
|
||||
seq = params.get("seq")
|
||||
encryption_session._seq = seq - 1
|
||||
encrypted, seq = encryption_session.encrypt('{"great": "success"}')
|
||||
seq = seq
|
||||
return _mock_response(200, encrypted)
|
||||
|
||||
client_seed = None
|
||||
last_seq = None
|
||||
seq = None
|
||||
server_seed = secrets.token_bytes(16)
|
||||
client_credentials = Credentials("foo", "bar")
|
||||
device_auth_hash = TPLinkKlap.generate_auth_hash(client_credentials)
|
||||
|
||||
mocker.patch.object(httpx.AsyncClient, "post", side_effect=_return_response)
|
||||
|
||||
protocol = TPLinkKlap("127.0.0.1", credentials=client_credentials)
|
||||
|
||||
for _ in range(10):
|
||||
resp = await protocol.query({})
|
||||
assert resp == {"great": "success"}
|
||||
# Check the protocol is incrementing the sequence number
|
||||
assert last_seq is None or last_seq + 1 == seq
|
||||
last_seq = seq
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"response_status, expectation",
|
||||
[
|
||||
((403, 403, 403), pytest.raises(AuthenticationException)),
|
||||
((200, 403, 403), pytest.raises(AuthenticationException)),
|
||||
((200, 200, 403), pytest.raises(AuthenticationException)),
|
||||
((200, 200, 400), pytest.raises(SmartDeviceException)),
|
||||
],
|
||||
ids=("handshake1", "handshake2", "request", "non_auth_error"),
|
||||
)
|
||||
async def test_authentication_failures(mocker, response_status, expectation):
|
||||
async def _return_response(url, params=None, data=None, *_, **__):
|
||||
nonlocal client_seed, server_seed, device_auth_hash, response_status
|
||||
|
||||
if url == "http://127.0.0.1/app/handshake1":
|
||||
client_seed = data
|
||||
client_seed_auth_hash = _sha256(data + device_auth_hash)
|
||||
|
||||
return _mock_response(
|
||||
response_status[0], server_seed + client_seed_auth_hash
|
||||
)
|
||||
elif url == "http://127.0.0.1/app/handshake2":
|
||||
return _mock_response(response_status[1], b"")
|
||||
elif url == "http://127.0.0.1/app/request":
|
||||
return _mock_response(response_status[2], None)
|
||||
|
||||
client_seed = None
|
||||
|
||||
server_seed = secrets.token_bytes(16)
|
||||
client_credentials = Credentials("foo", "bar")
|
||||
device_auth_hash = TPLinkKlap.generate_auth_hash(client_credentials)
|
||||
|
||||
mocker.patch.object(httpx.AsyncClient, "post", side_effect=_return_response)
|
||||
|
||||
protocol = TPLinkKlap("127.0.0.1", credentials=client_credentials)
|
||||
|
||||
with expectation:
|
||||
await protocol.query({})
|
Reference in New Issue
Block a user