Add ADC Value to PIR Enabled Switches

- Add: ADC value reporting to PIR enabled switches, so that it may be used in polling automations.
- Add: ADC trigger state value reporting, for simpler automations.
- Add: Ability for features to use custom value parsers.
This commit is contained in:
Ryan Nitcher 2024-11-14 12:09:56 -07:00
parent cb89342be1
commit 84aa74546e
5 changed files with 232 additions and 21 deletions

View File

@ -133,7 +133,7 @@ async def feature(
echo(f"{feat.name} ({name}): {feat.value}{unit}") echo(f"{feat.name} ({name}): {feat.value}{unit}")
return feat.value return feat.value
value = ast.literal_eval(value) value = feat.parse_value(value, ast.literal_eval)
echo(f"Changing {name} from {feat.value} to {value}") echo(f"Changing {name} from {feat.value} to {value}")
response = await dev.features[name].set_value(value) response = await dev.features[name].set_value(value)
await dev.update() await dev.update()

View File

@ -163,6 +163,9 @@ class Feature:
#: If set, this property will be used to get *choices*. #: If set, this property will be used to get *choices*.
choices_getter: str | Callable[[], list[str]] | None = None choices_getter: str | Callable[[], list[str]] | None = None
#: Value converter, for when working with complex types.
value_parser: str | None = None
def __post_init__(self) -> None: def __post_init__(self) -> None:
"""Handle late-binding of members.""" """Handle late-binding of members."""
# Populate minimum & maximum values, if range_getter is given # Populate minimum & maximum values, if range_getter is given
@ -270,6 +273,26 @@ class Feature:
return await attribute_setter(value) return await attribute_setter(value)
def parse_value(
self, value: str, fallback: Callable[[str], Any | None] = lambda x: None
) -> Any | None:
"""Attempt to parse a given string into a value accepted by this feature."""
parser = self._get_property_value(self.value_parser)
parser = parser if parser else fallback
allowed = f"{self.choices}" if self.choices else "Unknown"
try:
parsed = parser(value)
if parsed is None:
raise ValueError(
f"Unexpected value for {self.name}: {value}"
f" - allowed: {allowed}"
)
return parsed
except SyntaxError as se:
raise ValueError(
f"{se.msg} for {self.name}: {value}" f" - allowed: {allowed}",
) from se
def __repr__(self) -> str: def __repr__(self) -> str:
try: try:
value = self.value value = self.value
@ -278,7 +301,18 @@ class Feature:
return f"Unable to read value ({self.id}): {ex}" return f"Unable to read value ({self.id}): {ex}"
if self.type == Feature.Type.Choice: if self.type == Feature.Type.Choice:
if not isinstance(choices, list) or value not in choices: if not isinstance(choices, list):
_LOGGER.critical(
"Choices are not properly defined for %s (%s). Type: <%s> Value: %s", # noqa: E501
self.name,
self.id,
type(choices),
choices,
)
return f"{self.name} ({self.id}): improperly defined choice set."
if (value not in choices) and (
isinstance(value, Enum) and value.name not in choices
):
_LOGGER.warning( _LOGGER.warning(
"Invalid value for for choice %s (%s): %s not in %s", "Invalid value for for choice %s (%s): %s not in %s",
self.name, self.name,
@ -290,7 +324,13 @@ class Feature:
f"{self.name} ({self.id}): invalid value '{value}' not in {choices}" f"{self.name} ({self.id}): invalid value '{value}' not in {choices}"
) )
value = " ".join( value = " ".join(
[f"*{choice}*" if choice == value else choice for choice in choices] [
f"*{choice}*"
if choice == value
or (isinstance(value, Enum) and choice == value.name)
else f"{choice}"
for choice in choices
]
) )
if self.precision_hint is not None and isinstance(value, float): if self.precision_hint is not None and isinstance(value, float):
value = round(value, self.precision_hint) value = round(value, self.precision_hint)

View File

@ -4,10 +4,11 @@ from __future__ import annotations
import logging import logging
from enum import Enum from enum import Enum
from typing import Literal, overload
from ...exceptions import KasaException from ...exceptions import KasaException
from ...feature import Feature from ...feature import Feature
from ..iotmodule import IotModule from ..iotmodule import IotModule, merge
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -20,6 +21,9 @@ class Range(Enum):
Near = 2 Near = 2
Custom = 3 Custom = 3
def __str__(self) -> str:
return self.name
class Motion(IotModule): class Motion(IotModule):
"""Implements the motion detection (PIR) module.""" """Implements the motion detection (PIR) module."""
@ -30,6 +34,11 @@ class Motion(IotModule):
if "get_config" not in self.data: if "get_config" not in self.data:
return return
# Require that ADC value is also present.
if "get_adc_value" not in self.data:
_LOGGER.warning("%r initialized, but no get_adc_value in response")
return
if "enable" not in self.config: if "enable" not in self.config:
_LOGGER.warning("%r initialized, but no enable in response") _LOGGER.warning("%r initialized, but no enable in response")
return return
@ -48,20 +57,78 @@ class Motion(IotModule):
) )
) )
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_range",
name="Motion Sensor Range",
icon="mdi:motion-sensor",
attribute_getter="range",
attribute_setter="_set_range_cli",
type=Feature.Type.Choice,
choices_getter="ranges",
value_parser="parse_range_value",
category=Feature.Category.Config,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_threshold",
name="Motion Sensor Threshold",
icon="mdi:motion-sensor",
attribute_getter="threshold",
attribute_setter="set_threshold",
type=Feature.Type.Number,
category=Feature.Category.Config,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_adc_value",
name="PIR ADC Value",
icon="mdi:motion-sensor",
attribute_getter="adc_value",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Primary,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_triggered",
name="PIR Triggered",
icon="mdi:motion-sensor",
attribute_getter="is_triggered",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Primary,
)
)
def query(self) -> dict: def query(self) -> dict:
"""Request PIR configuration.""" """Request PIR configuration."""
return self.query_for_command("get_config") req = merge(
self.query_for_command("get_config"),
self.query_for_command("get_adc_value"),
)
return req
@property @property
def config(self) -> dict: def config(self) -> dict:
"""Return current configuration.""" """Return current configuration."""
return self.data["get_config"] return self.data["get_config"]
@property
def range(self) -> Range:
"""Return motion detection range."""
return Range(self.config["trigger_index"])
@property @property
def enabled(self) -> bool: def enabled(self) -> bool:
"""Return True if module is enabled.""" """Return True if module is enabled."""
@ -71,23 +138,99 @@ class Motion(IotModule):
"""Enable/disable PIR.""" """Enable/disable PIR."""
return await self.call("set_enable", {"enable": int(state)}) return await self.call("set_enable", {"enable": int(state)})
async def set_range( def _parse_range_value(self, value: str) -> int | Range | None:
self, *, range: Range | None = None, custom_range: int | None = None """Attempt to parse a range value from the given string."""
) -> dict: _LOGGER.debug("Parse Range Value: %s", value)
"""Set the range for the sensor. parsed: int | Range | None = None
try:
parsed = int(value)
_LOGGER.debug("Parse Range Value: %s is an integer.", value)
return parsed
except ValueError:
_LOGGER.debug("Parse Range Value: %s is not an integer.", value)
value = value.strip().upper()
if value in Range._member_names_:
_LOGGER.debug("Parse Range Value: %s is an enumeration.", value)
parsed = Range[value]
return parsed
_LOGGER.debug("Parse Range Value: %s is not a Range Value.", value)
return None
:param range: for using standard ranges @property
:param custom_range: range in decimeters, overrides the range parameter def ranges(self) -> list[Range]:
"""Return set of supported range classes."""
range_min = 0
range_max = len(self.config["array"])
valid_ranges = list()
for r in Range:
if (r.value >= range_min) and (r.value < range_max):
valid_ranges.append(r)
return valid_ranges
@property
def range(self) -> Range:
"""Return motion detection Range."""
return Range(self.config["trigger_index"])
@overload
async def set_range(self, *, range: Range) -> dict: ...
@overload
async def set_range(self, *, range: Literal[Range.Custom], value: int) -> dict: ...
@overload
async def set_range(self, *, value: int) -> dict: ...
async def set_range(
self, *, range: Range | None = None, value: int | None = None
) -> dict:
"""Set the Range for the sensor.
:param Range: for using standard Ranges
:param custom_Range: Range in decimeters, overrides the Range parameter
""" """
if custom_range is not None: if value is not None:
payload = {"index": Range.Custom.value, "value": custom_range} if range is not None and range is not Range.Custom:
raise KasaException(
"Refusing to set non-custom range %s to value %d." % (range, value)
)
elif value is None:
raise KasaException("Custom range threshold may not be set to None.")
payload = {"index": Range.Custom.value, "value": value}
elif range is not None: elif range is not None:
payload = {"index": range.value} payload = {"index": range.value}
else: else:
raise KasaException("Either range or custom_range need to be defined") raise KasaException("Either range or value needs to be defined")
return await self.call("set_trigger_sens", payload) return await self.call("set_trigger_sens", payload)
async def _set_range_cli(self, input: Range | int) -> dict:
if isinstance(input, Range):
return await self.set_range(range=input)
elif isinstance(input, int):
return await self.set_range(value=input)
else:
raise KasaException(
"Invalid type: %s given to cli motion set." % (type(input))
)
def get_range_threshold(self, range_type: Range) -> int:
"""Get the distance threshold at which the PIR sensor is will trigger."""
if range_type.value < 0 or range_type.value >= len(self.config["array"]):
raise KasaException(
"Range type is outside the bounds of the configured device ranges."
)
return int(self.config["array"][range_type.value])
@property
def threshold(self) -> int:
"""Return motion detection Range."""
return self.get_range_threshold(self.range)
async def set_threshold(self, value: int) -> dict:
"""Set the distance threshold at which the PIR sensor is will trigger."""
return await self.set_range(value=value)
@property @property
def inactivity_timeout(self) -> int: def inactivity_timeout(self) -> int:
"""Return inactivity timeout in milliseconds.""" """Return inactivity timeout in milliseconds."""
@ -100,3 +243,13 @@ class Motion(IotModule):
to avoid reverting this back to 60 seconds after a period of time. to avoid reverting this back to 60 seconds after a period of time.
""" """
return await self.call("set_cold_time", {"cold_time": timeout}) return await self.call("set_cold_time", {"cold_time": timeout})
@property
def adc_value(self) -> int:
"""Return motion adc value."""
return int(self.data["get_adc_value"]["value"])
@property
def is_triggered(self) -> bool:
"""Return if the motion sensor has been triggered."""
return (self.enabled) and (self.adc_value < self.threshold)

