Add Async ADC PIR Support

- Add: Async function to get the current state of the ADC PIR sensor.
- Fix: Move ADC PIR state and config into proper dataclasses.
- Add: Add CLI test for passing bad quotes to feature set.
This commit is contained in:
Ryan Nitcher 2024-11-25 23:19:42 -07:00
parent d2ce2d20d4
commit 28080067db
3 changed files with 140 additions and 18 deletions

View File

@ -4,6 +4,7 @@ from __future__ import annotations
import logging import logging
import math import math
from dataclasses import dataclass
from enum import Enum from enum import Enum
from ...exceptions import KasaException from ...exceptions import KasaException
@ -25,6 +26,64 @@ class Range(Enum):
return self.name return self.name
@dataclass
class PIRConfig:
"""Dataclass representing a PIR sensor configuration."""
enabled: bool
adc_min: int
adc_max: int
range: Range
threshold: int
@property
def adc_mid(self) -> int:
"""Compute the ADC midpoint from the configured ADC Max and Min values."""
return math.floor(abs(self.adc_max - self.adc_min) / 2)
@dataclass
class PIRStatus:
"""Dataclass representing the current trigger state of an ADC PIR sensor."""
adc_value: int
def get_pir_value(self, config: PIRConfig) -> int:
"""
Get the PIR status value in integer form.
Computes the PIR status value that this object represents,
using the given PIR configuration.
"""
return config.adc_mid - self.adc_value
def get_pir_percent(self, config: PIRConfig) -> float:
"""
Get the PIR status value in percentile form.
Computes the PIR status percentage that this object represents,
using the given PIR configuration.
"""
value = self.get_pir_value(config)
divisor = (
(config.adc_mid - config.adc_min)
if (value < 0)
else (config.adc_max - config.adc_mid)
)
return (float(value) / divisor) * 100
def get_pir_triggered(self, config: PIRConfig) -> bool:
"""
Get the PIR status trigger state.
Compute the PIR trigger state this object represents,
using the given PIR configuration.
"""
return (config.enabled) and (
abs(self.get_pir_percent(config)) > (100 - config.threshold)
)
class Motion(IotModule): class Motion(IotModule):
"""Implements the motion detection (PIR) module.""" """Implements the motion detection (PIR) module."""
@ -150,7 +209,7 @@ class Motion(IotModule):
id="pir_adc_mid", id="pir_adc_mid",
name="PIR ADC Mid", name="PIR ADC Mid",
icon="mdi:motion-sensor", icon="mdi:motion-sensor",
attribute_getter="adc_midpoint", attribute_getter="adc_mid",
attribute_setter=None, attribute_setter=None,
type=Feature.Type.Sensor, type=Feature.Type.Sensor,
category=Feature.Category.Debug, category=Feature.Category.Debug,
@ -200,23 +259,35 @@ class Motion(IotModule):
"""Return current configuration.""" """Return current configuration."""
return self.data["get_config"] return self.data["get_config"]
@property
def pir_config(self) -> PIRConfig:
"""Return PIR sensor configuration."""
pir_range = Range(self.config["trigger_index"])
return PIRConfig(
enabled=bool(self.config["enable"]),
adc_min=int(self.config["min_adc"]),
adc_max=int(self.config["max_adc"]),
range=pir_range,
threshold=self.get_range_threshold(pir_range),
)
@property @property
def enabled(self) -> bool: def enabled(self) -> bool:
"""Return True if module is enabled.""" """Return True if module is enabled."""
return bool(self.config["enable"]) return self.pir_config.enabled
@property @property
def adc_min(self) -> int: def adc_min(self) -> int:
"""Return minimum ADC sensor value.""" """Return minimum ADC sensor value."""
return int(self.config["min_adc"]) return self.pir_config.adc_min
@property @property
def adc_max(self) -> int: def adc_max(self) -> int:
"""Return maximum ADC sensor value.""" """Return maximum ADC sensor value."""
return int(self.config["max_adc"]) return self.pir_config.adc_max
@property @property
def adc_midpoint(self) -> int: def adc_mid(self) -> int:
""" """
Return the midpoint for the ADC. Return the midpoint for the ADC.
@ -225,7 +296,7 @@ class Motion(IotModule):
Currently this is estimated by: Currently this is estimated by:
math.floor(abs(adc_max - adc_min) / 2) math.floor(abs(adc_max - adc_min) / 2)
""" """
return math.floor(abs(self.adc_max - self.adc_min) / 2) return self.pir_config.adc_mid
async def set_enabled(self, state: bool) -> dict: async def set_enabled(self, state: bool) -> dict:
"""Enable/disable PIR.""" """Enable/disable PIR."""
@ -245,7 +316,7 @@ class Motion(IotModule):
@property @property
def range(self) -> Range: def range(self) -> Range:
"""Return motion detection Range.""" """Return motion detection Range."""
return Range(self.config["trigger_index"]) return self.pir_config.range
async def set_range(self, range: Range) -> dict: async def set_range(self, range: Range) -> dict:
"""Set the Range for the sensor. """Set the Range for the sensor.
@ -280,7 +351,7 @@ class Motion(IotModule):
@property @property
def threshold(self) -> int: def threshold(self) -> int:
"""Return motion detection Range.""" """Return motion detection Range."""
return self.get_range_threshold(self.range) return self.pir_config.threshold
async def set_threshold(self, value: int) -> dict: async def set_threshold(self, value: int) -> dict:
"""Set the distance threshold at which the PIR sensor is will trigger.""" """Set the distance threshold at which the PIR sensor is will trigger."""
@ -300,28 +371,32 @@ class Motion(IotModule):
""" """
return await self.call("set_cold_time", {"cold_time": timeout}) return await self.call("set_cold_time", {"cold_time": timeout})
@property
def pir_state(self) -> PIRStatus:
"""Return cached PIR status."""
return PIRStatus(self.data["get_adc_value"]["value"])
async def get_pir_state(self) -> PIRStatus:
"""Return real-time PIR status."""
current = await self.call("get_adc_value")
return PIRStatus(current["value"])
@property @property
def adc_value(self) -> int: def adc_value(self) -> int:
"""Return motion adc value.""" """Return motion adc value."""
return self.data["get_adc_value"]["value"] return self.pir_state.adc_value
@property @property
def pir_value(self) -> int: def pir_value(self) -> int:
"""Return the computed PIR sensor value.""" """Return the computed PIR sensor value."""
return self.adc_midpoint - self.adc_value return self.pir_state.get_pir_value(self.pir_config)
@property @property
def pir_percent(self) -> float: def pir_percent(self) -> float:
"""Return the computed PIR sensor value, in percentile form.""" """Return the computed PIR sensor value, in percentile form."""
amp = self.pir_value return self.pir_state.get_pir_percent(self.pir_config)
per: float
if amp < 0:
per = (float(amp) / (self.adc_midpoint - self.adc_min)) * 100
else:
per = (float(amp) / (self.adc_max - self.adc_midpoint)) * 100
return per
@property @property
def pir_triggered(self) -> bool: def pir_triggered(self) -> bool:
"""Return if the motion sensor has been triggered.""" """Return if the motion sensor has been triggered."""
return (self.enabled) and (abs(self.pir_percent) > (100 - self.threshold)) return self.pir_state.get_pir_triggered(self.pir_config)

