Merge branch 'master' into feat/smartcam_passthrough

This commit is contained in:
Steven B.
2024-12-11 14:48:04 +00:00
committed by GitHub
155 changed files with 2092 additions and 1491 deletions

View File

@@ -14,9 +14,17 @@ from kasa import (
Discover,
UnsupportedDeviceError,
)
from kasa.discover import ConnectAttempt, DiscoveryResult
from kasa.discover import (
NEW_DISCOVERY_REDACTORS,
ConnectAttempt,
DiscoveredRaw,
DiscoveryResult,
)
from kasa.iot.iotdevice import _extract_sys_info
from kasa.protocols.iotprotocol import REDACTORS as IOT_REDACTORS
from kasa.protocols.protocol import redact_data
from ..json import dumps as json_dumps
from .common import echo, error
@@ -64,7 +72,9 @@ async def detail(ctx):
await ctx.parent.invoke(state)
echo()
discovered = await _discover(ctx, print_discovered, print_unsupported)
discovered = await _discover(
ctx, print_discovered=print_discovered, print_unsupported=print_unsupported
)
if ctx.parent.parent.params["host"]:
return discovered
@@ -77,6 +87,33 @@ async def detail(ctx):
return discovered
@discover.command()
@click.option(
"--redact/--no-redact",
default=False,
is_flag=True,
type=bool,
help="Set flag to redact sensitive data from raw output.",
)
@click.pass_context
async def raw(ctx, redact: bool):
"""Return raw discovery data returned from devices."""
def print_raw(discovered: DiscoveredRaw):
if redact:
redactors = (
NEW_DISCOVERY_REDACTORS
if discovered["meta"]["port"] == Discover.DISCOVERY_PORT_2
else IOT_REDACTORS
)
discovered["discovery_response"] = redact_data(
discovered["discovery_response"], redactors
)
echo(json_dumps(discovered, indent=True))
return await _discover(ctx, print_raw=print_raw, do_echo=False)
@discover.command()
@click.pass_context
async def list(ctx):
@@ -102,10 +139,17 @@ async def list(ctx):
echo(f"{host:<15} UNSUPPORTED DEVICE")
echo(f"{'HOST':<15} {'DEVICE FAMILY':<20} {'ENCRYPT':<7} {'ALIAS'}")
return await _discover(ctx, print_discovered, print_unsupported, do_echo=False)
return await _discover(
ctx,
print_discovered=print_discovered,
print_unsupported=print_unsupported,
do_echo=False,
)
async def _discover(ctx, print_discovered, print_unsupported, *, do_echo=True):
async def _discover(
ctx, *, print_discovered=None, print_unsupported=None, print_raw=None, do_echo=True
):
params = ctx.parent.parent.params
target = params["target"]
username = params["username"]
@@ -126,6 +170,7 @@ async def _discover(ctx, print_discovered, print_unsupported, *, do_echo=True):
timeout=timeout,
discovery_timeout=discovery_timeout,
on_unsupported=print_unsupported,
on_discovered_raw=print_raw,
)
if do_echo:
echo(f"Discovering devices on {target} for {discovery_timeout} seconds")
@@ -137,6 +182,7 @@ async def _discover(ctx, print_discovered, print_unsupported, *, do_echo=True):
port=port,
timeout=timeout,
credentials=credentials,
on_discovered_raw=print_raw,
)
for device in discovered_devices.values():

View File

