Add childsetup module to smartcam hubs (#1469)

Add the `childsetup` module for `smartcam` hubs to allow pairing and unpairing child devices.
This commit is contained in:
Steven B. 2025-01-23 09:42:37 +00:00 committed by GitHub
parent bd43e0f7d2
commit 5e57f8bd6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 278 additions and 39 deletions

View File

@ -48,7 +48,10 @@ class ChildSetup(SmartModule):
detected = await self._get_detected_devices()
if not detected["child_device_list"]:
_LOGGER.info("No devices found.")
_LOGGER.warning(
"No devices found, make sure to activate pairing "
"mode on the devices to be added."
)
return []
_LOGGER.info(
@ -63,7 +66,7 @@ class ChildSetup(SmartModule):
async def unpair(self, device_id: str) -> dict:
"""Remove device from the hub."""
_LOGGER.debug("Going to unpair %s from %s", device_id, self)
_LOGGER.info("Going to unpair %s from %s", device_id, self)
payload = {"child_device_list": [{"device_id": device_id}]}
return await self.call("remove_child_device_list", payload)

View File

@ -691,12 +691,8 @@ class SmartDevice(Device):
"""
self._info = info
async def _query_helper(
self, method: str, params: dict | None = None, child_ids: None = None
) -> dict:
res = await self.protocol.query({method: params})
return res
async def _query_helper(self, method: str, params: dict | None = None) -> dict:
return await self.protocol.query({method: params})
@property
def ssid(self) -> str:

View File

@ -5,6 +5,7 @@ from .babycrydetection import BabyCryDetection
from .battery import Battery
from .camera import Camera
from .childdevice import ChildDevice
from .childsetup import ChildSetup
from .device import DeviceModule
from .homekit import HomeKit
from .led import Led
@ -23,6 +24,7 @@ __all__ = [
"Battery",
"Camera",
"ChildDevice",
"ChildSetup",
"DeviceModule",
"Led",
"PanTilt",

View File

@ -0,0 +1,107 @@
"""Implementation for child device setup.
This module allows pairing and disconnecting child devices.
"""
from __future__ import annotations
import asyncio
import logging
from ...feature import Feature
from ..smartcammodule import SmartCamModule
_LOGGER = logging.getLogger(__name__)
class ChildSetup(SmartCamModule):
"""Implementation for child device setup."""
REQUIRED_COMPONENT = "childQuickSetup"
QUERY_GETTER_NAME = "getSupportChildDeviceCategory"
QUERY_MODULE_NAME = "childControl"
_categories: list[str] = []
def _initialize_features(self) -> None:
"""Initialize features."""
self._add_feature(
Feature(
self._device,
id="pair",
name="Pair",
container=self,
attribute_setter="pair",
category=Feature.Category.Config,
type=Feature.Type.Action,
)
)
async def _post_update_hook(self) -> None:
if not self._categories:
self._categories = [
cat["category"].replace("ipcamera", "camera")
for cat in self.data["device_category_list"]
]
@property
def supported_child_device_categories(self) -> list[str]:
"""Supported child device categories."""
return self._categories
async def pair(self, *, timeout: int = 10) -> list[dict]:
"""Scan for new devices and pair after discovering first new device."""
await self.call(
"startScanChildDevice", {"childControl": {"category": self._categories}}
)
_LOGGER.info("Waiting %s seconds for discovering new devices", timeout)
await asyncio.sleep(timeout)
res = await self.call(
"getScanChildDeviceList", {"childControl": {"category": self._categories}}
)
detected_list = res["getScanChildDeviceList"]["child_device_list"]
if not detected_list:
_LOGGER.warning(
"No devices found, make sure to activate pairing "
"mode on the devices to be added."
)
return []
_LOGGER.info(
"Discovery done, found %s devices: %s",
len(detected_list),
detected_list,
)
return await self._add_devices(detected_list)
async def _add_devices(self, detected_list: list[dict]) -> list:
"""Add devices based on getScanChildDeviceList response."""
await self.call(
"addScanChildDeviceList",
{"childControl": {"child_device_list": detected_list}},
)
await self._device.update()
successes = []
for detected in detected_list:
device_id = detected["device_id"]
result = "not added"
if device_id in self._device._children:
result = "added"
successes.append(detected)
msg = f"{detected['device_model']} - {device_id} - {result}"
_LOGGER.info("Adding child to %s: %s", self._device.host, msg)
return successes
async def unpair(self, device_id: str) -> dict:
"""Remove device from the hub."""
_LOGGER.info("Going to unpair %s from %s", device_id, self)
payload = {"childControl": {"child_device_list": [{"device_id": device_id}]}}
return await self.call("removeChildDeviceList", payload)

View File

@ -188,13 +188,6 @@ class SmartCamDevice(SmartDevice):
return res
async def _query_getter_helper(
self, method: str, module: str, sections: str | list[str]
) -> Any:
res = await self.protocol.query({method: {module: {"name": sections}}})
return res
@staticmethod
def _parse_components(components_raw: ComponentsRaw) -> dict[str, int]:
return {

View File

@ -3,7 +3,7 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, Final, cast
from typing import TYPE_CHECKING, Final
from ..exceptions import DeviceError, KasaException, SmartErrorCode
from ..modulemapping import ModuleName
@ -68,21 +68,7 @@ class SmartCamModule(SmartModule):
Just a helper method.
"""
if params:
module = next(iter(params))
section = next(iter(params[module]))
else:
module = "system"
section = "null"
if method[:3] == "get":
return await self._device._query_getter_helper(method, module, section)
if TYPE_CHECKING:
params = cast(dict[str, dict[str, Any]], params)
return await self._device._query_setter_helper(
method, module, section, params[module][section]
)
return await self._device._query_helper(method, params)
@property
def data(self) -> dict:

View File

@ -153,7 +153,33 @@ class FakeSmartCamTransport(BaseTransport):
"setup_code": "00000000000",
"setup_payload": "00:0000000-0000.00.000",
},
)
),
"getSupportChildDeviceCategory": (
"childQuickSetup",
{
"device_category_list": [
{"category": "ipcamera"},
{"category": "subg.trv"},
{"category": "subg.trigger"},
{"category": "subg.plugswitch"},
]
},
),
"getScanChildDeviceList": (
"childQuickSetup",
{
"child_device_list": [
{
"device_id": "0000000000000000000000000000000000000000",
"category": "subg.trigger.button",
"device_model": "S200B",
"name": "I01BU0tFRF9OQU1FIw====",
}
],
"scan_wait_time": 55,
"scan_status": "scanning",
},
),
}
# Setters for when there's not a simple mapping of setters to getters
SETTERS = {
@ -179,6 +205,17 @@ class FakeSmartCamTransport(BaseTransport):
],
}
def _hub_remove_device(self, info, params):
"""Remove hub device."""
items_to_remove = [dev["device_id"] for dev in params["child_device_list"]]
children = info["getChildDeviceList"]["child_device_list"]
new_children = [
dev for dev in children if dev["device_id"] not in items_to_remove
]
info["getChildDeviceList"]["child_device_list"] = new_children
return {"result": {}, "error_code": 0}
@staticmethod
def _get_second_key(request_dict: dict[str, Any]) -> str:
assert (
@ -269,6 +306,14 @@ class FakeSmartCamTransport(BaseTransport):
return {**result, "error_code": 0}
else:
return {"error_code": -1}
elif method == "removeChildDeviceList":
return self._hub_remove_device(info, request_dict["params"]["childControl"])
# actions
elif method in [
"addScanChildDeviceList",
"startScanChildDevice",
]:
return {"result": {}, "error_code": 0}
# smartcam child devices do not make requests for getDeviceInfo as they
# get updated from the parent's query. If this is being called from a

View File

@ -0,0 +1,103 @@
from __future__ import annotations
import logging
import pytest
from pytest_mock import MockerFixture
from kasa import Feature, Module, SmartDevice
from ...device_fixtures import parametrize
childsetup = parametrize(
"supports pairing", component_filter="childQuickSetup", protocol_filter={"SMARTCAM"}
)
@childsetup
async def test_childsetup_features(dev: SmartDevice):
"""Test the exposed features."""
cs = dev.modules[Module.ChildSetup]
assert "pair" in cs._module_features
pair = cs._module_features["pair"]
assert pair.type == Feature.Type.Action
@childsetup
async def test_childsetup_pair(
dev: SmartDevice, mocker: MockerFixture, caplog: pytest.LogCaptureFixture
):
"""Test device pairing."""
caplog.set_level(logging.INFO)
mock_query_helper = mocker.spy(dev, "_query_helper")
mocker.patch("asyncio.sleep")
cs = dev.modules[Module.ChildSetup]
await cs.pair()
mock_query_helper.assert_has_awaits(
[
mocker.call(
"startScanChildDevice",
params={
"childControl": {
"category": [
"camera",
"subg.trv",
"subg.trigger",
"subg.plugswitch",
]
}
},
),
mocker.call(
"getScanChildDeviceList",
{
"childControl": {
"category": [
"camera",
"subg.trv",
"subg.trigger",
"subg.plugswitch",
]
}
},
),
mocker.call(
"addScanChildDeviceList",
{
"childControl": {
"child_device_list": [
{
"device_id": "0000000000000000000000000000000000000000",
"category": "subg.trigger.button",
"device_model": "S200B",
"name": "I01BU0tFRF9OQU1FIw====",
}
]
}
},
),
]
)
assert "Discovery done" in caplog.text
@childsetup
async def test_childsetup_unpair(
dev: SmartDevice, mocker: MockerFixture, caplog: pytest.LogCaptureFixture
):
"""Test unpair."""
mock_query_helper = mocker.spy(dev, "_query_helper")
DUMMY_ID = "dummy_id"
cs = dev.modules[Module.ChildSetup]
await cs.unpair(DUMMY_ID)
mock_query_helper.assert_awaited_with(
"removeChildDeviceList",
params={"childControl": {"child_device_list": [{"device_id": DUMMY_ID}]}},
)

View File

@ -267,7 +267,11 @@ async def test_raw_command(dev, mocker, runner):
from kasa.smart import SmartDevice
if isinstance(dev, SmartCamDevice):
params = ["na", "getDeviceInfo"]
params = [
"na",
"getDeviceInfo",
'{"device_info": {"name": ["basic_info", "info"]}}',
]
elif isinstance(dev, SmartDevice):
params = ["na", "get_device_info"]
else:

View File

@ -191,12 +191,12 @@ async def test_feature_setters(dev: Device, mocker: MockerFixture):
exceptions = []
for feat in dev.features.values():
try:
prot = (
feat.container._device.protocol
if feat.container
else feat.device.protocol
)
with patch.object(prot, "query", name=feat.id) as query:
patch_dev = feat.container._device if feat.container else feat.device
with (
patch.object(patch_dev.protocol, "query", name=feat.id) as query,
# patch update in case feature setter does an update
patch.object(patch_dev, "update"),
):
await _test_feature(feat, query)
# we allow our own exceptions to avoid mocking valid responses
except KasaException: