Add support for pairing devices with hubs (#859)

Co-authored-by: Steven B. <51370195+sdb9696@users.noreply.github.com>
This commit is contained in:
Teemu R.
2025-01-20 11:36:06 +01:00
committed by GitHub
parent 2d26f91981
commit bca5576425
16 changed files with 412 additions and 15 deletions

96
kasa/cli/hub.py Normal file
View File

@@ -0,0 +1,96 @@
"""Hub-specific commands."""
import asyncio
import asyncclick as click
from kasa import DeviceType, Module, SmartDevice
from kasa.smart import SmartChildDevice
from .common import (
echo,
error,
pass_dev,
)
def pretty_category(cat: str):
"""Return pretty category for paired devices."""
return SmartChildDevice.CHILD_DEVICE_TYPE_MAP.get(cat)
@click.group()
@pass_dev
async def hub(dev: SmartDevice):
"""Commands controlling hub child device pairing."""
if dev.device_type is not DeviceType.Hub:
error(f"{dev} is not a hub.")
if dev.modules.get(Module.ChildSetup) is None:
error(f"{dev} does not have child setup module.")
@hub.command(name="list")
@pass_dev
async def hub_list(dev: SmartDevice):
"""List hub paired child devices."""
for c in dev.children:
echo(f"{c.device_id}: {c}")
@hub.command(name="supported")
@pass_dev
async def hub_supported(dev: SmartDevice):
"""List supported hub child device categories."""
cs = dev.modules[Module.ChildSetup]
cats = [cat["category"] for cat in await cs.get_supported_device_categories()]
for cat in cats:
echo(f"Supports: {cat}")
@hub.command(name="pair")
@click.option("--timeout", default=10)
@pass_dev
async def hub_pair(dev: SmartDevice, timeout: int):
"""Pair all pairable device.
This will pair any child devices currently in pairing mode.
"""
cs = dev.modules[Module.ChildSetup]
echo(f"Finding new devices for {timeout} seconds...")
pair_res = await cs.pair(timeout=timeout)
if not pair_res:
echo("No devices found.")
for child in pair_res:
echo(
f'Paired {child["name"]} ({child["device_model"]}, '
f'{pretty_category(child["category"])}) with id {child["device_id"]}'
)
@hub.command(name="unpair")
@click.argument("device_id")
@pass_dev
async def hub_unpair(dev, device_id: str):
"""Unpair given device."""
cs = dev.modules[Module.ChildSetup]
# Accessing private here, as the property exposes only values
if device_id not in dev._children:
error(f"{dev} does not have children with identifier {device_id}")
res = await cs.unpair(device_id=device_id)
# Give the device some time to update its internal state, just in case.
await asyncio.sleep(1)
await dev.update()
if device_id not in dev._children:
echo(f"Unpaired {device_id}")
else:
error(f"Failed to unpair {device_id}")
return res

View File

@@ -93,6 +93,7 @@ def _legacy_type_to_class(_type: str) -> Any:
"hsv": "light",
"temperature": "light",
"effect": "light",
"hub": "hub",
},
result_callback=json_formatter_cb,
)

View File

@@ -76,6 +76,7 @@ from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from .device import Device
from .module import Module
_LOGGER = logging.getLogger(__name__)
@@ -142,7 +143,7 @@ class Feature:
#: Callable coroutine or name of the method that allows changing the value
attribute_setter: str | Callable[..., Coroutine[Any, Any, Any]] | None = None
#: Container storing the data, this overrides 'device' for getters
container: Any = None
container: Device | Module | None = None
#: Icon suggestion
icon: str | None = None
#: Attribute containing the name of the unit getter property.

View File

@@ -154,6 +154,7 @@ class Module(ABC):
)
ChildLock: Final[ModuleName[smart.ChildLock]] = ModuleName("ChildLock")
TriggerLogs: Final[ModuleName[smart.TriggerLogs]] = ModuleName("TriggerLogs")
ChildSetup: Final[ModuleName[smart.ChildSetup]] = ModuleName("ChildSetup")
HomeKit: Final[ModuleName[smart.HomeKit]] = ModuleName("HomeKit")
Matter: Final[ModuleName[smart.Matter]] = ModuleName("Matter")

View File

@@ -8,6 +8,7 @@ from .brightness import Brightness
from .childdevice import ChildDevice
from .childlock import ChildLock
from .childprotection import ChildProtection
from .childsetup import ChildSetup
from .clean import Clean
from .cloud import Cloud
from .color import Color
@@ -47,6 +48,7 @@ __all__ = [
"DeviceModule",
"ChildDevice",
"ChildLock",
"ChildSetup",
"BatterySensor",
"HumiditySensor",
"TemperatureSensor",

View File

@@ -0,0 +1,84 @@
"""Implementation for child device setup.
This module allows pairing and disconnecting child devices.
"""
from __future__ import annotations
import asyncio
import logging
from ...feature import Feature
from ..smartmodule import SmartModule
_LOGGER = logging.getLogger(__name__)
class ChildSetup(SmartModule):
"""Implementation for child device setup."""
REQUIRED_COMPONENT = "child_quick_setup"
QUERY_GETTER_NAME = "get_support_child_device_category"
def _initialize_features(self) -> None:
"""Initialize features."""
self._add_feature(
Feature(
self._device,
id="pair",
name="Pair",
container=self,
attribute_setter="pair",
category=Feature.Category.Config,
type=Feature.Type.Action,
)
)
async def get_supported_device_categories(self) -> list[dict]:
"""Get supported device categories."""
categories = await self.call("get_support_child_device_category")
return categories["get_support_child_device_category"]["device_category_list"]
async def pair(self, *, timeout: int = 10) -> list[dict]:
"""Scan for new devices and pair after discovering first new device."""
await self.call("begin_scanning_child_device")
_LOGGER.info("Waiting %s seconds for discovering new devices", timeout)
await asyncio.sleep(timeout)
detected = await self._get_detected_devices()
if not detected["child_device_list"]:
_LOGGER.info("No devices found.")
return []
_LOGGER.info(
"Discovery done, found %s devices: %s",
len(detected["child_device_list"]),
detected,
)
await self._add_devices(detected)
return detected["child_device_list"]
async def unpair(self, device_id: str) -> dict:
"""Remove device from the hub."""
_LOGGER.debug("Going to unpair %s from %s", device_id, self)
payload = {"child_device_list": [{"device_id": device_id}]}
return await self.call("remove_child_device_list", payload)
async def _add_devices(self, devices: dict) -> dict:
"""Add devices based on get_detected_device response.
Pass the output from :ref:_get_detected_devices: as a parameter.
"""
res = await self.call("add_child_device_list", devices)
return res
async def _get_detected_devices(self) -> dict:
"""Return list of devices detected during scanning."""
param = {"scan_list": await self.get_supported_device_categories()}
res = await self.call("get_scan_child_device_list", param)
_LOGGER.debug("Scan status: %s", res)
return res["get_scan_child_device_list"]

View File

@@ -537,6 +537,21 @@ class SmartDevice(Device):
)
)
if self.parent is not None and (
cs := self.parent.modules.get(Module.ChildSetup)
):
self._add_feature(
Feature(
device=self,
id="unpair",
name="Unpair device",
container=cs,
attribute_setter=lambda: cs.unpair(self.device_id),
category=Feature.Category.Debug,
type=Feature.Type.Action,
)
)
for module in self.modules.values():
module._initialize_features()
for feat in module._module_features.values():