mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-01-22 20:57:07 +00:00
Add LightEffectModule for dynamic light effects on SMART bulbs (#887)
Support the `light_effect` module which allows setting the effect to Off or Party or Relax. Uses the new `Feature.Type.Choice`. Does not currently allow editing of effects.
This commit is contained in:
parent
5ef81f4669
commit
5b486074e2
38
kasa/cli.py
38
kasa/cli.py
@ -586,6 +586,7 @@ def _echo_features(
|
||||
title: str,
|
||||
category: Feature.Category | None = None,
|
||||
verbose: bool = False,
|
||||
indent: str = "\t",
|
||||
):
|
||||
"""Print out a listing of features and their values."""
|
||||
if category is not None:
|
||||
@ -598,13 +599,13 @@ def _echo_features(
|
||||
echo(f"[bold]{title}[/bold]")
|
||||
for _, feat in features.items():
|
||||
try:
|
||||
echo(f"\t{feat}")
|
||||
echo(f"{indent}{feat}")
|
||||
if verbose:
|
||||
echo(f"\t\tType: {feat.type}")
|
||||
echo(f"\t\tCategory: {feat.category}")
|
||||
echo(f"\t\tIcon: {feat.icon}")
|
||||
echo(f"{indent}\tType: {feat.type}")
|
||||
echo(f"{indent}\tCategory: {feat.category}")
|
||||
echo(f"{indent}\tIcon: {feat.icon}")
|
||||
except Exception as ex:
|
||||
echo(f"\t{feat.name} ({feat.id}): got exception (%s)" % ex)
|
||||
echo(f"{indent}{feat.name} ({feat.id}): [red]got exception ({ex})[/red]")
|
||||
|
||||
|
||||
def _echo_all_features(features, *, verbose=False, title_prefix=None):
|
||||
@ -1219,22 +1220,15 @@ async def feature(dev: Device, child: str, name: str, value):
|
||||
echo(f"Targeting child device {child}")
|
||||
dev = dev.get_child_device(child)
|
||||
if not name:
|
||||
|
||||
def _print_features(dev):
|
||||
for name, feat in dev.features.items():
|
||||
try:
|
||||
unit = f" {feat.unit}" if feat.unit else ""
|
||||
echo(f"\t{feat.name} ({name}): {feat.value}{unit}")
|
||||
except Exception as ex:
|
||||
echo(f"\t{feat.name} ({name}): [red]{ex}[/red]")
|
||||
|
||||
echo("[bold]== Features ==[/bold]")
|
||||
_print_features(dev)
|
||||
_echo_features(dev.features, "\n[bold]== Features ==[/bold]\n", indent="")
|
||||
|
||||
if dev.children:
|
||||
for child_dev in dev.children:
|
||||
echo(f"[bold]== Child {child_dev.alias} ==")
|
||||
_print_features(child_dev)
|
||||
_echo_features(
|
||||
child_dev.features,
|
||||
f"\n[bold]== Child {child_dev.alias} ==\n",
|
||||
indent="",
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
@ -1249,9 +1243,13 @@ async def feature(dev: Device, child: str, name: str, value):
|
||||
echo(f"{feat.name} ({name}): {feat.value}{unit}")
|
||||
return feat.value
|
||||
|
||||
echo(f"Setting {name} to {value}")
|
||||
value = ast.literal_eval(value)
|
||||
return await dev.features[name].set_value(value)
|
||||
echo(f"Changing {name} from {feat.value} to {value}")
|
||||
response = await dev.features[name].set_value(value)
|
||||
await dev.update()
|
||||
echo(f"New state: {feat.value}")
|
||||
|
||||
return response
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -174,9 +174,16 @@ class Feature:
|
||||
def __repr__(self):
|
||||
try:
|
||||
value = self.value
|
||||
choices = self.choices
|
||||
except Exception as ex:
|
||||
return f"Unable to read value ({self.id}): {ex}"
|
||||
|
||||
if self.type == Feature.Type.Choice:
|
||||
if not isinstance(choices, list) or value not in choices:
|
||||
return f"Value {value} is not a valid choice ({self.id}): {choices}"
|
||||
value = " ".join(
|
||||
[f"*{choice}*" if choice == value else choice for choice in choices]
|
||||
)
|
||||
if self.precision_hint is not None and value is not None:
|
||||
value = round(self.value, self.precision_hint)
|
||||
|
||||
|
@ -15,6 +15,7 @@ from .firmware import Firmware
|
||||
from .frostprotection import FrostProtectionModule
|
||||
from .humidity import HumiditySensor
|
||||
from .ledmodule import LedModule
|
||||
from .lighteffectmodule import LightEffectModule
|
||||
from .lighttransitionmodule import LightTransitionModule
|
||||
from .reportmodule import ReportModule
|
||||
from .temperature import TemperatureSensor
|
||||
@ -39,6 +40,7 @@ __all__ = [
|
||||
"FanModule",
|
||||
"Firmware",
|
||||
"CloudModule",
|
||||
"LightEffectModule",
|
||||
"LightTransitionModule",
|
||||
"ColorTemperatureModule",
|
||||
"ColorModule",
|
||||
|
112
kasa/smart/modules/lighteffectmodule.py
Normal file
112
kasa/smart/modules/lighteffectmodule.py
Normal file
@ -0,0 +1,112 @@
|
||||
"""Module for light effects."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import copy
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from ...feature import Feature
|
||||
from ..smartmodule import SmartModule
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..smartdevice import SmartDevice
|
||||
|
||||
|
||||
class LightEffectModule(SmartModule):
|
||||
"""Implementation of dynamic light effects."""
|
||||
|
||||
REQUIRED_COMPONENT = "light_effect"
|
||||
QUERY_GETTER_NAME = "get_dynamic_light_effect_rules"
|
||||
AVAILABLE_BULB_EFFECTS = {
|
||||
"L1": "Party",
|
||||
"L2": "Relax",
|
||||
}
|
||||
LIGHT_EFFECTS_OFF = "Off"
|
||||
|
||||
def __init__(self, device: SmartDevice, module: str):
|
||||
super().__init__(device, module)
|
||||
self._scenes_names_to_id: dict[str, str] = {}
|
||||
|
||||
def _initialize_features(self):
|
||||
"""Initialize features."""
|
||||
device = self._device
|
||||
self._add_feature(
|
||||
Feature(
|
||||
device,
|
||||
"Light effect",
|
||||
container=self,
|
||||
attribute_getter="effect",
|
||||
attribute_setter="set_effect",
|
||||
category=Feature.Category.Config,
|
||||
type=Feature.Type.Choice,
|
||||
choices_getter="effect_list",
|
||||
)
|
||||
)
|
||||
|
||||
def _initialize_effects(self) -> dict[str, dict[str, Any]]:
|
||||
"""Return built-in effects."""
|
||||
# Copy the effects so scene name updates do not update the underlying dict.
|
||||
effects = copy.deepcopy(
|
||||
{effect["id"]: effect for effect in self.data["rule_list"]}
|
||||
)
|
||||
for effect in effects.values():
|
||||
if not effect["scene_name"]:
|
||||
# If the name has not been edited scene_name will be an empty string
|
||||
effect["scene_name"] = self.AVAILABLE_BULB_EFFECTS[effect["id"]]
|
||||
else:
|
||||
# Otherwise it will be b64 encoded
|
||||
effect["scene_name"] = base64.b64decode(effect["scene_name"]).decode()
|
||||
self._scenes_names_to_id = {
|
||||
effect["scene_name"]: effect["id"] for effect in effects.values()
|
||||
}
|
||||
return effects
|
||||
|
||||
@property
|
||||
def effect_list(self) -> list[str] | None:
|
||||
"""Return built-in effects list.
|
||||
|
||||
Example:
|
||||
['Party', 'Relax', ...]
|
||||
"""
|
||||
effects = [self.LIGHT_EFFECTS_OFF]
|
||||
effects.extend(
|
||||
[effect["scene_name"] for effect in self._initialize_effects().values()]
|
||||
)
|
||||
return effects
|
||||
|
||||
@property
|
||||
def effect(self) -> str:
|
||||
"""Return effect name."""
|
||||
# get_dynamic_light_effect_rules also has an enable property and current_rule_id
|
||||
# property that could be used here as an alternative
|
||||
if self._device._info["dynamic_light_effect_enable"]:
|
||||
return self._initialize_effects()[
|
||||
self._device._info["dynamic_light_effect_id"]
|
||||
]["scene_name"]
|
||||
return self.LIGHT_EFFECTS_OFF
|
||||
|
||||
async def set_effect(
|
||||
self,
|
||||
effect: str,
|
||||
) -> None:
|
||||
"""Set an effect for the device.
|
||||
|
||||
The device doesn't store an active effect while not enabled so store locally.
|
||||
"""
|
||||
if effect != self.LIGHT_EFFECTS_OFF and effect not in self._scenes_names_to_id:
|
||||
raise ValueError(
|
||||
f"Cannot set light effect to {effect}, possible values "
|
||||
f"are: {self.LIGHT_EFFECTS_OFF} "
|
||||
f"{' '.join(self._scenes_names_to_id.keys())}"
|
||||
)
|
||||
enable = effect != self.LIGHT_EFFECTS_OFF
|
||||
params: dict[str, bool | str] = {"enable": enable}
|
||||
if enable:
|
||||
effect_id = self._scenes_names_to_id[effect]
|
||||
params["id"] = effect_id
|
||||
return await self.call("set_dynamic_light_effect_rule_enable", params)
|
||||
|
||||
def query(self) -> dict:
|
||||
"""Query to execute during the update cycle."""
|
||||
return {self.QUERY_GETTER_NAME: {"start_index": 0}}
|
@ -40,11 +40,6 @@ if TYPE_CHECKING:
|
||||
# same issue, homekit perhaps?
|
||||
WALL_SWITCH_PARENT_ONLY_MODULES = [DeviceModule, TimeModule, Firmware, CloudModule]
|
||||
|
||||
AVAILABLE_BULB_EFFECTS = {
|
||||
"L1": "Party",
|
||||
"L2": "Relax",
|
||||
}
|
||||
|
||||
|
||||
# Device must go last as the other interfaces also inherit Device
|
||||
# and python needs a consistent method resolution order.
|
||||
@ -683,44 +678,6 @@ class SmartDevice(Bulb, Fan, Device):
|
||||
ColorTemperatureModule, self.modules["ColorTemperatureModule"]
|
||||
).valid_temperature_range
|
||||
|
||||
@property
|
||||
def has_effects(self) -> bool:
|
||||
"""Return True if the device supports effects."""
|
||||
return "dynamic_light_effect_enable" in self._info
|
||||
|
||||
@property
|
||||
def effect(self) -> dict:
|
||||
"""Return effect state.
|
||||
|
||||
This follows the format used by SmartLightStrip.
|
||||
|
||||
Example:
|
||||
{'brightness': 50,
|
||||
'custom': 0,
|
||||
'enable': 0,
|
||||
'id': '',
|
||||
'name': ''}
|
||||
"""
|
||||
# If no effect is active, dynamic_light_effect_id does not appear in info
|
||||
current_effect = self._info.get("dynamic_light_effect_id", "")
|
||||
data = {
|
||||
"brightness": self.brightness,
|
||||
"enable": current_effect != "",
|
||||
"id": current_effect,
|
||||
"name": AVAILABLE_BULB_EFFECTS.get(current_effect, ""),
|
||||
}
|
||||
|
||||
return data
|
||||
|
||||
@property
|
||||
def effect_list(self) -> list[str] | None:
|
||||
"""Return built-in effects list.
|
||||
|
||||
Example:
|
||||
['Party', 'Relax', ...]
|
||||
"""
|
||||
return list(AVAILABLE_BULB_EFFECTS.keys()) if self.has_effects else None
|
||||
|
||||
@property
|
||||
def hsv(self) -> HSV:
|
||||
"""Return the current HSV state of the bulb.
|
||||
@ -807,17 +764,12 @@ class SmartDevice(Bulb, Fan, Device):
|
||||
brightness
|
||||
)
|
||||
|
||||
async def set_effect(
|
||||
self,
|
||||
effect: str,
|
||||
*,
|
||||
brightness: int | None = None,
|
||||
transition: int | None = None,
|
||||
) -> None:
|
||||
"""Set an effect on the device."""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def presets(self) -> list[BulbPreset]:
|
||||
"""Return a list of available bulb setting presets."""
|
||||
return []
|
||||
|
||||
@property
|
||||
def has_effects(self) -> bool:
|
||||
"""Return True if the device supports effects."""
|
||||
return "LightEffectModule" in self.modules
|
||||
|
@ -176,6 +176,19 @@ class FakeSmartTransport(BaseTransport):
|
||||
"Method %s not implemented for children" % child_method
|
||||
)
|
||||
|
||||
def _set_light_effect(self, info, params):
|
||||
"""Set or remove values as per the device behaviour."""
|
||||
info["get_device_info"]["dynamic_light_effect_enable"] = params["enable"]
|
||||
info["get_dynamic_light_effect_rules"]["enable"] = params["enable"]
|
||||
if params["enable"]:
|
||||
info["get_device_info"]["dynamic_light_effect_id"] = params["id"]
|
||||
info["get_dynamic_light_effect_rules"]["current_rule_id"] = params["enable"]
|
||||
else:
|
||||
if "dynamic_light_effect_id" in info["get_device_info"]:
|
||||
del info["get_device_info"]["dynamic_light_effect_id"]
|
||||
if "current_rule_id" in info["get_dynamic_light_effect_rules"]:
|
||||
del info["get_dynamic_light_effect_rules"]["current_rule_id"]
|
||||
|
||||
def _send_request(self, request_dict: dict):
|
||||
method = request_dict["method"]
|
||||
params = request_dict["params"]
|
||||
@ -223,6 +236,9 @@ class FakeSmartTransport(BaseTransport):
|
||||
return retval
|
||||
elif method == "set_qs_info":
|
||||
return {"error_code": 0}
|
||||
elif method == "set_dynamic_light_effect_rule_enable":
|
||||
self._set_light_effect(info, params)
|
||||
return {"error_code": 0}
|
||||
elif method[:4] == "set_":
|
||||
target_method = f"get_{method[4:]}"
|
||||
info[target_method].update(params)
|
||||
|
42
kasa/tests/smart/modules/test_light_effect.py
Normal file
42
kasa/tests/smart/modules/test_light_effect.py
Normal file
@ -0,0 +1,42 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from itertools import chain
|
||||
from typing import cast
|
||||
|
||||
import pytest
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from kasa import Device, Feature
|
||||
from kasa.smart.modules import LightEffectModule
|
||||
from kasa.tests.device_fixtures import parametrize
|
||||
|
||||
light_effect = parametrize(
|
||||
"has light effect", component_filter="light_effect", protocol_filter={"SMART"}
|
||||
)
|
||||
|
||||
|
||||
@light_effect
|
||||
async def test_light_effect(dev: Device, mocker: MockerFixture):
|
||||
"""Test light effect."""
|
||||
light_effect = cast(LightEffectModule, dev.modules.get("LightEffectModule"))
|
||||
assert light_effect
|
||||
|
||||
feature = light_effect._module_features["light_effect"]
|
||||
assert feature.type == Feature.Type.Choice
|
||||
|
||||
call = mocker.spy(light_effect, "call")
|
||||
assert feature.choices == light_effect.effect_list
|
||||
assert feature.choices
|
||||
for effect in chain(reversed(feature.choices), feature.choices):
|
||||
await light_effect.set_effect(effect)
|
||||
enable = effect != LightEffectModule.LIGHT_EFFECTS_OFF
|
||||
params: dict[str, bool | str] = {"enable": enable}
|
||||
if enable:
|
||||
params["id"] = light_effect._scenes_names_to_id[effect]
|
||||
call.assert_called_with("set_dynamic_light_effect_rule_enable", params)
|
||||
await dev.update()
|
||||
assert light_effect.effect == effect
|
||||
assert feature.value == effect
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
await light_effect.set_effect("foobar")
|
@ -689,6 +689,17 @@ async def test_feature(mocker, runner):
|
||||
assert res.exit_code == 0
|
||||
|
||||
|
||||
async def test_features_all(discovery_mock, mocker, runner):
|
||||
"""Test feature command on all fixtures."""
|
||||
res = await runner.invoke(
|
||||
cli,
|
||||
["--host", "127.0.0.123", "--debug", "feature"],
|
||||
catch_exceptions=False,
|
||||
)
|
||||
assert "== Features ==" in res.output
|
||||
assert res.exit_code == 0
|
||||
|
||||
|
||||
async def test_feature_single(mocker, runner):
|
||||
"""Test feature command returning single value."""
|
||||
dummy_device = await get_device_for_fixture_protocol(
|
||||
@ -736,7 +747,7 @@ async def test_feature_set(mocker, runner):
|
||||
)
|
||||
|
||||
led_setter.assert_called_with(True)
|
||||
assert "Setting led to True" in res.output
|
||||
assert "Changing led from False to True" in res.output
|
||||
assert res.exit_code == 0
|
||||
|
||||
|
||||
@ -762,14 +773,14 @@ async def test_feature_set_child(mocker, runner):
|
||||
"--child",
|
||||
child_id,
|
||||
"state",
|
||||
"False",
|
||||
"True",
|
||||
],
|
||||
catch_exceptions=False,
|
||||
)
|
||||
|
||||
get_child_device.assert_called()
|
||||
setter.assert_called_with(False)
|
||||
setter.assert_called_with(True)
|
||||
|
||||
assert f"Targeting child device {child_id}"
|
||||
assert "Setting state to False" in res.output
|
||||
assert "Changing state from False to True" in res.output
|
||||
assert res.exit_code == 0
|
||||
|
Loading…
Reference in New Issue
Block a user