From 5b486074e27ea63a2c371266aa863df41de13149 Mon Sep 17 00:00:00 2001 From: Steven B <51370195+sdb9696@users.noreply.github.com> Date: Thu, 2 May 2024 15:31:12 +0100 Subject: [PATCH] Add LightEffectModule for dynamic light effects on SMART bulbs (#887) Support the `light_effect` module which allows setting the effect to Off or Party or Relax. Uses the new `Feature.Type.Choice`. Does not currently allow editing of effects. --- kasa/cli.py | 38 +++--- kasa/feature.py | 7 ++ kasa/smart/modules/__init__.py | 2 + kasa/smart/modules/lighteffectmodule.py | 112 ++++++++++++++++++ kasa/smart/smartdevice.py | 58 +-------- kasa/tests/fakeprotocol_smart.py | 16 +++ kasa/tests/smart/modules/test_light_effect.py | 42 +++++++ kasa/tests/test_cli.py | 19 ++- 8 files changed, 217 insertions(+), 77 deletions(-) create mode 100644 kasa/smart/modules/lighteffectmodule.py create mode 100644 kasa/tests/smart/modules/test_light_effect.py diff --git a/kasa/cli.py b/kasa/cli.py index 0ef3eccb..696dee27 100755 --- a/kasa/cli.py +++ b/kasa/cli.py @@ -586,6 +586,7 @@ def _echo_features( title: str, category: Feature.Category | None = None, verbose: bool = False, + indent: str = "\t", ): """Print out a listing of features and their values.""" if category is not None: @@ -598,13 +599,13 @@ def _echo_features( echo(f"[bold]{title}[/bold]") for _, feat in features.items(): try: - echo(f"\t{feat}") + echo(f"{indent}{feat}") if verbose: - echo(f"\t\tType: {feat.type}") - echo(f"\t\tCategory: {feat.category}") - echo(f"\t\tIcon: {feat.icon}") + echo(f"{indent}\tType: {feat.type}") + echo(f"{indent}\tCategory: {feat.category}") + echo(f"{indent}\tIcon: {feat.icon}") except Exception as ex: - echo(f"\t{feat.name} ({feat.id}): got exception (%s)" % ex) + echo(f"{indent}{feat.name} ({feat.id}): [red]got exception ({ex})[/red]") def _echo_all_features(features, *, verbose=False, title_prefix=None): @@ -1219,22 +1220,15 @@ async def feature(dev: Device, child: str, name: str, value): echo(f"Targeting child device {child}") dev = dev.get_child_device(child) if not name: - - def _print_features(dev): - for name, feat in dev.features.items(): - try: - unit = f" {feat.unit}" if feat.unit else "" - echo(f"\t{feat.name} ({name}): {feat.value}{unit}") - except Exception as ex: - echo(f"\t{feat.name} ({name}): [red]{ex}[/red]") - - echo("[bold]== Features ==[/bold]") - _print_features(dev) + _echo_features(dev.features, "\n[bold]== Features ==[/bold]\n", indent="") if dev.children: for child_dev in dev.children: - echo(f"[bold]== Child {child_dev.alias} ==") - _print_features(child_dev) + _echo_features( + child_dev.features, + f"\n[bold]== Child {child_dev.alias} ==\n", + indent="", + ) return @@ -1249,9 +1243,13 @@ async def feature(dev: Device, child: str, name: str, value): echo(f"{feat.name} ({name}): {feat.value}{unit}") return feat.value - echo(f"Setting {name} to {value}") value = ast.literal_eval(value) - return await dev.features[name].set_value(value) + echo(f"Changing {name} from {feat.value} to {value}") + response = await dev.features[name].set_value(value) + await dev.update() + echo(f"New state: {feat.value}") + + return response if __name__ == "__main__": diff --git a/kasa/feature.py b/kasa/feature.py index b6e933ce..2000b21a 100644 --- a/kasa/feature.py +++ b/kasa/feature.py @@ -174,9 +174,16 @@ class Feature: def __repr__(self): try: value = self.value + choices = self.choices except Exception as ex: return f"Unable to read value ({self.id}): {ex}" + if self.type == Feature.Type.Choice: + if not isinstance(choices, list) or value not in choices: + return f"Value {value} is not a valid choice ({self.id}): {choices}" + value = " ".join( + [f"*{choice}*" if choice == value else choice for choice in choices] + ) if self.precision_hint is not None and value is not None: value = round(self.value, self.precision_hint) diff --git a/kasa/smart/modules/__init__.py b/kasa/smart/modules/__init__.py index ee2b8442..64722079 100644 --- a/kasa/smart/modules/__init__.py +++ b/kasa/smart/modules/__init__.py @@ -15,6 +15,7 @@ from .firmware import Firmware from .frostprotection import FrostProtectionModule from .humidity import HumiditySensor from .ledmodule import LedModule +from .lighteffectmodule import LightEffectModule from .lighttransitionmodule import LightTransitionModule from .reportmodule import ReportModule from .temperature import TemperatureSensor @@ -39,6 +40,7 @@ __all__ = [ "FanModule", "Firmware", "CloudModule", + "LightEffectModule", "LightTransitionModule", "ColorTemperatureModule", "ColorModule", diff --git a/kasa/smart/modules/lighteffectmodule.py b/kasa/smart/modules/lighteffectmodule.py new file mode 100644 index 00000000..7f03b8ff --- /dev/null +++ b/kasa/smart/modules/lighteffectmodule.py @@ -0,0 +1,112 @@ +"""Module for light effects.""" + +from __future__ import annotations + +import base64 +import copy +from typing import TYPE_CHECKING, Any + +from ...feature import Feature +from ..smartmodule import SmartModule + +if TYPE_CHECKING: + from ..smartdevice import SmartDevice + + +class LightEffectModule(SmartModule): + """Implementation of dynamic light effects.""" + + REQUIRED_COMPONENT = "light_effect" + QUERY_GETTER_NAME = "get_dynamic_light_effect_rules" + AVAILABLE_BULB_EFFECTS = { + "L1": "Party", + "L2": "Relax", + } + LIGHT_EFFECTS_OFF = "Off" + + def __init__(self, device: SmartDevice, module: str): + super().__init__(device, module) + self._scenes_names_to_id: dict[str, str] = {} + + def _initialize_features(self): + """Initialize features.""" + device = self._device + self._add_feature( + Feature( + device, + "Light effect", + container=self, + attribute_getter="effect", + attribute_setter="set_effect", + category=Feature.Category.Config, + type=Feature.Type.Choice, + choices_getter="effect_list", + ) + ) + + def _initialize_effects(self) -> dict[str, dict[str, Any]]: + """Return built-in effects.""" + # Copy the effects so scene name updates do not update the underlying dict. + effects = copy.deepcopy( + {effect["id"]: effect for effect in self.data["rule_list"]} + ) + for effect in effects.values(): + if not effect["scene_name"]: + # If the name has not been edited scene_name will be an empty string + effect["scene_name"] = self.AVAILABLE_BULB_EFFECTS[effect["id"]] + else: + # Otherwise it will be b64 encoded + effect["scene_name"] = base64.b64decode(effect["scene_name"]).decode() + self._scenes_names_to_id = { + effect["scene_name"]: effect["id"] for effect in effects.values() + } + return effects + + @property + def effect_list(self) -> list[str] | None: + """Return built-in effects list. + + Example: + ['Party', 'Relax', ...] + """ + effects = [self.LIGHT_EFFECTS_OFF] + effects.extend( + [effect["scene_name"] for effect in self._initialize_effects().values()] + ) + return effects + + @property + def effect(self) -> str: + """Return effect name.""" + # get_dynamic_light_effect_rules also has an enable property and current_rule_id + # property that could be used here as an alternative + if self._device._info["dynamic_light_effect_enable"]: + return self._initialize_effects()[ + self._device._info["dynamic_light_effect_id"] + ]["scene_name"] + return self.LIGHT_EFFECTS_OFF + + async def set_effect( + self, + effect: str, + ) -> None: + """Set an effect for the device. + + The device doesn't store an active effect while not enabled so store locally. + """ + if effect != self.LIGHT_EFFECTS_OFF and effect not in self._scenes_names_to_id: + raise ValueError( + f"Cannot set light effect to {effect}, possible values " + f"are: {self.LIGHT_EFFECTS_OFF} " + f"{' '.join(self._scenes_names_to_id.keys())}" + ) + enable = effect != self.LIGHT_EFFECTS_OFF + params: dict[str, bool | str] = {"enable": enable} + if enable: + effect_id = self._scenes_names_to_id[effect] + params["id"] = effect_id + return await self.call("set_dynamic_light_effect_rule_enable", params) + + def query(self) -> dict: + """Query to execute during the update cycle.""" + return {self.QUERY_GETTER_NAME: {"start_index": 0}} diff --git a/kasa/smart/smartdevice.py b/kasa/smart/smartdevice.py index 733f3157..e5df10be 100644 --- a/kasa/smart/smartdevice.py +++ b/kasa/smart/smartdevice.py @@ -40,11 +40,6 @@ if TYPE_CHECKING: # same issue, homekit perhaps? WALL_SWITCH_PARENT_ONLY_MODULES = [DeviceModule, TimeModule, Firmware, CloudModule] -AVAILABLE_BULB_EFFECTS = { - "L1": "Party", - "L2": "Relax", -} - # Device must go last as the other interfaces also inherit Device # and python needs a consistent method resolution order. @@ -683,44 +678,6 @@ class SmartDevice(Bulb, Fan, Device): ColorTemperatureModule, self.modules["ColorTemperatureModule"] ).valid_temperature_range - @property - def has_effects(self) -> bool: - """Return True if the device supports effects.""" - return "dynamic_light_effect_enable" in self._info - - @property - def effect(self) -> dict: - """Return effect state. - - This follows the format used by SmartLightStrip. - - Example: - {'brightness': 50, - 'custom': 0, - 'enable': 0, - 'id': '', - 'name': ''} - """ - # If no effect is active, dynamic_light_effect_id does not appear in info - current_effect = self._info.get("dynamic_light_effect_id", "") - data = { - "brightness": self.brightness, - "enable": current_effect != "", - "id": current_effect, - "name": AVAILABLE_BULB_EFFECTS.get(current_effect, ""), - } - - return data - - @property - def effect_list(self) -> list[str] | None: - """Return built-in effects list. - - Example: - ['Party', 'Relax', ...] - """ - return list(AVAILABLE_BULB_EFFECTS.keys()) if self.has_effects else None - @property def hsv(self) -> HSV: """Return the current HSV state of the bulb. @@ -807,17 +764,12 @@ class SmartDevice(Bulb, Fan, Device): brightness ) - async def set_effect( - self, - effect: str, - *, - brightness: int | None = None, - transition: int | None = None, - ) -> None: - """Set an effect on the device.""" - raise NotImplementedError() - @property def presets(self) -> list[BulbPreset]: """Return a list of available bulb setting presets.""" return [] + + @property + def has_effects(self) -> bool: + """Return True if the device supports effects.""" + return "LightEffectModule" in self.modules diff --git a/kasa/tests/fakeprotocol_smart.py b/kasa/tests/fakeprotocol_smart.py index 7340b5b7..ae1a7ad6 100644 --- a/kasa/tests/fakeprotocol_smart.py +++ b/kasa/tests/fakeprotocol_smart.py @@ -176,6 +176,19 @@ class FakeSmartTransport(BaseTransport): "Method %s not implemented for children" % child_method ) + def _set_light_effect(self, info, params): + """Set or remove values as per the device behaviour.""" + info["get_device_info"]["dynamic_light_effect_enable"] = params["enable"] + info["get_dynamic_light_effect_rules"]["enable"] = params["enable"] + if params["enable"]: + info["get_device_info"]["dynamic_light_effect_id"] = params["id"] + info["get_dynamic_light_effect_rules"]["current_rule_id"] = params["enable"] + else: + if "dynamic_light_effect_id" in info["get_device_info"]: + del info["get_device_info"]["dynamic_light_effect_id"] + if "current_rule_id" in info["get_dynamic_light_effect_rules"]: + del info["get_dynamic_light_effect_rules"]["current_rule_id"] + def _send_request(self, request_dict: dict): method = request_dict["method"] params = request_dict["params"] @@ -223,6 +236,9 @@ class FakeSmartTransport(BaseTransport): return retval elif method == "set_qs_info": return {"error_code": 0} + elif method == "set_dynamic_light_effect_rule_enable": + self._set_light_effect(info, params) + return {"error_code": 0} elif method[:4] == "set_": target_method = f"get_{method[4:]}" info[target_method].update(params) diff --git a/kasa/tests/smart/modules/test_light_effect.py b/kasa/tests/smart/modules/test_light_effect.py new file mode 100644 index 00000000..ba1b2293 --- /dev/null +++ b/kasa/tests/smart/modules/test_light_effect.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from itertools import chain +from typing import cast + +import pytest +from pytest_mock import MockerFixture + +from kasa import Device, Feature +from kasa.smart.modules import LightEffectModule +from kasa.tests.device_fixtures import parametrize + +light_effect = parametrize( + "has light effect", component_filter="light_effect", protocol_filter={"SMART"} +) + + +@light_effect +async def test_light_effect(dev: Device, mocker: MockerFixture): + """Test light effect.""" + light_effect = cast(LightEffectModule, dev.modules.get("LightEffectModule")) + assert light_effect + + feature = light_effect._module_features["light_effect"] + assert feature.type == Feature.Type.Choice + + call = mocker.spy(light_effect, "call") + assert feature.choices == light_effect.effect_list + assert feature.choices + for effect in chain(reversed(feature.choices), feature.choices): + await light_effect.set_effect(effect) + enable = effect != LightEffectModule.LIGHT_EFFECTS_OFF + params: dict[str, bool | str] = {"enable": enable} + if enable: + params["id"] = light_effect._scenes_names_to_id[effect] + call.assert_called_with("set_dynamic_light_effect_rule_enable", params) + await dev.update() + assert light_effect.effect == effect + assert feature.value == effect + + with pytest.raises(ValueError): + await light_effect.set_effect("foobar") diff --git a/kasa/tests/test_cli.py b/kasa/tests/test_cli.py index 3d80ee47..7addd434 100644 --- a/kasa/tests/test_cli.py +++ b/kasa/tests/test_cli.py @@ -689,6 +689,17 @@ async def test_feature(mocker, runner): assert res.exit_code == 0 +async def test_features_all(discovery_mock, mocker, runner): + """Test feature command on all fixtures.""" + res = await runner.invoke( + cli, + ["--host", "127.0.0.123", "--debug", "feature"], + catch_exceptions=False, + ) + assert "== Features ==" in res.output + assert res.exit_code == 0 + + async def test_feature_single(mocker, runner): """Test feature command returning single value.""" dummy_device = await get_device_for_fixture_protocol( @@ -736,7 +747,7 @@ async def test_feature_set(mocker, runner): ) led_setter.assert_called_with(True) - assert "Setting led to True" in res.output + assert "Changing led from False to True" in res.output assert res.exit_code == 0 @@ -762,14 +773,14 @@ async def test_feature_set_child(mocker, runner): "--child", child_id, "state", - "False", + "True", ], catch_exceptions=False, ) get_child_device.assert_called() - setter.assert_called_with(False) + setter.assert_called_with(True) assert f"Targeting child device {child_id}" - assert "Setting state to False" in res.output + assert "Changing state from False to True" in res.output assert res.exit_code == 0