Add core device, child and camera modules to smartcamera (#1193)

Co-authored-by: Teemu R. <tpr@iki.fi>
This commit is contained in:
Steven B.
2024-10-24 17:22:45 +01:00
committed by GitHub
parent 8ee8c17bdc
commit 28361c1727
11 changed files with 427 additions and 47 deletions

View File

@@ -0,0 +1,11 @@
"""Modules for SMARTCAMERA devices."""
from .camera import Camera
from .childdevice import ChildDevice
from .device import DeviceModule
__all__ = [
"Camera",
"ChildDevice",
"DeviceModule",
]

View File

@@ -0,0 +1,45 @@
"""Implementation of device module."""
from __future__ import annotations
from ...device_type import DeviceType
from ...feature import Feature
from ..smartcameramodule import SmartCameraModule
class Camera(SmartCameraModule):
"""Implementation of device module."""
QUERY_GETTER_NAME = "getLensMaskConfig"
QUERY_MODULE_NAME = "lens_mask"
QUERY_SECTION_NAMES = "lens_mask_info"
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
id="state",
name="State",
attribute_getter="is_on",
attribute_setter="set_state",
type=Feature.Type.Switch,
category=Feature.Category.Primary,
)
)
@property
def is_on(self) -> bool:
"""Return the device id."""
return self.data["lens_mask_info"]["enabled"] == "on"
async def set_state(self, on: bool) -> dict:
"""Set the device state."""
params = {"enabled": "on" if on else "off"}
return await self._device._query_setter_helper(
"setLensMaskConfig", self.QUERY_MODULE_NAME, "lens_mask_info", params
)
async def _check_supported(self) -> bool:
"""Additional check to see if the module is supported by the device."""
return self._device.device_type is DeviceType.Camera

View File

@@ -0,0 +1,23 @@
"""Module for child devices."""
from ...device_type import DeviceType
from ..smartcameramodule import SmartCameraModule
class ChildDevice(SmartCameraModule):
"""Implementation for child devices."""
NAME = "childdevice"
QUERY_GETTER_NAME = "getChildDeviceList"
QUERY_MODULE_NAME = "childControl"
def query(self) -> dict:
"""Query to execute during the update cycle.
Default implementation uses the raw query getter w/o parameters.
"""
return {self.QUERY_GETTER_NAME: {self.QUERY_MODULE_NAME: {"start_index": 0}}}
async def _check_supported(self) -> bool:
"""Additional check to see if the module is supported by the device."""
return self._device.device_type is DeviceType.Hub

View File

@@ -0,0 +1,40 @@
"""Implementation of device module."""
from __future__ import annotations
from ...feature import Feature
from ..smartcameramodule import SmartCameraModule
class DeviceModule(SmartCameraModule):
"""Implementation of device module."""
NAME = "devicemodule"
QUERY_GETTER_NAME = "getDeviceInfo"
QUERY_MODULE_NAME = "device_info"
QUERY_SECTION_NAMES = ["basic_info", "info"]
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
id="device_id",
name="Device ID",
attribute_getter="device_id",
category=Feature.Category.Debug,
type=Feature.Type.Sensor,
)
)
async def _post_update_hook(self) -> None:
"""Overriden to prevent module disabling.
Overrides the default behaviour to disable a module if the query returns
an error because this module is critical.
"""
@property
def device_id(self) -> str:
"""Return the device id."""
return self.data["basic_info"]["dev_id"]

View File