@@ -99,6 +99,7 @@ from typing import (
Annotated,
Any,
NamedTuple,
TypedDict,
cast,
)
@@ -147,18 +148,43 @@ class ConnectAttempt(NamedTuple):
device: type
class DiscoveredMeta(TypedDict):
"""Meta info about discovery response."""
ip: str
port: int
class DiscoveredRaw(TypedDict):
"""Try to connect attempt."""
meta: DiscoveredMeta
discovery_response: dict
OnDiscoveredCallable = Callable[[Device], Coroutine]
OnDiscoveredRawCallable = Callable[[DiscoveredRaw], None]
OnUnsupportedCallable = Callable[[UnsupportedDeviceError], Coroutine]
OnConnectAttemptCallable = Callable[[ConnectAttempt, bool], None]
DeviceDict = dict[str, Device]
DECRYPTED_REDACTORS: dict[str, Callable[[Any], Any] | None] = {
"connect_ssid": lambda x: "#MASKED_SSID#" if x else "",
"device_id": lambda x: "REDACTED_" + x[9::],
"owner": lambda x: "REDACTED_" + x[9::],
}
NEW_DISCOVERY_REDACTORS: dict[str, Callable[[Any], Any] | None] = {
"device_id": lambda x: "REDACTED_" + x[9::],
"device_name": lambda x: "#MASKED_NAME#" if x else "",
"owner": lambda x: "REDACTED_" + x[9::],
"mac": mask_mac,
"master_device_id": lambda x: "REDACTED_" + x[9::],
"group_id": lambda x: "REDACTED_" + x[9::],
"group_name": lambda x: "I01BU0tFRF9TU0lEIw==",
"encrypt_info": lambda x: {**x, "key": "", "data": ""},
"ip": lambda x: x, # don't redact but keep listed here for dump_devinfo
"decrypted_data": lambda x: redact_data(x, DECRYPTED_REDACTORS),
}
@@ -216,6 +242,7 @@ class _DiscoverProtocol(asyncio.DatagramProtocol):
self,
*,
on_discovered: OnDiscoveredCallable | None = None,
on_discovered_raw: OnDiscoveredRawCallable | None = None,
target: str = "255.255.255.255",
discovery_packets: int = 3,
discovery_timeout: int = 5,
@@ -240,6 +267,7 @@ class _DiscoverProtocol(asyncio.DatagramProtocol):
self.unsupported_device_exceptions: dict = {}
self.invalid_device_exceptions: dict = {}
self.on_unsupported = on_unsupported
self.on_discovered_raw = on_discovered_raw
self.credentials = credentials
self.timeout = timeout
self.discovery_timeout = discovery_timeout
@@ -329,12 +357,23 @@ class _DiscoverProtocol(asyncio.DatagramProtocol):
config.timeout = self.timeout
try:
if port == self.discovery_port:
device = Discover._get_device_instance_legacy(data, config)
json_func = Discover._get_discovery_json_legacy
device_func = Discover._get_device_instance_legacy
elif port == Discover.DISCOVERY_PORT_2:
config.uses_http = True
device = Discover._get_device_instance(data, config)
json_func = Discover._get_discovery_json
device_func = Discover._get_device_instance
else:
return
info = json_func(data, ip)
if self.on_discovered_raw is not None:
self.on_discovered_raw(
{
"discovery_response": info,
"meta": {"ip": ip, "port": port},
}
)
device = device_func(info, config)
except UnsupportedDeviceError as udex:
_LOGGER.debug("Unsupported device found at %s << %s", ip, udex)
self.unsupported_device_exceptions[ip] = udex
@@ -391,6 +430,7 @@ class Discover:
*,
target: str = "255.255.255.255",
on_discovered: OnDiscoveredCallable | None = None,
on_discovered_raw: OnDiscoveredRawCallable | None = None,
discovery_timeout: int = 5,
discovery_packets: int = 3,
interface: str | None = None,
@@ -421,6 +461,8 @@ class Discover:
:param target: The target address where to send the broadcast discovery
queries if multi-homing (e.g. 192.168.xxx.255).
:param on_discovered: coroutine to execute on discovery
:param on_discovered_raw: Optional callback once discovered json is loaded
before any attempt to deserialize it and create devices
:param discovery_timeout: Seconds to wait for responses, defaults to 5
:param discovery_packets: Number of discovery packets to broadcast
:param interface: Bind to specific interface
@@ -443,6 +485,7 @@ class Discover:
discovery_packets=discovery_packets,
interface=interface,
on_unsupported=on_unsupported,
on_discovered_raw=on_discovered_raw,
credentials=credentials,
timeout=timeout,
discovery_timeout=discovery_timeout,
@@ -476,6 +519,7 @@ class Discover:
credentials: Credentials | None = None,
username: str | None = None,
password: str | None = None,
on_discovered_raw: OnDiscoveredRawCallable | None = None,
on_unsupported: OnUnsupportedCallable | None = None,
) -> Device | None:
"""Discover a single device by the given IP address.
@@ -493,6 +537,9 @@ class Discover:
username and password are ignored if provided.
:param username: Username for devices that require authentication
:param password: Password for devices that require authentication
:param on_discovered_raw: Optional callback once discovered json is loaded
before any attempt to deserialize it and create devices
:param on_unsupported: Optional callback when unsupported devices are discovered
:rtype: SmartDevice
:return: Object for querying/controlling found device.
"""
@@ -529,6 +576,7 @@ class Discover:
credentials=credentials,
timeout=timeout,
discovery_timeout=discovery_timeout,
on_discovered_raw=on_discovered_raw,
),
local_addr=("0.0.0.0", 0), # noqa: S104
)
@@ -666,15 +714,19 @@ class Discover:
return get_device_class_from_sys_info(info)
@staticmethod
def _get_device_instance_legacy(data: bytes, config: DeviceConfig) -> IotDevice:
"""Get SmartDevice from legacy 9999 response."""
def _get_discovery_json_legacy(data: bytes, ip: str) -> dict:
"""Get discovery json from legacy 9999 response."""
try:
info = json_loads(XorEncryption.decrypt(data))
except Exception as ex:
raise KasaException(
f"Unable to read response from device: {config.host}: {ex}"
f"Unable to read response from device: {ip}: {ex}"
) from ex
return info
@staticmethod
def _get_device_instance_legacy(info: dict, config: DeviceConfig) -> Device:
"""Get IotDevice from legacy 9999 response."""
if _LOGGER.isEnabledFor(logging.DEBUG):
data = redact_data(info, IOT_REDACTORS) if Discover._redact_data else info
_LOGGER.debug("[DISCOVERY] %s << %s", config.host, pf(data))
@@ -698,6 +750,7 @@ class Discover:
@staticmethod
def _decrypt_discovery_data(discovery_result: DiscoveryResult) -> None:
debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
if TYPE_CHECKING:
assert discovery_result.encrypt_info
assert _AesDiscoveryQuery.keypair
@@ -713,22 +766,39 @@ class Discover:
session = AesEncyptionSession(key, iv)
decrypted_data = session.decrypt(encrypted_data)
discovery_result.decrypted_data = json_loads(decrypted_data)
result = json_loads(decrypted_data)
if debug_enabled:
data = (
redact_data(result, DECRYPTED_REDACTORS)
if Discover._redact_data
else result
)
_LOGGER.debug(
"Decrypted encrypt_info for %s: %s",
discovery_result.ip,
pf(data),
)
discovery_result.decrypted_data = result
@staticmethod
def _get_discovery_json(data: bytes, ip: str) -> dict:
"""Get discovery json from the new 20002 response."""
try:
info = json_loads(data[16:])
except Exception as ex:
_LOGGER.debug("Got invalid response from device %s: %s", ip, data)
raise KasaException(
f"Unable to read response from device: {ip}: {ex}"
) from ex
return info
@staticmethod
def _get_device_instance(
data: bytes,
info: dict,
config: DeviceConfig,
) -> Device:
"""Get SmartDevice from the new 20002 response."""
debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
try:
info = json_loads(data[16:])
except Exception as ex:
_LOGGER.debug("Got invalid response from device %s: %s", config.host, data)
raise KasaException(
f"Unable to read response from device: {config.host}: {ex}"
) from ex
try:
discovery_result = DiscoveryResult.from_dict(info["result"])
@@ -757,7 +827,9 @@ class Discover:
Discover._decrypt_discovery_data(discovery_result)
except Exception:
_LOGGER.exception(
"Unable to decrypt discovery data %s: %s", config.host, data
"Unable to decrypt discovery data %s: %s",
config.host,
redact_data(info, NEW_DISCOVERY_REDACTORS),
)
type_ = discovery_result.device_type

