Generalize smartdevice child support

* Initialize children's modules (and features) using the child component negotiation results
* Set device_type based on the device response
* Print out child features in cli 'state'
* Add --child option to cli 'command' to allow targeting child devices
* Guard "generic" features like rssi, ssid, etc. only to devices which have this information
This commit is contained in:
Teemu Rytilahti 2024-02-20 00:26:47 +01:00
parent efb4a0f31f
commit bced5e40c5
6 changed files with 124 additions and 51 deletions

View File

@ -558,9 +558,14 @@ async def state(ctx, dev: Device):
echo(f"\tPort: {dev.port}")
echo(f"\tDevice state: {dev.is_on}")
if dev.is_strip:
echo("\t[bold]== Plugs ==[/bold]")
for plug in dev.children: # type: ignore
echo(f"\t* Socket '{plug.alias}' state: {plug.is_on} since {plug.on_since}")
echo("\t[bold]== Children ==[/bold]")
for child in dev.children.values():
echo(f"\t* {child.alias} ({child.model}, {child.device_type})")
for feat in child.features.values():
try:
echo(f"\t\t{feat.name}: {feat.value}")
except Exception as ex:
echo(f"\t\t{feat.name}: got exception (%s)" % ex)
echo()
echo("\t[bold]== Generic information ==[/bold]")
@ -641,13 +646,20 @@ async def raw_command(ctx, dev: Device, module, command, parameters):
@cli.command(name="command")
@pass_dev
@click.option("--module", required=False, help="Module for IOT protocol.")
@click.option("--child", required=False, help="Child ID for controlling sub-devices")
@click.argument("command")
@click.argument("parameters", default=None, required=False)
async def cmd_command(dev: Device, module, command, parameters):
async def cmd_command(dev: Device, module, child, command, parameters):
"""Run a raw command on the device."""
if parameters is not None:
parameters = ast.literal_eval(parameters)
if child:
echo(f"Selecting child {child} from {dev}")
# TODO: Update required to initialize children
await dev.update()
dev = dev._children[child]
if isinstance(dev, IotDevice):
res = await dev._query_helper(module, command, parameters)
elif isinstance(dev, SmartDevice):

View File

@ -14,6 +14,7 @@ class DeviceType(Enum):
StripSocket = "stripsocket"
Dimmer = "dimmer"
LightStrip = "lightstrip"
Sensor = "sensor"
Unknown = "unknown"
@staticmethod

View File

@ -1,4 +1,6 @@
"""Implementation for child devices."""
from typing import Dict
from ..smartmodule import SmartModule
@ -6,4 +8,12 @@ class ChildDeviceModule(SmartModule):
"""Implementation for child devices."""
REQUIRED_COMPONENT = "child_device"
QUERY_GETTER_NAME = "get_child_device_list"
def query(self) -> Dict:
"""Query to execute during the update cycle."""
# TODO: There is no need to fetch the component list every time,
# so this should be optimized only for the init.
return {
"get_child_device_list": None,
"get_child_device_component_list": None,
}

View File

