From 62c1dd87dc9e41cdc893ac1db42805afbfca3069 Mon Sep 17 00:00:00 2001 From: "Teemu R." Date: Sun, 26 Jan 2025 01:43:02 +0100 Subject: [PATCH] Add powerprotection module (#1337) Implements power protection on supported devices. If the power usage is above the given threshold and the feature is enabled, the device will be turned off. Adds the following features: * `overloaded` binary sensor * `power_protection_threshold` number, setting this to `0` turns the feature off. --------- Co-authored-by: Steven B <51370195+sdb9696@users.noreply.github.com> --- kasa/module.py | 19 ++- kasa/smart/modules/__init__.py | 2 + kasa/smart/modules/powerprotection.py | 124 ++++++++++++++++++++ tests/fakeprotocol_smart.py | 8 ++ tests/smart/modules/test_powerprotection.py | 98 ++++++++++++++++ 5 files changed, 247 insertions(+), 4 deletions(-) create mode 100644 kasa/smart/modules/powerprotection.py create mode 100644 tests/smart/modules/test_powerprotection.py diff --git a/kasa/module.py b/kasa/module.py index 107ce1e6..c58c6b40 100644 --- a/kasa/module.py +++ b/kasa/module.py @@ -81,6 +81,9 @@ ModuleT = TypeVar("ModuleT", bound="Module") class FeatureAttribute: """Class for annotating attributes bound to feature.""" + def __init__(self, feature_name: str | None = None) -> None: + self.feature_name = feature_name + def __repr__(self) -> str: return "FeatureAttribute" @@ -155,6 +158,9 @@ class Module(ABC): ) ChildLock: Final[ModuleName[smart.ChildLock]] = ModuleName("ChildLock") TriggerLogs: Final[ModuleName[smart.TriggerLogs]] = ModuleName("TriggerLogs") + PowerProtection: Final[ModuleName[smart.PowerProtection]] = ModuleName( + "PowerProtection" + ) HomeKit: Final[ModuleName[smart.HomeKit]] = ModuleName("HomeKit") Matter: Final[ModuleName[smart.Matter]] = ModuleName("Matter") @@ -234,7 +240,7 @@ class Module(ABC): ) -def _is_bound_feature(attribute: property | Callable) -> bool: +def _get_feature_attribute(attribute: property | Callable) -> FeatureAttribute | None: """Check if an attribute is bound to a feature with FeatureAttribute.""" if isinstance(attribute, property): hints = get_type_hints(attribute.fget, include_extras=True) @@ -245,9 +251,9 @@ def _is_bound_feature(attribute: property | Callable) -> bool: metadata = hints["return"].__metadata__ for meta in metadata: if isinstance(meta, FeatureAttribute): - return True + return meta - return False + return None @cache @@ -274,12 +280,17 @@ def _get_bound_feature( f"module {module.__class__.__name__}" ) - if not _is_bound_feature(attribute_callable): + if not (fa := _get_feature_attribute(attribute_callable)): raise KasaException( f"Attribute {attribute_name} of module {module.__class__.__name__}" " is not bound to a feature" ) + # If a feature_name was passed to the FeatureAttribute use that to check + # for the feature. Otherwise check the getters and setters in the features + if fa.feature_name: + return module._all_features.get(fa.feature_name) + check = {attribute_name, attribute_callable} for feature in module._all_features.values(): if (getter := feature.attribute_getter) and getter in check: diff --git a/kasa/smart/modules/__init__.py b/kasa/smart/modules/__init__.py index 9215277e..15404239 100644 --- a/kasa/smart/modules/__init__.py +++ b/kasa/smart/modules/__init__.py @@ -34,6 +34,7 @@ from .matter import Matter from .mop import Mop from .motionsensor import MotionSensor from .overheatprotection import OverheatProtection +from .powerprotection import PowerProtection from .reportmode import ReportMode from .speaker import Speaker from .temperaturecontrol import TemperatureControl @@ -80,6 +81,7 @@ __all__ = [ "Consumables", "CleanRecords", "SmartLightEffect", + "PowerProtection", "OverheatProtection", "Speaker", "HomeKit", diff --git a/kasa/smart/modules/powerprotection.py b/kasa/smart/modules/powerprotection.py new file mode 100644 index 00000000..ff7e726d --- /dev/null +++ b/kasa/smart/modules/powerprotection.py @@ -0,0 +1,124 @@ +"""Power protection module.""" + +from __future__ import annotations + +from typing import Annotated + +from ...feature import Feature +from ...module import FeatureAttribute +from ..smartmodule import SmartModule + + +class PowerProtection(SmartModule): + """Implementation for power_protection.""" + + REQUIRED_COMPONENT = "power_protection" + + def _initialize_features(self) -> None: + """Initialize features after the initial update.""" + self._add_feature( + Feature( + device=self._device, + id="overloaded", + name="Overloaded", + container=self, + attribute_getter="overloaded", + type=Feature.Type.BinarySensor, + category=Feature.Category.Info, + ) + ) + self._add_feature( + Feature( + device=self._device, + id="power_protection_threshold", + name="Power protection threshold", + container=self, + attribute_getter="_threshold_or_zero", + attribute_setter="_set_threshold_auto_enable", + unit_getter=lambda: "W", + type=Feature.Type.Number, + range_getter=lambda: (0, self._max_power), + category=Feature.Category.Config, + ) + ) + + def query(self) -> dict: + """Query to execute during the update cycle.""" + return {"get_protection_power": {}, "get_max_power": {}} + + @property + def overloaded(self) -> bool: + """Return True is power protection has been triggered. + + This value remains True until the device is turned on again. + """ + return self._device.sys_info["power_protection_status"] == "overloaded" + + @property + def enabled(self) -> bool: + """Return True if child protection is enabled.""" + return self.data["get_protection_power"]["enabled"] + + async def set_enabled(self, enabled: bool, *, threshold: int | None = None) -> dict: + """Set power protection enabled. + + If power protection has never been enabled before the threshold will + be 0 so if threshold is not provided it will be set to half the max. + """ + if threshold is None and enabled and self.protection_threshold == 0: + threshold = int(self._max_power / 2) + + if threshold and (threshold < 0 or threshold > self._max_power): + raise ValueError( + "Threshold out of range: %s (%s)", threshold, self.protection_threshold + ) + + params = {**self.data["get_protection_power"], "enabled": enabled} + if threshold is not None: + params["protection_power"] = threshold + return await self.call("set_protection_power", params) + + async def _set_threshold_auto_enable(self, threshold: int) -> dict: + """Set power protection and enable.""" + if threshold == 0: + return await self.set_enabled(False) + else: + return await self.set_enabled(True, threshold=threshold) + + @property + def _threshold_or_zero(self) -> int: + """Get power protection threshold. 0 if not enabled.""" + return self.protection_threshold if self.enabled else 0 + + @property + def _max_power(self) -> int: + """Return max power.""" + return self.data["get_max_power"]["max_power"] + + @property + def protection_threshold( + self, + ) -> Annotated[int, FeatureAttribute("power_protection_threshold")]: + """Return protection threshold in watts.""" + # If never configured, there is no value set. + return self.data["get_protection_power"].get("protection_power", 0) + + async def set_protection_threshold(self, threshold: int) -> dict: + """Set protection threshold.""" + if threshold < 0 or threshold > self._max_power: + raise ValueError( + "Threshold out of range: %s (%s)", threshold, self.protection_threshold + ) + + params = { + **self.data["get_protection_power"], + "protection_power": threshold, + } + return await self.call("set_protection_power", params) + + async def _check_supported(self) -> bool: + """Return True if module is supported. + + This is needed, as strips like P304M report the status only for children. + """ + return "power_protection_status" in self._device.sys_info diff --git a/tests/fakeprotocol_smart.py b/tests/fakeprotocol_smart.py index d2367d9f..2006b52e 100644 --- a/tests/fakeprotocol_smart.py +++ b/tests/fakeprotocol_smart.py @@ -164,6 +164,14 @@ class FakeSmartTransport(BaseTransport): "energy_monitoring", {"igain": 10861, "vgain": 118657}, ), + "get_protection_power": ( + "power_protection", + {"enabled": False, "protection_power": 0}, + ), + "get_max_power": ( + "power_protection", + {"max_power": 3904}, + ), "get_matter_setup_info": ( "matter", { diff --git a/tests/smart/modules/test_powerprotection.py b/tests/smart/modules/test_powerprotection.py new file mode 100644 index 00000000..7f03c0e9 --- /dev/null +++ b/tests/smart/modules/test_powerprotection.py @@ -0,0 +1,98 @@ +import pytest +from pytest_mock import MockerFixture + +from kasa import Module, SmartDevice + +from ...device_fixtures import get_parent_and_child_modules, parametrize + +powerprotection = parametrize( + "has powerprotection", + component_filter="power_protection", + protocol_filter={"SMART"}, +) + + +@powerprotection +@pytest.mark.parametrize( + ("feature", "prop_name", "type"), + [ + ("overloaded", "overloaded", bool), + ("power_protection_threshold", "protection_threshold", int), + ], +) +async def test_features(dev, feature, prop_name, type): + """Test that features are registered and work as expected.""" + powerprot = next(get_parent_and_child_modules(dev, Module.PowerProtection)) + assert powerprot + device = powerprot._device + + prop = getattr(powerprot, prop_name) + assert isinstance(prop, type) + + feat = device.features[feature] + assert feat.value == prop + assert isinstance(feat.value, type) + + +@powerprotection +async def test_set_enable(dev: SmartDevice, mocker: MockerFixture): + """Test enable.""" + powerprot = next(get_parent_and_child_modules(dev, Module.PowerProtection)) + assert powerprot + device = powerprot._device + + original_enabled = powerprot.enabled + original_threshold = powerprot.protection_threshold + + try: + # Simple enable with an existing threshold + call_spy = mocker.spy(powerprot, "call") + await powerprot.set_enabled(True) + params = { + "enabled": True, + "protection_power": mocker.ANY, + } + call_spy.assert_called_with("set_protection_power", params) + + # Enable with no threshold param when 0 + call_spy.reset_mock() + await powerprot.set_protection_threshold(0) + await device.update() + await powerprot.set_enabled(True) + params = { + "enabled": True, + "protection_power": int(powerprot._max_power / 2), + } + call_spy.assert_called_with("set_protection_power", params) + + # Enable false should not update the threshold + call_spy.reset_mock() + await powerprot.set_protection_threshold(0) + await device.update() + await powerprot.set_enabled(False) + params = { + "enabled": False, + "protection_power": 0, + } + call_spy.assert_called_with("set_protection_power", params) + + finally: + await powerprot.set_enabled(original_enabled, threshold=original_threshold) + + +@powerprotection +async def test_set_threshold(dev: SmartDevice, mocker: MockerFixture): + """Test enable.""" + powerprot = next(get_parent_and_child_modules(dev, Module.PowerProtection)) + assert powerprot + + call_spy = mocker.spy(powerprot, "call") + await powerprot.set_protection_threshold(123) + params = { + "enabled": mocker.ANY, + "protection_power": 123, + } + call_spy.assert_called_with("set_protection_power", params) + + with pytest.raises(ValueError, match="Threshold out of range"): + await powerprot.set_protection_threshold(-10)