View File

@@ -24,7 +24,6 @@ State (state): True
Signal Level (signal_level): 2
RSSI (rssi): -52
SSID (ssid): #MASKED_SSID#
Overheated (overheated): False
Reboot (reboot): <Action>
Brightness (brightness): 100
Cloud connection (cloud_connection): True
@@ -39,6 +38,7 @@ Light effect (light_effect): Off
Light preset (light_preset): Not set
Smooth transition on (smooth_transition_on): 2
Smooth transition off (smooth_transition_off): 2
Overheated (overheated): False
Device time (device_time): 2024-02-23 02:40:15+01:00
To see whether a device supports a feature, check for the existence of it:

View File

@@ -8,18 +8,24 @@ from typing import Any
try:
import orjson
def dumps(obj: Any, *, default: Callable | None = None) -> str:
def dumps(
obj: Any, *, default: Callable | None = None, indent: bool = False
) -> str:
"""Dump JSON."""
return orjson.dumps(obj).decode()
return orjson.dumps(
obj, option=orjson.OPT_INDENT_2 if indent else None
).decode()
loads = orjson.loads
except ImportError:
import json
def dumps(obj: Any, *, default: Callable | None = None) -> str:
def dumps(
obj: Any, *, default: Callable | None = None, indent: bool = False
) -> str:
"""Dump JSON."""
# Separators specified for consistency with orjson
return json.dumps(obj, separators=(",", ":"))
return json.dumps(obj, separators=(",", ":"), indent=2 if indent else None)
loads = json.loads

