From 12f7f33880ecfc29d67cca278e62c7091161c9ba Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 5 Jan 2025 13:52:05 -1000 Subject: [PATCH] coverage --- tests/protocols/test_iotprotocol.py | 107 ++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/tests/protocols/test_iotprotocol.py b/tests/protocols/test_iotprotocol.py index a2feaae3..600a6b51 100644 --- a/tests/protocols/test_iotprotocol.py +++ b/tests/protocols/test_iotprotocol.py @@ -294,6 +294,113 @@ async def test_protocol_handles_cancellation_during_connection( assert response == {"great": "success"} +@pytest.mark.parametrize( + ("protocol_class", "transport_class", "encryption_class"), + [ + ( + _deprecated_TPLinkSmartHomeProtocol, + XorTransport, + _deprecated_TPLinkSmartHomeProtocol, + ), + (IotProtocol, XorTransport, XorEncryption), + ], + ids=("_deprecated_TPLinkSmartHomeProtocol", "IotProtocol-XorTransport"), +) +async def test_protocol_handles_timeout_during_write( + mocker, protocol_class, transport_class, encryption_class +): + attempts = 0 + encrypted = encryption_class.encrypt('{"great":"success"}')[ + transport_class.BLOCK_SIZE : + ] + + def _cancel_first_attempt(*_): + nonlocal attempts + attempts += 1 + if attempts == 1: + raise TimeoutError("Simulated timeout") + + async def _mock_read(byte_count): + nonlocal encrypted + if byte_count == transport_class.BLOCK_SIZE: + return struct.pack(">I", len(encrypted)) + if byte_count == len(encrypted): + return encrypted + + raise ValueError(f"No mock for {byte_count}") + + def aio_mock_writer(_, __): + reader = mocker.patch("asyncio.StreamReader") + writer = mocker.patch("asyncio.StreamWriter") + mocker.patch.object(writer, "write", _cancel_first_attempt) + mocker.patch.object(reader, "readexactly", _mock_read) + mocker.patch.object(writer, "drain", new_callable=AsyncMock) + return reader, writer + + config = DeviceConfig("127.0.0.1") + protocol = protocol_class(transport=transport_class(config=config)) + mocker.patch("asyncio.open_connection", side_effect=aio_mock_writer) + await protocol.query({}) + writer_obj = protocol if hasattr(protocol, "writer") else protocol._transport + assert writer_obj.writer is not None + response = await protocol.query({}) + assert response == {"great": "success"} + + +@pytest.mark.parametrize( + ("protocol_class", "transport_class", "encryption_class"), + [ + ( + _deprecated_TPLinkSmartHomeProtocol, + XorTransport, + _deprecated_TPLinkSmartHomeProtocol, + ), + (IotProtocol, XorTransport, XorEncryption), + ], + ids=("_deprecated_TPLinkSmartHomeProtocol", "IotProtocol-XorTransport"), +) +async def test_protocol_handles_timeout_during_connection( + mocker, protocol_class, transport_class, encryption_class +): + attempts = 0 + encrypted = encryption_class.encrypt('{"great":"success"}')[ + transport_class.BLOCK_SIZE : + ] + + async def _mock_read(byte_count): + nonlocal encrypted + if byte_count == transport_class.BLOCK_SIZE: + return struct.pack(">I", len(encrypted)) + if byte_count == len(encrypted): + return encrypted + + raise ValueError(f"No mock for {byte_count}") + + def aio_mock_writer(_, __): + nonlocal attempts + attempts += 1 + if attempts == 1: + raise TimeoutError("Simulated timeout") + reader = mocker.patch("asyncio.StreamReader") + writer = mocker.patch("asyncio.StreamWriter") + mocker.patch.object(reader, "readexactly", _mock_read) + mocker.patch.object(writer, "drain", new_callable=AsyncMock) + return reader, writer + + config = DeviceConfig("127.0.0.1") + protocol = protocol_class(transport=transport_class(config=config)) + writer_obj = protocol if hasattr(protocol, "writer") else protocol._transport + await writer_obj.close() + + mocker.patch("asyncio.open_connection", side_effect=aio_mock_writer) + await protocol.query({"any": "thing"}) + + writer_obj = protocol if hasattr(protocol, "writer") else protocol._transport + assert writer_obj.writer is not None + response = await protocol.query({}) + assert response == {"great": "success"} + + @pytest.mark.parametrize( ("protocol_class", "transport_class", "encryption_class"), [