mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-01-24 21:57:07 +00:00
Rename descriptor to feature
This commit is contained in:
parent
288b5cacce
commit
5dc190837c
@ -17,7 +17,6 @@ from warnings import warn
|
||||
|
||||
from kasa.bulb import Bulb
|
||||
from kasa.credentials import Credentials
|
||||
from kasa.descriptors import Descriptor, DescriptorCategory, DescriptorType
|
||||
from kasa.device import Device
|
||||
from kasa.device_type import DeviceType
|
||||
from kasa.deviceconfig import (
|
||||
@ -34,6 +33,7 @@ from kasa.exceptions import (
|
||||
TimeoutException,
|
||||
UnsupportedDeviceException,
|
||||
)
|
||||
from kasa.feature import Feature, FeatureCategory, FeatureType
|
||||
from kasa.iot.iotbulb import BulbPreset, TurnOnBehavior, TurnOnBehaviors
|
||||
from kasa.iotprotocol import (
|
||||
IotProtocol,
|
||||
@ -55,9 +55,9 @@ __all__ = [
|
||||
"TurnOnBehaviors",
|
||||
"TurnOnBehavior",
|
||||
"DeviceType",
|
||||
"Descriptor",
|
||||
"DescriptorType",
|
||||
"DescriptorCategory",
|
||||
"Feature",
|
||||
"FeatureType",
|
||||
"FeatureCategory",
|
||||
"EmeterStatus",
|
||||
"Device",
|
||||
"Bulb",
|
||||
|
29
kasa/cli.py
29
kasa/cli.py
@ -101,6 +101,7 @@ class ExceptionHandlerGroup(click.Group):
|
||||
asyncio.get_event_loop().run_until_complete(self.main(*args, **kwargs))
|
||||
except Exception as ex:
|
||||
echo(f"Got error: {ex!r}")
|
||||
raise
|
||||
|
||||
|
||||
def json_formatter_cb(result, **kwargs):
|
||||
@ -565,9 +566,9 @@ async def state(ctx, dev: Device):
|
||||
else:
|
||||
echo(f"\t{info_name}: {info_data}")
|
||||
|
||||
echo("\n\t[bold]== Descriptors == [/bold]")
|
||||
for id_, descriptor in dev.descriptors.items():
|
||||
echo(f"\t{descriptor.name} ({id_}): {descriptor.value}")
|
||||
echo("\n\t[bold]== Features == [/bold]")
|
||||
for id_, feature in dev.features.items():
|
||||
echo(f"\t{feature.name} ({id_}): {feature.value}")
|
||||
|
||||
if dev.has_emeter:
|
||||
echo("\n\t[bold]== Current State ==[/bold]")
|
||||
@ -585,8 +586,6 @@ async def state(ctx, dev: Device):
|
||||
echo("\n\t[bold]== Verbose information ==[/bold]")
|
||||
echo(f"\tCredentials hash: {dev.credentials_hash}")
|
||||
echo(f"\tDevice ID: {dev.device_id}")
|
||||
for feature in dev.features:
|
||||
echo(f"\tFeature: {feature}")
|
||||
echo()
|
||||
_echo_discovery_info(dev._discovery_info)
|
||||
return dev.internal_state
|
||||
@ -1106,11 +1105,11 @@ async def shell(dev: Device):
|
||||
loop.stop()
|
||||
|
||||
|
||||
@cli.command(name="descriptor")
|
||||
@cli.command(name="feature")
|
||||
@click.argument("name", required=False)
|
||||
@click.argument("value", required=False)
|
||||
@pass_dev
|
||||
async def descriptor(dev, name: str, value):
|
||||
async def feature(dev, name: str, value):
|
||||
"""Access and modify descriptor values.
|
||||
|
||||
If no *name* is given, lists available descriptors and their values.
|
||||
@ -1118,24 +1117,24 @@ async def descriptor(dev, name: str, value):
|
||||
If both *name* and *value* are set, the described setting is changed.
|
||||
"""
|
||||
if not name:
|
||||
echo("[bold]== Descriptors ==[/bold]")
|
||||
for name, desc in dev.descriptors.items():
|
||||
echo(f"{desc.name} ({name}): {desc.value}")
|
||||
echo("[bold]== Feature ==[/bold]")
|
||||
for name, feat in dev.features.items():
|
||||
echo(f"{feat.name} ({name}): {feat.value}")
|
||||
return
|
||||
|
||||
if name not in dev.descriptors:
|
||||
if name not in dev.features:
|
||||
echo(f"No descriptor by name {name}")
|
||||
return
|
||||
|
||||
desc = dev.descriptors[name]
|
||||
feat = dev.features[name]
|
||||
|
||||
if value is None:
|
||||
echo(f"{desc.name} ({name}): {desc.value}")
|
||||
return desc.value
|
||||
echo(f"{feat.name} ({name}): {feat.value}")
|
||||
return feat.value
|
||||
|
||||
echo(f"Setting {name} to {value}")
|
||||
value = ast.literal_eval(value)
|
||||
return await dev.descriptors[name].set_value(value)
|
||||
return await dev.features[name].set_value(value)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -3,14 +3,14 @@ import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional, Sequence, Set, Union
|
||||
from typing import Any, Dict, List, Optional, Sequence, Union
|
||||
|
||||
from .credentials import Credentials
|
||||
from .descriptors import Descriptor
|
||||
from .device_type import DeviceType
|
||||
from .deviceconfig import DeviceConfig
|
||||
from .emeterstatus import EmeterStatus
|
||||
from .exceptions import SmartDeviceException
|
||||
from .feature import Feature
|
||||
from .iotprotocol import IotProtocol
|
||||
from .protocol import BaseProtocol
|
||||
from .xortransport import XorTransport
|
||||
@ -70,7 +70,7 @@ class Device(ABC):
|
||||
self._discovery_info: Optional[Dict[str, Any]] = None
|
||||
|
||||
self.modules: Dict[str, Any] = {}
|
||||
self._descriptors: Dict[str, Descriptor] = {}
|
||||
self._features: Dict[str, Feature] = {}
|
||||
|
||||
@staticmethod
|
||||
async def connect(
|
||||
@ -298,9 +298,16 @@ class Device(ABC):
|
||||
"""Return the key state information."""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def features(self) -> Set[str]:
|
||||
def features(self) -> Dict[str, Feature]:
|
||||
"""Return the list of supported features."""
|
||||
return self._features
|
||||
|
||||
def add_feature(self, feature: Feature):
|
||||
"""Add a new feature to the device."""
|
||||
desc_name = feature.name.lower().replace(" ", "_")
|
||||
if desc_name in self._features:
|
||||
raise SmartDeviceException("Duplicate feature name %s" % desc_name)
|
||||
self._features[desc_name] = feature
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
@ -345,18 +352,6 @@ class Device(ABC):
|
||||
async def set_alias(self, alias: str):
|
||||
"""Set the device name (alias)."""
|
||||
|
||||
@property
|
||||
def descriptors(self) -> Dict[str, Descriptor]:
|
||||
"""Return the list of descriptors."""
|
||||
return self._descriptors
|
||||
|
||||
def add_descriptor(self, descriptor: "Descriptor"):
|
||||
"""Add a new descriptor to the device."""
|
||||
desc_name = descriptor.name.lower().replace(" ", "_")
|
||||
if desc_name in self._descriptors:
|
||||
raise SmartDeviceException("Duplicate descriptor name %s" % desc_name)
|
||||
self._descriptors[desc_name] = descriptor
|
||||
|
||||
def __repr__(self):
|
||||
if self._last_update is None:
|
||||
return f"<{self._device_type} at {self.host} - update() needed>"
|
||||
|
@ -1,10 +1,13 @@
|
||||
"""Generic interface for defining device features."""
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum, auto
|
||||
from typing import Any, Callable
|
||||
from typing import TYPE_CHECKING, Any, Callable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .device import Device
|
||||
|
||||
|
||||
class DescriptorCategory(Enum):
|
||||
class FeatureCategory(Enum):
|
||||
"""Descriptor category."""
|
||||
|
||||
# TODO: we could probably do better than using the scheme homeassistant is using
|
||||
@ -12,8 +15,8 @@ class DescriptorCategory(Enum):
|
||||
Diagnostic = auto()
|
||||
|
||||
|
||||
class DescriptorType(Enum):
|
||||
"""Type of the information defined by the descriptor."""
|
||||
class FeatureType(Enum):
|
||||
"""Type to help decide how to present the feature."""
|
||||
|
||||
Sensor = auto()
|
||||
BinarySensor = auto()
|
||||
@ -22,33 +25,39 @@ class DescriptorType(Enum):
|
||||
|
||||
|
||||
@dataclass
|
||||
class Descriptor:
|
||||
"""Descriptor defines a generic interface for device features."""
|
||||
class Feature:
|
||||
"""Feature defines a generic interface for device features."""
|
||||
|
||||
device: Any # TODO: rename to something else, this can also be a module.
|
||||
#: Device instance required for getting and setting values
|
||||
device: "Device"
|
||||
#: User-friendly short description
|
||||
name: str
|
||||
#: Name of the property that allows accessing the value
|
||||
attribute_getter: str | Callable
|
||||
#: Name of the method that allows changing the value
|
||||
attribute_setter: str | None = None
|
||||
#: Type of the information
|
||||
#: Container storing the data, this overrides 'device' for getters
|
||||
container: Any = None
|
||||
#: Icon suggestion
|
||||
icon: str | None = None
|
||||
#: Unit of the descriptor
|
||||
unit: str | None = None
|
||||
#: Hint for homeassistant
|
||||
#: TODO: Replace with a set of flags to allow homeassistant make its own decision?
|
||||
show_in_hass: bool = True
|
||||
category: DescriptorCategory = DescriptorCategory.Diagnostic
|
||||
type: DescriptorType = DescriptorType.Sensor
|
||||
category: FeatureCategory = FeatureCategory.Diagnostic
|
||||
type: FeatureType = FeatureType.Sensor
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
"""Return the current value."""
|
||||
container = self.container if self.container is not None else self.device
|
||||
if isinstance(self.attribute_getter, Callable):
|
||||
return self.attribute_getter(self.device)
|
||||
return getattr(self.device, self.attribute_getter)
|
||||
return self.attribute_getter(container)
|
||||
return getattr(container, self.attribute_getter)
|
||||
|
||||
async def set_value(self, value):
|
||||
"""Set the value."""
|
||||
if self.attribute_setter is None:
|
||||
raise ValueError("Tried to set read-only feature.")
|
||||
return await getattr(self.device, self.attribute_setter)(value)
|
@ -18,11 +18,11 @@ import logging
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Dict, List, Optional, Sequence, Set
|
||||
|
||||
from ..descriptors import Descriptor
|
||||
from ..device import Device, WifiNetwork
|
||||
from ..deviceconfig import DeviceConfig
|
||||
from ..emeterstatus import EmeterStatus
|
||||
from ..exceptions import SmartDeviceException
|
||||
from ..feature import Feature
|
||||
from ..protocol import BaseProtocol
|
||||
from .modules import Emeter, IotModule
|
||||
|
||||
@ -185,9 +185,9 @@ class IotDevice(Device):
|
||||
super().__init__(host=host, config=config, protocol=protocol)
|
||||
|
||||
self._sys_info: Any = None # TODO: this is here to avoid changing tests
|
||||
self._features: Set[str] = set()
|
||||
self._children: Sequence["IotDevice"] = []
|
||||
self._supported_modules: Optional[Dict[str, IotModule]] = None
|
||||
self._legacy_features: Set[str] = set()
|
||||
|
||||
@property
|
||||
def children(self) -> Sequence["IotDevice"]:
|
||||
@ -262,7 +262,7 @@ class IotDevice(Device):
|
||||
|
||||
@property # type: ignore
|
||||
@requires_update
|
||||
def features(self) -> Set[str]:
|
||||
def features(self) -> Dict[str, Feature]:
|
||||
"""Return a set of features that the device supports."""
|
||||
return self._features
|
||||
|
||||
@ -278,7 +278,7 @@ class IotDevice(Device):
|
||||
@requires_update
|
||||
def has_emeter(self) -> bool:
|
||||
"""Return True if device has an energy meter."""
|
||||
return "ENE" in self.features
|
||||
return "ENE" in self._legacy_features
|
||||
|
||||
async def get_sys_info(self) -> Dict[str, Any]:
|
||||
"""Retrieve system information."""
|
||||
@ -301,26 +301,26 @@ class IotDevice(Device):
|
||||
self._last_update = response
|
||||
self._set_sys_info(response["system"]["get_sysinfo"])
|
||||
|
||||
if not self._descriptors:
|
||||
if not self._features:
|
||||
await self._initialize_descriptors()
|
||||
|
||||
await self._modular_update(req)
|
||||
self._set_sys_info(self._last_update["system"]["get_sysinfo"])
|
||||
|
||||
async def _initialize_descriptors(self):
|
||||
self.add_descriptor(
|
||||
Descriptor(
|
||||
self.add_feature(
|
||||
Feature(
|
||||
device=self, name="RSSI", attribute_getter="rssi", icon="mdi:signal"
|
||||
)
|
||||
)
|
||||
self.add_descriptor(
|
||||
Descriptor(
|
||||
self.add_feature(
|
||||
Feature(
|
||||
device=self, name="Time", attribute_getter="time", show_in_hass=False
|
||||
)
|
||||
)
|
||||
if "on_time" in self._sys_info:
|
||||
self.add_descriptor(
|
||||
Descriptor(
|
||||
self.add_feature(
|
||||
Feature(
|
||||
device=self,
|
||||
name="On since",
|
||||
attribute_getter="on_since",
|
||||
@ -343,8 +343,8 @@ class IotDevice(Device):
|
||||
for module in self.modules.values():
|
||||
if module.is_supported:
|
||||
supported[module._module] = module
|
||||
for _, module_desc in module._module_descriptors.items():
|
||||
self.add_descriptor(module_desc)
|
||||
for module_feat in module._module_features.values():
|
||||
self.add_feature(module_feat)
|
||||
|
||||
self._supported_modules = supported
|
||||
|
||||
@ -395,9 +395,7 @@ class IotDevice(Device):
|
||||
"""Set sys_info."""
|
||||
self._sys_info = sys_info
|
||||
if features := sys_info.get("feature"):
|
||||
self._features = _parse_features(features)
|
||||
else:
|
||||
self._features = set()
|
||||
self._legacy_features = _parse_features(features)
|
||||
|
||||
@property # type: ignore
|
||||
@requires_update
|
||||
|
@ -2,9 +2,9 @@
|
||||
import logging
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from ..descriptors import Descriptor, DescriptorCategory, DescriptorType
|
||||
from ..device_type import DeviceType
|
||||
from ..deviceconfig import DeviceConfig
|
||||
from ..feature import Feature, FeatureCategory, FeatureType
|
||||
from ..protocol import BaseProtocol
|
||||
from .iotdevice import IotDevice, requires_update
|
||||
from .modules import Antitheft, Cloud, Schedule, Time, Usage
|
||||
@ -57,15 +57,15 @@ class IotPlug(IotDevice):
|
||||
self.add_module("time", Time(self, "time"))
|
||||
self.add_module("cloud", Cloud(self, "cnCloud"))
|
||||
|
||||
self.add_descriptor(
|
||||
Descriptor(
|
||||
self.add_feature(
|
||||
Feature(
|
||||
device=self,
|
||||
name="LED",
|
||||
icon="mdi:led-{state}",
|
||||
attribute_getter="led",
|
||||
attribute_setter="set_led",
|
||||
category=DescriptorCategory.Config,
|
||||
type=DescriptorType.Switch,
|
||||
category=FeatureCategory.Config,
|
||||
type=FeatureType.Switch,
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -4,7 +4,7 @@ try:
|
||||
except ImportError:
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ...descriptors import Descriptor, DescriptorType
|
||||
from ...feature import Feature, FeatureType
|
||||
from .module import IotModule
|
||||
|
||||
|
||||
@ -28,13 +28,14 @@ class Cloud(IotModule):
|
||||
|
||||
def __init__(self, device, module):
|
||||
super().__init__(device, module)
|
||||
self.add_descriptor(
|
||||
Descriptor(
|
||||
device=self,
|
||||
name="Cloud Connection",
|
||||
self.add_feature(
|
||||
Feature(
|
||||
device=device,
|
||||
container=self,
|
||||
name="Cloud connection",
|
||||
icon="mdi:cloud",
|
||||
attribute_getter="is_connected",
|
||||
type=DescriptorType.BinarySensor,
|
||||
type=FeatureType.BinarySensor,
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -4,8 +4,8 @@ import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import TYPE_CHECKING, Dict
|
||||
|
||||
from ...descriptors import Descriptor
|
||||
from ...exceptions import SmartDeviceException
|
||||
from ...feature import Feature
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from kasa.iot import IotDevice
|
||||
@ -35,14 +35,14 @@ class IotModule(ABC):
|
||||
def __init__(self, device: "IotDevice", module: str):
|
||||
self._device = device
|
||||
self._module = module
|
||||
self._module_descriptors: Dict[str, Descriptor] = {}
|
||||
self._module_features: Dict[str, Feature] = {}
|
||||
|
||||
def add_descriptor(self, desc):
|
||||
def add_feature(self, feature: Feature):
|
||||
"""Add module descriptor."""
|
||||
module_desc_name = f"{self._module}_{desc.name}"
|
||||
if module_desc_name in self._module_descriptors:
|
||||
raise Exception("Duplicate name detected %s" % module_desc_name)
|
||||
self._module_descriptors[module_desc_name] = desc
|
||||
feature_name = f"{self._module}_{feature.name}"
|
||||
if feature_name in self._module_features:
|
||||
raise SmartDeviceException("Duplicate name detected %s" % feature_name)
|
||||
self._module_features[feature_name] = feature
|
||||
|
||||
@abstractmethod
|
||||
def query(self):
|
||||
|
@ -2,15 +2,15 @@
|
||||
import base64
|
||||
import logging
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Set, cast
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, cast
|
||||
|
||||
from ..aestransport import AesTransport
|
||||
from ..descriptors import Descriptor, DescriptorType
|
||||
from ..device import Device, WifiNetwork
|
||||
from ..device_type import DeviceType
|
||||
from ..deviceconfig import DeviceConfig
|
||||
from ..emeterstatus import EmeterStatus
|
||||
from ..exceptions import AuthenticationException, SmartDeviceException
|
||||
from ..feature import Feature, FeatureType
|
||||
from ..smartprotocol import SmartProtocol
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -121,7 +121,7 @@ class SmartDevice(Device):
|
||||
|
||||
# We can first initialize the descriptors after the first update.
|
||||
# We make here an assumption that every device has at least a single descriptor.
|
||||
if not self._descriptors:
|
||||
if not self._features:
|
||||
await self._initialize_descriptors()
|
||||
|
||||
_LOGGER.debug("Got an update: %s", self._last_update)
|
||||
@ -133,49 +133,47 @@ class SmartDevice(Device):
|
||||
|
||||
async def _initialize_descriptors(self):
|
||||
"""Initialize device descriptors."""
|
||||
self.add_descriptor(
|
||||
Descriptor(
|
||||
self.add_feature(
|
||||
Feature(
|
||||
self,
|
||||
"Signal Level",
|
||||
attribute_getter=lambda x: x._info["signal_level"],
|
||||
icon="mdi:signal",
|
||||
)
|
||||
)
|
||||
self.add_descriptor(
|
||||
Descriptor(
|
||||
self.add_feature(
|
||||
Feature(
|
||||
self,
|
||||
"RSSI",
|
||||
attribute_getter=lambda x: x._info["rssi"],
|
||||
icon="mdi:signal",
|
||||
)
|
||||
)
|
||||
self.add_descriptor(
|
||||
Descriptor(
|
||||
self.add_feature(
|
||||
Feature(
|
||||
device=self, name="Time", attribute_getter="time", show_in_hass=False
|
||||
)
|
||||
)
|
||||
self.add_descriptor(
|
||||
Descriptor(
|
||||
device=self, name="SSID", attribute_getter="ssid", icon="mdi:wifi"
|
||||
)
|
||||
self.add_feature(
|
||||
Feature(device=self, name="SSID", attribute_getter="ssid", icon="mdi:wifi")
|
||||
)
|
||||
|
||||
if "overheated" in self._info:
|
||||
self.add_descriptor(
|
||||
Descriptor(
|
||||
self.add_feature(
|
||||
Feature(
|
||||
self,
|
||||
"Overheated",
|
||||
attribute_getter=lambda x: x._info["overheated"],
|
||||
icon="mdi:heat-wave",
|
||||
type=DescriptorType.BinarySensor,
|
||||
type=FeatureType.BinarySensor,
|
||||
)
|
||||
)
|
||||
|
||||
# We check for the key available, and not for the property truthiness,
|
||||
# as the value is falsy when the device is off.
|
||||
if "on_time" in self._info:
|
||||
self.add_descriptor(
|
||||
Descriptor(
|
||||
self.add_feature(
|
||||
Feature(
|
||||
device=self,
|
||||
name="On since",
|
||||
attribute_getter="on_since",
|
||||
@ -288,12 +286,6 @@ class SmartDevice(Device):
|
||||
"SSID": self.ssid,
|
||||
}
|
||||
|
||||
@property
|
||||
def features(self) -> Set[str]:
|
||||
"""Return the list of supported features."""
|
||||
# TODO:
|
||||
return set()
|
||||
|
||||
@property
|
||||
def has_emeter(self) -> bool:
|
||||
"""Return if the device has emeter."""
|
||||
|
Loading…
Reference in New Issue
Block a user