View File

@@ -25,19 +25,35 @@ if TYPE_CHECKING:
_LOGGER = logging.getLogger(__name__)
def _mask_children(children: list[dict[str, Any]]) -> list[dict[str, Any]]:
def mask_child(child: dict[str, Any], index: int) -> dict[str, Any]:
result = {
**child,
"id": f"SCRUBBED_CHILD_DEVICE_ID_{index+1}",
}
# Will leave empty aliases as blank
if child.get("alias"):
result["alias"] = f"#MASKED_NAME# {index + 1}"
return result
return [mask_child(child, index) for index, child in enumerate(children)]
REDACTORS: dict[str, Callable[[Any], Any] | None] = {
"latitude": lambda x: 0,
"longitude": lambda x: 0,
"latitude_i": lambda x: 0,
"longitude_i": lambda x: 0,
"deviceId": lambda x: "REDACTED_" + x[9::],
"id": lambda x: "REDACTED_" + x[9::],
"children": _mask_children,
"alias": lambda x: "#MASKED_NAME#" if x else "",
"mac": mask_mac,
"mic_mac": mask_mac,
"ssid": lambda x: "#MASKED_SSID#" if x else "",
"oemId": lambda x: "REDACTED_" + x[9::],
"username": lambda _: "user@example.com", # cnCloud
"hwId": lambda x: "REDACTED_" + x[9::],
}

View File

@@ -66,6 +66,8 @@ def redact_data(data: _T, redactors: dict[str, Callable[[Any], Any] | None]) ->
def mask_mac(mac: str) -> str:
"""Return mac address with last two octects blanked."""
if len(mac) == 12:
return f"{mac[:6]}000000"
delim = ":" if ":" in mac else "-"
rest = delim.join(format(s, "02x") for s in bytes.fromhex("000000"))
return f"{mac[:8]}{delim}{rest}"

View File

