Enable newer encrypted discovery protocol (#1168)

This commit is contained in:
Steven B.
2024-10-16 15:28:27 +01:00
committed by GitHub
parent 7fd8c14c1f
commit 380fbb93c3
7 changed files with 257 additions and 69 deletions

View File

@@ -99,8 +99,8 @@ async def test_handshake_with_keys(mocker):
assert transport._state is TransportState.HANDSHAKE_REQUIRED
await transport.perform_handshake()
assert transport._key_pair.get_private_key() == test_keys["private"]
assert transport._key_pair.get_public_key() == test_keys["public"]
assert transport._key_pair.private_key_der_b64 == test_keys["private"]
assert transport._key_pair.public_key_der_b64 == test_keys["public"]
@status_parameters

View File

@@ -2,6 +2,7 @@ import json
import os
import re
from datetime import datetime
from unittest.mock import ANY
import asyncclick as click
import pytest
@@ -17,7 +18,6 @@ from kasa import (
EmeterStatus,
KasaException,
Module,
UnsupportedDeviceError,
)
from kasa.cli.device import (
alias,
@@ -613,6 +613,7 @@ async def test_without_device_type(dev, mocker, runner):
credentials=Credentials("foo", "bar"),
timeout=5,
discovery_timeout=7,
on_unsupported=ANY,
)
@@ -735,7 +736,7 @@ async def test_host_unsupported(unsupported_device_info, runner):
)
assert res.exit_code != 0
assert isinstance(res.exception, UnsupportedDeviceError)
assert "== Unsupported device ==" in res.output
@new_discovery

View File

@@ -2,6 +2,8 @@
# ruff: noqa: S106
import asyncio
import base64
import json
import logging
import re
import socket
@@ -10,6 +12,8 @@ from unittest.mock import MagicMock
import aiohttp
import pytest # type: ignore # https://github.com/pytest-dev/pytest/issues/3342
from async_timeout import timeout as asyncio_timeout
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding as asymmetric_padding
from kasa import (
Credentials,
@@ -18,11 +22,17 @@ from kasa import (
Discover,
KasaException,
)
from kasa.aestransport import AesEncyptionSession
from kasa.deviceconfig import (
DeviceConfig,
DeviceConnectionParameters,
)
from kasa.discover import DiscoveryResult, _DiscoverProtocol, json_dumps
from kasa.discover import (
DiscoveryResult,
_AesDiscoveryQuery,
_DiscoverProtocol,
json_dumps,
)
from kasa.exceptions import AuthenticationError, UnsupportedDeviceError
from kasa.iot import IotDevice
from kasa.xortransport import XorEncryption
@@ -278,7 +288,7 @@ async def test_discover_send(mocker):
assert proto.target_1 == ("255.255.255.255", 9999)
transport = mocker.patch.object(proto, "transport")
await proto.do_discover()
assert transport.sendto.call_count == proto.discovery_packets * 2
assert transport.sendto.call_count == proto.discovery_packets * 3
async def test_discover_datagram_received(mocker, discovery_data):
@@ -485,13 +495,14 @@ async def test_do_discover_drop_packets(mocker, port, do_not_reply_count):
discovery_timeout=discovery_timeout,
discovery_packets=5,
)
ft = FakeDatagramTransport(dp, port, do_not_reply_count)
expected_send = 1 if port == 9999 else 2
ft = FakeDatagramTransport(dp, port, do_not_reply_count * expected_send)
dp.connection_made(ft)
await dp.wait_for_discovery_to_complete()
await asyncio.sleep(0)
assert ft.send_count == do_not_reply_count + 1
assert ft.send_count == do_not_reply_count * expected_send + expected_send
assert dp.discover_task.done()
assert dp.discover_task.cancelled()
@@ -603,3 +614,36 @@ async def test_discovery_redaction(discovery_mock, caplog: pytest.LogCaptureFixt
await Discover.discover()
assert mac not in caplog.text
assert "12:34:56:00:00:00" in caplog.text
async def test_discovery_decryption():
"""Test discovery decryption."""
key = b"8\x89\x02\xfa\xf5Xs\x1c\xa1 H\x9a\x82\xc7\xd9\t"
iv = b"9=\xf8\x1bS\xcd0\xb5\x89i\xba\xfd^9\x9f\xfa"
key_iv = key + iv
_AesDiscoveryQuery.generate_query()
keypair = _AesDiscoveryQuery.keypair
padding = asymmetric_padding.OAEP(
mgf=asymmetric_padding.MGF1(algorithm=hashes.SHA1()), # noqa: S303
algorithm=hashes.SHA1(), # noqa: S303
label=None,
)
encrypted_key_iv = keypair.public_key.encrypt(key_iv, padding)
encrypted_key_iv_b4 = base64.b64encode(encrypted_key_iv)
encryption_session = AesEncyptionSession(key_iv[:16], key_iv[16:])
data_dict = {"foo": 1, "bar": 2}
data = json.dumps(data_dict)
encypted_data = encryption_session.encrypt(data.encode())
encrypt_info = {
"data": encypted_data.decode(),
"key": encrypted_key_iv_b4.decode(),
"sym_schm": "AES",
}
info = {**UNSUPPORTED["result"], "encrypt_info": encrypt_info}
dr = DiscoveryResult(**info)
Discover._decrypt_discovery_data(dr)
assert dr.decrypted_data == data_dict