Add ADC Value to PIR Enabled Switches (#1263)
Some checks are pending
CI / Perform linting checks (3.13) (push) Waiting to run
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.13) (push) Blocked by required conditions
CodeQL checks / Analyze (python) (push) Waiting to run

This commit is contained in:
Ryan Nitcher 2025-01-25 03:45:48 -07:00 committed by GitHub
parent 0aa1242a00
commit 7f2a1be392
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 491 additions and 39 deletions

View File

@ -6,10 +6,7 @@ import ast
import asyncclick as click
from kasa import (
Device,
Feature,
)
from kasa import Device, Feature
from .common import (
echo,
@ -133,7 +130,22 @@ async def feature(
echo(f"{feat.name} ({name}): {feat.value}{unit}")
return feat.value
try:
# Attempt to parse as python literal.
value = ast.literal_eval(value)
except ValueError:
# The value is probably an unquoted string, so we'll raise an error,
# and tell the user to quote the string.
raise click.exceptions.BadParameter(
f'{repr(value)} for {name} (Perhaps you forgot to "quote" the value?)'
) from SyntaxError
except SyntaxError:
# There are likely miss-matched quotes or odd characters in the input,
# so abort and complain to the user.
raise click.exceptions.BadParameter(
f"{repr(value)} for {name}"
) from SyntaxError
echo(f"Changing {name} from {feat.value} to {value}")
response = await dev.features[name].set_value(value)
await dev.update()

View File

@ -256,7 +256,7 @@ class Feature:
elif self.type == Feature.Type.Choice: # noqa: SIM102
if not self.choices or value not in self.choices:
raise ValueError(
f"Unexpected value for {self.name}: {value}"
f"Unexpected value for {self.name}: '{value}'"
f" - allowed: {self.choices}"
)
@ -279,7 +279,18 @@ class Feature:
return f"Unable to read value ({self.id}): {ex}"
if self.type == Feature.Type.Choice:
if not isinstance(choices, list) or value not in choices:
if not isinstance(choices, list):
_LOGGER.error(
"Choices are not properly defined for %s (%s). Type: <%s> Value: %s", # noqa: E501
self.name,
self.id,
type(choices),
choices,
)
return f"{self.name} ({self.id}): improperly defined choice set."
if (value not in choices) and (
isinstance(value, Enum) and value.name not in choices
):
_LOGGER.warning(
"Invalid value for for choice %s (%s): %s not in %s",
self.name,
@ -291,7 +302,13 @@ class Feature:
f"{self.name} ({self.id}): invalid value '{value}' not in {choices}"
)
value = " ".join(
[f"*{choice}*" if choice == value else choice for choice in choices]
[
f"*{choice}*"
if choice == value
or (isinstance(value, Enum) and choice == value.name)
else f"{choice}"
for choice in choices
]
)
if self.precision_hint is not None and isinstance(value, float):
value = round(value, self.precision_hint)

View File

@ -3,11 +3,13 @@
from __future__ import annotations
import logging
import math
from dataclasses import dataclass
from enum import Enum
from ...exceptions import KasaException
from ...feature import Feature
from ..iotmodule import IotModule
from ..iotmodule import IotModule, merge
_LOGGER = logging.getLogger(__name__)
@ -20,6 +22,71 @@ class Range(Enum):
Near = 2
Custom = 3
def __str__(self) -> str:
return self.name
@dataclass
class PIRConfig:
"""Dataclass representing a PIR sensor configuration."""
enabled: bool
adc_min: int
adc_max: int
range: Range
threshold: int
@property
def adc_mid(self) -> int:
"""Compute the ADC midpoint from the configured ADC Max and Min values."""
return math.floor(abs(self.adc_max - self.adc_min) / 2)
@dataclass
class PIRStatus:
"""Dataclass representing the current trigger state of an ADC PIR sensor."""
pir_config: PIRConfig
adc_value: int
@property
def pir_value(self) -> int:
"""
Get the PIR status value in integer form.
Computes the PIR status value that this object represents,
using the given PIR configuration.
"""
return self.pir_config.adc_mid - self.adc_value
@property
def pir_percent(self) -> float:
"""
Get the PIR status value in percentile form.
Computes the PIR status percentage that this object represents,
using the given PIR configuration.
"""
value = self.pir_value
divisor = (
(self.pir_config.adc_mid - self.pir_config.adc_min)
if (value < 0)
else (self.pir_config.adc_max - self.pir_config.adc_mid)
)
return (float(value) / divisor) * 100
@property
def pir_triggered(self) -> bool:
"""
Get the PIR status trigger state.
Compute the PIR trigger state this object represents,
using the given PIR configuration.
"""
return (self.pir_config.enabled) and (
abs(self.pir_percent) > (100 - self.pir_config.threshold)
)
class Motion(IotModule):
"""Implements the motion detection (PIR) module."""
@ -30,6 +97,11 @@ class Motion(IotModule):
if "get_config" not in self.data:
return
# Require that ADC value is also present.
if "get_adc_value" not in self.data:
_LOGGER.warning("%r initialized, but no get_adc_value in response")
return
if "enable" not in self.config:
_LOGGER.warning("%r initialized, but no enable in response")
return
@ -48,9 +120,143 @@ class Motion(IotModule):
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_range",
name="Motion Sensor Range",
icon="mdi:motion-sensor",
attribute_getter="range",
attribute_setter="_set_range_from_str",
type=Feature.Type.Choice,
choices_getter="ranges",
category=Feature.Category.Config,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_threshold",
name="Motion Sensor Threshold",
icon="mdi:motion-sensor",
attribute_getter="threshold",
attribute_setter="set_threshold",
type=Feature.Type.Number,
category=Feature.Category.Config,
range_getter=lambda: (0, 100),
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_triggered",
name="PIR Triggered",
icon="mdi:motion-sensor",
attribute_getter="pir_triggered",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Primary,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_value",
name="PIR Value",
icon="mdi:motion-sensor",
attribute_getter="pir_value",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Info,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_adc_value",
name="PIR ADC Value",
icon="mdi:motion-sensor",
attribute_getter="adc_value",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_adc_min",
name="PIR ADC Min",
icon="mdi:motion-sensor",
attribute_getter="adc_min",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_adc_mid",
name="PIR ADC Mid",
icon="mdi:motion-sensor",
attribute_getter="adc_mid",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_adc_max",
name="PIR ADC Max",
icon="mdi:motion-sensor",
attribute_getter="adc_max",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_percent",
name="PIR Percentile",
icon="mdi:motion-sensor",
attribute_getter="pir_percent",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
unit_getter=lambda: "%",
)
)
def query(self) -> dict:
"""Request PIR configuration."""
return self.query_for_command("get_config")
req = merge(
self.query_for_command("get_config"),
self.query_for_command("get_adc_value"),
)
return req
@property
def config(self) -> dict:
@ -58,34 +264,103 @@ class Motion(IotModule):
return self.data["get_config"]
@property
def range(self) -> Range:
"""Return motion detection range."""
return Range(self.config["trigger_index"])
def pir_config(self) -> PIRConfig:
"""Return PIR sensor configuration."""
pir_range = Range(self.config["trigger_index"])
return PIRConfig(
enabled=bool(self.config["enable"]),
adc_min=int(self.config["min_adc"]),
adc_max=int(self.config["max_adc"]),
range=pir_range,
threshold=self.get_range_threshold(pir_range),
)
@property
def enabled(self) -> bool:
"""Return True if module is enabled."""
return bool(self.config["enable"])
return self.pir_config.enabled
@property
def adc_min(self) -> int:
"""Return minimum ADC sensor value."""
return self.pir_config.adc_min
@property
def adc_max(self) -> int:
"""Return maximum ADC sensor value."""
return self.pir_config.adc_max
@property
def adc_mid(self) -> int:
"""
Return the midpoint for the ADC.
The midpoint represents the zero point for the PIR sensor waveform.
Currently this is estimated by:
math.floor(abs(adc_max - adc_min) / 2)
"""
return self.pir_config.adc_mid
async def set_enabled(self, state: bool) -> dict:
"""Enable/disable PIR."""
return await self.call("set_enable", {"enable": int(state)})
async def set_range(
self, *, range: Range | None = None, custom_range: int | None = None
) -> dict:
"""Set the range for the sensor.
@property
def ranges(self) -> list[str]:
"""Return set of supported range classes."""
range_min = 0
range_max = len(self.config["array"])
valid_ranges = list()
for r in Range:
if (r.value >= range_min) and (r.value < range_max):
valid_ranges.append(r.name)
return valid_ranges
:param range: for using standard ranges
:param custom_range: range in decimeters, overrides the range parameter
@property
def range(self) -> Range:
"""Return motion detection Range."""
return self.pir_config.range
async def set_range(self, range: Range) -> dict:
"""Set the Range for the sensor.
:param Range: the range class to use.
"""
if custom_range is not None:
payload = {"index": Range.Custom.value, "value": custom_range}
elif range is not None:
payload = {"index": range.value}
else:
raise KasaException("Either range or custom_range need to be defined")
return await self.call("set_trigger_sens", payload)
def _parse_range_value(self, value: str) -> Range:
"""Attempt to parse a range value from the given string."""
value = value.strip().capitalize()
try:
return Range[value]
except KeyError:
raise KasaException(
f"Invalid range value: '{value}'."
f" Valid options are: {Range._member_names_}"
) from KeyError
async def _set_range_from_str(self, input: str) -> dict:
value = self._parse_range_value(input)
return await self.set_range(range=value)
def get_range_threshold(self, range_type: Range) -> int:
"""Get the distance threshold at which the PIR sensor is will trigger."""
if range_type.value < 0 or range_type.value >= len(self.config["array"]):
raise KasaException(
"Range type is outside the bounds of the configured device ranges."
)
return int(self.config["array"][range_type.value])
@property
def threshold(self) -> int:
"""Return motion detection Range."""
return self.pir_config.threshold
async def set_threshold(self, value: int) -> dict:
"""Set the distance threshold at which the PIR sensor is will trigger."""
payload = {"index": Range.Custom.value, "value": value}
return await self.call("set_trigger_sens", payload)
@property
@ -100,3 +375,34 @@ class Motion(IotModule):
to avoid reverting this back to 60 seconds after a period of time.
"""
return await self.call("set_cold_time", {"cold_time": timeout})
@property
def pir_state(self) -> PIRStatus:
"""Return cached PIR status."""
return PIRStatus(self.pir_config, self.data["get_adc_value"]["value"])
async def get_pir_state(self) -> PIRStatus:
"""Return real-time PIR status."""
latest = await self.call("get_adc_value")
self.data["get_adc_value"] = latest
return PIRStatus(self.pir_config, latest["value"])
@property
def adc_value(self) -> int:
"""Return motion adc value."""
return self.pir_state.adc_value
@property
def pir_value(self) -> int:
"""Return the computed PIR sensor value."""
return self.pir_state.pir_value
@property
def pir_percent(self) -> float:
"""Return the computed PIR sensor value, in percentile form."""
return self.pir_state.pir_percent
@property
def pir_triggered(self) -> bool:
"""Return if the motion sensor has been triggered."""
return self.pir_state.pir_triggered