@@ -9,6 +9,7 @@ from __future__ import annotations
import asyncio
import base64
import logging
import re
import time
import uuid
from collections.abc import Callable
@@ -45,15 +46,27 @@ REDACTORS: dict[str, Callable[[Any], Any] | None] = {
"original_device_id": lambda x: "REDACTED_" + x[9::], # Strip children
"nickname": lambda x: "I01BU0tFRF9OQU1FIw==" if x else "",
"mac": mask_mac,
"ssid": lambda x: "I01BU0tFRF9TU0lEIw=" if x else "",
"ssid": lambda x: "I01BU0tFRF9TU0lEIw==" if x else "",
"bssid": lambda _: "000000000000",
"channel": lambda _: 0,
"oem_id": lambda x: "REDACTED_" + x[9::],
"setup_code": None, # matter
"setup_payload": None, # matter
"mfi_setup_code": None, # mfi_ for homekit
"mfi_setup_id": None,
"mfi_token_token": None,
"mfi_token_uuid": None,
"setup_code": lambda x: re.sub(r"\w", "0", x), # matter
"setup_payload": lambda x: re.sub(r"\w", "0", x), # matter
"mfi_setup_code": lambda x: re.sub(r"\w", "0", x), # mfi_ for homekit
"mfi_setup_id": lambda x: re.sub(r"\w", "0", x),
"mfi_token_token": lambda x: re.sub(r"\w", "0", x),
"mfi_token_uuid": lambda x: re.sub(r"\w", "0", x),
"ip": lambda x: x, # don't redact but keep listed here for dump_devinfo
# smartcam
"dev_id": lambda x: "REDACTED_" + x[9::],
"device_name": lambda x: "#MASKED_NAME#" if x else "",
"device_alias": lambda x: "#MASKED_NAME#" if x else "",
"local_ip": lambda x: x, # don't redact but keep listed here for dump_devinfo
# robovac
"board_sn": lambda _: "000000000000",
"custom_sn": lambda _: "000000000000",
"location": lambda x: "#MASKED_NAME#" if x else "",
"map_data": lambda x: "#SCRUBBED_MAPDATA#" if x else "",
}

View File

@@ -24,6 +24,7 @@ from .lightpreset import LightPreset
from .lightstripeffect import LightStripEffect
from .lighttransition import LightTransition
from .motionsensor import MotionSensor
from .overheatprotection import OverheatProtection
from .reportmode import ReportMode
from .temperaturecontrol import TemperatureControl
from .temperaturesensor import TemperatureSensor
@@ -64,4 +65,5 @@ __all__ = [
"FrostProtection",
"Thermostat",
"SmartLightEffect",
"OverheatProtection",
]

View File

@@ -10,7 +10,7 @@ class ContactSensor(SmartModule):
"""Implementation of contact sensor module."""
REQUIRED_COMPONENT = None # we depend on availability of key
REQUIRED_KEY_ON_PARENT = "open"
SYSINFO_LOOKUP_KEYS = ["open"]
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""

View File

@@ -0,0 +1,41 @@
"""Overheat module."""
from __future__ import annotations
from ...feature import Feature
from ..smartmodule import SmartModule
class OverheatProtection(SmartModule):
"""Implementation for overheat_protection."""
SYSINFO_LOOKUP_KEYS = ["overheated", "overheat_status"]
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
container=self,
id="overheated",
name="Overheated",
attribute_getter="overheated",
icon="mdi:heat-wave",
type=Feature.Type.BinarySensor,
category=Feature.Category.Info,
)
)
@property
def overheated(self) -> bool:
"""Return True if device reports overheating."""
if (value := self._device.sys_info.get("overheat_status")) is not None:
# Value can be normal, cooldown, or overheated.
# We report all but normal as overheated.
return value != "normal"
return self._device.sys_info["overheated"]
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {}

View File

