Add support for cleaning records (#945)
Some checks are pending
CI / Perform linting checks (3.13) (push) Waiting to run
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.13) (push) Blocked by required conditions
CodeQL checks / Analyze (python) (push) Waiting to run

Co-authored-by: Steven B. <51370195+sdb9696@users.noreply.github.com>
This commit is contained in:
Teemu R.
2025-01-20 12:41:56 +01:00
committed by GitHub
parent bca5576425
commit 05085462d3
12 changed files with 448 additions and 15 deletions

View File

@@ -93,6 +93,7 @@ def _legacy_type_to_class(_type: str) -> Any:
"hsv": "light",
"temperature": "light",
"effect": "light",
"vacuum": "vacuum",
"hub": "hub",
},
result_callback=json_formatter_cb,

53
kasa/cli/vacuum.py Normal file
View File

@@ -0,0 +1,53 @@
"""Module for cli vacuum commands.."""
from __future__ import annotations
import asyncclick as click
from kasa import (
Device,
Module,
)
from .common import (
error,
pass_dev_or_child,
)
@click.group(invoke_without_command=False)
@click.pass_context
async def vacuum(ctx: click.Context) -> None:
"""Vacuum commands."""
@vacuum.group(invoke_without_command=True, name="records")
@pass_dev_or_child
async def records_group(dev: Device) -> None:
"""Access cleaning records."""
if not (rec := dev.modules.get(Module.CleanRecords)):
error("This device does not support records.")
data = rec.parsed_data
latest = data.last_clean
click.echo(
f"Totals: {rec.total_clean_area} {rec.area_unit} in {rec.total_clean_time} "
f"(cleaned {rec.total_clean_count} times)"
)
click.echo(f"Last clean: {latest.clean_area} {rec.area_unit} @ {latest.clean_time}")
click.echo("Execute `kasa vacuum records list` to list all records.")
@records_group.command(name="list")
@pass_dev_or_child
async def records_list(dev: Device) -> None:
"""List all cleaning records."""
if not (rec := dev.modules.get(Module.CleanRecords)):
error("This device does not support records.")
data = rec.parsed_data
for record in data.records:
click.echo(
f"* {record.timestamp}: cleaned {record.clean_area} {rec.area_unit}"
f" in {record.clean_time}"
)

View File

@@ -25,6 +25,7 @@ Signal Level (signal_level): 2
RSSI (rssi): -52
SSID (ssid): #MASKED_SSID#
Reboot (reboot): <Action>
Device time (device_time): 2024-02-23 02:40:15+01:00
Brightness (brightness): 100
Cloud connection (cloud_connection): True
HSV (hsv): HSV(hue=0, saturation=100, value=100)
@@ -39,7 +40,6 @@ Light preset (light_preset): Not set
Smooth transition on (smooth_transition_on): 2
Smooth transition off (smooth_transition_off): 2
Overheated (overheated): False
Device time (device_time): 2024-02-23 02:40:15+01:00
To see whether a device supports a feature, check for the existence of it:
@@ -299,8 +299,10 @@ class Feature:
if isinstance(value, Enum):
value = repr(value)
s = f"{self.name} ({self.id}): {value}"
if self.unit is not None:
s += f" {self.unit}"
if (unit := self.unit) is not None:
if isinstance(unit, Enum):
unit = repr(unit)
s += f" {unit}"
if self.type == Feature.Type.Number:
s += f" (range: {self.minimum_value}-{self.maximum_value})"

View File

@@ -168,6 +168,7 @@ class Module(ABC):
Dustbin: Final[ModuleName[smart.Dustbin]] = ModuleName("Dustbin")
Speaker: Final[ModuleName[smart.Speaker]] = ModuleName("Speaker")
Mop: Final[ModuleName[smart.Mop]] = ModuleName("Mop")
CleanRecords: Final[ModuleName[smart.CleanRecords]] = ModuleName("CleanRecords")
def __init__(self, device: Device, module: str) -> None:
self._device = device

View File

