From 84aa74546ef95dd78f87ac6de3a6802552c1e490 Mon Sep 17 00:00:00 2001 From: Ryan Nitcher Date: Thu, 14 Nov 2024 12:09:56 -0700 Subject: [PATCH] Add ADC Value to PIR Enabled Switches - Add: ADC value reporting to PIR enabled switches, so that it may be used in polling automations. - Add: ADC trigger state value reporting, for simpler automations. - Add: Ability for features to use custom value parsers. --- kasa/cli/feature.py | 2 +- kasa/feature.py | 44 +++++++- kasa/iot/modules/motion.py | 185 ++++++++++++++++++++++++++++--- tests/fakeprotocol_iot.py | 3 +- tests/iot/modules/test_motion.py | 19 +++- 5 files changed, 232 insertions(+), 21 deletions(-) diff --git a/kasa/cli/feature.py b/kasa/cli/feature.py index 522dee7f..35dba781 100644 --- a/kasa/cli/feature.py +++ b/kasa/cli/feature.py @@ -133,7 +133,7 @@ async def feature( echo(f"{feat.name} ({name}): {feat.value}{unit}") return feat.value - value = ast.literal_eval(value) + value = feat.parse_value(value, ast.literal_eval) echo(f"Changing {name} from {feat.value} to {value}") response = await dev.features[name].set_value(value) await dev.update() diff --git a/kasa/feature.py b/kasa/feature.py index d747338d..3b194ed4 100644 --- a/kasa/feature.py +++ b/kasa/feature.py @@ -163,6 +163,9 @@ class Feature: #: If set, this property will be used to get *choices*. choices_getter: str | Callable[[], list[str]] | None = None + #: Value converter, for when working with complex types. + value_parser: str | None = None + def __post_init__(self) -> None: """Handle late-binding of members.""" # Populate minimum & maximum values, if range_getter is given @@ -270,6 +273,26 @@ class Feature: return await attribute_setter(value) + def parse_value( + self, value: str, fallback: Callable[[str], Any | None] = lambda x: None + ) -> Any | None: + """Attempt to parse a given string into a value accepted by this feature.""" + parser = self._get_property_value(self.value_parser) + parser = parser if parser else fallback + allowed = f"{self.choices}" if self.choices else "Unknown" + try: + parsed = parser(value) + if parsed is None: + raise ValueError( + f"Unexpected value for {self.name}: {value}" + f" - allowed: {allowed}" + ) + return parsed + except SyntaxError as se: + raise ValueError( + f"{se.msg} for {self.name}: {value}" f" - allowed: {allowed}", + ) from se + def __repr__(self) -> str: try: value = self.value @@ -278,7 +301,18 @@ class Feature: return f"Unable to read value ({self.id}): {ex}" if self.type == Feature.Type.Choice: - if not isinstance(choices, list) or value not in choices: + if not isinstance(choices, list): + _LOGGER.critical( + "Choices are not properly defined for %s (%s). Type: <%s> Value: %s", # noqa: E501 + self.name, + self.id, + type(choices), + choices, + ) + return f"{self.name} ({self.id}): improperly defined choice set." + if (value not in choices) and ( + isinstance(value, Enum) and value.name not in choices + ): _LOGGER.warning( "Invalid value for for choice %s (%s): %s not in %s", self.name, @@ -290,7 +324,13 @@ class Feature: f"{self.name} ({self.id}): invalid value '{value}' not in {choices}" ) value = " ".join( - [f"*{choice}*" if choice == value else choice for choice in choices] + [ + f"*{choice}*" + if choice == value + or (isinstance(value, Enum) and choice == value.name) + else f"{choice}" + for choice in choices + ] ) if self.precision_hint is not None and isinstance(value, float): value = round(value, self.precision_hint) diff --git a/kasa/iot/modules/motion.py b/kasa/iot/modules/motion.py index e65cbd93..77c1f170 100644 --- a/kasa/iot/modules/motion.py +++ b/kasa/iot/modules/motion.py @@ -4,10 +4,11 @@ from __future__ import annotations import logging from enum import Enum +from typing import Literal, overload from ...exceptions import KasaException from ...feature import Feature -from ..iotmodule import IotModule +from ..iotmodule import IotModule, merge _LOGGER = logging.getLogger(__name__) @@ -20,6 +21,9 @@ class Range(Enum): Near = 2 Custom = 3 + def __str__(self) -> str: + return self.name + class Motion(IotModule): """Implements the motion detection (PIR) module.""" @@ -30,6 +34,11 @@ class Motion(IotModule): if "get_config" not in self.data: return + # Require that ADC value is also present. + if "get_adc_value" not in self.data: + _LOGGER.warning("%r initialized, but no get_adc_value in response") + return + if "enable" not in self.config: _LOGGER.warning("%r initialized, but no enable in response") return @@ -48,20 +57,78 @@ class Motion(IotModule): ) ) + self._add_feature( + Feature( + device=self._device, + container=self, + id="pir_range", + name="Motion Sensor Range", + icon="mdi:motion-sensor", + attribute_getter="range", + attribute_setter="_set_range_cli", + type=Feature.Type.Choice, + choices_getter="ranges", + value_parser="parse_range_value", + category=Feature.Category.Config, + ) + ) + + self._add_feature( + Feature( + device=self._device, + container=self, + id="pir_threshold", + name="Motion Sensor Threshold", + icon="mdi:motion-sensor", + attribute_getter="threshold", + attribute_setter="set_threshold", + type=Feature.Type.Number, + category=Feature.Category.Config, + ) + ) + + self._add_feature( + Feature( + device=self._device, + container=self, + id="pir_adc_value", + name="PIR ADC Value", + icon="mdi:motion-sensor", + attribute_getter="adc_value", + attribute_setter=None, + type=Feature.Type.Sensor, + category=Feature.Category.Primary, + ) + ) + + self._add_feature( + Feature( + device=self._device, + container=self, + id="pir_triggered", + name="PIR Triggered", + icon="mdi:motion-sensor", + attribute_getter="is_triggered", + attribute_setter=None, + type=Feature.Type.Sensor, + category=Feature.Category.Primary, + ) + ) + def query(self) -> dict: """Request PIR configuration.""" - return self.query_for_command("get_config") + req = merge( + self.query_for_command("get_config"), + self.query_for_command("get_adc_value"), + ) + + return req @property def config(self) -> dict: """Return current configuration.""" return self.data["get_config"] - @property - def range(self) -> Range: - """Return motion detection range.""" - return Range(self.config["trigger_index"]) - @property def enabled(self) -> bool: """Return True if module is enabled.""" @@ -71,23 +138,99 @@ class Motion(IotModule): """Enable/disable PIR.""" return await self.call("set_enable", {"enable": int(state)}) - async def set_range( - self, *, range: Range | None = None, custom_range: int | None = None - ) -> dict: - """Set the range for the sensor. + def _parse_range_value(self, value: str) -> int | Range | None: + """Attempt to parse a range value from the given string.""" + _LOGGER.debug("Parse Range Value: %s", value) + parsed: int | Range | None = None + try: + parsed = int(value) + _LOGGER.debug("Parse Range Value: %s is an integer.", value) + return parsed + except ValueError: + _LOGGER.debug("Parse Range Value: %s is not an integer.", value) + value = value.strip().upper() + if value in Range._member_names_: + _LOGGER.debug("Parse Range Value: %s is an enumeration.", value) + parsed = Range[value] + return parsed + _LOGGER.debug("Parse Range Value: %s is not a Range Value.", value) + return None - :param range: for using standard ranges - :param custom_range: range in decimeters, overrides the range parameter + @property + def ranges(self) -> list[Range]: + """Return set of supported range classes.""" + range_min = 0 + range_max = len(self.config["array"]) + valid_ranges = list() + for r in Range: + if (r.value >= range_min) and (r.value < range_max): + valid_ranges.append(r) + return valid_ranges + + @property + def range(self) -> Range: + """Return motion detection Range.""" + return Range(self.config["trigger_index"]) + + @overload + async def set_range(self, *, range: Range) -> dict: ... + + @overload + async def set_range(self, *, range: Literal[Range.Custom], value: int) -> dict: ... + + @overload + async def set_range(self, *, value: int) -> dict: ... + + async def set_range( + self, *, range: Range | None = None, value: int | None = None + ) -> dict: + """Set the Range for the sensor. + + :param Range: for using standard Ranges + :param custom_Range: Range in decimeters, overrides the Range parameter """ - if custom_range is not None: - payload = {"index": Range.Custom.value, "value": custom_range} + if value is not None: + if range is not None and range is not Range.Custom: + raise KasaException( + "Refusing to set non-custom range %s to value %d." % (range, value) + ) + elif value is None: + raise KasaException("Custom range threshold may not be set to None.") + payload = {"index": Range.Custom.value, "value": value} elif range is not None: payload = {"index": range.value} else: - raise KasaException("Either range or custom_range need to be defined") + raise KasaException("Either range or value needs to be defined") return await self.call("set_trigger_sens", payload) + async def _set_range_cli(self, input: Range | int) -> dict: + if isinstance(input, Range): + return await self.set_range(range=input) + elif isinstance(input, int): + return await self.set_range(value=input) + else: + raise KasaException( + "Invalid type: %s given to cli motion set." % (type(input)) + ) + + def get_range_threshold(self, range_type: Range) -> int: + """Get the distance threshold at which the PIR sensor is will trigger.""" + if range_type.value < 0 or range_type.value >= len(self.config["array"]): + raise KasaException( + "Range type is outside the bounds of the configured device ranges." + ) + return int(self.config["array"][range_type.value]) + + @property + def threshold(self) -> int: + """Return motion detection Range.""" + return self.get_range_threshold(self.range) + + async def set_threshold(self, value: int) -> dict: + """Set the distance threshold at which the PIR sensor is will trigger.""" + return await self.set_range(value=value) + @property def inactivity_timeout(self) -> int: """Return inactivity timeout in milliseconds.""" @@ -100,3 +243,13 @@ class Motion(IotModule): to avoid reverting this back to 60 seconds after a period of time. """ return await self.call("set_cold_time", {"cold_time": timeout}) + + @property + def adc_value(self) -> int: + """Return motion adc value.""" + return int(self.data["get_adc_value"]["value"]) + + @property + def is_triggered(self) -> bool: + """Return if the motion sensor has been triggered.""" + return (self.enabled) and (self.adc_value < self.threshold) diff --git a/tests/fakeprotocol_iot.py b/tests/fakeprotocol_iot.py index 88e34647..df8f82c3 100644 --- a/tests/fakeprotocol_iot.py +++ b/tests/fakeprotocol_iot.py @@ -192,6 +192,7 @@ AMBIENT_MODULE = { MOTION_MODULE = { + "get_adc_value": {"value": 50, "err_code": 0}, "get_config": { "enable": 0, "version": "1.0", @@ -201,7 +202,7 @@ MOTION_MODULE = { "max_adc": 4095, "array": [80, 50, 20, 0], "err_code": 0, - } + }, } LIGHT_DETAILS = { diff --git a/tests/iot/modules/test_motion.py b/tests/iot/modules/test_motion.py index a2b32a87..723557f5 100644 --- a/tests/iot/modules/test_motion.py +++ b/tests/iot/modules/test_motion.py @@ -1,6 +1,8 @@ +import pytest from pytest_mock import MockerFixture from kasa import Module +from kasa.exceptions import KasaException from kasa.iot import IotDimmer from kasa.iot.modules.motion import Motion, Range @@ -36,7 +38,14 @@ async def test_motion_range(dev: IotDimmer, mocker: MockerFixture): motion: Motion = dev.modules[Module.IotMotion] query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper") - await motion.set_range(custom_range=123) + await motion.set_range(value=123) + query_helper.assert_called_with( + "smartlife.iot.PIR", + "set_trigger_sens", + {"index": Range.Custom.value, "value": 123}, + ) + + await motion.set_range(range=Range.Custom, value=123) query_helper.assert_called_with( "smartlife.iot.PIR", "set_trigger_sens", @@ -48,6 +57,14 @@ async def test_motion_range(dev: IotDimmer, mocker: MockerFixture): "smartlife.iot.PIR", "set_trigger_sens", {"index": Range.Far.value} ) + with pytest.raises(KasaException, match="Refusing to set non-custom range"): + await motion.set_range(range=Range.Near, value=100) # type: ignore[call-overload] + + with pytest.raises( + KasaException, match="Either range or value needs to be defined" + ): + await motion.set_range() # type: ignore[call-overload] + @dimmer_iot def test_motion_feature(dev: IotDimmer):