Ryan Nitcher 7f2a1be392
Some checks are pending
CI / Perform linting checks (3.13) (push) Waiting to run
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.13) (push) Blocked by required conditions
CodeQL checks / Analyze (python) (push) Waiting to run
Add ADC Value to PIR Enabled Switches (#1263)
2025-01-25 10:45:48 +00:00

409 lines
12 KiB
Python

"""Implementation of the motion detection (PIR) module found in some dimmers."""
from __future__ import annotations
import logging
import math
from dataclasses import dataclass
from enum import Enum
from ...exceptions import KasaException
from ...feature import Feature
from ..iotmodule import IotModule, merge
_LOGGER = logging.getLogger(__name__)
class Range(Enum):
"""Range for motion detection."""
Far = 0
Mid = 1
Near = 2
Custom = 3
def __str__(self) -> str:
return self.name
@dataclass
class PIRConfig:
"""Dataclass representing a PIR sensor configuration."""
enabled: bool
adc_min: int
adc_max: int
range: Range
threshold: int
@property
def adc_mid(self) -> int:
"""Compute the ADC midpoint from the configured ADC Max and Min values."""
return math.floor(abs(self.adc_max - self.adc_min) / 2)
@dataclass
class PIRStatus:
"""Dataclass representing the current trigger state of an ADC PIR sensor."""
pir_config: PIRConfig
adc_value: int
@property
def pir_value(self) -> int:
"""
Get the PIR status value in integer form.
Computes the PIR status value that this object represents,
using the given PIR configuration.
"""
return self.pir_config.adc_mid - self.adc_value
@property
def pir_percent(self) -> float:
"""
Get the PIR status value in percentile form.
Computes the PIR status percentage that this object represents,
using the given PIR configuration.
"""
value = self.pir_value
divisor = (
(self.pir_config.adc_mid - self.pir_config.adc_min)
if (value < 0)
else (self.pir_config.adc_max - self.pir_config.adc_mid)
)
return (float(value) / divisor) * 100
@property
def pir_triggered(self) -> bool:
"""
Get the PIR status trigger state.
Compute the PIR trigger state this object represents,
using the given PIR configuration.
"""
return (self.pir_config.enabled) and (
abs(self.pir_percent) > (100 - self.pir_config.threshold)
)
class Motion(IotModule):
"""Implements the motion detection (PIR) module."""
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
# Only add features if the device supports the module
if "get_config" not in self.data:
return
# Require that ADC value is also present.
if "get_adc_value" not in self.data:
_LOGGER.warning("%r initialized, but no get_adc_value in response")
return
if "enable" not in self.config:
_LOGGER.warning("%r initialized, but no enable in response")
return
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_enabled",
name="PIR enabled",
icon="mdi:motion-sensor",
attribute_getter="enabled",
attribute_setter="set_enabled",
type=Feature.Type.Switch,
category=Feature.Category.Config,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_range",
name="Motion Sensor Range",
icon="mdi:motion-sensor",
attribute_getter="range",
attribute_setter="_set_range_from_str",
type=Feature.Type.Choice,
choices_getter="ranges",
category=Feature.Category.Config,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_threshold",
name="Motion Sensor Threshold",
icon="mdi:motion-sensor",
attribute_getter="threshold",
attribute_setter="set_threshold",
type=Feature.Type.Number,
category=Feature.Category.Config,
range_getter=lambda: (0, 100),
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_triggered",
name="PIR Triggered",
icon="mdi:motion-sensor",
attribute_getter="pir_triggered",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Primary,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_value",
name="PIR Value",
icon="mdi:motion-sensor",
attribute_getter="pir_value",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Info,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_adc_value",
name="PIR ADC Value",
icon="mdi:motion-sensor",
attribute_getter="adc_value",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_adc_min",
name="PIR ADC Min",
icon="mdi:motion-sensor",
attribute_getter="adc_min",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_adc_mid",
name="PIR ADC Mid",
icon="mdi:motion-sensor",
attribute_getter="adc_mid",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_adc_max",
name="PIR ADC Max",
icon="mdi:motion-sensor",
attribute_getter="adc_max",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
self._add_feature(
Feature(
device=self._device,
container=self,
id="pir_percent",
name="PIR Percentile",
icon="mdi:motion-sensor",
attribute_getter="pir_percent",
attribute_setter=None,
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
unit_getter=lambda: "%",
)
)
def query(self) -> dict:
"""Request PIR configuration."""
req = merge(
self.query_for_command("get_config"),
self.query_for_command("get_adc_value"),
)
return req
@property
def config(self) -> dict:
"""Return current configuration."""
return self.data["get_config"]
@property
def pir_config(self) -> PIRConfig:
"""Return PIR sensor configuration."""
pir_range = Range(self.config["trigger_index"])
return PIRConfig(
enabled=bool(self.config["enable"]),
adc_min=int(self.config["min_adc"]),
adc_max=int(self.config["max_adc"]),
range=pir_range,
threshold=self.get_range_threshold(pir_range),
)
@property
def enabled(self) -> bool:
"""Return True if module is enabled."""
return self.pir_config.enabled
@property
def adc_min(self) -> int:
"""Return minimum ADC sensor value."""
return self.pir_config.adc_min
@property
def adc_max(self) -> int:
"""Return maximum ADC sensor value."""
return self.pir_config.adc_max
@property
def adc_mid(self) -> int:
"""
Return the midpoint for the ADC.
The midpoint represents the zero point for the PIR sensor waveform.
Currently this is estimated by:
math.floor(abs(adc_max - adc_min) / 2)
"""
return self.pir_config.adc_mid
async def set_enabled(self, state: bool) -> dict:
"""Enable/disable PIR."""
return await self.call("set_enable", {"enable": int(state)})
@property
def ranges(self) -> list[str]:
"""Return set of supported range classes."""
range_min = 0
range_max = len(self.config["array"])
valid_ranges = list()
for r in Range:
if (r.value >= range_min) and (r.value < range_max):
valid_ranges.append(r.name)
return valid_ranges
@property
def range(self) -> Range:
"""Return motion detection Range."""
return self.pir_config.range
async def set_range(self, range: Range) -> dict:
"""Set the Range for the sensor.
:param Range: the range class to use.
"""
payload = {"index": range.value}
return await self.call("set_trigger_sens", payload)
def _parse_range_value(self, value: str) -> Range:
"""Attempt to parse a range value from the given string."""
value = value.strip().capitalize()
try:
return Range[value]
except KeyError:
raise KasaException(
f"Invalid range value: '{value}'."
f" Valid options are: {Range._member_names_}"
) from KeyError
async def _set_range_from_str(self, input: str) -> dict:
value = self._parse_range_value(input)
return await self.set_range(range=value)
def get_range_threshold(self, range_type: Range) -> int:
"""Get the distance threshold at which the PIR sensor is will trigger."""
if range_type.value < 0 or range_type.value >= len(self.config["array"]):
raise KasaException(
"Range type is outside the bounds of the configured device ranges."
)
return int(self.config["array"][range_type.value])
@property
def threshold(self) -> int:
"""Return motion detection Range."""
return self.pir_config.threshold
async def set_threshold(self, value: int) -> dict:
"""Set the distance threshold at which the PIR sensor is will trigger."""
payload = {"index": Range.Custom.value, "value": value}
return await self.call("set_trigger_sens", payload)
@property
def inactivity_timeout(self) -> int:
"""Return inactivity timeout in milliseconds."""
return self.config["cold_time"]
async def set_inactivity_timeout(self, timeout: int) -> dict:
"""Set inactivity timeout in milliseconds.
Note, that you need to delete the default "Smart Control" rule in the app
to avoid reverting this back to 60 seconds after a period of time.
"""
return await self.call("set_cold_time", {"cold_time": timeout})
@property
def pir_state(self) -> PIRStatus:
"""Return cached PIR status."""
return PIRStatus(self.pir_config, self.data["get_adc_value"]["value"])
async def get_pir_state(self) -> PIRStatus:
"""Return real-time PIR status."""
latest = await self.call("get_adc_value")
self.data["get_adc_value"] = latest
return PIRStatus(self.pir_config, latest["value"])
@property
def adc_value(self) -> int:
"""Return motion adc value."""
return self.pir_state.adc_value
@property
def pir_value(self) -> int:
"""Return the computed PIR sensor value."""
return self.pir_state.pir_value
@property
def pir_percent(self) -> float:
"""Return the computed PIR sensor value, in percentile form."""
return self.pir_state.pir_percent
@property
def pir_triggered(self) -> bool:
"""Return if the motion sensor has been triggered."""
return self.pir_state.pir_triggered