@@ -10,6 +10,7 @@ from .childlock import ChildLock
from .childprotection import ChildProtection
from .childsetup import ChildSetup
from .clean import Clean
from .cleanrecords import CleanRecords
from .cloud import Cloud
from .color import Color
from .colortemperature import ColorTemperature
@@ -75,6 +76,7 @@ __all__ = [
"FrostProtection",
"Thermostat",
"Clean",
"CleanRecords",
"SmartLightEffect",
"OverheatProtection",
"Speaker",

View File

@@ -0,0 +1,205 @@
"""Implementation of vacuum cleaning records."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta, tzinfo
from typing import Annotated, cast
from mashumaro import DataClassDictMixin, field_options
from mashumaro.config import ADD_DIALECT_SUPPORT
from mashumaro.dialect import Dialect
from mashumaro.types import SerializationStrategy
from ...feature import Feature
from ...module import FeatureAttribute
from ..smartmodule import Module, SmartModule
from .clean import AreaUnit, Clean
_LOGGER = logging.getLogger(__name__)
@dataclass
class Record(DataClassDictMixin):
"""Historical cleanup result."""
class Config:
"""Configuration class."""
code_generation_options = [ADD_DIALECT_SUPPORT]
#: Total time cleaned (in minutes)
clean_time: timedelta = field(
metadata=field_options(deserialize=lambda x: timedelta(minutes=x))
)
#: Total area cleaned
clean_area: int
dust_collection: bool
timestamp: datetime
info_num: int | None = None
message: int | None = None
map_id: int | None = None
start_type: int | None = None
task_type: int | None = None
record_index: int | None = None
#: Error code from cleaning
error: int = field(default=0)
class _DateTimeSerializationStrategy(SerializationStrategy):
def __init__(self, tz: tzinfo) -> None:
self.tz = tz
def deserialize(self, value: float) -> datetime:
return datetime.fromtimestamp(value, self.tz)
def _get_tz_strategy(tz: tzinfo) -> type[Dialect]:
"""Return a timezone aware de-serialization strategy."""
class TimezoneDialect(Dialect):
serialization_strategy = {datetime: _DateTimeSerializationStrategy(tz)}
return TimezoneDialect
@dataclass
class Records(DataClassDictMixin):
"""Response payload for getCleanRecords."""
class Config:
"""Configuration class."""
code_generation_options = [ADD_DIALECT_SUPPORT]
total_time: timedelta = field(
metadata=field_options(deserialize=lambda x: timedelta(minutes=x))
)
total_area: int
total_count: int = field(metadata=field_options(alias="total_number"))
records: list[Record] = field(metadata=field_options(alias="record_list"))
last_clean: Record = field(metadata=field_options(alias="lastest_day_record"))
@classmethod
def __pre_deserialize__(cls, d: dict) -> dict:
if ldr := d.get("lastest_day_record"):
d["lastest_day_record"] = {
"timestamp": ldr[0],
"clean_time": ldr[1],
"clean_area": ldr[2],
"dust_collection": ldr[3],
}
return d
class CleanRecords(SmartModule):
"""Implementation of vacuum cleaning records."""
REQUIRED_COMPONENT = "clean_percent"
_parsed_data: Records
async def _post_update_hook(self) -> None:
"""Cache parsed data after an update."""
self._parsed_data = Records.from_dict(
self.data, dialect=_get_tz_strategy(self._device.timezone)
)
def _initialize_features(self) -> None:
"""Initialize features."""
for type_ in ["total", "last"]:
self._add_feature(
Feature(
self._device,
id=f"{type_}_clean_area",
name=f"{type_.capitalize()} area cleaned",
container=self,
attribute_getter=f"{type_}_clean_area",
unit_getter="area_unit",
category=Feature.Category.Debug,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id=f"{type_}_clean_time",
name=f"{type_.capitalize()} time cleaned",
container=self,
attribute_getter=f"{type_}_clean_time",
category=Feature.Category.Debug,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id="total_clean_count",
name="Total clean count",
container=self,
attribute_getter="total_clean_count",
category=Feature.Category.Debug,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id="last_clean_timestamp",
name="Last clean timestamp",
container=self,
attribute_getter="last_clean_timestamp",
category=Feature.Category.Debug,
type=Feature.Type.Sensor,
)
)
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {
"getCleanRecords": {},
}
@property
def total_clean_area(self) -> Annotated[int, FeatureAttribute()]:
"""Return total cleaning area."""
return self._parsed_data.total_area
@property
def total_clean_time(self) -> timedelta:
"""Return total cleaning time."""
return self._parsed_data.total_time
@property
def total_clean_count(self) -> int:
"""Return total clean count."""
return self._parsed_data.total_count
@property
def last_clean_area(self) -> Annotated[int, FeatureAttribute()]:
"""Return latest cleaning area."""
return self._parsed_data.last_clean.clean_area
@property
def last_clean_time(self) -> timedelta:
"""Return total cleaning time."""
return self._parsed_data.last_clean.clean_time
@property
def last_clean_timestamp(self) -> datetime:
"""Return latest cleaning timestamp."""
return self._parsed_data.last_clean.timestamp
@property
def area_unit(self) -> AreaUnit:
"""Return area unit."""
clean = cast(Clean, self._device.modules[Module.Clean])
return clean.area_unit
@property
def parsed_data(self) -> Records:
"""Return parsed records data."""
return self._parsed_data

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import base64
import logging
import time
from collections import OrderedDict
from collections.abc import Sequence
from datetime import UTC, datetime, timedelta, tzinfo
from typing import TYPE_CHECKING, Any, TypeAlias, cast
@@ -66,7 +67,9 @@ class SmartDevice(Device):
self._components_raw: ComponentsRaw | None = None
self._components: dict[str, int] = {}
self._state_information: dict[str, Any] = {}
self._modules: dict[str | ModuleName[Module], SmartModule] = {}
self._modules: OrderedDict[str | ModuleName[Module], SmartModule] = (
OrderedDict()
)
self._parent: SmartDevice | None = None
self._children: dict[str, SmartDevice] = {}
self._last_update_time: float | None = None
@@ -445,6 +448,11 @@ class SmartDevice(Device):
):
self._modules[Thermostat.__name__] = Thermostat(self, "thermostat")
# We move time to the beginning so other modules can access the
# time and timezone after update if required. e.g. cleanrecords
if Time.__name__ in self._modules:
self._modules.move_to_end(Time.__name__, last=False)
async def _initialize_features(self) -> None:
"""Initialize device features."""
self._add_feature(