import warnings from json import loads as json_loads from kasa import Credentials, DeviceConfig, KasaException, SmartProtocol from kasa.protocol import BaseTransport class FakeSmartProtocol(SmartProtocol): def __init__(self, info): super().__init__( transport=FakeSmartTransport(info), ) async def query(self, request, retry_count: int = 3): """Implement query here so can still patch SmartProtocol.query.""" resp_dict = await self._query(request, retry_count) return resp_dict class FakeSmartTransport(BaseTransport): def __init__(self, info): super().__init__( config=DeviceConfig( "127.0.0.123", credentials=Credentials( username="dummy_user", password="dummy_password", # noqa: S106 ), ), ) self.info = info self.components = { comp["id"]: comp["ver_code"] for comp in self.info["component_nego"]["component_list"] } @property def default_port(self): """Default port for the transport.""" return 80 @property def credentials_hash(self): """The hashed credentials used by the transport.""" return self._credentials.username + self._credentials.password + "hash" FIXTURE_MISSING_MAP = { "get_wireless_scan_info": ("wireless", {"ap_list": [], "wep_supported": False}), "get_auto_off_config": ("auto_off", {"delay_min": 10, "enable": False}), "get_led_info": ( "led", { "led_rule": "never", "led_status": False, "night_mode": { "end_time": 420, "night_mode_type": "sunrise_sunset", "start_time": 1140, "sunrise_offset": 0, "sunset_offset": 0, }, }, ), "get_connect_cloud_state": ("cloud_connect", {"status": 1}), "get_on_off_gradually_info": ("on_off_gradually", {"enable": True}), "get_latest_fw": ( "firmware", { "fw_size": 0, "fw_ver": "1.0.5 Build 230801 Rel.095702", "hw_id": "", "need_to_upgrade": False, "oem_id": "", "release_date": "", "release_note": "", "type": 0, }, ), "get_auto_update_info": ( "firmware", {"enable": True, "random_range": 120, "time": 180}, ), } async def send(self, request: str): request_dict = json_loads(request) method = request_dict["method"] params = request_dict["params"] if method == "multipleRequest": responses = [] for request in params["requests"]: response = self._send_request(request) # type: ignore[arg-type] response["method"] = request["method"] # type: ignore[index] responses.append(response) return {"result": {"responses": responses}, "error_code": 0} else: return self._send_request(request_dict) def _handle_control_child(self, params: dict): """Handle control_child command.""" device_id = params.get("device_id") request_data = params.get("requestData", {}) child_method = request_data.get("method") child_params = request_data.get("params") info = self.info children = info["get_child_device_list"]["child_device_list"] for child in children: if child["device_id"] == device_id: info = child break # We only support get & set device info for now. if child_method == "get_device_info": return {"result": info, "error_code": 0} elif child_method == "set_device_info": info.update(child_params) return {"error_code": 0} raise NotImplementedError( "Method %s not implemented for children" % child_method ) def _send_request(self, request_dict: dict): method = request_dict["method"] params = request_dict["params"] info = self.info if method == "control_child": return self._handle_control_child(params) elif method == "component_nego" or method[:4] == "get_": if method in info: return {"result": info[method], "error_code": 0} elif ( missing_result := self.FIXTURE_MISSING_MAP.get(method) ) and missing_result[0] in self.components: warnings.warn( UserWarning( f"Fixture missing expected method {method}, try to regenerate" ), stacklevel=1, ) return {"result": missing_result[1], "error_code": 0} else: raise KasaException(f"Fixture doesn't support {method}") elif method == "set_qs_info": return {"error_code": 0} elif method[:4] == "set_": target_method = f"get_{method[4:]}" info[target_method].update(params) return {"error_code": 0} async def close(self) -> None: pass async def reset(self) -> None: pass