@@ -9,7 +9,7 @@ from typing import Any
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..protocols.smartprotocol import SmartProtocol, _ChildProtocolWrapper
from .smartdevice import SmartDevice
from .smartdevice import ComponentsRaw, SmartDevice
from .smartmodule import SmartModule
_LOGGER = logging.getLogger(__name__)
@@ -37,7 +37,7 @@ class SmartChildDevice(SmartDevice):
self,
parent: SmartDevice,
info: dict,
component_info: dict,
component_info_raw: ComponentsRaw,
*,
config: DeviceConfig | None = None,
protocol: SmartProtocol | None = None,
@@ -47,7 +47,8 @@ class SmartChildDevice(SmartDevice):
super().__init__(parent.host, config=parent.config, protocol=_protocol)
self._parent = parent
self._update_internal_state(info)
self._components = component_info
self._components_raw = component_info_raw
self._components = self._parse_components(self._components_raw)
async def update(self, update_children: bool = True) -> None:
"""Update child module info.
@@ -84,7 +85,7 @@ class SmartChildDevice(SmartDevice):
cls,
parent: SmartDevice,
child_info: dict,
child_components: dict,
child_components_raw: ComponentsRaw,
protocol: SmartProtocol | None = None,
*,
last_update: dict | None = None,
@@ -97,7 +98,7 @@ class SmartChildDevice(SmartDevice):
derived from the parent.
"""
child: SmartChildDevice = cls(
parent, child_info, child_components, protocol=protocol
parent, child_info, child_components_raw, protocol=protocol
)
if last_update:
child._last_update = last_update

View File