@ -1,4 +1,5 @@
"""Child device implementation."""
import logging
from typing import Optional
from ..device_type import DeviceType
@ -6,6 +7,8 @@ from ..deviceconfig import DeviceConfig
from ..smartprotocol import SmartProtocol, _ChildProtocolWrapper
from .smartdevice import SmartDevice
_LOGGER = logging.getLogger(__name__)
class SmartChildDevice(SmartDevice):
"""Presentation of a child device.
@ -16,23 +19,41 @@ class SmartChildDevice(SmartDevice):
def __init__(
self,
parent: SmartDevice,
child_id: str,
info,
component_info,
config: Optional[DeviceConfig] = None,
protocol: Optional[SmartProtocol] = None,
) -> None:
super().__init__(parent.host, config=parent.config, protocol=parent.protocol)
self._parent = parent
self._id = child_id
self.protocol = _ChildProtocolWrapper(child_id, parent.protocol)
self._device_type = DeviceType.StripSocket
self._update_internal_state(info)
self._components = component_info
self._id = info["device_id"]
self.protocol = _ChildProtocolWrapper(self._id, parent.protocol)
async def update(self, update_children: bool = True):
"""Noop update. The parent updates our internals."""
def update_internal_state(self, info):
"""Set internal state for the child."""
# TODO: cleanup the _last_update, _sys_info, _info, _data mess.
self._last_update = self._sys_info = self._info = info
@classmethod
async def create(cls, parent: SmartDevice, child_info, child_components):
"""Create a child device based on device info and component listing."""
child: "SmartChildDevice" = cls(parent, child_info, child_components)
await child._initialize_modules()
await child._initialize_features()
return child
@property
def device_type(self) -> DeviceType:
"""Return child device type."""
child_device_map = {
"plug.powerstrip.sub-plug": DeviceType.Plug,
"subg.trigger.temp-hmdt-sensor": DeviceType.Sensor,
}
dev_type = child_device_map.get(self.sys_info["category"])
if dev_type is None:
_LOGGER.warning("Unknown child device type, please open issue ")
dev_type = DeviceType.Unknown
return dev_type
def __repr__(self):
return f"<ChildDevice {self.alias} of {self._parent}>"

View File

@ -12,22 +12,16 @@ from ..emeterstatus import EmeterStatus
from ..exceptions import AuthenticationException, SmartDeviceException, SmartErrorCode
from ..feature import Feature, FeatureType
from ..smartprotocol import SmartProtocol
from .modules import ( # noqa: F401
from .modules import * # noqa: F403
AutoOffModule,
ChildDeviceModule,
CloudModule,
DeviceModule,
EnergyModule,
LedModule,
LightTransitionModule,
TimeModule,
)
from .smartmodule import SmartModule
_LOGGER = logging.getLogger(__name__)
if TYPE_CHECKING:
from .smartchilddevice import SmartChildDevice
from .smartmodule import SmartModule
class SmartDevice(Device):
@ -49,21 +43,32 @@ class SmartDevice(Device):
self._components: Dict[str, int] = {}
self._children: Dict[str, "SmartChildDevice"] = {}
self._state_information: Dict[str, Any] = {}
self.modules: Dict[str, SmartModule] = {}
self.modules: Dict[str, "SmartModule"] = {}
self._parent: Optional["SmartDevice"] = None
async def _initialize_children(self):
"""Initialize children for power strips."""
children = self._last_update["child_info"]["child_device_list"]
# TODO: Use the type information to construct children,
# as hubs can also have them.
children = self.internal_state["child_info"]["child_device_list"]
children_components = {
child["device_id"]: {
comp["id"]: int(comp["ver_code"]) for comp in child["component_list"]
}
for child in self.internal_state["get_child_device_component_list"][
"child_component_list"
]
}
from .smartchilddevice import SmartChildDevice
self._children = {
child["device_id"]: SmartChildDevice(
parent=self, child_id=child["device_id"]
child_info["device_id"]: await SmartChildDevice.create(
parent=self,
child_info=child_info,
child_components=children_components[child_info["device_id"]],
)
for child in children
for child_info in children
}
# TODO: if all are sockets, then we are a strip, and otherwise a hub?
# doesn't work for the walldimmer with fancontrol...
self._device_type = DeviceType.Strip
@property
@ -153,6 +158,7 @@ class SmartDevice(Device):
async def _initialize_features(self):
"""Initialize device features."""
self._add_feature(Feature(self, "Device ID", attribute_getter="device_id"))
if "device_on" in self._info:
self._add_feature(
Feature(
@ -164,6 +170,8 @@ class SmartDevice(Device):
)
)
if "signal_level" in self._info:
self._add_feature(
Feature(
self,
@ -172,6 +180,8 @@ class SmartDevice(Device):
icon="mdi:signal",
)
)
if "rssi" in self._info:
self._add_feature(
Feature(
self,
@ -180,8 +190,12 @@ class SmartDevice(Device):
icon="mdi:signal",
)
)
if "ssid" in self._info:
self._add_feature(
Feature(device=self, name="SSID", attribute_getter="ssid", icon="mdi:wifi")
Feature(
device=self, name="SSID", attribute_getter="ssid", icon="mdi:wifi"
)
)
if "overheated" in self._info:
@ -232,7 +246,12 @@ class SmartDevice(Device):
@property
def time(self) -> datetime:
"""Return the time."""
_timemod = cast(TimeModule, self.modules["TimeModule"])
# TODO: Default to parent's time module for child devices
if self._parent and "TimeModule" in self.modules:
_timemod = cast(TimeModule, self._parent.modules["TimeModule"]) # noqa: F405
else:
_timemod = cast(TimeModule, self.modules["TimeModule"]) # noqa: F405
return _timemod.time
@property
@ -284,6 +303,14 @@ class SmartDevice(Device):
"""Return all the internal state data."""
return self._last_update
def _update_internal_state(self, info):
"""Update internal state.
This is used by the parent to push updates to its children
"""
# TODO: cleanup the _last_update, _info mess.
self._last_update = self._info = info
async def _query_helper(
self, method: str, params: Optional[Dict] = None, child_ids=None
) -> Any:
@ -347,19 +374,19 @@ class SmartDevice(Device):
@property
def emeter_realtime(self) -> EmeterStatus:
"""Get the emeter status."""
energy = cast(EnergyModule, self.modules["EnergyModule"])
energy = cast(EnergyModule, self.modules["EnergyModule"]) # noqa: F405
return energy.emeter_realtime
@property
def emeter_this_month(self) -> Optional[float]:
"""Get the emeter value for this month."""
energy = cast(EnergyModule, self.modules["EnergyModule"])
energy = cast(EnergyModule, self.modules["EnergyModule"]) # noqa: F405
return energy.emeter_this_month
@property
def emeter_today(self) -> Optional[float]:
"""Get the emeter value for today."""
energy = cast(EnergyModule, self.modules["EnergyModule"])
energy = cast(EnergyModule, self.modules["EnergyModule"]) # noqa: F405
return energy.emeter_today
@property
@ -372,7 +399,7 @@ class SmartDevice(Device):
return None
on_time = cast(float, on_time)
if (timemod := self.modules.get("TimeModule")) is not None:
timemod = cast(TimeModule, timemod)
timemod = cast(TimeModule, timemod) # noqa: F405
return timemod.time - timedelta(seconds=on_time)
else: # We have no device time, use current local time.
return datetime.now().replace(microsecond=0) - timedelta(seconds=on_time)

View File

@ -255,7 +255,9 @@ async def test_device_class_ctors(device_class_name_obj):
klass = device_class_name_obj[1]
if issubclass(klass, SmartChildDevice):
parent = SmartDevice(host, config=config)
dev = klass(parent, 1)
dev = klass(
parent, {"dummy": "info", "device_id": "dummy"}, {"dummy": "components"}
)
else:
dev = klass(host, config=config)
assert dev.host == host