mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-01-24 13:47:05 +00:00
Add common childsetup interface (#1470)
Add a common interface for the `childsetup` module across `smart` and `smartcam` hubs. Co-authored-by: Teemu R. <tpr@iki.fi>
This commit is contained in:
parent
b701441215
commit
09fce3f426
@ -8,3 +8,10 @@
|
||||
.. automodule:: kasa.smart.modules.childdevice
|
||||
:noindex:
|
||||
```
|
||||
|
||||
## Pairing and unpairing
|
||||
|
||||
```{eval-rst}
|
||||
.. automodule:: kasa.interfaces.childsetup
|
||||
:noindex:
|
||||
```
|
||||
|
@ -13,6 +13,7 @@
|
||||
127.0.0.3
|
||||
127.0.0.4
|
||||
127.0.0.5
|
||||
127.0.0.6
|
||||
|
||||
:meth:`~kasa.Discover.discover_single` returns a single device by hostname:
|
||||
|
||||
|
@ -44,8 +44,7 @@ async def hub_supported(dev: SmartDevice):
|
||||
"""List supported hub child device categories."""
|
||||
cs = dev.modules[Module.ChildSetup]
|
||||
|
||||
cats = [cat["category"] for cat in await cs.get_supported_device_categories()]
|
||||
for cat in cats:
|
||||
for cat in cs.supported_categories:
|
||||
echo(f"Supports: {cat}")
|
||||
|
||||
|
||||
|
@ -22,7 +22,7 @@ Discovery returns a dict of {ip: discovered devices}:
|
||||
>>>
|
||||
>>> found_devices = await Discover.discover()
|
||||
>>> [dev.model for dev in found_devices.values()]
|
||||
['KP303', 'HS110', 'L530E', 'KL430', 'HS220']
|
||||
['KP303', 'HS110', 'L530E', 'KL430', 'HS220', 'H200']
|
||||
|
||||
You can pass username and password for devices requiring authentication
|
||||
|
||||
@ -31,21 +31,21 @@ You can pass username and password for devices requiring authentication
|
||||
>>> password="great_password",
|
||||
>>> )
|
||||
>>> print(len(devices))
|
||||
5
|
||||
6
|
||||
|
||||
You can also pass a :class:`kasa.Credentials`
|
||||
|
||||
>>> creds = Credentials("user@example.com", "great_password")
|
||||
>>> devices = await Discover.discover(credentials=creds)
|
||||
>>> print(len(devices))
|
||||
5
|
||||
6
|
||||
|
||||
Discovery can also be targeted to a specific broadcast address instead of
|
||||
the default 255.255.255.255:
|
||||
|
||||
>>> found_devices = await Discover.discover(target="127.0.0.255", credentials=creds)
|
||||
>>> print(len(found_devices))
|
||||
5
|
||||
6
|
||||
|
||||
Basic information is available on the device from the discovery broadcast response
|
||||
but it is important to call device.update() after discovery if you want to access
|
||||
@ -70,6 +70,7 @@ Discovered Bedroom Lamp Plug (model: HS110)
|
||||
Discovered Living Room Bulb (model: L530)
|
||||
Discovered Bedroom Lightstrip (model: KL430)
|
||||
Discovered Living Room Dimmer Switch (model: HS220)
|
||||
Discovered Tapo Hub (model: H200)
|
||||
|
||||
Discovering a single device returns a kasa.Device object.
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
"""Package for interfaces."""
|
||||
|
||||
from .childsetup import ChildSetup
|
||||
from .energy import Energy
|
||||
from .fan import Fan
|
||||
from .led import Led
|
||||
@ -10,6 +11,7 @@ from .thermostat import Thermostat, ThermostatState
|
||||
from .time import Time
|
||||
|
||||
__all__ = [
|
||||
"ChildSetup",
|
||||
"Fan",
|
||||
"Energy",
|
||||
"Led",
|
||||
|
70
kasa/interfaces/childsetup.py
Normal file
70
kasa/interfaces/childsetup.py
Normal file
@ -0,0 +1,70 @@
|
||||
"""Module for childsetup interface.
|
||||
|
||||
The childsetup module allows pairing and unpairing of supported child device types to
|
||||
hubs.
|
||||
|
||||
>>> from kasa import Discover, Module, LightState
|
||||
>>>
|
||||
>>> dev = await Discover.discover_single(
|
||||
>>> "127.0.0.6",
|
||||
>>> username="user@example.com",
|
||||
>>> password="great_password"
|
||||
>>> )
|
||||
>>> await dev.update()
|
||||
>>> print(dev.alias)
|
||||
Tapo Hub
|
||||
|
||||
>>> childsetup = dev.modules[Module.ChildSetup]
|
||||
>>> childsetup.supported_categories
|
||||
['camera', 'subg.trv', 'subg.trigger', 'subg.plugswitch']
|
||||
|
||||
Put child devices in pairing mode.
|
||||
The hub will pair with all supported devices in pairing mode:
|
||||
|
||||
>>> added = await childsetup.pair()
|
||||
>>> added
|
||||
[{'device_id': 'SCRUBBED_CHILD_DEVICE_ID_5', 'category': 'subg.trigger.button', \
|
||||
'device_model': 'S200B', 'name': 'I01BU0tFRF9OQU1FIw===='}]
|
||||
|
||||
>>> for child in dev.children:
|
||||
>>> print(f"{child.device_id} - {child.model}")
|
||||
SCRUBBED_CHILD_DEVICE_ID_1 - T310
|
||||
SCRUBBED_CHILD_DEVICE_ID_2 - T315
|
||||
SCRUBBED_CHILD_DEVICE_ID_3 - T110
|
||||
SCRUBBED_CHILD_DEVICE_ID_4 - S200B
|
||||
SCRUBBED_CHILD_DEVICE_ID_5 - S200B
|
||||
|
||||
Unpair with the child `device_id`:
|
||||
|
||||
>>> await childsetup.unpair("SCRUBBED_CHILD_DEVICE_ID_4")
|
||||
>>> for child in dev.children:
|
||||
>>> print(f"{child.device_id} - {child.model}")
|
||||
SCRUBBED_CHILD_DEVICE_ID_1 - T310
|
||||
SCRUBBED_CHILD_DEVICE_ID_2 - T315
|
||||
SCRUBBED_CHILD_DEVICE_ID_3 - T110
|
||||
SCRUBBED_CHILD_DEVICE_ID_5 - S200B
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from ..module import Module
|
||||
|
||||
|
||||
class ChildSetup(Module, ABC):
|
||||
"""Interface for child setup on hubs."""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def supported_categories(self) -> list[str]:
|
||||
"""Supported child device categories."""
|
||||
|
||||
@abstractmethod
|
||||
async def pair(self, *, timeout: int = 10) -> list[dict]:
|
||||
"""Scan for new devices and pair them."""
|
||||
|
||||
@abstractmethod
|
||||
async def unpair(self, device_id: str) -> dict:
|
||||
"""Remove device from the hub."""
|
@ -93,6 +93,7 @@ class Module(ABC):
|
||||
"""
|
||||
|
||||
# Common Modules
|
||||
ChildSetup: Final[ModuleName[interfaces.ChildSetup]] = ModuleName("ChildSetup")
|
||||
Energy: Final[ModuleName[interfaces.Energy]] = ModuleName("Energy")
|
||||
Fan: Final[ModuleName[interfaces.Fan]] = ModuleName("Fan")
|
||||
LightEffect: Final[ModuleName[interfaces.LightEffect]] = ModuleName("LightEffect")
|
||||
@ -154,7 +155,6 @@ class Module(ABC):
|
||||
)
|
||||
ChildLock: Final[ModuleName[smart.ChildLock]] = ModuleName("ChildLock")
|
||||
TriggerLogs: Final[ModuleName[smart.TriggerLogs]] = ModuleName("TriggerLogs")
|
||||
ChildSetup: Final[ModuleName[smart.ChildSetup]] = ModuleName("ChildSetup")
|
||||
|
||||
HomeKit: Final[ModuleName[smart.HomeKit]] = ModuleName("HomeKit")
|
||||
Matter: Final[ModuleName[smart.Matter]] = ModuleName("Matter")
|
||||
|
@ -9,16 +9,21 @@ import asyncio
|
||||
import logging
|
||||
|
||||
from ...feature import Feature
|
||||
from ...interfaces.childsetup import ChildSetup as ChildSetupInterface
|
||||
from ..smartmodule import SmartModule
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChildSetup(SmartModule):
|
||||
class ChildSetup(SmartModule, ChildSetupInterface):
|
||||
"""Implementation for child device setup."""
|
||||
|
||||
REQUIRED_COMPONENT = "child_quick_setup"
|
||||
QUERY_GETTER_NAME = "get_support_child_device_category"
|
||||
_categories: list[str] = []
|
||||
|
||||
# Supported child device categories will hardly ever change
|
||||
MINIMUM_UPDATE_INTERVAL_SECS = 60 * 60 * 24
|
||||
|
||||
def _initialize_features(self) -> None:
|
||||
"""Initialize features."""
|
||||
@ -34,13 +39,18 @@ class ChildSetup(SmartModule):
|
||||
)
|
||||
)
|
||||
|
||||
async def get_supported_device_categories(self) -> list[dict]:
|
||||
"""Get supported device categories."""
|
||||
categories = await self.call("get_support_child_device_category")
|
||||
return categories["get_support_child_device_category"]["device_category_list"]
|
||||
async def _post_update_hook(self) -> None:
|
||||
self._categories = [
|
||||
cat["category"] for cat in self.data["device_category_list"]
|
||||
]
|
||||
|
||||
@property
|
||||
def supported_categories(self) -> list[str]:
|
||||
"""Supported child device categories."""
|
||||
return self._categories
|
||||
|
||||
async def pair(self, *, timeout: int = 10) -> list[dict]:
|
||||
"""Scan for new devices and pair after discovering first new device."""
|
||||
"""Scan for new devices and pair them."""
|
||||
await self.call("begin_scanning_child_device")
|
||||
|
||||
_LOGGER.info("Waiting %s seconds for discovering new devices", timeout)
|
||||
@ -60,28 +70,43 @@ class ChildSetup(SmartModule):
|
||||
detected,
|
||||
)
|
||||
|
||||
await self._add_devices(detected)
|
||||
|
||||
return detected["child_device_list"]
|
||||
return await self._add_devices(detected)
|
||||
|
||||
async def unpair(self, device_id: str) -> dict:
|
||||
"""Remove device from the hub."""
|
||||
_LOGGER.info("Going to unpair %s from %s", device_id, self)
|
||||
|
||||
payload = {"child_device_list": [{"device_id": device_id}]}
|
||||
return await self.call("remove_child_device_list", payload)
|
||||
res = await self.call("remove_child_device_list", payload)
|
||||
await self._device.update()
|
||||
return res
|
||||
|
||||
async def _add_devices(self, devices: dict) -> dict:
|
||||
async def _add_devices(self, devices: dict) -> list[dict]:
|
||||
"""Add devices based on get_detected_device response.
|
||||
|
||||
Pass the output from :ref:_get_detected_devices: as a parameter.
|
||||
"""
|
||||
res = await self.call("add_child_device_list", devices)
|
||||
return res
|
||||
await self.call("add_child_device_list", devices)
|
||||
|
||||
await self._device.update()
|
||||
|
||||
successes = []
|
||||
for detected in devices["child_device_list"]:
|
||||
device_id = detected["device_id"]
|
||||
|
||||
result = "not added"
|
||||
if device_id in self._device._children:
|
||||
result = "added"
|
||||
successes.append(detected)
|
||||
|
||||
msg = f"{detected['device_model']} - {device_id} - {result}"
|
||||
_LOGGER.info("Added child to %s: %s", self._device.host, msg)
|
||||
|
||||
return successes
|
||||
|
||||
async def _get_detected_devices(self) -> dict:
|
||||
"""Return list of devices detected during scanning."""
|
||||
param = {"scan_list": await self.get_supported_device_categories()}
|
||||
param = {"scan_list": self.data["device_category_list"]}
|
||||
res = await self.call("get_scan_child_device_list", param)
|
||||
_LOGGER.debug("Scan status: %s", res)
|
||||
return res["get_scan_child_device_list"]
|
||||
|
@ -9,12 +9,13 @@ import asyncio
|
||||
import logging
|
||||
|
||||
from ...feature import Feature
|
||||
from ...interfaces.childsetup import ChildSetup as ChildSetupInterface
|
||||
from ..smartcammodule import SmartCamModule
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChildSetup(SmartCamModule):
|
||||
class ChildSetup(SmartCamModule, ChildSetupInterface):
|
||||
"""Implementation for child device setup."""
|
||||
|
||||
REQUIRED_COMPONENT = "childQuickSetup"
|
||||
@ -22,6 +23,9 @@ class ChildSetup(SmartCamModule):
|
||||
QUERY_MODULE_NAME = "childControl"
|
||||
_categories: list[str] = []
|
||||
|
||||
# Supported child device categories will hardly ever change
|
||||
MINIMUM_UPDATE_INTERVAL_SECS = 60 * 60 * 24
|
||||
|
||||
def _initialize_features(self) -> None:
|
||||
"""Initialize features."""
|
||||
self._add_feature(
|
||||
@ -37,19 +41,18 @@ class ChildSetup(SmartCamModule):
|
||||
)
|
||||
|
||||
async def _post_update_hook(self) -> None:
|
||||
if not self._categories:
|
||||
self._categories = [
|
||||
cat["category"].replace("ipcamera", "camera")
|
||||
for cat in self.data["device_category_list"]
|
||||
]
|
||||
self._categories = [
|
||||
cat["category"].replace("ipcamera", "camera")
|
||||
for cat in self.data["device_category_list"]
|
||||
]
|
||||
|
||||
@property
|
||||
def supported_child_device_categories(self) -> list[str]:
|
||||
def supported_categories(self) -> list[str]:
|
||||
"""Supported child device categories."""
|
||||
return self._categories
|
||||
|
||||
async def pair(self, *, timeout: int = 10) -> list[dict]:
|
||||
"""Scan for new devices and pair after discovering first new device."""
|
||||
"""Scan for new devices and pair them."""
|
||||
await self.call(
|
||||
"startScanChildDevice", {"childControl": {"category": self._categories}}
|
||||
)
|
||||
@ -76,7 +79,7 @@ class ChildSetup(SmartCamModule):
|
||||
)
|
||||
return await self._add_devices(detected_list)
|
||||
|
||||
async def _add_devices(self, detected_list: list[dict]) -> list:
|
||||
async def _add_devices(self, detected_list: list[dict]) -> list[dict]:
|
||||
"""Add devices based on getScanChildDeviceList response."""
|
||||
await self.call(
|
||||
"addScanChildDeviceList",
|
||||
@ -104,4 +107,6 @@ class ChildSetup(SmartCamModule):
|
||||
_LOGGER.info("Going to unpair %s from %s", device_id, self)
|
||||
|
||||
payload = {"childControl": {"child_device_list": [{"device_id": device_id}]}}
|
||||
return await self.call("removeChildDeviceList", payload)
|
||||
res = await self.call("removeChildDeviceList", payload)
|
||||
await self._device.update()
|
||||
return res
|
||||
|
@ -4,10 +4,10 @@ from pytest_mock import MockerFixture
|
||||
from kasa import DeviceType, Module
|
||||
from kasa.cli.hub import hub
|
||||
|
||||
from ..device_fixtures import HUBS_SMART, hubs_smart, parametrize, plug_iot
|
||||
from ..device_fixtures import hubs, plug_iot
|
||||
|
||||
|
||||
@hubs_smart
|
||||
@hubs
|
||||
async def test_hub_pair(dev, mocker: MockerFixture, runner, caplog):
|
||||
"""Test that pair calls the expected methods."""
|
||||
cs = dev.modules.get(Module.ChildSetup)
|
||||
@ -25,7 +25,7 @@ async def test_hub_pair(dev, mocker: MockerFixture, runner, caplog):
|
||||
assert res.exit_code == 0
|
||||
|
||||
|
||||
@parametrize("hubs smart", model_filter=HUBS_SMART, protocol_filter={"SMART"})
|
||||
@hubs
|
||||
async def test_hub_unpair(dev, mocker: MockerFixture, runner):
|
||||
"""Test that unpair calls the expected method."""
|
||||
if not dev.children:
|
||||
|
@ -346,6 +346,7 @@ hub_smartcam = parametrize(
|
||||
device_type_filter=[DeviceType.Hub],
|
||||
protocol_filter={"SMARTCAM"},
|
||||
)
|
||||
hubs = parametrize_combine([hubs_smart, hub_smartcam])
|
||||
doobell_smartcam = parametrize(
|
||||
"doorbell smartcam",
|
||||
device_type_filter=[DeviceType.Doorbell],
|
||||
|
@ -176,10 +176,19 @@ class FakeSmartTransport(BaseTransport):
|
||||
"child_quick_setup",
|
||||
{"device_category_list": [{"category": "subg.trv"}]},
|
||||
),
|
||||
# no devices found
|
||||
"get_scan_child_device_list": (
|
||||
"child_quick_setup",
|
||||
{"child_device_list": [{"dummy": "response"}], "scan_status": "idle"},
|
||||
{
|
||||
"child_device_list": [
|
||||
{
|
||||
"device_id": "0000000000000000000000000000000000000000",
|
||||
"category": "subg.trigger.button",
|
||||
"device_model": "S200B",
|
||||
"name": "I01BU0tFRF9OQU1FIw==",
|
||||
}
|
||||
],
|
||||
"scan_status": "idle",
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
|
@ -42,7 +42,6 @@ async def test_childsetup_pair(
|
||||
mock_query_helper.assert_has_awaits(
|
||||
[
|
||||
mocker.call("begin_scanning_child_device", None),
|
||||
mocker.call("get_support_child_device_category", None),
|
||||
mocker.call("get_scan_child_device_list", params=mocker.ANY),
|
||||
mocker.call("add_child_device_list", params=mocker.ANY),
|
||||
]
|
||||
|
@ -41,29 +41,11 @@ async def test_childsetup_pair(
|
||||
[
|
||||
mocker.call(
|
||||
"startScanChildDevice",
|
||||
params={
|
||||
"childControl": {
|
||||
"category": [
|
||||
"camera",
|
||||
"subg.trv",
|
||||
"subg.trigger",
|
||||
"subg.plugswitch",
|
||||
]
|
||||
}
|
||||
},
|
||||
params={"childControl": {"category": cs.supported_categories}},
|
||||
),
|
||||
mocker.call(
|
||||
"getScanChildDeviceList",
|
||||
{
|
||||
"childControl": {
|
||||
"category": [
|
||||
"camera",
|
||||
"subg.trv",
|
||||
"subg.trigger",
|
||||
"subg.plugswitch",
|
||||
]
|
||||
}
|
||||
},
|
||||
{"childControl": {"category": cs.supported_categories}},
|
||||
),
|
||||
mocker.call(
|
||||
"addScanChildDeviceList",
|
||||
@ -71,10 +53,10 @@ async def test_childsetup_pair(
|
||||
"childControl": {
|
||||
"child_device_list": [
|
||||
{
|
||||
"device_id": "0000000000000000000000000000000000000000",
|
||||
"category": "subg.trigger.button",
|
||||
"device_model": "S200B",
|
||||
"name": "I01BU0tFRF9OQU1FIw====",
|
||||
"device_id": mocker.ANY,
|
||||
"category": mocker.ANY,
|
||||
"device_model": mocker.ANY,
|
||||
"name": mocker.ANY,
|
||||
}
|
||||
]
|
||||
}
|
||||
|
@ -148,6 +148,25 @@ def test_tutorial_examples(readmes_mock):
|
||||
assert not res["failed"]
|
||||
|
||||
|
||||
def test_childsetup_examples(readmes_mock, mocker):
|
||||
"""Test device examples."""
|
||||
pair_resp = [
|
||||
{
|
||||
"device_id": "SCRUBBED_CHILD_DEVICE_ID_5",
|
||||
"category": "subg.trigger.button",
|
||||
"device_model": "S200B",
|
||||
"name": "I01BU0tFRF9OQU1FIw====",
|
||||
}
|
||||
]
|
||||
mocker.patch(
|
||||
"kasa.smartcam.modules.childsetup.ChildSetup.pair", return_value=pair_resp
|
||||
)
|
||||
res = xdoctest.doctest_module("kasa.interfaces.childsetup", "all")
|
||||
assert res["n_passed"] > 0
|
||||
assert res["n_warned"] == 0
|
||||
assert not res["failed"]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def readmes_mock(mocker):
|
||||
fixture_infos = {
|
||||
@ -156,6 +175,7 @@ async def readmes_mock(mocker):
|
||||
"127.0.0.3": get_fixture_info("L530E(EU)_3.0_1.1.6.json", "SMART"), # Bulb
|
||||
"127.0.0.4": get_fixture_info("KL430(US)_1.0_1.0.10.json", "IOT"), # Lightstrip
|
||||
"127.0.0.5": get_fixture_info("HS220(US)_1.0_1.5.7.json", "IOT"), # Dimmer
|
||||
"127.0.0.6": get_fixture_info("H200(US)_1.0_1.3.6.json", "SMARTCAM"), # Hub
|
||||
}
|
||||
fixture_infos["127.0.0.1"].data["system"]["get_sysinfo"]["alias"] = (
|
||||
"Bedroom Power Strip"
|
||||
@ -176,4 +196,7 @@ async def readmes_mock(mocker):
|
||||
fixture_infos["127.0.0.5"].data["system"]["get_sysinfo"]["alias"] = (
|
||||
"Living Room Dimmer Switch"
|
||||
)
|
||||
fixture_infos["127.0.0.6"].data["getDeviceInfo"]["device_info"]["basic_info"][
|
||||
"device_alias"
|
||||
] = "Tapo Hub"
|
||||
return patch_discovery(fixture_infos, mocker)
|
||||
|
Loading…
Reference in New Issue
Block a user