Add generic interface (descriptors) for obtaining device features

This commit is contained in:
Teemu Rytilahti 2024-02-03 19:42:08 +01:00
parent 458949157a
commit f0f1e478c4
9 changed files with 245 additions and 6 deletions

View File

@ -17,6 +17,7 @@ from warnings import warn
from kasa.bulb import Bulb
from kasa.credentials import Credentials
from kasa.descriptors import Descriptor, DescriptorCategory, DescriptorType
from kasa.device import Device
from kasa.device_type import DeviceType
from kasa.deviceconfig import (
@ -54,6 +55,9 @@ __all__ = [
"TurnOnBehaviors",
"TurnOnBehavior",
"DeviceType",
"Descriptor",
"DescriptorType",
"DescriptorCategory",
"EmeterStatus",
"Device",
"Bulb",

View File

@ -565,6 +565,10 @@ async def state(ctx, dev: Device):
else:
echo(f"\t{info_name}: {info_data}")
echo("\n\t[bold]== Descriptors == [/bold]")
for id_, descriptor in dev.descriptors.items():
echo(f"\t{descriptor.name} ({id_}): {descriptor.value}")
if dev.has_emeter:
echo("\n\t[bold]== Current State ==[/bold]")
emeter_status = dev.emeter_realtime
@ -1102,5 +1106,37 @@ async def shell(dev: Device):
loop.stop()
@cli.command(name="descriptor")
@click.argument("name", required=False)
@click.argument("value", required=False)
@pass_dev
async def descriptor(dev, name: str, value):
"""Access and modify descriptor values.
If no *name* is given, lists available descriptors and their values.
If only *name* is given, the value of named descriptor is returned.
If both *name* and *value* are set, the described setting is changed.
"""
if not name:
echo("[bold]== Descriptors ==[/bold]")
for name, desc in dev.descriptors.items():
echo(f"{desc.name} ({name}): {desc.value}")
return
if name not in dev.descriptors:
echo(f"No descriptor by name {name}")
return
desc = dev.descriptors[name]
if value is None:
echo(f"{desc.name} ({name}): {desc.value}")
return desc.value
echo(f"Setting {name} to {value}")
value = ast.literal_eval(value)
return await dev.descriptors[name].set_value(value)
if __name__ == "__main__":
cli()

54
kasa/descriptors.py Normal file
View File

@ -0,0 +1,54 @@
"""Generic interface for defining device features."""
from dataclasses import dataclass
from enum import Enum, auto
from typing import Any, Callable
class DescriptorCategory(Enum):
"""Descriptor category."""
# TODO: we could probably do better than using the scheme homeassistant is using
Config = auto()
Diagnostic = auto()
class DescriptorType(Enum):
"""Type of the information defined by the descriptor."""
Sensor = auto()
BinarySensor = auto()
Switch = auto()
Button = auto()
@dataclass
class Descriptor:
"""Descriptor defines a generic interface for device features."""
device: Any # TODO: rename to something else, this can also be a module.
#: User-friendly short description
name: str
#: Name of the property that allows accessing the value
attribute_getter: str | Callable
#: Name of the method that allows changing the value
attribute_setter: str | None = None
#: Type of the information
icon: str | None = None
#: Unit of the descriptor
unit: str | None = None
#: Hint for homeassistant
#: TODO: Replace with a set of flags to allow homeassistant make its own decision?
show_in_hass: bool = True
category: DescriptorCategory = DescriptorCategory.Diagnostic
type: DescriptorType = DescriptorType.Sensor
@property
def value(self):
"""Return the current value."""
if isinstance(self.attribute_getter, Callable):
return self.attribute_getter(self.device)
return getattr(self.device, self.attribute_getter)
async def set_value(self, value):
"""Set the value."""
return await getattr(self.device, self.attribute_setter)(value)

View File

@ -6,6 +6,7 @@ from datetime import datetime
from typing import Any, Dict, List, Optional, Sequence, Set, Union
from .credentials import Credentials
from .descriptors import Descriptor
from .device_type import DeviceType
from .deviceconfig import DeviceConfig
from .emeterstatus import EmeterStatus
@ -69,6 +70,7 @@ class Device(ABC):
self._discovery_info: Optional[Dict[str, Any]] = None
self.modules: Dict[str, Any] = {}
self._descriptors: Dict[str, Descriptor] = {}
@staticmethod
async def connect(
@ -343,6 +345,18 @@ class Device(ABC):
async def set_alias(self, alias: str):
"""Set the device name (alias)."""
@property
def descriptors(self) -> Dict[str, Descriptor]:
"""Return the list of descriptors."""
return self._descriptors
def add_descriptor(self, descriptor: "Descriptor"):
"""Add a new descriptor to the device."""
desc_name = descriptor.name.lower().replace(" ", "_")
if desc_name in self._descriptors:
raise SmartDeviceException("Duplicate descriptor name %s" % desc_name)
self._descriptors[desc_name] = descriptor
def __repr__(self):
if self._last_update is None:
return f"<{self._device_type} at {self.host} - update() needed>"

View File

@ -18,6 +18,7 @@ import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Sequence, Set
from ..descriptors import Descriptor
from ..device import Device, WifiNetwork
from ..deviceconfig import DeviceConfig
from ..emeterstatus import EmeterStatus
@ -186,6 +187,7 @@ class IotDevice(Device):
self._sys_info: Any = None # TODO: this is here to avoid changing tests
self._features: Set[str] = set()
self._children: Sequence["IotDevice"] = []
self._supported_modules: Optional[Dict[str, IotModule]] = None
@property
def children(self) -> Sequence["IotDevice"]:
@ -299,9 +301,33 @@ class IotDevice(Device):
self._last_update = response
self._set_sys_info(response["system"]["get_sysinfo"])
if not self._descriptors:
await self._initialize_descriptors()
await self._modular_update(req)
self._set_sys_info(self._last_update["system"]["get_sysinfo"])
async def _initialize_descriptors(self):
self.add_descriptor(
Descriptor(
device=self, name="RSSI", attribute_getter="rssi", icon="mdi:signal"
)
)
self.add_descriptor(
Descriptor(
device=self, name="Time", attribute_getter="time", show_in_hass=False
)
)
if "on_time" in self._sys_info:
self.add_descriptor(
Descriptor(
device=self,
name="On since",
attribute_getter="on_since",
icon="mdi:clock",
)
)
async def _modular_update(self, req: dict) -> None:
"""Execute an update query."""
if self.has_emeter:
@ -310,6 +336,18 @@ class IotDevice(Device):
)
self.add_module("emeter", Emeter(self, self.emeter_type))
# TODO: perhaps modules should not have unsupported modules,
# making separate handling for this unnecessary
if self._supported_modules is None:
supported = {}
for module in self.modules.values():
if module.is_supported:
supported[module._module] = module
for _, module_desc in module._module_descriptors.items():
self.add_descriptor(module_desc)
self._supported_modules = supported
request_list = []
est_response_size = 1024 if "system" in req else 0
for module in self.modules.values():

View File

@ -2,6 +2,7 @@
import logging
from typing import Any, Dict, Optional
from ..descriptors import Descriptor, DescriptorCategory, DescriptorType
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..protocol import BaseProtocol
@ -56,6 +57,18 @@ class IotPlug(IotDevice):
self.add_module("time", Time(self, "time"))
self.add_module("cloud", Cloud(self, "cnCloud"))
self.add_descriptor(
Descriptor(
device=self,
name="LED",
icon="mdi:led-{state}",
attribute_getter="led",
attribute_setter="set_led",
category=DescriptorCategory.Config,
type=DescriptorType.Switch,
)
)
@property # type: ignore
@requires_update
def is_on(self) -> bool:
@ -88,5 +101,4 @@ class IotPlug(IotDevice):
@requires_update
def state_information(self) -> Dict[str, Any]:
"""Return switch-specific state information."""
info = {"LED state": self.led, "On since": self.on_since}
return info
return {}

View File

@ -4,6 +4,7 @@ try:
except ImportError:
from pydantic import BaseModel
from ...descriptors import Descriptor, DescriptorType
from .module import IotModule
@ -25,6 +26,23 @@ class CloudInfo(BaseModel):
class Cloud(IotModule):
"""Module implementing support for cloud services."""
def __init__(self, device, module):
super().__init__(device, module)
self.add_descriptor(
Descriptor(
device=self,
name="Cloud Connection",
icon="mdi:cloud",
attribute_getter="is_connected",
type=DescriptorType.BinarySensor,
)
)
@property
def is_connected(self) -> bool:
"""Return true if device is connected to the cloud."""
return self.info.binded
def query(self):
"""Request cloud connectivity info."""
return self.query_for_command("get_info")

View File

@ -2,8 +2,9 @@
import collections
import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Dict
from ...descriptors import Descriptor
from ...exceptions import SmartDeviceException
if TYPE_CHECKING:
@ -34,6 +35,14 @@ class IotModule(ABC):
def __init__(self, device: "IotDevice", module: str):
self._device = device
self._module = module
self._module_descriptors: Dict[str, Descriptor] = {}
def add_descriptor(self, desc):
"""Add module descriptor."""
module_desc_name = f"{self._module}_{desc.name}"
if module_desc_name in self._module_descriptors:
raise Exception("Duplicate name detected %s" % module_desc_name)
self._module_descriptors[module_desc_name] = desc
@abstractmethod
def query(self):

View File

@ -5,6 +5,7 @@ from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Set, cast
from ..aestransport import AesTransport
from ..descriptors import Descriptor, DescriptorType
from ..device import Device, WifiNetwork
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
@ -118,6 +119,11 @@ class SmartDevice(Device):
for info in child_info["child_device_list"]:
self._children[info["device_id"]].update_internal_state(info)
# We can first initialize the descriptors after the first update.
# We make here an assumption that every device has at least a single descriptor.
if not self._descriptors:
await self._initialize_descriptors()
_LOGGER.debug("Got an update: %s", self._last_update)
async def _initialize_modules(self):
@ -125,6 +131,50 @@ class SmartDevice(Device):
if "energy_monitoring" in self._components:
self.emeter_type = "emeter"
async def _initialize_descriptors(self):
"""Initialize device descriptors."""
self.add_descriptor(
Descriptor(
self,
"Signal Level",
attribute_getter=lambda x: x._info["signal_level"],
icon="mdi:signal",
)
)
self.add_descriptor(
Descriptor(
device=self, name="Time", attribute_getter="time", show_in_hass=False
)
)
self.add_descriptor(
Descriptor(
device=self, name="SSID", attribute_getter="ssid", icon="mdi:wifi"
)
)
if "overheated" in self._info:
self.add_descriptor(
Descriptor(
self,
"Overheated",
attribute_getter=lambda x: x._info["overheated"],
icon="mdi:heat-wave",
type=DescriptorType.BinarySensor,
)
)
# We check for the key available, and not for the property truthiness,
# as the value is falsy when the device is off.
if "on_since" in self._info:
self.add_descriptor(
Descriptor(
device=self,
name="On since",
attribute_getter="on_since",
icon="mdi:clock",
)
)
@property
def sys_info(self) -> Dict[str, Any]:
"""Returns the device info."""
@ -215,15 +265,19 @@ class SmartDevice(Device):
return res
@property
def state_information(self) -> Dict[str, Any]:
"""Return the key state information."""
def ssid(self) -> str:
"""Return ssid of the connected wifi ap."""
ssid = self._info.get("ssid")
ssid = base64.b64decode(ssid).decode() if ssid else "No SSID"
return ssid
@property
def state_information(self) -> Dict[str, Any]:
"""Return the key state information."""
return {
"overheated": self._info.get("overheated"),
"signal_level": self._info.get("signal_level"),
"SSID": ssid,
"SSID": self.ssid,
}
@property