View File

@ -68,6 +68,15 @@ async def test_motion_threshold(dev: IotDimmer, mocker: MockerFixture):
) )
@dimmer_iot
async def test_motion_realtime(dev: IotDimmer, mocker: MockerFixture):
motion: Motion = dev.modules[Module.IotMotion]
query_helper = mocker.patch("kasa.iot.IotDimmer._query_helper")
await motion.get_pir_state()
query_helper.assert_called_with("smartlife.iot.PIR", "get_adc_value", None)
@dimmer_iot @dimmer_iot
def test_motion_feature(dev: IotDimmer): def test_motion_feature(dev: IotDimmer):
assert Module.IotMotion in dev.modules assert Module.IotMotion in dev.modules

View File

@ -1119,6 +1119,44 @@ async def test_feature_set_child(mocker, runner):
assert res.exit_code == 0 assert res.exit_code == 0
async def test_feature_set_unquoted(mocker, runner):
"""Test feature command's set value."""
dummy_device = await get_device_for_fixture_protocol(
"ES20M(US)_1.0_1.0.11.json", "IOT"
)
range_setter = mocker.patch("kasa.iot.modules.motion.Motion._set_range_cli")
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
res = await runner.invoke(
cli,
["--host", "127.0.0.123", "--debug", "feature", "pir_range", "Far"],
catch_exceptions=False,
)
range_setter.assert_not_called()
assert "Error: Invalid value: " in res.output
assert res.exit_code != 0
async def test_feature_set_badquoted(mocker, runner):
"""Test feature command's set value."""
dummy_device = await get_device_for_fixture_protocol(
"ES20M(US)_1.0_1.0.11.json", "IOT"
)
range_setter = mocker.patch("kasa.iot.modules.motion.Motion._set_range_cli")
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
res = await runner.invoke(
cli,
["--host", "127.0.0.123", "--debug", "feature", "pir_range", "`Far"],
catch_exceptions=False,
)
range_setter.assert_not_called()
assert "Error: Invalid value: " in res.output
assert res.exit_code != 0
async def test_cli_child_commands( async def test_cli_child_commands(
dev: Device, runner: CliRunner, mocker: MockerFixture dev: Device, runner: CliRunner, mocker: MockerFixture
): ):