Add Fan interface for SMART devices (#873)

Enables the Fan interface for devices supporting that component.
Currently the only device with a fan is the ks240 which implements it as
a child device. This PR adds a method `get_module` to search the child
device for modules if it is a WallSwitch device type.
This commit is contained in:
Steven B 2024-04-30 17:42:53 +01:00 committed by GitHub
parent 7db989e2ec
commit 16f17a7729
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 124 additions and 18 deletions

23
kasa/fan.py Normal file
View File

@ -0,0 +1,23 @@
"""Module for Fan Interface."""
from __future__ import annotations
from abc import ABC, abstractmethod
class Fan(ABC):
"""Interface for a Fan."""
@property
@abstractmethod
def is_fan(self) -> bool:
"""Return True if the device is a fan."""
@property
@abstractmethod
def fan_speed_level(self) -> int:
"""Return fan speed level."""
@abstractmethod
async def set_fan_speed_level(self, level: int):
"""Set fan speed level."""

View File

@ -28,7 +28,7 @@ class FanModule(SmartModule):
attribute_setter="set_fan_speed_level", attribute_setter="set_fan_speed_level",
icon="mdi:fan", icon="mdi:fan",
type=Feature.Type.Number, type=Feature.Type.Number,
minimum_value=1, minimum_value=0,
maximum_value=4, maximum_value=4,
category=Feature.Category.Primary, category=Feature.Category.Primary,
) )
@ -55,10 +55,14 @@ class FanModule(SmartModule):
return self.data["fan_speed_level"] return self.data["fan_speed_level"]
async def set_fan_speed_level(self, level: int): async def set_fan_speed_level(self, level: int):
"""Set fan speed level.""" """Set fan speed level, 0 for off, 1-4 for on."""
if level < 1 or level > 4: if level < 0 or level > 4:
raise ValueError("Invalid level, should be in range 1-4.") raise ValueError("Invalid level, should be in range 0-4.")
return await self.call("set_device_info", {"fan_speed_level": level}) if level == 0:
return await self.call("set_device_info", {"device_on": False})
return await self.call(
"set_device_info", {"device_on": True, "fan_speed_level": level}
)
@property @property
def sleep_mode(self) -> bool: def sleep_mode(self) -> bool:

View File

