diff --git a/kasa/cli/feature.py b/kasa/cli/feature.py index 522dee7f..a4c739f6 100644 --- a/kasa/cli/feature.py +++ b/kasa/cli/feature.py @@ -6,10 +6,7 @@ import ast import asyncclick as click -from kasa import ( - Device, - Feature, -) +from kasa import Device, Feature from .common import ( echo, @@ -133,7 +130,22 @@ async def feature( echo(f"{feat.name} ({name}): {feat.value}{unit}") return feat.value - value = ast.literal_eval(value) + try: + # Attempt to parse as python literal. + value = ast.literal_eval(value) + except ValueError: + # The value is probably an unquoted string, so we'll raise an error, + # and tell the user to quote the string. + raise click.exceptions.BadParameter( + f'{repr(value)} for {name} (Perhaps you forgot to "quote" the value?)' + ) from SyntaxError + except SyntaxError: + # There are likely miss-matched quotes or odd characters in the input, + # so abort and complain to the user. + raise click.exceptions.BadParameter( + f"{repr(value)} for {name}" + ) from SyntaxError + echo(f"Changing {name} from {feat.value} to {value}") response = await dev.features[name].set_value(value) await dev.update() diff --git a/kasa/feature.py b/kasa/feature.py index 3c6beb0d..0c4c6e23 100644 --- a/kasa/feature.py +++ b/kasa/feature.py @@ -256,7 +256,7 @@ class Feature: elif self.type == Feature.Type.Choice: # noqa: SIM102 if not self.choices or value not in self.choices: raise ValueError( - f"Unexpected value for {self.name}: {value}" + f"Unexpected value for {self.name}: '{value}'" f" - allowed: {self.choices}" ) @@ -279,7 +279,18 @@ class Feature: return f"Unable to read value ({self.id}): {ex}" if self.type == Feature.Type.Choice: - if not isinstance(choices, list) or value not in choices: + if not isinstance(choices, list): + _LOGGER.error( + "Choices are not properly defined for %s (%s). Type: <%s> Value: %s", # noqa: E501 + self.name, + self.id, + type(choices), + choices, + ) + return f"{self.name} ({self.id}): improperly defined choice set." + if (value not in choices) and ( + isinstance(value, Enum) and value.name not in choices + ): _LOGGER.warning( "Invalid value for for choice %s (%s): %s not in %s", self.name, @@ -291,7 +302,13 @@ class Feature: f"{self.name} ({self.id}): invalid value '{value}' not in {choices}" ) value = " ".join( - [f"*{choice}*" if choice == value else choice for choice in choices] + [ + f"*{choice}*" + if choice == value + or (isinstance(value, Enum) and choice == value.name) + else f"{choice}" + for choice in choices + ] ) if self.precision_hint is not None and isinstance(value, float): value = round(value, self.precision_hint) diff --git a/kasa/iot/modules/motion.py b/kasa/iot/modules/motion.py index e65cbd93..a795b449 100644 --- a/kasa/iot/modules/motion.py +++ b/kasa/iot/modules/motion.py @@ -3,11 +3,13 @@ from __future__ import annotations import logging +import math +from dataclasses import dataclass from enum import Enum from ...exceptions import KasaException from ...feature import Feature -from ..iotmodule import IotModule +from ..iotmodule import IotModule, merge _LOGGER = logging.getLogger(__name__) @@ -20,6 +22,71 @@ class Range(Enum): Near = 2 Custom = 3 + def __str__(self) -> str: + return self.name + + +@dataclass +class PIRConfig: + """Dataclass representing a PIR sensor configuration.""" + + enabled: bool + adc_min: int + adc_max: int + range: Range + threshold: int + + @property + def adc_mid(self) -> int: + """Compute the ADC midpoint from the configured ADC Max and Min values.""" + return math.floor(abs(self.adc_max - self.adc_min) / 2) + + +@dataclass +class PIRStatus: + """Dataclass representing the current trigger state of an ADC PIR sensor.""" + + pir_config: PIRConfig + adc_value: int + + @property + def pir_value(self) -> int: + """ + Get the PIR status value in integer form. + + Computes the PIR status value that this object represents, + using the given PIR configuration. + """ + return self.pir_config.adc_mid - self.adc_value + + @property + def pir_percent(self) -> float: + """ + Get the PIR status value in percentile form. + + Computes the PIR status percentage that this object represents, + using the given PIR configuration. + """ + value = self.pir_value + divisor = ( + (self.pir_config.adc_mid - self.pir_config.adc_min) + if (value < 0) + else (self.pir_config.adc_max - self.pir_config.adc_mid) + ) + return (float(value) / divisor) * 100 + + @property + def pir_triggered(self) -> bool: + """ + Get the PIR status trigger state. + + Compute the PIR trigger state this object represents, + using the given PIR configuration. + """ + return (self.pir_config.enabled) and ( + abs(self.pir_percent) > (100 - self.pir_config.threshold) + ) + class Motion(IotModule): """Implements the motion detection (PIR) module.""" @@ -30,6 +97,11 @@ class Motion(IotModule): if "get_config" not in self.data: return + # Require that ADC value is also present. + if "get_adc_value" not in self.data: + _LOGGER.warning("%r initialized, but no get_adc_value in response") + return + if "enable" not in self.config: _LOGGER.warning("%r initialized, but no enable in response") return @@ -48,9 +120,143 @@ class Motion(IotModule): ) ) + self._add_feature( + Feature( + device=self._device, + container=self, + id="pir_range", + name="Motion Sensor Range", + icon="mdi:motion-sensor", + attribute_getter="range", + attribute_setter="_set_range_from_str", + type=Feature.Type.Choice, + choices_getter="ranges", + category=Feature.Category.Config, + ) + ) + + self._add_feature( + Feature( + device=self._device, + container=self, + id="pir_threshold", + name="Motion Sensor Threshold", + icon="mdi:motion-sensor", + attribute_getter="threshold", + attribute_setter="set_threshold", + type=Feature.Type.Number, + category=Feature.Category.Config, + range_getter=lambda: (0, 100), + ) + ) + + self._add_feature( + Feature( + device=self._device, + container=self, + id="pir_triggered", + name="PIR Triggered", + icon="mdi:motion-sensor", + attribute_getter="pir_triggered", + attribute_setter=None, + type=Feature.Type.Sensor, + category=Feature.Category.Primary, + ) + ) + + self._add_feature( + Feature( + device=self._device, + container=self, + id="pir_value", + name="PIR Value", + icon="mdi:motion-sensor", + attribute_getter="pir_value", + attribute_setter=None, + type=Feature.Type.Sensor, + category=Feature.Category.Info, + ) + ) + + self._add_feature( + Feature( + device=self._device, + container=self, + id="pir_adc_value", + name="PIR ADC Value", + icon="mdi:motion-sensor", + attribute_getter="adc_value", + attribute_setter=None, + type=Feature.Type.Sensor, + category=Feature.Category.Debug, + ) + ) + + self._add_feature( + Feature( + device=self._device, + container=self, + id="pir_adc_min", + name="PIR ADC Min", + icon="mdi:motion-sensor", + attribute_getter="adc_min", + attribute_setter=None, + type=Feature.Type.Sensor, + category=Feature.Category.Debug, + ) + ) + + self._add_feature( + Feature( + device=self._device, + container=self, + id="pir_adc_mid", + name="PIR ADC Mid", + icon="mdi:motion-sensor", + attribute_getter="adc_mid", + attribute_setter=None, + type=Feature.Type.Sensor, + category=Feature.Category.Debug, + ) + ) + + self._add_feature( + Feature( + device=self._device, + container=self, + id="pir_adc_max", + name="PIR ADC Max", + icon="mdi:motion-sensor", + attribute_getter="adc_max", + attribute_setter=None, + type=Feature.Type.Sensor, + category=Feature.Category.Debug, + ) + ) + + self._add_feature( + Feature( + device=self._device, + container=self, + id="pir_percent", + name="PIR Percentile", + icon="mdi:motion-sensor", + attribute_getter="pir_percent", + attribute_setter=None, + type=Feature.Type.Sensor, + category=Feature.Category.Debug, + unit_getter=lambda: "%", + ) + ) + def query(self) -> dict: """Request PIR configuration.""" - return self.query_for_command("get_config") + req = merge( + self.query_for_command("get_config"), + self.query_for_command("get_adc_value"), + ) + + return req @property def config(self) -> dict: @@ -58,34 +264,103 @@ class Motion(IotModule): return self.data["get_config"] @property - def range(self) -> Range: - """Return motion detection range.""" - return Range(self.config["trigger_index"]) + def pir_config(self) -> PIRConfig: + """Return PIR sensor configuration.""" + pir_range = Range(self.config["trigger_index"]) + return PIRConfig( + enabled=bool(self.config["enable"]), + adc_min=int(self.config["min_adc"]), + adc_max=int(self.config["max_adc"]), + range=pir_range, + threshold=self.get_range_threshold(pir_range), + ) @property def enabled(self) -> bool: """Return True if module is enabled.""" - return bool(self.config["enable"]) + return self.pir_config.enabled + + @property + def adc_min(self) -> int: + """Return minimum ADC sensor value.""" + return self.pir_config.adc_min + + @property + def adc_max(self) -> int: + """Return maximum ADC sensor value.""" + return self.pir_config.adc_max + + @property + def adc_mid(self) -> int: + """ + Return the midpoint for the ADC. + + The midpoint represents the zero point for the PIR sensor waveform. + + Currently this is estimated by: + math.floor(abs(adc_max - adc_min) / 2) + """ + return self.pir_config.adc_mid async def set_enabled(self, state: bool) -> dict: """Enable/disable PIR.""" return await self.call("set_enable", {"enable": int(state)}) - async def set_range( - self, *, range: Range | None = None, custom_range: int | None = None - ) -> dict: - """Set the range for the sensor. + @property + def ranges(self) -> list[str]: + """Return set of supported range classes.""" + range_min = 0 + range_max = len(self.config["array"]) + valid_ranges = list() + for r in Range: + if (r.value >= range_min) and (r.value < range_max): + valid_ranges.append(r.name) + return valid_ranges - :param range: for using standard ranges - :param custom_range: range in decimeters, overrides the range parameter + @property + def range(self) -> Range: + """Return motion detection Range.""" + return self.pir_config.range + + async def set_range(self, range: Range) -> dict: + """Set the Range for the sensor. + + :param Range: the range class to use. """ - if custom_range is not None: - payload = {"index": Range.Custom.value, "value": custom_range} - elif range is not None: - payload = {"index": range.value} - else: - raise KasaException("Either range or custom_range need to be defined") + payload = {"index": range.value} + return await self.call("set_trigger_sens", payload) + def _parse_range_value(self, value: str) -> Range: + """Attempt to parse a range value from the given string.""" + value = value.strip().capitalize() + try: + return Range[value] + except KeyError: + raise KasaException( + f"Invalid range value: '{value}'." + f" Valid options are: {Range._member_names_}" + ) from KeyError + + async def _set_range_from_str(self, input: str) -> dict: + value = self._parse_range_value(input) + return await self.set_range(range=value) + + def get_range_threshold(self, range_type: Range) -> int: + """Get the distance threshold at which the PIR sensor is will trigger.""" + if range_type.value < 0 or range_type.value >= len(self.config["array"]): + raise KasaException( + "Range type is outside the bounds of the configured device ranges." + ) + return int(self.config["array"][range_type.value]) + + @property + def threshold(self) -> int: + """Return motion detection Range.""" + return self.pir_config.threshold + + async def set_threshold(self, value: int) -> dict: + """Set the distance threshold at which the PIR sensor is will trigger.""" + payload = {"index": Range.Custom.value, "value": value} return await self.call("set_trigger_sens", payload) @property @@ -100,3 +375,34 @@ class Motion(IotModule): to avoid reverting this back to 60 seconds after a period of time. """ return await self.call("set_cold_time", {"cold_time": timeout}) + + @property + def pir_state(self) -> PIRStatus: + """Return cached PIR status.""" + return PIRStatus(self.pir_config, self.data["get_adc_value"]["value"]) + + async def get_pir_state(self) -> PIRStatus: + """Return real-time PIR status.""" + latest = await self.call("get_adc_value") + self.data["get_adc_value"] = latest + return PIRStatus(self.pir_config, latest["value"]) + + @property + def adc_value(self) -> int: + """Return motion adc value.""" + return self.pir_state.adc_value + + @property + def pir_value(self) -> int: + """Return the computed PIR sensor value.""" + return self.pir_state.pir_value + + @property + def pir_percent(self) -> float: + """Return the computed PIR sensor value, in percentile form.""" + return self.pir_state.pir_percent + + @property + def pir_triggered(self) -> bool: + """Return if the motion sensor has been triggered.""" + return self.pir_state.pir_triggered diff --git a/tests/fakeprotocol_iot.py b/tests/fakeprotocol_iot.py index 23ce7827..238e555c 100644 --- a/tests/fakeprotocol_iot.py +++ b/tests/fakeprotocol_iot.py @@ -192,6 +192,7 @@ AMBIENT_MODULE = { MOTION_MODULE = { + "get_adc_value": {"value": 50, "err_code": 0}, "get_config": { "enable": 0, "version": "1.0", @@ -201,7 +202,7 @@ MOTION_MODULE = { "max_adc": 4095, "array": [80, 50, 20, 0], "err_code": 0, - } + }, } LIGHT_DETAILS = { diff --git a/tests/iot/modules/test_motion.py b/tests/iot/modules/test_motion.py index a2b32a87..2d1ccbcc 100644 --- a/tests/iot/modules/test_motion.py +++ b/tests/iot/modules/test_motion.py @@ -1,6 +1,7 @@ +import pytest from pytest_mock import MockerFixture -from kasa import Module +from kasa import KasaException, Module from kasa.iot import IotDimmer from kasa.iot.modules.motion import Motion, Range @@ -36,17 +37,72 @@ async def test_motion_range(dev: IotDimmer, mocker: MockerFixture): motion: Motion = dev.modules[Module.IotMotion] query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper") - await motion.set_range(custom_range=123) - query_helper.assert_called_with( - "smartlife.iot.PIR", - "set_trigger_sens", - {"index": Range.Custom.value, "value": 123}, - ) + for range in Range: + await motion.set_range(range) + query_helper.assert_called_with( + "smartlife.iot.PIR", + "set_trigger_sens", + {"index": range.value}, + ) - await motion.set_range(range=Range.Far) - query_helper.assert_called_with( - "smartlife.iot.PIR", "set_trigger_sens", {"index": Range.Far.value} - ) + +@dimmer_iot +async def test_motion_range_from_string(dev: IotDimmer, mocker: MockerFixture): + motion: Motion = dev.modules[Module.IotMotion] + query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper") + + ranges_good = { + "near": Range.Near, + "MID": Range.Mid, + "fAr": Range.Far, + " Custom ": Range.Custom, + } + for range_str, range in ranges_good.items(): + await motion._set_range_from_str(range_str) + query_helper.assert_called_with( + "smartlife.iot.PIR", + "set_trigger_sens", + {"index": range.value}, + ) + + query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper") + ranges_bad = ["near1", "MD", "F\nAR", "Custom Near", '"FAR"', "'FAR'"] + for range_str in ranges_bad: + with pytest.raises(KasaException): + await motion._set_range_from_str(range_str) + query_helper.assert_not_called() + + +@dimmer_iot +async def test_motion_threshold(dev: IotDimmer, mocker: MockerFixture): + motion: Motion = dev.modules[Module.IotMotion] + query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper") + + for range in Range: + # Switch to a given range. + await motion.set_range(range) + query_helper.assert_called_with( + "smartlife.iot.PIR", + "set_trigger_sens", + {"index": range.value}, + ) + + # Assert that the range always goes to custom, regardless of current range. + await motion.set_threshold(123) + query_helper.assert_called_with( + "smartlife.iot.PIR", + "set_trigger_sens", + {"index": Range.Custom.value, "value": 123}, + ) + + +@dimmer_iot +async def test_motion_realtime(dev: IotDimmer, mocker: MockerFixture): + motion: Motion = dev.modules[Module.IotMotion] + query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper") + + await motion.get_pir_state() + query_helper.assert_called_with("smartlife.iot.PIR", "get_adc_value", None) @dimmer_iot diff --git a/tests/test_cli.py b/tests/test_cli.py index c7a93970..627959e7 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1180,6 +1180,63 @@ async def test_feature_set_child(mocker, runner): assert res.exit_code == 0 +async def test_feature_set_unquoted(mocker, runner): + """Test feature command's set value.""" + dummy_device = await get_device_for_fixture_protocol( + "ES20M(US)_1.0_1.0.11.json", "IOT" + ) + range_setter = mocker.patch("kasa.iot.modules.motion.Motion._set_range_from_str") + mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device) + + res = await runner.invoke( + cli, + ["--host", "127.0.0.123", "--debug", "feature", "pir_range", "Far"], + catch_exceptions=False, + ) + + range_setter.assert_not_called() + assert "Error: Invalid value: " in res.output + assert res.exit_code != 0 + + +async def test_feature_set_badquoted(mocker, runner): + """Test feature command's set value.""" + dummy_device = await get_device_for_fixture_protocol( + "ES20M(US)_1.0_1.0.11.json", "IOT" + ) + range_setter = mocker.patch("kasa.iot.modules.motion.Motion._set_range_from_str") + mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device) + + res = await runner.invoke( + cli, + ["--host", "127.0.0.123", "--debug", "feature", "pir_range", "`Far"], + catch_exceptions=False, + ) + + range_setter.assert_not_called() + assert "Error: Invalid value: " in res.output + assert res.exit_code != 0 + + +async def test_feature_set_goodquoted(mocker, runner): + """Test feature command's set value.""" + dummy_device = await get_device_for_fixture_protocol( + "ES20M(US)_1.0_1.0.11.json", "IOT" + ) + range_setter = mocker.patch("kasa.iot.modules.motion.Motion._set_range_from_str") + mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device) + + res = await runner.invoke( + cli, + ["--host", "127.0.0.123", "--debug", "feature", "pir_range", "'Far'"], + catch_exceptions=False, + ) + + range_setter.assert_called() + assert "Error: Invalid value: " not in res.output + assert res.exit_code == 0 + + async def test_cli_child_commands( dev: Device, runner: CliRunner, mocker: MockerFixture ): diff --git a/tests/test_feature.py b/tests/test_feature.py index 0d621032..bb707688 100644 --- a/tests/test_feature.py +++ b/tests/test_feature.py @@ -141,7 +141,10 @@ async def test_feature_choice_list(dummy_feature, caplog, mocker: MockerFixture) mock_setter.assert_called_with("first") mock_setter.reset_mock() - with pytest.raises(ValueError, match="Unexpected value for dummy_feature: invalid"): # noqa: PT012 + with pytest.raises( # noqa: PT012 + ValueError, + match="Unexpected value for dummy_feature: 'invalid' (?: - allowed: .*)?", + ): await dummy_feature.set_value("invalid") assert "Unexpected value" in caplog.text