View File

@ -192,6 +192,7 @@ AMBIENT_MODULE = {
MOTION_MODULE = {
"get_adc_value": {"value": 50, "err_code": 0},
"get_config": {
"enable": 0,
"version": "1.0",
@ -201,7 +202,7 @@ MOTION_MODULE = {
"max_adc": 4095,
"array": [80, 50, 20, 0],
"err_code": 0,
}
},
}
LIGHT_DETAILS = {

View File

@ -1,6 +1,7 @@
import pytest
from pytest_mock import MockerFixture
from kasa import Module
from kasa import KasaException, Module
from kasa.iot import IotDimmer
from kasa.iot.modules.motion import Motion, Range
@ -36,17 +37,72 @@ async def test_motion_range(dev: IotDimmer, mocker: MockerFixture):
motion: Motion = dev.modules[Module.IotMotion]
query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper")
await motion.set_range(custom_range=123)
for range in Range:
await motion.set_range(range)
query_helper.assert_called_with(
"smartlife.iot.PIR",
"set_trigger_sens",
{"index": range.value},
)
@dimmer_iot
async def test_motion_range_from_string(dev: IotDimmer, mocker: MockerFixture):
motion: Motion = dev.modules[Module.IotMotion]
query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper")
ranges_good = {
"near": Range.Near,
"MID": Range.Mid,
"fAr": Range.Far,
" Custom ": Range.Custom,
}
for range_str, range in ranges_good.items():
await motion._set_range_from_str(range_str)
query_helper.assert_called_with(
"smartlife.iot.PIR",
"set_trigger_sens",
{"index": range.value},
)
query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper")
ranges_bad = ["near1", "MD", "F\nAR", "Custom Near", '"FAR"', "'FAR'"]
for range_str in ranges_bad:
with pytest.raises(KasaException):
await motion._set_range_from_str(range_str)
query_helper.assert_not_called()
@dimmer_iot
async def test_motion_threshold(dev: IotDimmer, mocker: MockerFixture):
motion: Motion = dev.modules[Module.IotMotion]
query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper")
for range in Range:
# Switch to a given range.
await motion.set_range(range)
query_helper.assert_called_with(
"smartlife.iot.PIR",
"set_trigger_sens",
{"index": range.value},
)
# Assert that the range always goes to custom, regardless of current range.
await motion.set_threshold(123)
query_helper.assert_called_with(
"smartlife.iot.PIR",
"set_trigger_sens",
{"index": Range.Custom.value, "value": 123},
)
await motion.set_range(range=Range.Far)
query_helper.assert_called_with(
"smartlife.iot.PIR", "set_trigger_sens", {"index": Range.Far.value}
)
@dimmer_iot
async def test_motion_realtime(dev: IotDimmer, mocker: MockerFixture):
motion: Motion = dev.modules[Module.IotMotion]
query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper")
await motion.get_pir_state()
query_helper.assert_called_with("smartlife.iot.PIR", "get_adc_value", None)
@dimmer_iot

View File

@ -1180,6 +1180,63 @@ async def test_feature_set_child(mocker, runner):
assert res.exit_code == 0
async def test_feature_set_unquoted(mocker, runner):
"""Test feature command's set value."""
dummy_device = await get_device_for_fixture_protocol(
"ES20M(US)_1.0_1.0.11.json", "IOT"
)
range_setter = mocker.patch("kasa.iot.modules.motion.Motion._set_range_from_str")
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
res = await runner.invoke(
cli,
["--host", "127.0.0.123", "--debug", "feature", "pir_range", "Far"],
catch_exceptions=False,
)
range_setter.assert_not_called()
assert "Error: Invalid value: " in res.output
assert res.exit_code != 0
async def test_feature_set_badquoted(mocker, runner):
"""Test feature command's set value."""
dummy_device = await get_device_for_fixture_protocol(
"ES20M(US)_1.0_1.0.11.json", "IOT"
)
range_setter = mocker.patch("kasa.iot.modules.motion.Motion._set_range_from_str")
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
res = await runner.invoke(
cli,
["--host", "127.0.0.123", "--debug", "feature", "pir_range", "`Far"],
catch_exceptions=False,
)
range_setter.assert_not_called()
assert "Error: Invalid value: " in res.output
assert res.exit_code != 0
async def test_feature_set_goodquoted(mocker, runner):
"""Test feature command's set value."""
dummy_device = await get_device_for_fixture_protocol(
"ES20M(US)_1.0_1.0.11.json", "IOT"
)
range_setter = mocker.patch("kasa.iot.modules.motion.Motion._set_range_from_str")
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
res = await runner.invoke(
cli,
["--host", "127.0.0.123", "--debug", "feature", "pir_range", "'Far'"],
catch_exceptions=False,
)
range_setter.assert_called()
assert "Error: Invalid value: " not in res.output
assert res.exit_code == 0
async def test_cli_child_commands(
dev: Device, runner: CliRunner, mocker: MockerFixture
):

View File

@ -141,7 +141,10 @@ async def test_feature_choice_list(dummy_feature, caplog, mocker: MockerFixture)
mock_setter.assert_called_with("first")
mock_setter.reset_mock()
with pytest.raises(ValueError, match="Unexpected value for dummy_feature: invalid"): # noqa: PT012
with pytest.raises( # noqa: PT012
ValueError,
match="Unexpected value for dummy_feature: 'invalid' (?: - allowed: .*)?",
):
await dummy_feature.set_value("invalid")
assert "Unexpected value" in caplog.text