@@ -7,7 +7,7 @@ import logging
import time
from collections.abc import Mapping, Sequence
from datetime import UTC, datetime, timedelta, tzinfo
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any, TypeAlias, cast
from ..device import Device, WifiNetwork, _DeviceInfo
from ..device_type import DeviceType
@@ -40,6 +40,8 @@ _LOGGER = logging.getLogger(__name__)
# same issue, homekit perhaps?
NON_HUB_PARENT_ONLY_MODULES = [DeviceModule, Time, Firmware, Cloud]
ComponentsRaw: TypeAlias = dict[str, list[dict[str, int | str]]]
# Device must go last as the other interfaces also inherit Device
# and python needs a consistent method resolution order.
@@ -61,7 +63,7 @@ class SmartDevice(Device):
)
super().__init__(host=host, config=config, protocol=_protocol)
self.protocol: SmartProtocol
self._components_raw: dict[str, Any] | None = None
self._components_raw: ComponentsRaw | None = None
self._components: dict[str, int] = {}
self._state_information: dict[str, Any] = {}
self._modules: dict[str | ModuleName[Module], SmartModule] = {}
@@ -82,10 +84,8 @@ class SmartDevice(Device):
self.internal_state.update(resp)
children = self.internal_state["get_child_device_list"]["child_device_list"]
children_components = {
child["device_id"]: {
comp["id"]: int(comp["ver_code"]) for comp in child["component_list"]
}
children_components_raw = {
child["device_id"]: child
for child in self.internal_state["get_child_device_component_list"][
"child_component_list"
]
@@ -96,7 +96,7 @@ class SmartDevice(Device):
child_info["device_id"]: await SmartChildDevice.create(
parent=self,
child_info=child_info,
child_components=children_components[child_info["device_id"]],
child_components_raw=children_components_raw[child_info["device_id"]],
)
for child_info in children
}
@@ -131,6 +131,13 @@ class SmartDevice(Device):
f"{request} not found in {responses} for device {self.host}"
)
@staticmethod
def _parse_components(components_raw: ComponentsRaw) -> dict[str, int]:
return {
str(comp["id"]): int(comp["ver_code"])
for comp in components_raw["component_list"]
}
async def _negotiate(self) -> None:
"""Perform initialization.
@@ -151,12 +158,9 @@ class SmartDevice(Device):
self._info = self._try_get_response(resp, "get_device_info")
# Create our internal presentation of available components
self._components_raw = cast(dict, resp["component_nego"])
self._components_raw = cast(ComponentsRaw, resp["component_nego"])
self._components = {
comp["id"]: int(comp["ver_code"])
for comp in self._components_raw["component_list"]
}
self._components = self._parse_components(self._components_raw)
if "child_device" in self._components and not self.children:
await self._initialize_children()
@@ -349,9 +353,8 @@ class SmartDevice(Device):
) or mod.__name__ in child_modules_to_skip:
continue
required_component = cast(str, mod.REQUIRED_COMPONENT)
if required_component in self._components or (
mod.REQUIRED_KEY_ON_PARENT
and self.sys_info.get(mod.REQUIRED_KEY_ON_PARENT) is not None
if required_component in self._components or any(
self.sys_info.get(key) is not None for key in mod.SYSINFO_LOOKUP_KEYS
):
_LOGGER.debug(
"Device %s, found required %s, adding %s to modules.",
@@ -440,19 +443,6 @@ class SmartDevice(Device):
)
)
if "overheated" in self._info:
self._add_feature(
Feature(
self,
id="overheated",
name="Overheated",
attribute_getter=lambda x: x._info["overheated"],
icon="mdi:heat-wave",
type=Feature.Type.BinarySensor,
category=Feature.Category.Info,
)
)
# We check for the key available, and not for the property truthiness,
# as the value is falsy when the device is off.
if "on_time" in self._info:

View File

@@ -54,8 +54,8 @@ class SmartModule(Module):
NAME: str
#: Module is initialized, if the given component is available
REQUIRED_COMPONENT: str | None = None
#: Module is initialized, if the given key available in the main sysinfo
REQUIRED_KEY_ON_PARENT: str | None = None
#: Module is initialized, if any of the given keys exists in the sysinfo
SYSINFO_LOOKUP_KEYS: list[str] = []
#: Query to execute during the main update cycle
QUERY_GETTER_NAME: str

View File

@@ -3,13 +3,14 @@
from __future__ import annotations
import logging
from typing import Any
from typing import Any, cast
from ..device import _DeviceInfo
from ..device_type import DeviceType
from ..module import Module
from ..protocols.smartcamprotocol import _ChildCameraProtocolWrapper
from ..smart import SmartChildDevice, SmartDevice
from ..smart.smartdevice import ComponentsRaw
from .modules import ChildDevice, DeviceModule
from .smartcammodule import SmartCamModule
@@ -78,7 +79,7 @@ class SmartCamDevice(SmartDevice):
self._children[child_id]._update_internal_state(info)
async def _initialize_smart_child(
self, info: dict, child_components: dict
self, info: dict, child_components_raw: ComponentsRaw
) -> SmartDevice:
"""Initialize a smart child device attached to a smartcam device."""
child_id = info["device_id"]
@@ -93,7 +94,7 @@ class SmartCamDevice(SmartDevice):
return await SmartChildDevice.create(
parent=self,
child_info=info,
child_components=child_components,
child_components_raw=child_components_raw,
protocol=child_protocol,
last_update=initial_response,
)
@@ -108,17 +109,8 @@ class SmartCamDevice(SmartDevice):
self.internal_state.update(resp)
smart_children_components = {
child["device_id"]: {
comp["id"]: int(comp["ver_code"]) for comp in component_list
}
child["device_id"]: child
for child in resp["getChildDeviceComponentList"]["child_component_list"]
if (component_list := child.get("component_list"))
# Child camera devices will have a different component schema so only
# extract smart values.
and (first_comp := next(iter(component_list), None))
and isinstance(first_comp, dict)
and "id" in first_comp
and "ver_code" in first_comp
}
children = {}
for info in resp["getChildDeviceList"]["child_device_list"]:
@@ -172,6 +164,13 @@ class SmartCamDevice(SmartDevice):
return res
@staticmethod
def _parse_components(components_raw: ComponentsRaw) -> dict[str, int]:
return {
str(comp["name"]): int(comp["version"])
for comp in components_raw["app_component_list"]
}
async def _negotiate(self) -> None:
"""Perform initialization.
@@ -186,12 +185,10 @@ class SmartCamDevice(SmartDevice):
self._last_update.update(resp)
self._update_internal_info(resp)
self._components = {
comp["name"]: int(comp["version"])
for comp in resp["getAppComponentList"]["app_component"][
"app_component_list"
]
}
self._components_raw = cast(
ComponentsRaw, resp["getAppComponentList"]["app_component"]
)
self._components = self._parse_components(self._components_raw)
if "childControl" in self._components and not self.children:
await self._initialize_children()