From d68966bd6b4187cf4493f13c3c11fb8fbc8bdd8a Mon Sep 17 00:00:00 2001 From: Teemu Rytilahti Date: Fri, 29 Nov 2024 18:24:01 +0100 Subject: [PATCH] Add fixtured-powered device simulator --- devtools/simulator.py | 114 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 devtools/simulator.py diff --git a/devtools/simulator.py b/devtools/simulator.py new file mode 100644 index 00000000..cc35fb31 --- /dev/null +++ b/devtools/simulator.py @@ -0,0 +1,114 @@ +"""Minimalistic, fixture-powered device simulator.""" + +import asyncio +import json +import logging +import ssl + +import asyncclick as click + +from kasa.json import dumps as json_dumps +from kasa.json import loads as json_loads + +_LOGGER = logging.getLogger(__name__) + +HDR = b"\x02\x00\x00\x01\x01\xe5\x11\x00" +RESP_HDR = bytearray(16) + + +class DiscoveryProtocol(asyncio.DatagramProtocol): + """Simplified discovery protocol implementation.""" + + def __init__(self, fixture): + self.disco_data = fixture + + def connection_made(self, transport): + """Set transport and log incoming connections.""" + peer = transport.get_extra_info("peername") + _LOGGER.info("[UDP] Connection from %s", peer) + self.transport = transport + + def datagram_received(self, data: bytes, addr): + """Respond to discovery requests.""" + _LOGGER.info("[UDP] %s << %s", addr, data) + if not data.startswith(HDR): + _LOGGER.debug("[UDP] Unexpected datagram from %s: %r", addr, data) + + resp_data = {"error_code": 0, "result": self.disco_data} + resp = json_dumps(resp_data).encode() + _LOGGER.info("[UDP] %s >> %s", addr, resp) + self.transport.sendto(RESP_HDR + resp, addr) + + +class AppProtocol(asyncio.Protocol): + """App protocol implementation.""" + + def connection_made(self, transport): + """Set the transport on incoming connections.""" + peer = transport.get_extra_info("peername") + _LOGGER.info("[APP] Connection from %s", peer) + self.transport = transport + + def data_received(self, data: bytes): + """Handle received requests.""" + message = data + _LOGGER.info("[APP] << %r", message) + + resp = {"error_code": 0} + + _LOGGER.info("[APP] >> %r", resp) + self.transport.write(json.dumps(resp).encode()) + + # TODO: don't close after response? + self.transport.close() + + +async def serve_udp(port, discovery_data): + """Serve discovery protocol.""" + loop = asyncio.get_event_loop() + _LOGGER.info("Serving discovery on port %s", port) + await loop.create_datagram_endpoint( + lambda: DiscoveryProtocol(discovery_data), + local_addr=("0.0.0.0", port), # noqa: S104 + ) + while True: + await asyncio.sleep(5) + + +async def serve_tcp(port): + """Serve app communications protocol.""" + _LOGGER.info("Serving app on port %s", port) + loop = asyncio.get_event_loop() + + # TODO: These settings are not enough to let openssl nor kasa cli to connect + # ssl.SSLError: [SSL: NO_SHARED_CIPHER] no shared cipher (_ssl.c:1000) + + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + ctx.set_ciphers("ALL:@SECLEVEL=0") + ctx.minimum_version = ssl.TLSVersion.TLSv1 + + server = await loop.create_server(AppProtocol, host="0.0.0.0", port=port, ssl=ctx) # noqa: S104 + async with server: + await server.serve_forever() + + +@click.command() +@click.argument("fixture", type=click.File()) +async def main(fixture): + """Minimalistic, fixture-powered device simulator.""" + fixture_data = json_loads(fixture.read()) + disco_data = fixture_data["discovery_result"] + _LOGGER.info( + "Starting for %s (%s)", disco_data["device_model"], disco_data["device_type"] + ) + + async with asyncio.TaskGroup() as tg: + tg.create_task(serve_udp(discovery_data=disco_data, port=20002)) + tg.create_task(serve_tcp(port=4433)) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + main()