Initial support for vacuums (clean module) (#944)

Adds support for clean module:
- Show current vacuum state
- Start cleaning (all rooms)
- Return to dock
- Pausing & unpausing
- Controlling the fan speed

---------

Co-authored-by: Steven B <51370195+sdb9696@users.noreply.github.com>
This commit is contained in:
Teemu R.
2025-01-14 15:35:09 +01:00
committed by GitHub
parent be34dbd387
commit 1be87674bf
16 changed files with 799 additions and 13 deletions

View File

@@ -159,7 +159,7 @@ def get_device_class_from_family(
"SMART.KASAHUB": SmartDevice,
"SMART.KASASWITCH": SmartDevice,
"SMART.IPCAMERA.HTTPS": SmartCamDevice,
"SMART.TAPOROBOVAC": SmartDevice,
"SMART.TAPOROBOVAC.HTTPS": SmartDevice,
"IOT.SMARTPLUGSWITCH": IotPlug,
"IOT.SMARTBULB": IotBulb,
"IOT.IPCAMERA": IotCamera,
@@ -173,6 +173,9 @@ def get_device_class_from_family(
_LOGGER.debug("Unknown SMART device with %s, using SmartDevice", device_type)
cls = SmartDevice
if cls is not None:
_LOGGER.debug("Using %s for %s", cls.__name__, device_type)
return cls
@@ -188,6 +191,7 @@ def get_protocol(config: DeviceConfig, *, strict: bool = False) -> BaseProtocol
"""
ctype = config.connection_type
protocol_name = ctype.device_family.value.split(".")[0]
_LOGGER.debug("Finding protocol for %s", ctype.device_family)
if ctype.device_family is DeviceFamily.SmartIpCamera:
if strict and ctype.encryption_type is not DeviceEncryptionType.Aes:

View File

@@ -676,9 +676,14 @@ class Discover:
for key, val in candidates.items():
try:
prot, config = val
_LOGGER.debug("Trying to connect with %s", prot.__class__.__name__)
dev = await _connect(config, prot)
except Exception:
_LOGGER.debug("Unable to connect with %s", prot)
except Exception as ex:
_LOGGER.debug(
"Unable to connect with %s: %s",
prot.__class__.__name__,
ex,
)
if on_attempt:
ca = tuple.__new__(ConnectAttempt, key)
on_attempt(ca, False)
@@ -686,6 +691,7 @@ class Discover:
if on_attempt:
ca = tuple.__new__(ConnectAttempt, key)
on_attempt(ca, True)
_LOGGER.debug("Found working protocol %s", prot.__class__.__name__)
return dev
finally:
await prot.close()

View File

@@ -127,6 +127,8 @@ class SmartErrorCode(IntEnum):
DST_ERROR = -2301
DST_SAVE_ERROR = -2302
VACUUM_BATTERY_LOW = -3001
SYSTEM_ERROR = -40101
INVALID_ARGUMENTS = -40209

View File

@@ -161,6 +161,9 @@ class Module(ABC):
Camera: Final[ModuleName[smartcam.Camera]] = ModuleName("Camera")
LensMask: Final[ModuleName[smartcam.LensMask]] = ModuleName("LensMask")
# Vacuum modules
Clean: Final[ModuleName[smart.Clean]] = ModuleName("Clean")
def __init__(self, device: Device, module: str) -> None:
self._device = device
self._module = module

View File

@@ -7,6 +7,7 @@ from .batterysensor import BatterySensor
from .brightness import Brightness
from .childdevice import ChildDevice
from .childprotection import ChildProtection
from .clean import Clean
from .cloud import Cloud
from .color import Color
from .colortemperature import ColorTemperature
@@ -66,6 +67,7 @@ __all__ = [
"TriggerLogs",
"FrostProtection",
"Thermostat",
"Clean",
"SmartLightEffect",
"OverheatProtection",
"HomeKit",

267
kasa/smart/modules/clean.py Normal file
View File

@@ -0,0 +1,267 @@
"""Implementation of vacuum clean module."""
from __future__ import annotations
import logging
from enum import IntEnum
from typing import Annotated
from ...feature import Feature
from ...module import FeatureAttribute
from ..smartmodule import SmartModule
_LOGGER = logging.getLogger(__name__)
class Status(IntEnum):
"""Status of vacuum."""
Idle = 0
Cleaning = 1
Mapping = 2
GoingHome = 4
Charging = 5
Charged = 6
Paused = 7
Undocked = 8
Error = 100
UnknownInternal = -1000
class ErrorCode(IntEnum):
"""Error codes for vacuum."""
Ok = 0
SideBrushStuck = 2
MainBrushStuck = 3
WheelBlocked = 4
DustBinRemoved = 14
UnableToMove = 15
LidarBlocked = 16
UnableToFindDock = 21
BatteryLow = 22
UnknownInternal = -1000
class FanSpeed(IntEnum):
"""Fan speed level."""
Quiet = 1
Standard = 2
Turbo = 3
Max = 4
class Clean(SmartModule):
"""Implementation of vacuum clean module."""
REQUIRED_COMPONENT = "clean"
_error_code = ErrorCode.Ok
def _initialize_features(self) -> None:
"""Initialize features."""
self._add_feature(
Feature(
self._device,
id="vacuum_return_home",
name="Return home",
container=self,
attribute_setter="return_home",
category=Feature.Category.Primary,
type=Feature.Action,
)
)
self._add_feature(
Feature(
self._device,
id="vacuum_start",
name="Start cleaning",
container=self,
attribute_setter="start",
category=Feature.Category.Primary,
type=Feature.Action,
)
)
self._add_feature(
Feature(
self._device,
id="vacuum_pause",
name="Pause",
container=self,
attribute_setter="pause",
category=Feature.Category.Primary,
type=Feature.Action,
)
)
self._add_feature(
Feature(
self._device,
id="vacuum_status",
name="Vacuum status",
container=self,
attribute_getter="status",
category=Feature.Category.Primary,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id="vacuum_error",
name="Error",
container=self,
attribute_getter="error",
category=Feature.Category.Info,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id="battery_level",
name="Battery level",
container=self,
attribute_getter="battery",
icon="mdi:battery",
unit_getter=lambda: "%",
category=Feature.Category.Info,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id="vacuum_fan_speed",
name="Fan speed",
container=self,
attribute_getter="fan_speed_preset",
attribute_setter="set_fan_speed_preset",
icon="mdi:fan",
choices_getter=lambda: list(FanSpeed.__members__),
category=Feature.Category.Primary,
type=Feature.Type.Choice,
)
)
async def _post_update_hook(self) -> None:
"""Set error code after update."""
errors = self._vac_status.get("err_status")
if errors is None or not errors:
self._error_code = ErrorCode.Ok
return
if len(errors) > 1:
_LOGGER.warning(
"Multiple error codes, using the first one only: %s", errors
)
error = errors.pop(0)
try:
self._error_code = ErrorCode(error)
except ValueError:
_LOGGER.warning(
"Unknown error code, please create an issue describing the error: %s",
error,
)
self._error_code = ErrorCode.UnknownInternal
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {
"getVacStatus": None,
"getBatteryInfo": None,
"getCleanStatus": None,
"getCleanAttr": {"type": "global"},
}
async def start(self) -> dict:
"""Start cleaning."""
# If we are paused, do not restart cleaning
if self.status is Status.Paused:
return await self.resume()
return await self.call(
"setSwitchClean",
{
"clean_mode": 0,
"clean_on": True,
"clean_order": True,
"force_clean": False,
},
)
async def pause(self) -> dict:
"""Pause cleaning."""
if self.status is Status.GoingHome:
return await self.set_return_home(False)
return await self.set_pause(True)
async def resume(self) -> dict:
"""Resume cleaning."""
return await self.set_pause(False)
async def set_pause(self, enabled: bool) -> dict:
"""Pause or resume cleaning."""
return await self.call("setRobotPause", {"pause": enabled})
async def return_home(self) -> dict:
"""Return home."""
return await self.set_return_home(True)
async def set_return_home(self, enabled: bool) -> dict:
"""Return home / pause returning."""
return await self.call("setSwitchCharge", {"switch_charge": enabled})
@property
def error(self) -> ErrorCode:
"""Return error."""
return self._error_code
@property
def fan_speed_preset(self) -> Annotated[str, FeatureAttribute()]:
"""Return fan speed preset."""
return FanSpeed(self._settings["suction"]).name
async def set_fan_speed_preset(
self, speed: str
) -> Annotated[dict, FeatureAttribute]:
"""Set fan speed preset."""
name_to_value = {x.name: x.value for x in FanSpeed}
if speed not in name_to_value:
raise ValueError("Invalid fan speed %s, available %s", speed, name_to_value)
return await self.call(
"setCleanAttr", {"suction": name_to_value[speed], "type": "global"}
)
@property
def battery(self) -> int:
"""Return battery level."""
return self.data["getBatteryInfo"]["battery_percentage"]
@property
def _vac_status(self) -> dict:
"""Return vac status container."""
return self.data["getVacStatus"]
@property
def _settings(self) -> dict:
"""Return cleaning settings."""
return self.data["getCleanAttr"]
@property
def status(self) -> Status:
"""Return current status."""
if self._error_code is not ErrorCode.Ok:
return Status.Error
status_code = self._vac_status["status"]
try:
return Status(status_code)
except ValueError:
_LOGGER.warning("Got unknown status code: %s (%s)", status_code, self.data)
return Status.UnknownInternal