@ -14,6 +14,7 @@ from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig from ..deviceconfig import DeviceConfig
from ..emeterstatus import EmeterStatus from ..emeterstatus import EmeterStatus
from ..exceptions import AuthenticationError, DeviceError, KasaException, SmartErrorCode from ..exceptions import AuthenticationError, DeviceError, KasaException, SmartErrorCode
from ..fan import Fan
from ..feature import Feature from ..feature import Feature
from ..smartprotocol import SmartProtocol from ..smartprotocol import SmartProtocol
from .modules import ( from .modules import (
@ -23,6 +24,7 @@ from .modules import (
ColorTemperatureModule, ColorTemperatureModule,
DeviceModule, DeviceModule,
EnergyModule, EnergyModule,
FanModule,
Firmware, Firmware,
TimeModule, TimeModule,
) )
@ -36,7 +38,7 @@ if TYPE_CHECKING:
# the child but only work on the parent. See longer note below in _initialize_modules. # the child but only work on the parent. See longer note below in _initialize_modules.
# This list should be updated when creating new modules that could have the # This list should be updated when creating new modules that could have the
# same issue, homekit perhaps? # same issue, homekit perhaps?
WALL_SWITCH_PARENT_ONLY_MODULES = [DeviceModule, TimeModule, Firmware, CloudModule] # noqa: F405 WALL_SWITCH_PARENT_ONLY_MODULES = [DeviceModule, TimeModule, Firmware, CloudModule]
AVAILABLE_BULB_EFFECTS = { AVAILABLE_BULB_EFFECTS = {
"L1": "Party", "L1": "Party",
@ -44,7 +46,7 @@ AVAILABLE_BULB_EFFECTS = {
} }
class SmartDevice(Device, Bulb): class SmartDevice(Device, Bulb, Fan):
"""Base class to represent a SMART protocol based device.""" """Base class to represent a SMART protocol based device."""
def __init__( def __init__(
@ -221,9 +223,6 @@ class SmartDevice(Device, Bulb):
if await module._check_supported(): if await module._check_supported():
self._modules[module.name] = module self._modules[module.name] = module
if self._exposes_child_modules:
self._modules.update(**child_modules_to_skip)
async def _initialize_features(self): async def _initialize_features(self):
"""Initialize device features.""" """Initialize device features."""
self._add_feature( self._add_feature(
@ -309,6 +308,16 @@ class SmartDevice(Device, Bulb):
for feat in module._module_features.values(): for feat in module._module_features.values():
self._add_feature(feat) self._add_feature(feat)
def get_module(self, module_name) -> SmartModule | None:
"""Return the module from the device modules or None if not present."""
if module_name in self.modules:
return self.modules[module_name]
elif self._exposes_child_modules:
for child in self._children.values():
if module_name in child.modules:
return child.modules[module_name]
return None
@property @property
def is_cloud_connected(self): def is_cloud_connected(self):
"""Returns if the device is connected to the cloud.""" """Returns if the device is connected to the cloud."""
@ -460,19 +469,19 @@ class SmartDevice(Device, Bulb):
@property @property
def emeter_realtime(self) -> EmeterStatus: def emeter_realtime(self) -> EmeterStatus:
"""Get the emeter status.""" """Get the emeter status."""
energy = cast(EnergyModule, self.modules["EnergyModule"]) # noqa: F405 energy = cast(EnergyModule, self.modules["EnergyModule"])
return energy.emeter_realtime return energy.emeter_realtime
@property @property
def emeter_this_month(self) -> float | None: def emeter_this_month(self) -> float | None:
"""Get the emeter value for this month.""" """Get the emeter value for this month."""
energy = cast(EnergyModule, self.modules["EnergyModule"]) # noqa: F405 energy = cast(EnergyModule, self.modules["EnergyModule"])
return energy.emeter_this_month return energy.emeter_this_month
@property @property
def emeter_today(self) -> float | None: def emeter_today(self) -> float | None:
"""Get the emeter value for today.""" """Get the emeter value for today."""
energy = cast(EnergyModule, self.modules["EnergyModule"]) # noqa: F405 energy = cast(EnergyModule, self.modules["EnergyModule"])
return energy.emeter_today return energy.emeter_today
@property @property
@ -635,6 +644,26 @@ class SmartDevice(Device, Bulb):
_LOGGER.warning("Unknown device type, falling back to plug") _LOGGER.warning("Unknown device type, falling back to plug")
return DeviceType.Plug return DeviceType.Plug
# Fan interface methods
@property
def is_fan(self) -> bool:
"""Return True if the device is a fan."""
return "FanModule" in self.modules
@property
def fan_speed_level(self) -> int:
"""Return fan speed level."""
if not self.is_fan:
raise KasaException("Device is not a Fan")
return cast(FanModule, self.modules["FanModule"]).fan_speed_level
async def set_fan_speed_level(self, level: int):
"""Set fan speed level."""
if not self.is_fan:
raise KasaException("Device is not a Fan")
await cast(FanModule, self.modules["FanModule"]).set_fan_speed_level(level)
# Bulb interface methods # Bulb interface methods
@property @property

View File

@ -10,7 +10,7 @@ brightness = parametrize("brightness smart", component_filter="brightness")
@brightness @brightness
async def test_brightness_component(dev: SmartDevice): async def test_brightness_component(dev: SmartDevice):
"""Test brightness feature.""" """Test brightness feature."""
brightness = dev.modules.get("Brightness") brightness = dev.get_module("Brightness")
assert brightness assert brightness
assert isinstance(dev, SmartDevice) assert isinstance(dev, SmartDevice)
assert "brightness" in dev._components assert "brightness" in dev._components

View File

@ -1,8 +1,9 @@
from typing import cast from typing import cast
import pytest
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from kasa import SmartDevice from kasa.smart import SmartDevice
from kasa.smart.modules import FanModule from kasa.smart.modules import FanModule
from kasa.tests.device_fixtures import parametrize from kasa.tests.device_fixtures import parametrize
@ -12,7 +13,7 @@ fan = parametrize("has fan", component_filter="fan_control", protocol_filter={"S
@fan @fan
async def test_fan_speed(dev: SmartDevice, mocker: MockerFixture): async def test_fan_speed(dev: SmartDevice, mocker: MockerFixture):
"""Test fan speed feature.""" """Test fan speed feature."""
fan = cast(FanModule, dev.modules.get("FanModule")) fan = cast(FanModule, dev.get_module("FanModule"))
assert fan assert fan
level_feature = fan._module_features["fan_speed_level"] level_feature = fan._module_features["fan_speed_level"]
@ -24,7 +25,9 @@ async def test_fan_speed(dev: SmartDevice, mocker: MockerFixture):
call = mocker.spy(fan, "call") call = mocker.spy(fan, "call")
await fan.set_fan_speed_level(3) await fan.set_fan_speed_level(3)
call.assert_called_with("set_device_info", {"fan_speed_level": 3}) call.assert_called_with(
"set_device_info", {"device_on": True, "fan_speed_level": 3}
)
await dev.update() await dev.update()
@ -35,7 +38,7 @@ async def test_fan_speed(dev: SmartDevice, mocker: MockerFixture):
@fan @fan
async def test_sleep_mode(dev: SmartDevice, mocker: MockerFixture): async def test_sleep_mode(dev: SmartDevice, mocker: MockerFixture):
"""Test sleep mode feature.""" """Test sleep mode feature."""
fan = cast(FanModule, dev.modules.get("FanModule")) fan = cast(FanModule, dev.get_module("FanModule"))
assert fan assert fan
sleep_feature = fan._module_features["fan_sleep_mode"] sleep_feature = fan._module_features["fan_sleep_mode"]
assert isinstance(sleep_feature.value, bool) assert isinstance(sleep_feature.value, bool)
@ -48,3 +51,31 @@ async def test_sleep_mode(dev: SmartDevice, mocker: MockerFixture):
assert fan.sleep_mode is True assert fan.sleep_mode is True
assert sleep_feature.value is True assert sleep_feature.value is True
@fan
async def test_fan_interface(dev: SmartDevice, mocker: MockerFixture):
"""Test fan speed on device interface."""
assert isinstance(dev, SmartDevice)
fan = cast(FanModule, dev.get_module("FanModule"))
device = fan._device
assert device.is_fan
await device.set_fan_speed_level(1)
await dev.update()
assert device.fan_speed_level == 1
assert device.is_on
await device.set_fan_speed_level(4)
await dev.update()
assert device.fan_speed_level == 4
await device.set_fan_speed_level(0)
await dev.update()
assert not device.is_on
with pytest.raises(ValueError):
await device.set_fan_speed_level(-1)
with pytest.raises(ValueError):
await device.set_fan_speed_level(5)

View File

@ -16,6 +16,7 @@ from kasa.smart import SmartDevice
from .conftest import ( from .conftest import (
bulb_smart, bulb_smart,
device_smart, device_smart,
get_device_for_fixture_protocol,
) )
@ -121,6 +122,24 @@ async def test_update_module_queries(dev: SmartDevice, mocker: MockerFixture):
spies[device].assert_not_called() spies[device].assert_not_called()
async def test_get_modules(mocker):
"""Test get_modules for child and parent modules."""
dummy_device = await get_device_for_fixture_protocol(
"KS240(US)_1.0_1.0.5.json", "SMART"
)
module = dummy_device.get_module("CloudModule")
assert module
assert module._device == dummy_device
module = dummy_device.get_module("FanModule")
assert module
assert module._device != dummy_device
assert module._device._parent == dummy_device
module = dummy_device.get_module("DummyModule")
assert module is None
@bulb_smart @bulb_smart
async def test_smartdevice_brightness(dev: SmartDevice): async def test_smartdevice_brightness(dev: SmartDevice):
"""Test brightness setter and getter.""" """Test brightness setter and getter."""