From acd0202cabe4bbd87d5ec0c4ad19bdc830933b15 Mon Sep 17 00:00:00 2001 From: "Steven B." <51370195+sdb9696@users.noreply.github.com> Date: Fri, 18 Oct 2024 12:06:22 +0100 Subject: [PATCH] Update dump_devinfo for smart camera protocol (#1169) Introduces the child camera protocol wrapper, required to get the child device info with the new protocol. --- devtools/dump_devinfo.py | 452 ++++++++++--- devtools/helpers/smartrequests.py | 4 +- kasa/experimental/smartcameraprotocol.py | 100 ++- kasa/experimental/sslaestransport.py | 2 +- kasa/smartprotocol.py | 9 +- .../experimental/C210(EU)_2.0_1.4.2.json | 606 ++++++++++++++++++ 6 files changed, 1076 insertions(+), 97 deletions(-) create mode 100644 kasa/tests/fixtures/experimental/C210(EU)_2.0_1.4.2.json diff --git a/devtools/dump_devinfo.py b/devtools/dump_devinfo.py index 8ca39d03..12e4c3cb 100644 --- a/devtools/dump_devinfo.py +++ b/devtools/dump_devinfo.py @@ -28,14 +28,22 @@ from kasa import ( AuthenticationError, Credentials, Device, + DeviceConfig, + DeviceConnectionParameters, Discover, KasaException, TimeoutError, ) +from kasa.device_factory import get_protocol +from kasa.deviceconfig import DeviceEncryptionType, DeviceFamily from kasa.discover import DiscoveryResult from kasa.exceptions import SmartErrorCode -from kasa.smart import SmartDevice -from kasa.smartprotocol import _ChildProtocolWrapper +from kasa.experimental.smartcameraprotocol import ( + SmartCameraProtocol, + _ChildCameraProtocolWrapper, +) +from kasa.smart import SmartChildDevice +from kasa.smartprotocol import SmartProtocol, _ChildProtocolWrapper Call = namedtuple("Call", "module method") SmartCall = namedtuple("SmartCall", "module request should_succeed child_device_id") @@ -45,6 +53,8 @@ SMART_FOLDER = "kasa/tests/fixtures/smart/" SMART_CHILD_FOLDER = "kasa/tests/fixtures/smart/child/" IOT_FOLDER = "kasa/tests/fixtures/" +ENCRYPT_TYPES = [encrypt_type.value for encrypt_type in DeviceEncryptionType] + _LOGGER = logging.getLogger(__name__) @@ -82,11 +92,23 @@ def scrub(res): "mfi_setup_id", "mfi_token_token", "mfi_token_uuid", + "dev_id", + "device_name", + "device_alias", + "connect_ssid", + "encrypt_info", + "local_ip", ] for k, v in res.items(): if isinstance(v, collections.abc.Mapping): - res[k] = scrub(res.get(k)) + if k == "encrypt_info": + if "data" in v: + v["data"] = "" + if "key" in v: + v["key"] = "" + else: + res[k] = scrub(res.get(k)) elif ( isinstance(v, list) and len(v) > 0 @@ -107,20 +129,20 @@ def scrub(res): v = f"{v[:8]}{delim}{rest}" elif k in ["latitude", "latitude_i", "longitude", "longitude_i"]: v = 0 - elif k in ["ip"]: + elif k in ["ip", "local_ip"]: v = "127.0.0.123" elif k in ["ssid"]: # Need a valid base64 value here v = base64.b64encode(b"#MASKED_SSID#").decode() elif k in ["nickname"]: v = base64.b64encode(b"#MASKED_NAME#").decode() - elif k in ["alias"]: + elif k in ["alias", "device_alias"]: v = "#MASKED_NAME#" elif isinstance(res[k], int): v = 0 - elif k == "device_id" and "SCRUBBED" in v: + elif k in ["device_id", "dev_id"] and "SCRUBBED" in v: pass # already scrubbed - elif k == "device_id" and len(v) > 40: + elif k == ["device_id", "dev_id"] and len(v) > 40: # retain the last two chars when scrubbing child ids end = v[-2:] v = re.sub(r"\w", "0", v) @@ -142,14 +164,18 @@ def default_to_regular(d): return d -async def handle_device(basedir, autosave, device: Device, batch_size: int): +async def handle_device( + basedir, autosave, protocol, *, discovery_info=None, batch_size: int +): """Create a fixture for a single device instance.""" - if isinstance(device, SmartDevice): + if isinstance(protocol, SmartProtocol): fixture_results: list[FixtureResult] = await get_smart_fixtures( - device, batch_size + protocol, discovery_info=discovery_info, batch_size=batch_size ) else: - fixture_results = [await get_legacy_fixture(device)] + fixture_results = [ + await get_legacy_fixture(protocol, discovery_info=discovery_info) + ] for fixture_result in fixture_results: save_filename = Path(basedir) / fixture_result.folder / fixture_result.filename @@ -207,6 +233,44 @@ async def handle_device(basedir, autosave, device: Device, batch_size: int): + " Do not use this flag unless you are sure you know what it means." ), ) +@click.option( + "--discovery-timeout", + envvar="KASA_DISCOVERY_TIMEOUT", + default=10, + required=False, + show_default=True, + help="Timeout for discovery.", +) +@click.option( + "-e", + "--encrypt-type", + envvar="KASA_ENCRYPT_TYPE", + default=None, + type=click.Choice(ENCRYPT_TYPES, case_sensitive=False), +) +@click.option( + "-df", + "--device-family", + envvar="KASA_DEVICE_FAMILY", + default="SMART.TAPOPLUG", + help="Device family type, e.g. `SMART.KASASWITCH`.", +) +@click.option( + "-lv", + "--login-version", + envvar="KASA_LOGIN_VERSION", + default=2, + type=int, + help="The login version for device authentication. Defaults to 2", +) +@click.option( + "--https/--no-https", + envvar="KASA_HTTPS", + default=False, + is_flag=True, + type=bool, + help="Set flag if the device encryption uses https.", +) @click.option("--port", help="Port override", type=int) async def cli( host, @@ -215,9 +279,14 @@ async def cli( autosave, debug, username, + discovery_timeout, password, batch_size, discovery_info, + encrypt_type, + https, + device_family, + login_version, port, ): """Generate devinfo files for devices. @@ -227,11 +296,14 @@ async def cli( if debug: logging.basicConfig(level=logging.DEBUG) + from kasa.experimental.enabled import Enabled + + Enabled.set(True) + credentials = Credentials(username=username, password=password) if host is not None: if discovery_info: click.echo("Host and discovery info given, trying connect on %s." % host) - from kasa import DeviceConfig, DeviceConnectionParameters di = json.loads(discovery_info) dr = DiscoveryResult(**di) @@ -247,25 +319,68 @@ async def cli( credentials=credentials, ) device = await Device.connect(config=dc) - device.update_from_discover_info(dr.get_dict()) + await handle_device( + basedir, + autosave, + device.protocol, + discovery_info=dr.get_dict(), + batch_size=batch_size, + ) + elif device_family and encrypt_type: + ctype = DeviceConnectionParameters( + DeviceFamily(device_family), + DeviceEncryptionType(encrypt_type), + login_version, + https, + ) + config = DeviceConfig( + host=host, + port_override=port, + credentials=credentials, + connection_type=ctype, + ) + if protocol := get_protocol(config): + await handle_device(basedir, autosave, protocol, batch_size=batch_size) + else: + raise KasaException( + "Could not find a protocol for the given parameters. " + + "Maybe you need to enable --experimental." + ) else: click.echo("Host given, performing discovery on %s." % host) device = await Discover.discover_single( - host, credentials=credentials, port=port + host, + credentials=credentials, + port=port, + discovery_timeout=discovery_timeout, + ) + await handle_device( + basedir, + autosave, + device.protocol, + discovery_info=device._discovery_info, + batch_size=batch_size, ) - await handle_device(basedir, autosave, device, batch_size) else: click.echo( "No --host given, performing discovery on %s. Use --target to override." % target ) - devices = await Discover.discover(target=target, credentials=credentials) + devices = await Discover.discover( + target=target, credentials=credentials, discovery_timeout=discovery_timeout + ) click.echo("Detected %s devices" % len(devices)) for dev in devices.values(): - await handle_device(basedir, autosave, dev, batch_size) + await handle_device( + basedir, + autosave, + dev.protocol, + discovery_info=dev._discovery_info, + batch_size=batch_size, + ) -async def get_legacy_fixture(device): +async def get_legacy_fixture(protocol, *, discovery_info): """Get fixture for legacy IOT style protocol.""" items = [ Call(module="system", method="get_sysinfo"), @@ -284,9 +399,7 @@ async def get_legacy_fixture(device): for test_call in items: try: click.echo(f"Testing {test_call}..", nl=False) - info = await device.protocol.query( - {test_call.module: {test_call.method: {}}} - ) + info = await protocol.query({test_call.module: {test_call.method: {}}}) resp = info[test_call.module] except Exception as ex: click.echo(click.style(f"FAIL {ex}", fg="red")) @@ -297,7 +410,7 @@ async def get_legacy_fixture(device): click.echo(click.style("OK", fg="green")) successes.append((test_call, info)) finally: - await device.protocol.close() + await protocol.close() final_query = defaultdict(defaultdict) final = defaultdict(defaultdict) @@ -308,15 +421,15 @@ async def get_legacy_fixture(device): final = default_to_regular(final) try: - final = await device.protocol.query(final_query) + final = await protocol.query(final_query) except Exception as ex: _echo_error(f"Unable to query all successes at once: {ex}", bold=True, fg="red") finally: - await device.protocol.close() - if device._discovery_info and not device._discovery_info.get("system"): + await protocol.close() + if discovery_info and not discovery_info.get("system"): # Need to recreate a DiscoverResult here because we don't want the aliases # in the fixture, we want the actual field names as returned by the device. - dr = DiscoveryResult(**device._discovery_info) + dr = DiscoveryResult(**protocol._discovery_info) final["discovery_result"] = dr.dict( by_alias=False, exclude_unset=True, exclude_none=True, exclude_defaults=True ) @@ -365,29 +478,29 @@ def format_exception(e): async def _make_requests_or_exit( - device: SmartDevice, - requests: list[SmartRequest], + protocol: SmartProtocol, + requests: dict, name: str, batch_size: int, *, child_device_id: str, ) -> dict[str, dict]: final = {} - protocol = ( - device.protocol - if child_device_id == "" - else _ChildProtocolWrapper(child_device_id, device.protocol) - ) + # Calling close on child protocol wrappers is a noop + protocol_to_close = protocol + if child_device_id: + if isinstance(protocol, SmartCameraProtocol): + protocol = _ChildCameraProtocolWrapper(child_device_id, protocol) + else: + protocol = _ChildProtocolWrapper(child_device_id, protocol) try: end = len(requests) step = batch_size # Break the requests down as there seems to be a size limit + keys = [key for key in requests] for i in range(0, end, step): x = i - requests_step = requests[x : x + step] - request: list[SmartRequest] | SmartRequest = ( - requests_step[0] if len(requests_step) == 1 else requests_step - ) - responses = await protocol.query(SmartRequest._create_request_dict(request)) + requests_step = {key: requests[key] for key in keys[x : x + step]} + responses = await protocol.query(requests_step) for method, result in responses.items(): final[method] = result return final @@ -413,10 +526,155 @@ async def _make_requests_or_exit( _echo_error(format_exception(ex)) exit(1) finally: - await device.protocol.close() + await protocol_to_close.close() -async def get_smart_test_calls(device: SmartDevice): +async def get_smart_camera_test_calls(protocol: SmartProtocol): + """Get the list of test calls to make.""" + test_calls: list[SmartCall] = [] + successes: list[SmartCall] = [] + + requests = { + "getAlertTypeList": {"msg_alarm": {"name": "alert_type"}}, + "getNightVisionCapability": {"image_capability": {"name": ["supplement_lamp"]}}, + "getDeviceInfo": {"device_info": {"name": ["basic_info"]}}, + "getDetectionConfig": {"motion_detection": {"name": ["motion_det"]}}, + "getPersonDetectionConfig": {"people_detection": {"name": ["detection"]}}, + "getVehicleDetectionConfig": {"vehicle_detection": {"name": ["detection"]}}, + "getBCDConfig": {"sound_detection": {"name": ["bcd"]}}, + "getPetDetectionConfig": {"pet_detection": {"name": ["detection"]}}, + "getBarkDetectionConfig": {"bark_detection": {"name": ["detection"]}}, + "getMeowDetectionConfig": {"meow_detection": {"name": ["detection"]}}, + "getGlassDetectionConfig": {"glass_detection": {"name": ["detection"]}}, + "getTamperDetectionConfig": {"tamper_detection": {"name": "tamper_det"}}, + "getLensMaskConfig": {"lens_mask": {"name": ["lens_mask_info"]}}, + "getLdc": {"image": {"name": ["switch", "common"]}}, + "getLastAlarmInfo": {"msg_alarm": {"name": ["chn1_msg_alarm_info"]}}, + "getLedStatus": {"led": {"name": ["config"]}}, + "getTargetTrackConfig": {"target_track": {"name": ["target_track_info"]}}, + "getPresetConfig": {"preset": {"name": ["preset"]}}, + "getFirmwareUpdateStatus": {"cloud_config": {"name": "upgrade_status"}}, + "getMediaEncrypt": {"cet": {"name": ["media_encrypt"]}}, + "getConnectionType": {"network": {"get_connection_type": []}}, + "getAlarmConfig": {"msg_alarm": {}}, + "getAlarmPlan": {"msg_alarm_plan": {}}, + "getSirenTypeList": {"siren": {}}, + "getSirenConfig": {"siren": {}}, + "getAlertConfig": { + "msg_alarm": { + "name": ["chn1_msg_alarm_info", "capability"], + "table": ["usr_def_audio"], + } + }, + "getLightTypeList": {"msg_alarm": {}}, + "getSirenStatus": {"siren": {}}, + "getLightFrequencyInfo": {"image": {"name": "common"}}, + "getLightFrequencyCapability": {"image": {"name": "common"}}, + "getRotationStatus": {"image": {"name": ["switch"]}}, + "getNightVisionModeConfig": {"image": {"name": "switch"}}, + "getWhitelampStatus": {"image": {"get_wtl_status": ["null"]}}, + "getWhitelampConfig": {"image": {"name": "switch"}}, + "getMsgPushConfig": {"msg_push": {"name": ["chn1_msg_push_info"]}}, + "getSdCardStatus": {"harddisk_manage": {"table": ["hd_info"]}}, + "getCircularRecordingConfig": {"harddisk_manage": {"name": "harddisk"}}, + "getRecordPlan": {"record_plan": {"name": ["chn1_channel"]}}, + "getAudioConfig": {"audio_config": {"name": ["speaker", "microphone"]}}, + "getFirmwareAutoUpgradeConfig": {"auto_upgrade": {"name": ["common"]}}, + "getVideoQualities": {"video": {"name": ["main"]}}, + "getVideoCapability": {"video_capability": {"name": "main"}}, + } + test_calls = [] + for method, params in requests.items(): + test_calls.append( + SmartCall( + module=method, + request={method: params}, + should_succeed=True, + child_device_id="", + ) + ) + + # Now get the child device requests + try: + child_request = {"getChildDeviceList": {"childControl": {"start_index": 0}}} + child_response = await protocol.query(child_request) + except Exception: + _LOGGER.debug("Device does not have any children.") + else: + successes.append( + SmartCall( + module="getChildDeviceList", + request=child_request, + should_succeed=True, + child_device_id="", + ) + ) + child_list = child_response["getChildDeviceList"]["child_device_list"] + for child in child_list: + child_id = child.get("device_id") or child.get("dev_id") + if not child_id: + _LOGGER.error("Could not find child device id in %s", child) + # If category is in the child device map the protocol is smart. + if ( + category := child.get("category") + ) and category in SmartChildDevice.CHILD_DEVICE_TYPE_MAP: + child_protocol = _ChildCameraProtocolWrapper(child_id, protocol) + try: + nego_response = await child_protocol.query({"component_nego": None}) + except Exception as ex: + _LOGGER.error("Error calling component_nego: %s", ex) + continue + if "component_nego" not in nego_response: + _LOGGER.error( + "Could not find component_nego in device response: %s", + nego_response, + ) + continue + successes.append( + SmartCall( + module="component_nego", + request={"component_nego": None}, + should_succeed=True, + child_device_id=child_id, + ) + ) + child_components = { + item["id"]: item["ver_code"] + for item in nego_response["component_nego"]["component_list"] + } + for component_id, ver_code in child_components.items(): + if ( + requests := get_component_requests(component_id, ver_code) + ) is not None: + component_test_calls = [ + SmartCall( + module=component_id, + request={key: val}, + should_succeed=True, + child_device_id=child_id, + ) + for key, val in requests.items() + ] + test_calls.extend(component_test_calls) + else: + click.echo(f"Skipping {component_id}..", nl=False) + click.echo(click.style("UNSUPPORTED", fg="yellow")) + else: # Not a smart protocol device so assume camera protocol + for method, params in requests.items(): + test_calls.append( + SmartCall( + module=method, + request={method: params}, + should_succeed=True, + child_device_id=child_id, + ) + ) + finally: + await protocol.close() + return test_calls, successes + + +async def get_smart_test_calls(protocol: SmartProtocol): """Get the list of test calls to make.""" test_calls = [] successes = [] @@ -425,7 +683,7 @@ async def get_smart_test_calls(device: SmartDevice): extra_test_calls = [ SmartCall( module="temp_humidity_records", - request=SmartRequest.get_raw_request("get_temp_humidity_records"), + request=SmartRequest.get_raw_request("get_temp_humidity_records").to_dict(), should_succeed=False, child_device_id="", ), @@ -433,7 +691,7 @@ async def get_smart_test_calls(device: SmartDevice): module="trigger_logs", request=SmartRequest.get_raw_request( "get_trigger_logs", SmartRequest.GetTriggerLogsParams() - ), + ).to_dict(), should_succeed=False, child_device_id="", ), @@ -441,8 +699,8 @@ async def get_smart_test_calls(device: SmartDevice): click.echo("Testing component_nego call ..", nl=False) responses = await _make_requests_or_exit( - device, - [SmartRequest.component_nego()], + protocol, + SmartRequest.component_nego().to_dict(), "component_nego call", batch_size=1, child_device_id="", @@ -452,7 +710,7 @@ async def get_smart_test_calls(device: SmartDevice): successes.append( SmartCall( module="component_nego", - request=SmartRequest("component_nego"), + request=SmartRequest("component_nego").to_dict(), should_succeed=True, child_device_id="", ) @@ -464,8 +722,8 @@ async def get_smart_test_calls(device: SmartDevice): if "child_device" in components: child_components = await _make_requests_or_exit( - device, - [SmartRequest.get_child_device_component_list()], + protocol, + SmartRequest.get_child_device_component_list().to_dict(), "child device component list", batch_size=1, child_device_id="", @@ -473,7 +731,7 @@ async def get_smart_test_calls(device: SmartDevice): successes.append( SmartCall( module="child_component_list", - request=SmartRequest.get_child_device_component_list(), + request=SmartRequest.get_child_device_component_list().to_dict(), should_succeed=True, child_device_id="", ) @@ -481,7 +739,7 @@ async def get_smart_test_calls(device: SmartDevice): test_calls.append( SmartCall( module="child_device_list", - request=SmartRequest.get_child_device_list(), + request=SmartRequest.get_child_device_list().to_dict(), should_succeed=True, child_device_id="", ) @@ -506,11 +764,11 @@ async def get_smart_test_calls(device: SmartDevice): component_test_calls = [ SmartCall( module=component_id, - request=request, + request={key: val}, should_succeed=True, child_device_id="", ) - for request in requests + for key, val in requests.items() ] test_calls.extend(component_test_calls) else: @@ -524,7 +782,7 @@ async def get_smart_test_calls(device: SmartDevice): test_calls.append( SmartCall( module="component_nego", - request=SmartRequest("component_nego"), + request=SmartRequest("component_nego").to_dict(), should_succeed=True, child_device_id=child_device_id, ) @@ -534,11 +792,11 @@ async def get_smart_test_calls(device: SmartDevice): component_test_calls = [ SmartCall( module=component_id, - request=request, + request={key: val}, should_succeed=True, child_device_id=child_device_id, ) - for request in requests + for key, val in requests.items() ] test_calls.extend(component_test_calls) else: @@ -568,23 +826,28 @@ def get_smart_child_fixture(response): ) -async def get_smart_fixtures(device: SmartDevice, batch_size: int): +async def get_smart_fixtures( + protocol: SmartProtocol, *, discovery_info=None, batch_size: int +): """Get fixture for new TAPO style protocol.""" - test_calls, successes = await get_smart_test_calls(device) + if isinstance(protocol, SmartCameraProtocol): + test_calls, successes = await get_smart_camera_test_calls(protocol) + child_wrapper: type[_ChildProtocolWrapper | _ChildCameraProtocolWrapper] = ( + _ChildCameraProtocolWrapper + ) + else: + test_calls, successes = await get_smart_test_calls(protocol) + child_wrapper = _ChildProtocolWrapper for test_call in test_calls: click.echo(f"Testing {test_call.module}..", nl=False) try: click.echo(f"Testing {test_call}..", nl=False) if test_call.child_device_id == "": - response = await device.protocol.query( - SmartRequest._create_request_dict(test_call.request) - ) + response = await protocol.query(test_call.request) else: - cp = _ChildProtocolWrapper(test_call.child_device_id, device.protocol) - response = await cp.query( - SmartRequest._create_request_dict(test_call.request) - ) + cp = child_wrapper(test_call.child_device_id, protocol) + response = await cp.query(test_call.request) except AuthenticationError as ex: _echo_error( f"Unable to query the device due to an authentication error: {ex}", @@ -614,12 +877,12 @@ async def get_smart_fixtures(device: SmartDevice, batch_size: int): click.echo(click.style("OK", fg="green")) successes.append(test_call) finally: - await device.protocol.close() + await protocol.close() - device_requests: dict[str, list[SmartRequest]] = {} + device_requests: dict[str, dict] = {} for success in successes: - device_request = device_requests.setdefault(success.child_device_id, []) - device_request.append(success.request) + device_request = device_requests.setdefault(success.child_device_id, {}) + device_request.update(success.request) scrubbed_device_ids = { device_id: f"SCRUBBED_CHILD_DEVICE_ID_{index}" @@ -628,7 +891,7 @@ async def get_smart_fixtures(device: SmartDevice, batch_size: int): } final = await _make_requests_or_exit( - device, + protocol, device_requests[""], "all successes at once", batch_size, @@ -639,7 +902,7 @@ async def get_smart_fixtures(device: SmartDevice, batch_size: int): if child_device_id == "": continue response = await _make_requests_or_exit( - device, + protocol, requests, "all child successes at once", batch_size, @@ -649,18 +912,26 @@ async def get_smart_fixtures(device: SmartDevice, batch_size: int): if "get_device_info" in response and "device_id" in response["get_device_info"]: response["get_device_info"]["device_id"] = scrubbed # If the child is a different model to the parent create a seperate fixture + if "get_device_info" in final: + parent_model = final["get_device_info"]["model"] + elif "getDeviceInfo" in final: + parent_model = final["getDeviceInfo"]["device_info"]["basic_info"][ + "device_model" + ] + else: + raise KasaException("Cannot determine parent device model.") if ( "component_nego" in response and "get_device_info" in response and (child_model := response["get_device_info"].get("model")) - and child_model != final["get_device_info"]["model"] + and child_model != parent_model ): fixture_results.append(get_smart_child_fixture(response)) else: cd = final.setdefault("child_devices", {}) cd[scrubbed] = response - # Scrub the device ids in the parent + # Scrub the device ids in the parent for smart protocol if gc := final.get("get_child_device_component_list"): for child in gc["child_component_list"]: device_id = child["device_id"] @@ -669,20 +940,43 @@ async def get_smart_fixtures(device: SmartDevice, batch_size: int): device_id = child["device_id"] child["device_id"] = scrubbed_device_ids[device_id] + # Scrub the device ids in the parent for the smart camera protocol + if gc := final.get("getChildDeviceList"): + for child in gc["child_device_list"]: + if device_id := child.get("device_id"): + child["device_id"] = scrubbed_device_ids[device_id] + continue + if device_id := child.get("dev_id"): + child["dev_id"] = scrubbed_device_ids[device_id] + continue + _LOGGER.error("Could not find a device for the child device: %s", child) + # Need to recreate a DiscoverResult here because we don't want the aliases # in the fixture, we want the actual field names as returned by the device. - dr = DiscoveryResult(**device._discovery_info) # type: ignore - final["discovery_result"] = dr.dict( - by_alias=False, exclude_unset=True, exclude_none=True, exclude_defaults=True - ) + if discovery_info: + dr = DiscoveryResult(**discovery_info) # type: ignore + final["discovery_result"] = dr.dict( + by_alias=False, exclude_unset=True, exclude_none=True, exclude_defaults=True + ) click.echo("Got %s successes" % len(successes)) click.echo(click.style("## device info file ##", bold=True)) - hw_version = final["get_device_info"]["hw_ver"] - sw_version = final["get_device_info"]["fw_ver"] - model = final["discovery_result"]["device_model"] - sw_version = sw_version.split(" ", maxsplit=1)[0] + if "get_device_info" in final: + hw_version = final["get_device_info"]["hw_ver"] + sw_version = final["get_device_info"]["fw_ver"] + if discovery_info: + model = discovery_info["device_model"] + else: + model = final["get_device_info"]["model"] + "(XX)" + sw_version = sw_version.split(" ", maxsplit=1)[0] + else: + hw_version = final["getDeviceInfo"]["device_info"]["basic_info"]["hw_version"] + sw_version = final["getDeviceInfo"]["device_info"]["basic_info"]["sw_version"] + model = final["getDeviceInfo"]["device_info"]["basic_info"]["device_model"] + region = final["getDeviceInfo"]["device_info"]["basic_info"]["region"] + sw_version = sw_version.split(" ", maxsplit=1)[0] + model = f"{model}({region})" save_filename = f"{model}_{hw_version}_{sw_version}.json" copy_folder = SMART_FOLDER diff --git a/devtools/helpers/smartrequests.py b/devtools/helpers/smartrequests.py index 4db1f7a1..104ccb64 100644 --- a/devtools/helpers/smartrequests.py +++ b/devtools/helpers/smartrequests.py @@ -356,8 +356,8 @@ def get_component_requests(component_id, ver_code): if (cr := COMPONENT_REQUESTS.get(component_id)) is None: return None if callable(cr): - return cr(ver_code) - return cr + return SmartRequest._create_request_dict(cr(ver_code)) + return SmartRequest._create_request_dict(cr) COMPONENT_REQUESTS = { diff --git a/kasa/experimental/smartcameraprotocol.py b/kasa/experimental/smartcameraprotocol.py index 384b76e9..78579616 100644 --- a/kasa/experimental/smartcameraprotocol.py +++ b/kasa/experimental/smartcameraprotocol.py @@ -6,7 +6,12 @@ import logging from pprint import pformat as pf from typing import Any -from ..exceptions import AuthenticationError, DeviceError, _RetryableError +from ..exceptions import ( + AuthenticationError, + DeviceError, + KasaException, + _RetryableError, +) from ..json import dumps as json_dumps from ..smartprotocol import SmartProtocol from .sslaestransport import ( @@ -65,22 +70,28 @@ class SmartCameraProtocol(SmartProtocol): if isinstance(request, dict): if len(request) == 1: - multi_method = next(iter(request)) - module = next(iter(request[multi_method])) - req = { - "method": multi_method[:3], - module: request[multi_method][module], - } + method = next(iter(request)) + if method == "multipleRequest": + params = request["multipleRequest"] + req = {"method": "multipleRequest", "params": params} + elif method[:3] == "set": + params = next(iter(request[method])) + req = { + "method": method[:3], + params: request[method][params], + } + else: + return await self._execute_multiple_query(request, retry_count) else: return await self._execute_multiple_query(request, retry_count) else: # If method like getSomeThing then module will be some_thing - multi_method = request + method = request snake_name = "".join( - ["_" + i.lower() if i.isupper() else i for i in multi_method] + ["_" + i.lower() if i.isupper() else i for i in method] ).lstrip("_") - module = snake_name[4:] - req = {"method": snake_name[:3], module: {}} + params = snake_name[4:] + req = {"method": snake_name[:3], params: {}} smart_request = json_dumps(req) if debug_enabled: @@ -100,10 +111,71 @@ class SmartCameraProtocol(SmartProtocol): if "error_code" in response_data: # H200 does not return an error code - self._handle_response_error_code(response_data, multi_method) + self._handle_response_error_code(response_data, method) # TODO need to update handle response lists - if multi_method[:3] == "set": + if method[:3] == "set": return {} - return {multi_method: {module: response_data[module]}} + if method == "multipleRequest": + return {method: response_data["result"]} + return {method: {params: response_data[params]}} + + +class _ChildCameraProtocolWrapper(SmartProtocol): + """Protocol wrapper for controlling child devices. + + This is an internal class used to communicate with child devices, + and should not be used directly. + + This class overrides query() method of the protocol to modify all + outgoing queries to use ``controlChild`` command, and unwraps the + device responses before returning to the caller. + """ + + def __init__(self, device_id: str, base_protocol: SmartProtocol): + self._device_id = device_id + self._protocol = base_protocol + self._transport = base_protocol._transport + + async def query(self, request: str | dict, retry_count: int = 3) -> dict: + """Wrap request inside controlChild envelope.""" + return await self._query(request, retry_count) + + async def _query(self, request: str | dict, retry_count: int = 3) -> dict: + """Wrap request inside controlChild envelope.""" + if not isinstance(request, dict): + raise KasaException("Child requests must be dictionaries.") + requests = [] + methods = [] + for key, val in request.items(): + request = { + "method": "controlChild", + "params": { + "childControl": { + "device_id": self._device_id, + "request_data": {"method": key, "params": val}, + } + }, + } + methods.append(key) + requests.append(request) + + multipleRequest = {"multipleRequest": {"requests": requests}} + + response = await self._protocol.query(multipleRequest, retry_count) + + responses = response["multipleRequest"]["responses"] + response_dict = {} + for index_id, response in enumerate(responses): + response_data = response["result"]["response_data"] + method = methods[index_id] + self._handle_response_error_code( + response_data, method, raise_on_error=False + ) + response_dict[method] = response_data.get("result") + + return response_dict + + async def close(self) -> None: + """Do nothing as the parent owns the protocol.""" diff --git a/kasa/experimental/sslaestransport.py b/kasa/experimental/sslaestransport.py index 151cd568..fa3e6920 100644 --- a/kasa/experimental/sslaestransport.py +++ b/kasa/experimental/sslaestransport.py @@ -507,5 +507,5 @@ SMART_RETRYABLE_ERRORS = [ ] SMART_AUTHENTICATION_ERRORS = [ - SmartErrorCode.INVALID_ARGUMENTS, + SmartErrorCode.HOMEKIT_LOGIN_FAIL, ] diff --git a/kasa/smartprotocol.py b/kasa/smartprotocol.py index 21179694..0c2a2bba 100644 --- a/kasa/smartprotocol.py +++ b/kasa/smartprotocol.py @@ -163,6 +163,7 @@ class SmartProtocol(BaseProtocol): ] end = len(multi_requests) + # Break the requests down as there can be a size limit step = self._multi_request_batch_size if step == 1: @@ -175,6 +176,10 @@ class SmartProtocol(BaseProtocol): multi_result[method] = resp["result"] return multi_result + # The SmartCameraProtocol sends requests with a length 1 as a + # multipleRequest. The SmartProtocol doesn't so will never + # raise_on_error + raise_on_error = end == 1 for batch_num, i in enumerate(range(0, end, step)): requests_step = multi_requests[i : i + step] @@ -222,7 +227,9 @@ class SmartProtocol(BaseProtocol): responses = response_step["result"]["responses"] for response in responses: method = response["method"] - self._handle_response_error_code(response, method, raise_on_error=False) + self._handle_response_error_code( + response, method, raise_on_error=raise_on_error + ) result = response.get("result", None) await self._handle_response_lists( result, method, retry_count=retry_count diff --git a/kasa/tests/fixtures/experimental/C210(EU)_2.0_1.4.2.json b/kasa/tests/fixtures/experimental/C210(EU)_2.0_1.4.2.json new file mode 100644 index 00000000..304a1e12 --- /dev/null +++ b/kasa/tests/fixtures/experimental/C210(EU)_2.0_1.4.2.json @@ -0,0 +1,606 @@ +{ + "discovery_result": { + "decrypted_data": { + "connect_ssid": "0000000000", + "connect_type": "wireless", + "device_id": "0000000000000000000000000000000000000000", + "http_port": 443, + "last_alarm_time": "0", + "last_alarm_type": "", + "owner": "00000000000000000000000000000000", + "sd_status": "offline" + }, + "device_id": "00000000000000000000000000000000", + "device_model": "C210", + "device_name": "00000 000", + "device_type": "SMART.IPCAMERA", + "encrypt_info": { + "data": "", + "key": "", + "sym_schm": "AES" + }, + "encrypt_type": [ + "3" + ], + "factory_default": false, + "firmware_version": "1.4.2 Build 240829 Rel.54953n", + "hardware_version": "2.0", + "ip": "127.0.0.123", + "is_support_iot_cloud": true, + "mac": "40-AE-30-00-00-00", + "mgt_encrypt_schm": { + "is_support_https": true + } + }, + "getAlertConfig": { + "msg_alarm": { + "capability": { + "alarm_duration_support": "1", + "alarm_volume_support": "1", + "alert_event_type_support": "1", + "usr_def_audio_alarm_max_num": "15", + "usr_def_audio_alarm_support": "1", + "usr_def_audio_max_duration": "15", + "usr_def_audio_type": "0", + "usr_def_start_file_id": "8195" + }, + "chn1_msg_alarm_info": { + "alarm_duration": "0", + "alarm_mode": [ + "sound", + "light" + ], + "alarm_type": "0", + "alarm_volume": "high", + "enabled": "off", + "light_alarm_enabled": "on", + "light_type": "1", + "sound_alarm_enabled": "on" + }, + "usr_def_audio": [] + } + }, + "getAlertTypeList": { + "msg_alarm": { + "alert_type": { + "alert_type_list": [ + "Siren", + "Tone" + ] + } + } + }, + "getAudioConfig": { + "audio_config": { + "microphone": { + "bitrate": "64", + "channels": "1", + "echo_cancelling": "off", + "encode_type": "G711alaw", + "input_device_type": "MicIn", + "mute": "off", + "noise_cancelling": "on", + "sampling_rate": "8", + "volume": "100" + }, + "speaker": { + "mute": "off", + "output_device_type": "SpeakerOut", + "volume": "100" + } + } + }, + "getBCDConfig": { + "sound_detection": { + "bcd": { + "digital_sensitivity": "50", + "enabled": "off", + "sensitivity": "medium" + } + } + }, + "getCircularRecordingConfig": { + "harddisk_manage": { + "harddisk": { + "loop": "on" + } + } + }, + "getConnectionType": { + "link_type": "wifi", + "rssi": "2", + "rssiValue": -64, + "ssid": "I01BU0tFRF9TU0lEIw==" + }, + "getDetectionConfig": { + "motion_detection": { + "motion_det": { + "digital_sensitivity": "50", + "enabled": "on", + "non_vehicle_enabled": "off", + "people_enabled": "off", + "sensitivity": "medium", + "vehicle_enabled": "off" + } + } + }, + "getDeviceInfo": { + "device_info": { + "basic_info": { + "avatar": "Home", + "barcode": "", + "dev_id": "0000000000000000000000000000000000000000", + "device_alias": "#MASKED_NAME#", + "device_info": "C210 2.0 IPC", + "device_model": "C210", + "device_name": "0000 0.0", + "device_type": "SMART.IPCAMERA", + "features": 3, + "ffs": false, + "has_set_location_info": 1, + "hw_desc": "00000000000000000000000000000000", + "hw_id": "00000000000000000000000000000000", + "hw_version": "2.0", + "is_cal": true, + "latitude": 0, + "longitude": 0, + "mac": "40-AE-30-00-00-00", + "manufacturer_name": "TP-LINK", + "mobile_access": "0", + "oem_id": "00000000000000000000000000000000", + "region": "EU", + "sw_version": "1.4.2 Build 240829 Rel.54953n" + } + } + }, + "getFirmwareAutoUpgradeConfig": { + "auto_upgrade": { + "common": { + "enabled": "on", + "random_range": "120", + "time": "03:00" + } + } + }, + "getFirmwareUpdateStatus": { + "cloud_config": { + "upgrade_status": { + "lastUpgradingSuccess": true, + "state": "normal" + } + } + }, + "getLastAlarmInfo": { + "msg_alarm": { + "chn1_msg_alarm_info": { + "alarm_duration": "0", + "alarm_mode": [ + "sound", + "light" + ], + "alarm_type": "0", + "alarm_volume": "high", + "enabled": "off", + "light_alarm_enabled": "on", + "light_type": "1", + "sound_alarm_enabled": "on" + } + } + }, + "getLdc": { + "image": { + "common": { + "area_compensation": "default", + "auto_exp_antiflicker": "off", + "auto_exp_gain_max": "0", + "backlight": "off", + "chroma": "50", + "contrast": "50", + "dehaze": "off", + "eis": "off", + "exp_gain": "0", + "exp_level": "0", + "exp_type": "auto", + "focus_limited": "10", + "focus_type": "manual", + "high_light_compensation": "off", + "inf_delay": "5", + "inf_end_time": "21600", + "inf_sensitivity": "1", + "inf_sensitivity_day2night": "1400", + "inf_sensitivity_night2day": "9100", + "inf_start_time": "64800", + "inf_type": "auto", + "iris_level": "160", + "light_freq_mode": "auto", + "lock_blue_colton": "0", + "lock_blue_gain": "0", + "lock_gb_gain": "0", + "lock_gr_gain": "0", + "lock_green_colton": "0", + "lock_red_colton": "0", + "lock_red_gain": "0", + "lock_source": "local", + "luma": "50", + "saturation": "50", + "sharpness": "50", + "shutter": "1/25", + "smartir": "off", + "smartir_level": "100", + "smartwtl": "auto_wtl", + "smartwtl_digital_level": "100", + "smartwtl_level": "5", + "style": "standard", + "wb_B_gain": "50", + "wb_G_gain": "50", + "wb_R_gain": "50", + "wb_type": "auto", + "wd_gain": "50", + "wide_dynamic": "off", + "wtl_delay": "5", + "wtl_end_time": "21600", + "wtl_sensitivity": "4", + "wtl_sensitivity_day2night": "1400", + "wtl_sensitivity_night2day": "9100", + "wtl_start_time": "64800", + "wtl_type": "auto" + }, + "switch": { + "best_view_distance": "0", + "clear_licence_plate_mode": "off", + "flip_type": "off", + "full_color_min_keep_time": "5", + "full_color_people_enhance": "off", + "image_scene_mode": "normal", + "image_scene_mode_autoday": "normal", + "image_scene_mode_autonight": "normal", + "image_scene_mode_common": "normal", + "image_scene_mode_shedday": "normal", + "image_scene_mode_shednight": "normal", + "ldc": "off", + "night_vision_mode": "inf_night_vision", + "overexposure_people_suppression": "off", + "rotate_type": "off", + "schedule_end_time": "64800", + "schedule_start_time": "21600", + "switch_mode": "common", + "wtl_force_time": "300", + "wtl_intensity_level": "5", + "wtl_manual_start_flag": "off" + } + } + }, + "getLedStatus": { + "led": { + "config": { + "enabled": "on" + } + } + }, + "getLensMaskConfig": { + "lens_mask": { + "lens_mask_info": { + "enabled": "off" + } + } + }, + "getLightFrequencyInfo": { + "image": { + "common": { + "area_compensation": "default", + "auto_exp_antiflicker": "off", + "auto_exp_gain_max": "0", + "backlight": "off", + "chroma": "50", + "contrast": "50", + "dehaze": "off", + "eis": "off", + "exp_gain": "0", + "exp_level": "0", + "exp_type": "auto", + "focus_limited": "10", + "focus_type": "manual", + "high_light_compensation": "off", + "inf_delay": "5", + "inf_end_time": "21600", + "inf_sensitivity": "1", + "inf_sensitivity_day2night": "1400", + "inf_sensitivity_night2day": "9100", + "inf_start_time": "64800", + "inf_type": "auto", + "iris_level": "160", + "light_freq_mode": "auto", + "lock_blue_colton": "0", + "lock_blue_gain": "0", + "lock_gb_gain": "0", + "lock_gr_gain": "0", + "lock_green_colton": "0", + "lock_red_colton": "0", + "lock_red_gain": "0", + "lock_source": "local", + "luma": "50", + "saturation": "50", + "sharpness": "50", + "shutter": "1/25", + "smartir": "off", + "smartir_level": "100", + "smartwtl": "auto_wtl", + "smartwtl_digital_level": "100", + "smartwtl_level": "5", + "style": "standard", + "wb_B_gain": "50", + "wb_G_gain": "50", + "wb_R_gain": "50", + "wb_type": "auto", + "wd_gain": "50", + "wide_dynamic": "off", + "wtl_delay": "5", + "wtl_end_time": "21600", + "wtl_sensitivity": "4", + "wtl_sensitivity_day2night": "1400", + "wtl_sensitivity_night2day": "9100", + "wtl_start_time": "64800", + "wtl_type": "auto" + } + } + }, + "getMediaEncrypt": { + "cet": { + "media_encrypt": { + "enabled": "on" + } + } + }, + "getMsgPushConfig": { + "msg_push": { + "chn1_msg_push_info": { + "notification_enabled": "on", + "rich_notification_enabled": "off" + } + } + }, + "getNightVisionCapability": { + "image_capability": { + "supplement_lamp": { + "night_vision_mode_range": [ + "inf_night_vision" + ], + "supplement_lamp_type": [ + "infrared_lamp" + ] + } + } + }, + "getNightVisionModeConfig": { + "image": { + "switch": { + "best_view_distance": "0", + "clear_licence_plate_mode": "off", + "flip_type": "off", + "full_color_min_keep_time": "5", + "full_color_people_enhance": "off", + "image_scene_mode": "normal", + "image_scene_mode_autoday": "normal", + "image_scene_mode_autonight": "normal", + "image_scene_mode_common": "normal", + "image_scene_mode_shedday": "normal", + "image_scene_mode_shednight": "normal", + "ldc": "off", + "night_vision_mode": "inf_night_vision", + "overexposure_people_suppression": "off", + "rotate_type": "off", + "schedule_end_time": "64800", + "schedule_start_time": "21600", + "switch_mode": "common", + "wtl_force_time": "300", + "wtl_intensity_level": "5", + "wtl_manual_start_flag": "off" + } + } + }, + "getPersonDetectionConfig": { + "people_detection": { + "detection": { + "enabled": "on", + "sensitivity": "50" + } + } + }, + "getPresetConfig": { + "preset": { + "preset": { + "id": [], + "name": [], + "position_pan": [], + "position_tilt": [], + "position_zoom": [], + "read_only": [] + } + } + }, + "getRecordPlan": { + "record_plan": { + "chn1_channel": { + "enabled": "on", + "friday": "[\"0000-2400:2\"]", + "monday": "[\"0000-2400:2\"]", + "saturday": "[\"0000-2400:2\"]", + "sunday": "[\"0000-2400:2\"]", + "thursday": "[\"0000-2400:2\"]", + "tuesday": "[\"0000-2400:2\"]", + "wednesday": "[\"0000-2400:2\"]" + } + } + }, + "getRotationStatus": { + "image": { + "switch": { + "best_view_distance": "0", + "clear_licence_plate_mode": "off", + "flip_type": "off", + "full_color_min_keep_time": "5", + "full_color_people_enhance": "off", + "image_scene_mode": "normal", + "image_scene_mode_autoday": "normal", + "image_scene_mode_autonight": "normal", + "image_scene_mode_common": "normal", + "image_scene_mode_shedday": "normal", + "image_scene_mode_shednight": "normal", + "ldc": "off", + "night_vision_mode": "inf_night_vision", + "overexposure_people_suppression": "off", + "rotate_type": "off", + "schedule_end_time": "64800", + "schedule_start_time": "21600", + "switch_mode": "common", + "wtl_force_time": "300", + "wtl_intensity_level": "5", + "wtl_manual_start_flag": "off" + } + } + }, + "getSdCardStatus": { + "harddisk_manage": { + "hd_info": [ + { + "hd_info_1": { + "crossline_free_space": "0B", + "crossline_free_space_accurate": "0B", + "crossline_total_space": "0B", + "crossline_total_space_accurate": "0B", + "detect_status": "offline", + "disk_name": "1", + "free_space": "0B", + "free_space_accurate": "0B", + "loop_record_status": "0", + "msg_push_free_space": "0B", + "msg_push_free_space_accurate": "0B", + "msg_push_total_space": "0B", + "msg_push_total_space_accurate": "0B", + "percent": "0", + "picture_free_space": "0B", + "picture_free_space_accurate": "0B", + "picture_total_space": "0B", + "picture_total_space_accurate": "0B", + "record_duration": "0", + "record_free_duration": "0", + "record_start_time": "0", + "rw_attr": "r", + "status": "offline", + "total_space": "0B", + "total_space_accurate": "0B", + "type": "local", + "video_free_space": "0B", + "video_free_space_accurate": "0B", + "video_total_space": "0B", + "video_total_space_accurate": "0B", + "write_protect": "0" + } + } + ] + } + }, + "getTamperDetectionConfig": { + "tamper_detection": { + "tamper_det": { + "digital_sensitivity": "50", + "enabled": "off", + "sensitivity": "medium" + } + } + }, + "getTargetTrackConfig": { + "target_track": { + "target_track_info": { + "back_time": "30", + "enabled": "off", + "track_mode": "pantilt", + "track_time": "0" + } + } + }, + "getVideoCapability": { + "video_capability": { + "main": { + "bitrate_types": [ + "cbr", + "vbr" + ], + "bitrates": [ + "256", + "512", + "1024", + "1382", + "2048" + ], + "change_fps_support": "1", + "encode_types": [ + "H264", + "H265" + ], + "frame_rates": [ + "65551", + "65556", + "65561" + ], + "minor_stream_support": "0", + "qualitys": [ + "1", + "3", + "5" + ], + "resolutions": [ + "2304*1296", + "1920*1080", + "1280*720" + ] + } + } + }, + "getVideoQualities": { + "video": { + "main": { + "bitrate": "1382", + "bitrate_type": "vbr", + "default_bitrate": "1382", + "encode_type": "H264", + "frame_rate": "65551", + "name": "VideoEncoder_1", + "quality": "3", + "resolution": "1920*1080", + "smart_codec": "off" + } + } + }, + "getWhitelampConfig": { + "image": { + "switch": { + "best_view_distance": "0", + "clear_licence_plate_mode": "off", + "flip_type": "off", + "full_color_min_keep_time": "5", + "full_color_people_enhance": "off", + "image_scene_mode": "normal", + "image_scene_mode_autoday": "normal", + "image_scene_mode_autonight": "normal", + "image_scene_mode_common": "normal", + "image_scene_mode_shedday": "normal", + "image_scene_mode_shednight": "normal", + "ldc": "off", + "night_vision_mode": "inf_night_vision", + "overexposure_people_suppression": "off", + "rotate_type": "off", + "schedule_end_time": "64800", + "schedule_start_time": "21600", + "switch_mode": "common", + "wtl_force_time": "300", + "wtl_intensity_level": "5", + "wtl_manual_start_flag": "off" + } + } + }, + "getWhitelampStatus": { + "rest_time": 0, + "status": 0 + } +}