View File

@ -192,6 +192,7 @@ AMBIENT_MODULE = {
MOTION_MODULE = { MOTION_MODULE = {
"get_adc_value": {"value": 50, "err_code": 0},
"get_config": { "get_config": {
"enable": 0, "enable": 0,
"version": "1.0", "version": "1.0",
@ -201,7 +202,7 @@ MOTION_MODULE = {
"max_adc": 4095, "max_adc": 4095,
"array": [80, 50, 20, 0], "array": [80, 50, 20, 0],
"err_code": 0, "err_code": 0,
} },
} }
LIGHT_DETAILS = { LIGHT_DETAILS = {

View File

@ -1,6 +1,8 @@
import pytest
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from kasa import Module from kasa import Module
from kasa.exceptions import KasaException
from kasa.iot import IotDimmer from kasa.iot import IotDimmer
from kasa.iot.modules.motion import Motion, Range from kasa.iot.modules.motion import Motion, Range
@ -36,7 +38,14 @@ async def test_motion_range(dev: IotDimmer, mocker: MockerFixture):
motion: Motion = dev.modules[Module.IotMotion] motion: Motion = dev.modules[Module.IotMotion]
query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper") query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper")
await motion.set_range(custom_range=123) await motion.set_range(value=123)
query_helper.assert_called_with(
"smartlife.iot.PIR",
"set_trigger_sens",
{"index": Range.Custom.value, "value": 123},
)
await motion.set_range(range=Range.Custom, value=123)
query_helper.assert_called_with( query_helper.assert_called_with(
"smartlife.iot.PIR", "smartlife.iot.PIR",
"set_trigger_sens", "set_trigger_sens",
@ -48,6 +57,14 @@ async def test_motion_range(dev: IotDimmer, mocker: MockerFixture):
"smartlife.iot.PIR", "set_trigger_sens", {"index": Range.Far.value} "smartlife.iot.PIR", "set_trigger_sens", {"index": Range.Far.value}
) )
with pytest.raises(KasaException, match="Refusing to set non-custom range"):
await motion.set_range(range=Range.Near, value=100) # type: ignore[call-overload]
with pytest.raises(
KasaException, match="Either range or value needs to be defined"
):
await motion.set_range() # type: ignore[call-overload]
@dimmer_iot @dimmer_iot
def test_motion_feature(dev: IotDimmer): def test_motion_feature(dev: IotDimmer):