mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-08-09 20:24:02 +00:00
Sleep between discovery packets (#656)
* Sleep between discovery packets * Add tests
This commit is contained in:
@@ -1,10 +1,13 @@
|
||||
# type: ignore
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import socket
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import aiohttp
|
||||
import pytest # type: ignore # https://github.com/pytest-dev/pytest/issues/3342
|
||||
from async_timeout import timeout as asyncio_timeout
|
||||
|
||||
from kasa import (
|
||||
Credentials,
|
||||
@@ -12,6 +15,7 @@ from kasa import (
|
||||
Discover,
|
||||
SmartDevice,
|
||||
SmartDeviceException,
|
||||
TPLinkSmartHomeProtocol,
|
||||
protocol,
|
||||
)
|
||||
from kasa.deviceconfig import (
|
||||
@@ -198,9 +202,9 @@ async def test_discover_send(mocker):
|
||||
"""Test discovery parameters."""
|
||||
proto = _DiscoverProtocol()
|
||||
assert proto.discovery_packets == 3
|
||||
assert proto.target == ("255.255.255.255", 9999)
|
||||
assert proto.target_1 == ("255.255.255.255", 9999)
|
||||
transport = mocker.patch.object(proto, "transport")
|
||||
proto.do_discover()
|
||||
await proto.do_discover()
|
||||
assert transport.sendto.call_count == proto.discovery_packets * 2
|
||||
|
||||
|
||||
@@ -341,3 +345,116 @@ async def test_discover_http_client(discovery_mock, mocker):
|
||||
assert x.protocol._transport._http_client.client != http_client
|
||||
x.config.http_client = http_client
|
||||
assert x.protocol._transport._http_client.client == http_client
|
||||
|
||||
|
||||
LEGACY_DISCOVER_DATA = {
|
||||
"system": {
|
||||
"get_sysinfo": {
|
||||
"alias": "#MASKED_NAME#",
|
||||
"dev_name": "Smart Wi-Fi Plug",
|
||||
"deviceId": "0000000000000000000000000000000000000000",
|
||||
"err_code": 0,
|
||||
"hwId": "00000000000000000000000000000000",
|
||||
"hw_ver": "0.0",
|
||||
"mac": "00:00:00:00:00:00",
|
||||
"mic_type": "IOT.SMARTPLUGSWITCH",
|
||||
"model": "HS100(UK)",
|
||||
"sw_ver": "1.1.0 Build 201016 Rel.175121",
|
||||
"updating": 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class FakeDatagramTransport(asyncio.DatagramTransport):
|
||||
GHOST_PORT = 8888
|
||||
|
||||
def __init__(self, dp, port, do_not_reply_count, unsupported=False):
|
||||
self.dp = dp
|
||||
self.port = port
|
||||
self.do_not_reply_count = do_not_reply_count
|
||||
self.send_count = 0
|
||||
if port == 9999:
|
||||
self.datagram = TPLinkSmartHomeProtocol.encrypt(
|
||||
json_dumps(LEGACY_DISCOVER_DATA)
|
||||
)[4:]
|
||||
elif port == 20002:
|
||||
discovery_data = UNSUPPORTED if unsupported else AUTHENTICATION_DATA_KLAP
|
||||
self.datagram = (
|
||||
b"\x02\x00\x00\x01\x01[\x00\x00\x00\x00\x00\x00W\xcev\xf8"
|
||||
+ json_dumps(discovery_data).encode()
|
||||
)
|
||||
else:
|
||||
self.datagram = {"foo": "bar"}
|
||||
|
||||
def get_extra_info(self, name, default=None):
|
||||
return MagicMock()
|
||||
|
||||
def sendto(self, data, addr=None):
|
||||
ip, port = addr
|
||||
if port == self.port or self.port == self.GHOST_PORT:
|
||||
self.send_count += 1
|
||||
if self.send_count > self.do_not_reply_count:
|
||||
self.dp.datagram_received(self.datagram, (ip, self.port))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("port", [9999, 20002])
|
||||
@pytest.mark.parametrize("do_not_reply_count", [0, 1, 2, 3, 4])
|
||||
async def test_do_discover_drop_packets(mocker, port, do_not_reply_count):
|
||||
"""Make sure that discover_single handles authenticating devices correctly."""
|
||||
host = "127.0.0.1"
|
||||
discovery_timeout = 1
|
||||
|
||||
event = asyncio.Event()
|
||||
dp = _DiscoverProtocol(
|
||||
target=host,
|
||||
discovery_timeout=discovery_timeout,
|
||||
discovery_packets=5,
|
||||
discovered_event=event,
|
||||
)
|
||||
ft = FakeDatagramTransport(dp, port, do_not_reply_count)
|
||||
dp.connection_made(ft)
|
||||
|
||||
timed_out = False
|
||||
try:
|
||||
async with asyncio_timeout(discovery_timeout):
|
||||
await event.wait()
|
||||
except asyncio.TimeoutError:
|
||||
timed_out = True
|
||||
|
||||
await asyncio.sleep(0)
|
||||
assert ft.send_count == do_not_reply_count + 1
|
||||
assert dp.discover_task.done()
|
||||
assert timed_out is False
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"port, will_timeout",
|
||||
[(FakeDatagramTransport.GHOST_PORT, True), (20002, False)],
|
||||
ids=["unknownport", "unsupporteddevice"],
|
||||
)
|
||||
async def test_do_discover_invalid(mocker, port, will_timeout):
|
||||
"""Make sure that discover_single handles authenticating devices correctly."""
|
||||
host = "127.0.0.1"
|
||||
discovery_timeout = 1
|
||||
|
||||
event = asyncio.Event()
|
||||
dp = _DiscoverProtocol(
|
||||
target=host,
|
||||
discovery_timeout=discovery_timeout,
|
||||
discovery_packets=5,
|
||||
discovered_event=event,
|
||||
)
|
||||
ft = FakeDatagramTransport(dp, port, 0, unsupported=True)
|
||||
dp.connection_made(ft)
|
||||
|
||||
timed_out = False
|
||||
try:
|
||||
async with asyncio_timeout(15):
|
||||
await event.wait()
|
||||
except asyncio.TimeoutError:
|
||||
timed_out = True
|
||||
|
||||
await asyncio.sleep(0)
|
||||
assert dp.discover_task.done()
|
||||
assert timed_out is will_timeout
|
||||
|
Reference in New Issue
Block a user