Generalize smartdevice child support ()

* Initialize children's modules (and features) using the child component negotiation results
* Set device_type based on the device response
* Print out child features in cli 'state'
* Add --child option to cli 'command' to allow targeting child devices
* Guard "generic" features like rssi, ssid, etc. only to devices which have this information

Note, we do not currently perform queries on child modules so some data may not be available. At the moment, a stop-gap solution to use parent's data is used but this is not always correct; even if the device shares the same clock and cloud connectivity, it may have its own firmware updates.
This commit is contained in:
Teemu R 2024-02-22 20:46:19 +01:00 committed by GitHub
parent f965b14021
commit 2b0721aea9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 198 additions and 99 deletions

@ -582,9 +582,14 @@ async def state(ctx, dev: Device):
echo(f"\tPort: {dev.port}")
echo(f"\tDevice state: {dev.is_on}")
if dev.is_strip:
echo("\t[bold]== Plugs ==[/bold]")
for plug in dev.children: # type: ignore
echo(f"\t* Socket '{plug.alias}' state: {plug.is_on} since {plug.on_since}")
echo("\t[bold]== Children ==[/bold]")
for child in dev.children:
echo(f"\t* {child.alias} ({child.model}, {child.device_type})")
for feat in child.features.values():
try:
echo(f"\t\t{feat.name}: {feat.value}")
except Exception as ex:
echo(f"\t\t{feat.name}: got exception (%s)" % ex)
echo()
echo("\t[bold]== Generic information ==[/bold]")
@ -665,13 +670,22 @@ async def raw_command(ctx, dev: Device, module, command, parameters):
@cli.command(name="command")
@pass_dev
@click.option("--module", required=False, help="Module for IOT protocol.")
@click.option("--child", required=False, help="Child ID for controlling sub-devices")
@click.argument("command")
@click.argument("parameters", default=None, required=False)
async def cmd_command(dev: Device, module, command, parameters):
async def cmd_command(dev: Device, module, child, command, parameters):
"""Run a raw command on the device."""
if parameters is not None:
parameters = ast.literal_eval(parameters)
if child:
# The way child devices are accessed requires a ChildDevice to
# wrap the communications. Doing this properly would require creating
# a common interfaces for both IOT and SMART child devices.
# As a stop-gap solution, we perform an update instead.
await dev.update()
dev = dev.get_child_device(child)
if isinstance(dev, IotDevice):
res = await dev._query_helper(module, command, parameters)
elif isinstance(dev, SmartDevice):