@@ -2,16 +2,26 @@
from __future__ import annotations
import logging
from typing import Any
from ..device_type import DeviceType
from ..exceptions import SmartErrorCode
from ..smart import SmartDevice
from ..module import Module
from ..smart import SmartChildDevice, SmartDevice
from .modules.childdevice import ChildDevice
from .modules.device import DeviceModule
from .smartcameramodule import SmartCameraModule
from .smartcameraprotocol import _ChildCameraProtocolWrapper
_LOGGER = logging.getLogger(__name__)
class SmartCamera(SmartDevice):
"""Class for smart cameras."""
# Modules that are called as part of the init procedure on first update
FIRST_UPDATE_MODULES = {DeviceModule, ChildDevice}
@staticmethod
def _get_device_type_from_sysinfo(sysinfo: dict[str, Any]) -> DeviceType:
"""Find type to be displayed as a supported device category."""
@@ -20,17 +30,108 @@ class SmartCamera(SmartDevice):
return DeviceType.Hub
return DeviceType.Camera
async def update(self, update_children: bool = False):
"""Update the device."""
def _update_internal_info(self, info_resp: dict) -> None:
"""Update the internal device info."""
info = self._try_get_response(info_resp, "getDeviceInfo")
self._info = self._map_info(info["device_info"])
def _update_children_info(self) -> None:
"""Update the internal child device info from the parent info."""
if child_info := self._try_get_response(
self._last_update, "getChildDeviceList", {}
):
for info in child_info["child_device_list"]:
self._children[info["device_id"]]._update_internal_state(info)
async def _initialize_smart_child(self, info: dict) -> SmartDevice:
"""Initialize a smart child device attached to a smartcamera."""
child_id = info["device_id"]
child_protocol = _ChildCameraProtocolWrapper(child_id, self.protocol)
try:
initial_response = await child_protocol.query(
{"component_nego": None, "get_connect_cloud_state": None}
)
child_components = {
item["id"]: item["ver_code"]
for item in initial_response["component_nego"]["component_list"]
}
except Exception as ex:
_LOGGER.exception("Error initialising child %s: %s", child_id, ex)
return await SmartChildDevice.create(
parent=self,
child_info=info,
child_components=child_components,
protocol=child_protocol,
last_update=initial_response,
)
async def _initialize_children(self) -> None:
"""Initialize children for hubs."""
if not (
child_info := self._try_get_response(
self._last_update, "getChildDeviceList", {}
)
):
return
children = {}
for info in child_info["child_device_list"]:
if (
category := info.get("category")
) and category in SmartChildDevice.CHILD_DEVICE_TYPE_MAP:
child_id = info["device_id"]
children[child_id] = await self._initialize_smart_child(info)
else:
_LOGGER.debug("Child device type not supported: %s", info)
self._children = children
async def _initialize_modules(self) -> None:
"""Initialize modules based on component negotiation response."""
for mod in SmartCameraModule.REGISTERED_MODULES.values():
module = mod(self, mod._module_name())
if await module._check_supported():
self._modules[module.name] = module
async def _initialize_features(self) -> None:
"""Initialize device features."""
for module in self.modules.values():
module._initialize_features()
for feat in module._module_features.values():
self._add_feature(feat)
for child in self._children.values():
await child._initialize_features()
async def _query_setter_helper(
self, method: str, module: str, section: str, params: dict | None = None
) -> dict:
res = await self.protocol.query({method: {module: {section: params}}})
return res
async def _query_getter_helper(
self, method: str, module: str, sections: str | list[str]
) -> Any:
res = await self.protocol.query({method: {module: {"name": sections}}})
return res
async def _negotiate(self) -> None:
"""Perform initialization.
We fetch the device info and the available components as early as possible.
If the device reports supporting child devices, they are also initialized.
"""
initial_query = {
"getDeviceInfo": {"device_info": {"name": ["basic_info", "info"]}},
"getLensMaskConfig": {"lens_mask": {"name": ["lens_mask_info"]}},
"getChildDeviceList": {"childControl": {"start_index": 0}},
}
resp = await self.protocol.query(initial_query)
self._last_update.update(resp)
info = self._try_get_response(resp, "getDeviceInfo")
self._info = self._map_info(info["device_info"])
self._last_update = resp
self._update_internal_info(resp)
await self._initialize_children()
def _map_info(self, device_info: dict) -> dict:
basic_info = device_info["basic_info"]
@@ -48,25 +149,17 @@ class SmartCamera(SmartDevice):
@property
def is_on(self) -> bool:
"""Return true if the device is on."""
if isinstance(self._last_update["getLensMaskConfig"], SmartErrorCode):
return True
return (
self._last_update["getLensMaskConfig"]["lens_mask"]["lens_mask_info"][
"enabled"
]
== "on"
)
if (camera := self.modules.get(Module.Camera)) and not camera.disabled:
return camera.is_on
async def set_state(self, on: bool):
return True
async def set_state(self, on: bool) -> dict:
"""Set the device state."""
if isinstance(self._last_update["getLensMaskConfig"], SmartErrorCode):
return
query = {
"setLensMaskConfig": {
"lens_mask": {"lens_mask_info": {"enabled": "on" if on else "off"}}
},
}
return await self.protocol.query(query)
if (camera := self.modules.get(Module.Camera)) and not camera.disabled:
return await camera.set_state(on)
return {}
@property
def device_type(self) -> DeviceType:
@@ -82,6 +175,14 @@ class SmartCamera(SmartDevice):
return self._info.get("alias")
return None
async def set_alias(self, alias: str) -> dict:
"""Set the device name (alias)."""
return await self.protocol.query(
{
"setDeviceAlias": {"system": {"sys": {"dev_alias": alias}}},
}
)
@property
def hw_info(self) -> dict:
"""Return hardware info for the device."""

View File

@@ -0,0 +1,96 @@
"""Base implementation for SMART modules."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from ..exceptions import DeviceError, KasaException, SmartErrorCode
from ..smart.smartmodule import SmartModule
if TYPE_CHECKING:
from .smartcamera import SmartCamera
_LOGGER = logging.getLogger(__name__)
class SmartCameraModule(SmartModule):
"""Base class for SMARTCAMERA modules."""
#: Query to execute during the main update cycle
QUERY_GETTER_NAME: str
#: Module name to be queried
QUERY_MODULE_NAME: str
#: Section name or names to be queried
QUERY_SECTION_NAMES: str | list[str]
REGISTERED_MODULES = {}
_device: SmartCamera
def query(self) -> dict:
"""Query to execute during the update cycle.
Default implementation uses the raw query getter w/o parameters.
"""
return {
self.QUERY_GETTER_NAME: {
self.QUERY_MODULE_NAME: {"name": self.QUERY_SECTION_NAMES}
}
}
async def call(self, method: str, params: dict | None = None) -> dict:
"""Call a method.
Just a helper method.
"""
if params:
module = next(iter(params))
section = next(iter(params[module]))
else:
module = "system"
section = "null"
if method[:3] == "get":
return await self._device._query_getter_helper(method, module, section)
return await self._device._query_setter_helper(method, module, section, params)
@property
def data(self) -> dict:
"""Return response data for the module."""
dev = self._device
q = self.query()
if not q:
return dev.sys_info
if len(q) == 1:
query_resp = dev._last_update.get(self.QUERY_GETTER_NAME, {})
if isinstance(query_resp, SmartErrorCode):
raise DeviceError(
f"Error accessing module data in {self._module}",
error_code=SmartErrorCode,
)
if not query_resp:
raise KasaException(
f"You need to call update() prior accessing module data"
f" for '{self._module}'"
)
return query_resp.get(self.QUERY_MODULE_NAME)
else:
found = {key: val for key, val in dev._last_update.items() if key in q}
for key in q:
if key not in found:
raise KasaException(
f"{key} not found, you need to call update() prior accessing"
f" module data for '{self._module}'"
)
if isinstance(found[key], SmartErrorCode):
raise DeviceError(
f"Error accessing module data {key} in {self._module}",
error_code=SmartErrorCode,
)
return found