mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-05-30 21:51:24 +00:00
coverage
This commit is contained in:
parent
c5bf1ccc4e
commit
12f7f33880
@ -294,6 +294,113 @@ async def test_protocol_handles_cancellation_during_connection(
|
||||
assert response == {"great": "success"}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("protocol_class", "transport_class", "encryption_class"),
|
||||
[
|
||||
(
|
||||
_deprecated_TPLinkSmartHomeProtocol,
|
||||
XorTransport,
|
||||
_deprecated_TPLinkSmartHomeProtocol,
|
||||
),
|
||||
(IotProtocol, XorTransport, XorEncryption),
|
||||
],
|
||||
ids=("_deprecated_TPLinkSmartHomeProtocol", "IotProtocol-XorTransport"),
|
||||
)
|
||||
async def test_protocol_handles_timeout_during_write(
|
||||
mocker, protocol_class, transport_class, encryption_class
|
||||
):
|
||||
attempts = 0
|
||||
encrypted = encryption_class.encrypt('{"great":"success"}')[
|
||||
transport_class.BLOCK_SIZE :
|
||||
]
|
||||
|
||||
def _cancel_first_attempt(*_):
|
||||
nonlocal attempts
|
||||
attempts += 1
|
||||
if attempts == 1:
|
||||
raise TimeoutError("Simulated timeout")
|
||||
|
||||
async def _mock_read(byte_count):
|
||||
nonlocal encrypted
|
||||
if byte_count == transport_class.BLOCK_SIZE:
|
||||
return struct.pack(">I", len(encrypted))
|
||||
if byte_count == len(encrypted):
|
||||
return encrypted
|
||||
|
||||
raise ValueError(f"No mock for {byte_count}")
|
||||
|
||||
def aio_mock_writer(_, __):
|
||||
reader = mocker.patch("asyncio.StreamReader")
|
||||
writer = mocker.patch("asyncio.StreamWriter")
|
||||
mocker.patch.object(writer, "write", _cancel_first_attempt)
|
||||
mocker.patch.object(reader, "readexactly", _mock_read)
|
||||
mocker.patch.object(writer, "drain", new_callable=AsyncMock)
|
||||
return reader, writer
|
||||
|
||||
config = DeviceConfig("127.0.0.1")
|
||||
protocol = protocol_class(transport=transport_class(config=config))
|
||||
mocker.patch("asyncio.open_connection", side_effect=aio_mock_writer)
|
||||
await protocol.query({})
|
||||
writer_obj = protocol if hasattr(protocol, "writer") else protocol._transport
|
||||
assert writer_obj.writer is not None
|
||||
response = await protocol.query({})
|
||||
assert response == {"great": "success"}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("protocol_class", "transport_class", "encryption_class"),
|
||||
[
|
||||
(
|
||||
_deprecated_TPLinkSmartHomeProtocol,
|
||||
XorTransport,
|
||||
_deprecated_TPLinkSmartHomeProtocol,
|
||||
),
|
||||
(IotProtocol, XorTransport, XorEncryption),
|
||||
],
|
||||
ids=("_deprecated_TPLinkSmartHomeProtocol", "IotProtocol-XorTransport"),
|
||||
)
|
||||
async def test_protocol_handles_timeout_during_connection(
|
||||
mocker, protocol_class, transport_class, encryption_class
|
||||
):
|
||||
attempts = 0
|
||||
encrypted = encryption_class.encrypt('{"great":"success"}')[
|
||||
transport_class.BLOCK_SIZE :
|
||||
]
|
||||
|
||||
async def _mock_read(byte_count):
|
||||
nonlocal encrypted
|
||||
if byte_count == transport_class.BLOCK_SIZE:
|
||||
return struct.pack(">I", len(encrypted))
|
||||
if byte_count == len(encrypted):
|
||||
return encrypted
|
||||
|
||||
raise ValueError(f"No mock for {byte_count}")
|
||||
|
||||
def aio_mock_writer(_, __):
|
||||
nonlocal attempts
|
||||
attempts += 1
|
||||
if attempts == 1:
|
||||
raise TimeoutError("Simulated timeout")
|
||||
reader = mocker.patch("asyncio.StreamReader")
|
||||
writer = mocker.patch("asyncio.StreamWriter")
|
||||
mocker.patch.object(reader, "readexactly", _mock_read)
|
||||
mocker.patch.object(writer, "drain", new_callable=AsyncMock)
|
||||
return reader, writer
|
||||
|
||||
config = DeviceConfig("127.0.0.1")
|
||||
protocol = protocol_class(transport=transport_class(config=config))
|
||||
writer_obj = protocol if hasattr(protocol, "writer") else protocol._transport
|
||||
await writer_obj.close()
|
||||
|
||||
mocker.patch("asyncio.open_connection", side_effect=aio_mock_writer)
|
||||
await protocol.query({"any": "thing"})
|
||||
|
||||
writer_obj = protocol if hasattr(protocol, "writer") else protocol._transport
|
||||
assert writer_obj.writer is not None
|
||||
response = await protocol.query({})
|
||||
assert response == {"great": "success"}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("protocol_class", "transport_class", "encryption_class"),
|
||||
[
|
||||
|
Loading…
x
Reference in New Issue
Block a user