@ -3,7 +3,7 @@ import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional, Sequence, Union
from typing import Any, Dict, List, Mapping, Optional, Sequence, Union
from .credentials import Credentials
from .device_type import DeviceType
@ -71,6 +71,8 @@ class Device(ABC):
self.modules: Dict[str, Any] = {}
self._features: Dict[str, Feature] = {}
self._parent: Optional["Device"] = None
self._children: Mapping[str, "Device"] = {}
@staticmethod
async def connect(
@ -182,9 +184,13 @@ class Device(ABC):
return await self.protocol.query(request=request)
@property
@abstractmethod
def children(self) -> Sequence["Device"]:
"""Returns the child devices."""
return list(self._children.values())
def get_child_device(self, id_: str) -> "Device":
"""Return child device by its ID."""
return self._children[id_]
@property
@abstractmethod

@ -14,6 +14,7 @@ class DeviceType(Enum):
StripSocket = "stripsocket"
Dimmer = "dimmer"
LightStrip = "lightstrip"
Sensor = "sensor"
Unknown = "unknown"
@staticmethod

@ -16,7 +16,7 @@ import functools
import inspect
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Sequence, Set
from typing import Any, Dict, List, Mapping, Optional, Sequence, Set
from ..device import Device, WifiNetwork
from ..deviceconfig import DeviceConfig
@ -183,19 +183,14 @@ class IotDevice(Device):
super().__init__(host=host, config=config, protocol=protocol)
self._sys_info: Any = None # TODO: this is here to avoid changing tests
self._children: Sequence["IotDevice"] = []
self._supported_modules: Optional[Dict[str, IotModule]] = None
self._legacy_features: Set[str] = set()
self._children: Mapping[str, "IotDevice"] = {}
@property
def children(self) -> Sequence["IotDevice"]:
"""Return list of children."""
return self._children
@children.setter
def children(self, children):
"""Initialize from a list of children."""
self._children = children
return list(self._children.values())
def add_module(self, name: str, module: IotModule):
"""Register a module."""
@ -408,15 +403,6 @@ class IotDevice(Device):
sys_info = self._sys_info
return str(sys_info["model"])
@property
def has_children(self) -> bool:
"""Return true if the device has children devices."""
# Ideally we would check for the 'child_num' key in sys_info,
# but devices that speak klap do not populate this key via
# update_from_discover_info so we check for the devices
# we know have children instead.
return self.is_strip
@property # type: ignore
def alias(self) -> Optional[str]:
"""Return device name (alias)."""

@ -115,10 +115,12 @@ class IotStrip(IotDevice):
if not self.children:
children = self.sys_info["children"]
_LOGGER.debug("Initializing %s child sockets", len(children))
self.children = [
IotStripPlug(self.host, parent=self, child_id=child["id"])
self._children = {
f"{self.mac}_{child['id']}": IotStripPlug(
self.host, parent=self, child_id=child["id"]
)
for child in children
]
}
if update_children and self.has_emeter:
for plug in self.children:

@ -1,4 +1,6 @@
"""Implementation for child devices."""
from typing import Dict
from ..smartmodule import SmartModule
@ -6,4 +8,12 @@ class ChildDeviceModule(SmartModule):
"""Implementation for child devices."""
REQUIRED_COMPONENT = "child_device"
QUERY_GETTER_NAME = "get_child_device_list"
def query(self) -> Dict:
"""Query to execute during the update cycle."""
# TODO: There is no need to fetch the component list every time,
# so this should be optimized only for the init.
return {
"get_child_device_list": None,
"get_child_device_component_list": None,
}

@ -1,4 +1,5 @@
"""Child device implementation."""
import logging
from typing import Optional
from ..device_type import DeviceType
@ -6,6 +7,8 @@ from ..deviceconfig import DeviceConfig
from ..smartprotocol import SmartProtocol, _ChildProtocolWrapper
from .smartdevice import SmartDevice
_LOGGER = logging.getLogger(__name__)
class SmartChildDevice(SmartDevice):
"""Presentation of a child device.
@ -16,23 +19,41 @@ class SmartChildDevice(SmartDevice):
def __init__(
self,
parent: SmartDevice,
child_id: str,
info,
component_info,
config: Optional[DeviceConfig] = None,
protocol: Optional[SmartProtocol] = None,
) -> None:
super().__init__(parent.host, config=parent.config, protocol=parent.protocol)
self._parent = parent
self._id = child_id
self.protocol = _ChildProtocolWrapper(child_id, parent.protocol)
self._device_type = DeviceType.StripSocket
self._update_internal_state(info)
self._components = component_info
self._id = info["device_id"]
self.protocol = _ChildProtocolWrapper(self._id, parent.protocol)
async def update(self, update_children: bool = True):
"""Noop update. The parent updates our internals."""
def update_internal_state(self, info):
"""Set internal state for the child."""
# TODO: cleanup the _last_update, _sys_info, _info, _data mess.
self._last_update = self._sys_info = self._info = info
@classmethod
async def create(cls, parent: SmartDevice, child_info, child_components):
"""Create a child device based on device info and component listing."""
child: "SmartChildDevice" = cls(parent, child_info, child_components)
await child._initialize_modules()
await child._initialize_features()
return child
@property
def device_type(self) -> DeviceType:
"""Return child device type."""
child_device_map = {
"plug.powerstrip.sub-plug": DeviceType.Plug,
"subg.trigger.temp-hmdt-sensor": DeviceType.Sensor,
}
dev_type = child_device_map.get(self.sys_info["category"])
if dev_type is None:
_LOGGER.warning("Unknown child device type, please open issue ")
dev_type = DeviceType.Unknown
return dev_type
def __repr__(self):
return f"<ChildDevice {self.alias} of {self._parent}>"

@ -2,7 +2,7 @@
import base64
import logging
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, cast
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, cast
from ..aestransport import AesTransport
from ..device import Device, WifiNetwork
@ -12,22 +12,12 @@ from ..emeterstatus import EmeterStatus
from ..exceptions import AuthenticationError, DeviceError, KasaException, SmartErrorCode
from ..feature import Feature, FeatureType
from ..smartprotocol import SmartProtocol
from .modules import ( # noqa: F401
AutoOffModule,
ChildDeviceModule,
CloudModule,
DeviceModule,
EnergyModule,
LedModule,
LightTransitionModule,
TimeModule,
)
from .smartmodule import SmartModule
from .modules import * # noqa: F403
_LOGGER = logging.getLogger(__name__)
if TYPE_CHECKING:
from .smartchilddevice import SmartChildDevice
from .smartmodule import SmartModule
class SmartDevice(Device):
@ -47,23 +37,34 @@ class SmartDevice(Device):
self.protocol: SmartProtocol
self._components_raw: Optional[Dict[str, Any]] = None
self._components: Dict[str, int] = {}
self._children: Dict[str, "SmartChildDevice"] = {}
self._state_information: Dict[str, Any] = {}
self.modules: Dict[str, SmartModule] = {}
self.modules: Dict[str, "SmartModule"] = {}
self._parent: Optional["SmartDevice"] = None
self._children: Mapping[str, "SmartDevice"] = {}
async def _initialize_children(self):
"""Initialize children for power strips."""
children = self._last_update["child_info"]["child_device_list"]
# TODO: Use the type information to construct children,
# as hubs can also have them.
children = self.internal_state["child_info"]["child_device_list"]
children_components = {
child["device_id"]: {
comp["id"]: int(comp["ver_code"]) for comp in child["component_list"]
}
for child in self.internal_state["get_child_device_component_list"][
"child_component_list"
]
}
from .smartchilddevice import SmartChildDevice
self._children = {
child["device_id"]: SmartChildDevice(
parent=self, child_id=child["device_id"]
child_info["device_id"]: await SmartChildDevice.create(
parent=self,
child_info=child_info,
child_components=children_components[child_info["device_id"]],
)
for child in children
for child_info in children
}
# TODO: if all are sockets, then we are a strip, and otherwise a hub?
# doesn't work for the walldimmer with fancontrol...
self._device_type = DeviceType.Strip
@property
@ -126,8 +127,10 @@ class SmartDevice(Device):
if not self.children:
await self._initialize_children()
# TODO: we don't currently perform queries on children based on modules,
# but just update the information that is returned in the main query.
for info in child_info["child_device_list"]:
self._children[info["device_id"]].update_internal_state(info)
self._children[info["device_id"]]._update_internal_state(info)
# We can first initialize the features after the first update.
# We make here an assumption that every device has at least a single feature.
@ -153,6 +156,7 @@ class SmartDevice(Device):
async def _initialize_features(self):
"""Initialize device features."""
self._add_feature(Feature(self, "Device ID", attribute_getter="device_id"))
if "device_on" in self._info:
self._add_feature(
Feature(
@ -164,25 +168,32 @@ class SmartDevice(Device):
)
)
self._add_feature(
Feature(
self,
"Signal Level",
attribute_getter=lambda x: x._info["signal_level"],
icon="mdi:signal",
if "signal_level" in self._info:
self._add_feature(
Feature(
self,
"Signal Level",
attribute_getter=lambda x: x._info["signal_level"],
icon="mdi:signal",
)
)
)
self._add_feature(
Feature(
self,
"RSSI",
attribute_getter=lambda x: x._info["rssi"],
icon="mdi:signal",
if "rssi" in self._info:
self._add_feature(
Feature(
self,
"RSSI",
attribute_getter=lambda x: x._info["rssi"],
icon="mdi:signal",
)
)
if "ssid" in self._info:
self._add_feature(
Feature(
device=self, name="SSID", attribute_getter="ssid", icon="mdi:wifi"
)
)
)
self._add_feature(
Feature(device=self, name="SSID", attribute_getter="ssid", icon="mdi:wifi")
)
if "overheated" in self._info:
self._add_feature(
@ -232,7 +243,12 @@ class SmartDevice(Device):
@property
def time(self) -> datetime:
"""Return the time."""
_timemod = cast(TimeModule, self.modules["TimeModule"])
# TODO: Default to parent's time module for child devices
if self._parent and "TimeModule" in self.modules:
_timemod = cast(TimeModule, self._parent.modules["TimeModule"]) # noqa: F405
else:
_timemod = cast(TimeModule, self.modules["TimeModule"]) # noqa: F405
return _timemod.time
@property
@ -284,6 +300,14 @@ class SmartDevice(Device):
"""Return all the internal state data."""
return self._last_update
def _update_internal_state(self, info):
"""Update internal state.
This is used by the parent to push updates to its children
"""
# TODO: cleanup the _last_update, _info mess.
self._last_update = self._info = info
async def _query_helper(
self, method: str, params: Optional[Dict] = None, child_ids=None
) -> Any:
@ -347,19 +371,19 @@ class SmartDevice(Device):
@property
def emeter_realtime(self) -> EmeterStatus:
"""Get the emeter status."""
energy = cast(EnergyModule, self.modules["EnergyModule"])
energy = cast(EnergyModule, self.modules["EnergyModule"]) # noqa: F405
return energy.emeter_realtime
@property
def emeter_this_month(self) -> Optional[float]:
"""Get the emeter value for this month."""
energy = cast(EnergyModule, self.modules["EnergyModule"])
energy = cast(EnergyModule, self.modules["EnergyModule"]) # noqa: F405
return energy.emeter_this_month
@property
def emeter_today(self) -> Optional[float]:
"""Get the emeter value for today."""
energy = cast(EnergyModule, self.modules["EnergyModule"])
energy = cast(EnergyModule, self.modules["EnergyModule"]) # noqa: F405
return energy.emeter_today
@property
@ -372,7 +396,7 @@ class SmartDevice(Device):
return None
on_time = cast(float, on_time)
if (timemod := self.modules.get("TimeModule")) is not None:
timemod = cast(TimeModule, timemod)
timemod = cast(TimeModule, timemod) # noqa: F405
return timemod.time - timedelta(seconds=on_time)
else: # We have no device time, use current local time.
return datetime.now().replace(microsecond=0) - timedelta(seconds=on_time)

@ -57,16 +57,25 @@ class SmartModule(Module):
"""
q = self.query()
q_keys = list(q.keys())
# TODO: hacky way to check if update has been called.
if q_keys[0] not in self._device._last_update:
raise KasaException(
f"You need to call update() prior accessing module data"
f" for '{self._module}'"
)
query_key = q_keys[0]
dev = self._device
# TODO: hacky way to check if update has been called.
# The way this falls back to parent may not always be wanted.
# Especially, devices can have their own firmware updates.
if query_key not in dev._last_update:
if dev._parent and query_key in dev._parent._last_update:
_LOGGER.debug("%s not found child, but found on parent", query_key)
dev = dev._parent
else:
raise KasaException(
f"You need to call update() prior accessing module data"
f" for '{self._module}'"
)
filtered_data = {k: v for k, v in dev._last_update.items() if k in q_keys}
filtered_data = {
k: v for k, v in self._device._last_update.items() if k in q_keys
}
if len(filtered_data) == 1:
return next(iter(filtered_data.values()))

@ -47,7 +47,6 @@ async def test_childdevice_properties(dev: SmartChildDevice):
assert len(dev.children) > 0
first = dev.children[0]
assert first.is_strip_socket
# children do not have children
assert not first.children

@ -19,6 +19,7 @@ from kasa.cli import (
alias,
brightness,
cli,
cmd_command,
emeter,
raw_command,
reboot,
@ -136,6 +137,32 @@ async def test_raw_command(dev, mocker):
assert "Usage" in res.output
async def test_command_with_child(dev, mocker):
"""Test 'command' command with --child."""
runner = CliRunner()
update_mock = mocker.patch.object(dev, "update")
dummy_child = mocker.create_autospec(IotDevice)
query_mock = mocker.patch.object(
dummy_child, "_query_helper", return_value={"dummy": "response"}
)
mocker.patch.object(dev, "_children", {"XYZ": dummy_child})
mocker.patch.object(dev, "get_child_device", return_value=dummy_child)
res = await runner.invoke(
cmd_command,
["--child", "XYZ", "command", "'params'"],
obj=dev,
catch_exceptions=False,
)
update_mock.assert_called()
query_mock.assert_called()
assert '{"dummy": "response"}' in res.output
assert res.exit_code == 0
@device_smart
async def test_reboot(dev, mocker):
"""Test that reboot works on SMART devices."""

@ -37,6 +37,7 @@ from .conftest import (
lightstrip,
no_emeter_iot,
plug,
strip,
turn_on,
)
from .fakeprotocol_iot import FakeIotProtocol
@ -201,13 +202,12 @@ async def test_representation(dev):
assert pattern.match(str(dev))
@device_iot
async def test_childrens(dev):
"""Make sure that children property is exposed by every device."""
if dev.is_strip:
assert len(dev.children) > 0
else:
assert len(dev.children) == 0
@strip
def test_children_api(dev):
"""Test the child device API."""
first = dev.children[0]
first_by_get_child_device = dev.get_child_device(first.device_id)
assert first == first_by_get_child_device
@device_iot
@ -215,10 +215,8 @@ async def test_children(dev):
"""Make sure that children property is exposed by every device."""
if dev.is_strip:
assert len(dev.children) > 0
assert dev.has_children is True
else:
assert len(dev.children) == 0
assert dev.has_children is False
@device_iot
@ -260,7 +258,9 @@ async def test_device_class_ctors(device_class_name_obj):
klass = device_class_name_obj[1]
if issubclass(klass, SmartChildDevice):
parent = SmartDevice(host, config=config)
dev = klass(parent, 1)
dev = klass(
parent, {"dummy": "info", "device_id": "dummy"}, {"dummy": "components"}
)
else:
dev = klass(host, config=config)
assert dev.host == host