Merge remote-tracking branch 'upstream/master' into feat/power_protection

This commit is contained in:
Steven B
2025-01-23 15:17:26 +00:00
302 changed files with 28083 additions and 3492 deletions

View File

@@ -6,16 +6,23 @@ from .autooff import AutoOff
from .batterysensor import BatterySensor
from .brightness import Brightness
from .childdevice import ChildDevice
from .childlock import ChildLock
from .childprotection import ChildProtection
from .childsetup import ChildSetup
from .clean import Clean
from .cleanrecords import CleanRecords
from .cloud import Cloud
from .color import Color
from .colortemperature import ColorTemperature
from .consumables import Consumables
from .contactsensor import ContactSensor
from .devicemodule import DeviceModule
from .dustbin import Dustbin
from .energy import Energy
from .fan import Fan
from .firmware import Firmware
from .frostprotection import FrostProtection
from .homekit import HomeKit
from .humiditysensor import HumiditySensor
from .led import Led
from .light import Light
@@ -23,9 +30,13 @@ from .lighteffect import LightEffect
from .lightpreset import LightPreset
from .lightstripeffect import LightStripEffect
from .lighttransition import LightTransition
from .matter import Matter
from .mop import Mop
from .motionsensor import MotionSensor
from .overheatprotection import OverheatProtection
from .powerprotection import PowerProtection
from .reportmode import ReportMode
from .speaker import Speaker
from .temperaturecontrol import TemperatureControl
from .temperaturesensor import TemperatureSensor
from .thermostat import Thermostat
@@ -39,6 +50,8 @@ __all__ = [
"Energy",
"DeviceModule",
"ChildDevice",
"ChildLock",
"ChildSetup",
"BatterySensor",
"HumiditySensor",
"TemperatureSensor",
@@ -64,6 +77,15 @@ __all__ = [
"TriggerLogs",
"FrostProtection",
"Thermostat",
"Clean",
"Consumables",
"CleanRecords",
"SmartLightEffect",
"PowerProtection",
"OverheatProtection",
"Speaker",
"HomeKit",
"Matter",
"Dustbin",
"Mop",
]

View File

@@ -2,7 +2,11 @@
from __future__ import annotations
from typing import Annotated
from ...exceptions import KasaException
from ...feature import Feature
from ...module import FeatureAttribute
from ..smartmodule import SmartModule
@@ -14,18 +18,22 @@ class BatterySensor(SmartModule):
def _initialize_features(self) -> None:
"""Initialize features."""
self._add_feature(
Feature(
self._device,
"battery_low",
"Battery low",
container=self,
attribute_getter="battery_low",
icon="mdi:alert",
type=Feature.Type.BinarySensor,
category=Feature.Category.Debug,
if (
"at_low_battery" in self._device.sys_info
or "is_low" in self._device.sys_info
):
self._add_feature(
Feature(
self._device,
"battery_low",
"Battery low",
container=self,
attribute_getter="battery_low",
icon="mdi:alert",
type=Feature.Type.BinarySensor,
category=Feature.Category.Debug,
)
)
)
# Some devices, like T110 contact sensor do not report the battery percentage
if "battery_percentage" in self._device.sys_info:
@@ -48,11 +56,17 @@ class BatterySensor(SmartModule):
return {}
@property
def battery(self) -> int:
def battery(self) -> Annotated[int, FeatureAttribute()]:
"""Return battery level."""
return self._device.sys_info["battery_percentage"]
@property
def battery_low(self) -> bool:
def battery_low(self) -> Annotated[bool, FeatureAttribute()]:
"""Return True if battery is low."""
return self._device.sys_info["at_low_battery"]
is_low = self._device.sys_info.get(
"at_low_battery", self._device.sys_info.get("is_low")
)
if is_low is None:
raise KasaException("Device does not report battery low status")
return is_low

View File

@@ -38,6 +38,7 @@ Plug 3: False
True
"""
from ...device_type import DeviceType
from ..smartmodule import SmartModule
@@ -46,3 +47,10 @@ class ChildDevice(SmartModule):
REQUIRED_COMPONENT = "child_device"
QUERY_GETTER_NAME = "get_child_device_list"
def query(self) -> dict:
"""Query to execute during the update cycle."""
q = super().query()
if self._device.device_type is DeviceType.Hub:
q["get_child_device_component_list"] = None
return q

View File

@@ -0,0 +1,37 @@
"""Child lock module."""
from __future__ import annotations
from ...feature import Feature
from ..smartmodule import SmartModule
class ChildLock(SmartModule):
"""Implementation for child lock."""
REQUIRED_COMPONENT = "button_and_led"
QUERY_GETTER_NAME = "getChildLockInfo"
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
device=self._device,
id="child_lock",
name="Child lock",
container=self,
attribute_getter="enabled",
attribute_setter="set_enabled",
type=Feature.Type.Switch,
category=Feature.Category.Config,
)
)
@property
def enabled(self) -> bool:
"""Return True if child lock is enabled."""
return self.data["child_lock_status"]
async def set_enabled(self, enabled: bool) -> dict:
"""Set child lock."""
return await self.call("setChildLockInfo", {"child_lock_status": enabled})

View File

@@ -0,0 +1,87 @@
"""Implementation for child device setup.
This module allows pairing and disconnecting child devices.
"""
from __future__ import annotations
import asyncio
import logging
from ...feature import Feature
from ..smartmodule import SmartModule
_LOGGER = logging.getLogger(__name__)
class ChildSetup(SmartModule):
"""Implementation for child device setup."""
REQUIRED_COMPONENT = "child_quick_setup"
QUERY_GETTER_NAME = "get_support_child_device_category"
def _initialize_features(self) -> None:
"""Initialize features."""
self._add_feature(
Feature(
self._device,
id="pair",
name="Pair",
container=self,
attribute_setter="pair",
category=Feature.Category.Config,
type=Feature.Type.Action,
)
)
async def get_supported_device_categories(self) -> list[dict]:
"""Get supported device categories."""
categories = await self.call("get_support_child_device_category")
return categories["get_support_child_device_category"]["device_category_list"]
async def pair(self, *, timeout: int = 10) -> list[dict]:
"""Scan for new devices and pair after discovering first new device."""
await self.call("begin_scanning_child_device")
_LOGGER.info("Waiting %s seconds for discovering new devices", timeout)
await asyncio.sleep(timeout)
detected = await self._get_detected_devices()
if not detected["child_device_list"]:
_LOGGER.warning(
"No devices found, make sure to activate pairing "
"mode on the devices to be added."
)
return []
_LOGGER.info(
"Discovery done, found %s devices: %s",
len(detected["child_device_list"]),
detected,
)
await self._add_devices(detected)
return detected["child_device_list"]
async def unpair(self, device_id: str) -> dict:
"""Remove device from the hub."""
_LOGGER.info("Going to unpair %s from %s", device_id, self)
payload = {"child_device_list": [{"device_id": device_id}]}
return await self.call("remove_child_device_list", payload)
async def _add_devices(self, devices: dict) -> dict:
"""Add devices based on get_detected_device response.
Pass the output from :ref:_get_detected_devices: as a parameter.
"""
res = await self.call("add_child_device_list", devices)
return res
async def _get_detected_devices(self) -> dict:
"""Return list of devices detected during scanning."""
param = {"scan_list": await self.get_supported_device_categories()}
res = await self.call("get_scan_child_device_list", param)
_LOGGER.debug("Scan status: %s", res)
return res["get_scan_child_device_list"]

427
kasa/smart/modules/clean.py Normal file
View File

@@ -0,0 +1,427 @@
"""Implementation of vacuum clean module."""
from __future__ import annotations
import logging
from datetime import timedelta
from enum import IntEnum, StrEnum
from typing import Annotated, Literal
from ...feature import Feature
from ...module import FeatureAttribute
from ..smartmodule import SmartModule
_LOGGER = logging.getLogger(__name__)
class Status(IntEnum):
"""Status of vacuum."""
Idle = 0
Cleaning = 1
Mapping = 2
GoingHome = 4
Charging = 5
Charged = 6
Paused = 7
Undocked = 8
Error = 100
UnknownInternal = -1000
class ErrorCode(IntEnum):
"""Error codes for vacuum."""
Ok = 0
SideBrushStuck = 2
MainBrushStuck = 3
WheelBlocked = 4
Trapped = 6
TrappedCliff = 7
DustBinRemoved = 14
UnableToMove = 15
LidarBlocked = 16
UnableToFindDock = 21
BatteryLow = 22
UnknownInternal = -1000
class FanSpeed(IntEnum):
"""Fan speed level."""
Quiet = 1
Standard = 2
Turbo = 3
Max = 4
Ultra = 5
class CarpetCleanMode(StrEnum):
"""Carpet clean mode."""
Normal = "normal"
Boost = "boost"
class AreaUnit(IntEnum):
"""Area unit."""
#: Square meter
Sqm = 0
#: Square feet
Sqft = 1
#: Taiwanese unit: https://en.wikipedia.org/wiki/Taiwanese_units_of_measurement#Area
Ping = 2
class Clean(SmartModule):
"""Implementation of vacuum clean module."""
REQUIRED_COMPONENT = "clean"
_error_code = ErrorCode.Ok
_logged_error_code_warnings: set | None = None
_logged_status_code_warnings: set
def _initialize_features(self) -> None:
"""Initialize features."""
self._add_feature(
Feature(
self._device,
id="vacuum_return_home",
name="Return home",
container=self,
attribute_setter="return_home",
category=Feature.Category.Primary,
type=Feature.Action,
)
)
self._add_feature(
Feature(
self._device,
id="vacuum_start",
name="Start cleaning",
container=self,
attribute_setter="start",
category=Feature.Category.Primary,
type=Feature.Action,
)
)
self._add_feature(
Feature(
self._device,
id="vacuum_pause",
name="Pause",
container=self,
attribute_setter="pause",
category=Feature.Category.Primary,
type=Feature.Action,
)
)
self._add_feature(
Feature(
self._device,
id="vacuum_status",
name="Vacuum status",
container=self,
attribute_getter="status",
category=Feature.Category.Primary,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id="vacuum_error",
name="Error",
container=self,
attribute_getter="error",
category=Feature.Category.Info,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id="battery_level",
name="Battery level",
container=self,
attribute_getter="battery",
icon="mdi:battery",
unit_getter=lambda: "%",
category=Feature.Category.Info,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id="vacuum_fan_speed",
name="Fan speed",
container=self,
attribute_getter="fan_speed_preset",
attribute_setter="set_fan_speed_preset",
icon="mdi:fan",
choices_getter=lambda: list(FanSpeed.__members__),
category=Feature.Category.Primary,
type=Feature.Type.Choice,
)
)
self._add_feature(
Feature(
self._device,
id="clean_count",
name="Clean count",
container=self,
attribute_getter="clean_count",
attribute_setter="set_clean_count",
range_getter=lambda: (1, 3),
category=Feature.Category.Config,
type=Feature.Type.Number,
)
)
self._add_feature(
Feature(
self._device,
id="carpet_clean_mode",
name="Carpet clean mode",
container=self,
attribute_getter="carpet_clean_mode",
attribute_setter="set_carpet_clean_mode",
icon="mdi:rug",
choices_getter=lambda: list(CarpetCleanMode.__members__),
category=Feature.Category.Config,
type=Feature.Type.Choice,
)
)
self._add_feature(
Feature(
self._device,
id="clean_area",
name="Cleaning area",
container=self,
attribute_getter="clean_area",
unit_getter="area_unit",
category=Feature.Category.Info,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id="clean_time",
name="Cleaning time",
container=self,
attribute_getter="clean_time",
category=Feature.Category.Info,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id="clean_progress",
name="Cleaning progress",
container=self,
attribute_getter="clean_progress",
unit_getter=lambda: "%",
category=Feature.Category.Info,
type=Feature.Type.Sensor,
)
)
async def _post_update_hook(self) -> None:
"""Set error code after update."""
if self._logged_error_code_warnings is None:
self._logged_error_code_warnings = set()
self._logged_status_code_warnings = set()
errors = self._vac_status.get("err_status")
if errors is None or not errors:
self._error_code = ErrorCode.Ok
return
if len(errors) > 1 and "multiple" not in self._logged_error_code_warnings:
self._logged_error_code_warnings.add("multiple")
_LOGGER.warning(
"Multiple error codes, using the first one only: %s", errors
)
error = errors.pop(0)
try:
self._error_code = ErrorCode(error)
except ValueError:
if error not in self._logged_error_code_warnings:
self._logged_error_code_warnings.add(error)
_LOGGER.warning(
"Unknown error code, please create an issue "
"describing the error: %s",
error,
)
self._error_code = ErrorCode.UnknownInternal
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {
"getVacStatus": {},
"getCleanInfo": {},
"getCarpetClean": {},
"getAreaUnit": {},
"getBatteryInfo": {},
"getCleanStatus": {},
"getCleanAttr": {"type": "global"},
}
async def start(self) -> dict:
"""Start cleaning."""
# If we are paused, do not restart cleaning
if self.status is Status.Paused:
return await self.resume()
return await self.call(
"setSwitchClean",
{
"clean_mode": 0,
"clean_on": True,
"clean_order": True,
"force_clean": False,
},
)
async def pause(self) -> dict:
"""Pause cleaning."""
if self.status is Status.GoingHome:
return await self.set_return_home(False)
return await self.set_pause(True)
async def resume(self) -> dict:
"""Resume cleaning."""
return await self.set_pause(False)
async def set_pause(self, enabled: bool) -> dict:
"""Pause or resume cleaning."""
return await self.call("setRobotPause", {"pause": enabled})
async def return_home(self) -> dict:
"""Return home."""
return await self.set_return_home(True)
async def set_return_home(self, enabled: bool) -> dict:
"""Return home / pause returning."""
return await self.call("setSwitchCharge", {"switch_charge": enabled})
@property
def error(self) -> ErrorCode:
"""Return error."""
return self._error_code
@property
def fan_speed_preset(self) -> Annotated[str, FeatureAttribute()]:
"""Return fan speed preset."""
return FanSpeed(self._settings["suction"]).name
async def set_fan_speed_preset(
self, speed: str
) -> Annotated[dict, FeatureAttribute]:
"""Set fan speed preset."""
name_to_value = {x.name: x.value for x in FanSpeed}
if speed not in name_to_value:
raise ValueError("Invalid fan speed %s, available %s", speed, name_to_value)
return await self._change_setting("suction", name_to_value[speed])
async def _change_setting(
self, name: str, value: int, *, scope: Literal["global", "pose"] = "global"
) -> dict:
"""Change device setting."""
params = {
name: value,
"type": scope,
}
return await self.call("setCleanAttr", params)
@property
def battery(self) -> int:
"""Return battery level."""
return self.data["getBatteryInfo"]["battery_percentage"]
@property
def _vac_status(self) -> dict:
"""Return vac status container."""
return self.data["getVacStatus"]
@property
def _info(self) -> dict:
"""Return current cleaning info."""
return self.data["getCleanInfo"]
@property
def _settings(self) -> dict:
"""Return cleaning settings."""
return self.data["getCleanAttr"]
@property
def status(self) -> Status:
"""Return current status."""
if self._error_code is not ErrorCode.Ok:
return Status.Error
status_code = self._vac_status["status"]
try:
return Status(status_code)
except ValueError:
if status_code not in self._logged_status_code_warnings:
self._logged_status_code_warnings.add(status_code)
_LOGGER.warning(
"Got unknown status code: %s (%s)", status_code, self.data
)
return Status.UnknownInternal
@property
def carpet_clean_mode(self) -> Annotated[str, FeatureAttribute()]:
"""Return carpet clean mode."""
return CarpetCleanMode(self.data["getCarpetClean"]["carpet_clean_prefer"]).name
async def set_carpet_clean_mode(
self, mode: str
) -> Annotated[dict, FeatureAttribute()]:
"""Set carpet clean mode."""
name_to_value = {x.name: x.value for x in CarpetCleanMode}
if mode not in name_to_value:
raise ValueError(
"Invalid carpet clean mode %s, available %s", mode, name_to_value
)
return await self.call(
"setCarpetClean", {"carpet_clean_prefer": name_to_value[mode]}
)
@property
def area_unit(self) -> AreaUnit:
"""Return area unit."""
return AreaUnit(self.data["getAreaUnit"]["area_unit"])
@property
def clean_area(self) -> Annotated[int, FeatureAttribute()]:
"""Return currently cleaned area."""
return self._info["clean_area"]
@property
def clean_time(self) -> timedelta:
"""Return current cleaning time."""
return timedelta(minutes=self._info["clean_time"])
@property
def clean_progress(self) -> int:
"""Return amount of currently cleaned area."""
return self._info["clean_percent"]
@property
def clean_count(self) -> Annotated[int, FeatureAttribute()]:
"""Return number of times to clean."""
return self._settings["clean_number"]
async def set_clean_count(self, count: int) -> Annotated[dict, FeatureAttribute()]:
"""Set number of times to clean."""
return await self._change_setting("clean_number", count)

View File

@@ -0,0 +1,205 @@
"""Implementation of vacuum cleaning records."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta, tzinfo
from typing import Annotated, cast
from mashumaro import DataClassDictMixin, field_options
from mashumaro.config import ADD_DIALECT_SUPPORT
from mashumaro.dialect import Dialect
from mashumaro.types import SerializationStrategy
from ...feature import Feature
from ...module import FeatureAttribute
from ..smartmodule import Module, SmartModule
from .clean import AreaUnit, Clean
_LOGGER = logging.getLogger(__name__)
@dataclass
class Record(DataClassDictMixin):
"""Historical cleanup result."""
class Config:
"""Configuration class."""
code_generation_options = [ADD_DIALECT_SUPPORT]
#: Total time cleaned (in minutes)
clean_time: timedelta = field(
metadata=field_options(deserialize=lambda x: timedelta(minutes=x))
)
#: Total area cleaned
clean_area: int
dust_collection: bool
timestamp: datetime
info_num: int | None = None
message: int | None = None
map_id: int | None = None
start_type: int | None = None
task_type: int | None = None
record_index: int | None = None
#: Error code from cleaning
error: int = field(default=0)
class _DateTimeSerializationStrategy(SerializationStrategy):
def __init__(self, tz: tzinfo) -> None:
self.tz = tz
def deserialize(self, value: float) -> datetime:
return datetime.fromtimestamp(value, self.tz)
def _get_tz_strategy(tz: tzinfo) -> type[Dialect]:
"""Return a timezone aware de-serialization strategy."""
class TimezoneDialect(Dialect):
serialization_strategy = {datetime: _DateTimeSerializationStrategy(tz)}
return TimezoneDialect
@dataclass
class Records(DataClassDictMixin):
"""Response payload for getCleanRecords."""
class Config:
"""Configuration class."""
code_generation_options = [ADD_DIALECT_SUPPORT]
total_time: timedelta = field(
metadata=field_options(deserialize=lambda x: timedelta(minutes=x))
)
total_area: int
total_count: int = field(metadata=field_options(alias="total_number"))
records: list[Record] = field(metadata=field_options(alias="record_list"))
last_clean: Record = field(metadata=field_options(alias="lastest_day_record"))
@classmethod
def __pre_deserialize__(cls, d: dict) -> dict:
if ldr := d.get("lastest_day_record"):
d["lastest_day_record"] = {
"timestamp": ldr[0],
"clean_time": ldr[1],
"clean_area": ldr[2],
"dust_collection": ldr[3],
}
return d
class CleanRecords(SmartModule):
"""Implementation of vacuum cleaning records."""
REQUIRED_COMPONENT = "clean_percent"
_parsed_data: Records
async def _post_update_hook(self) -> None:
"""Cache parsed data after an update."""
self._parsed_data = Records.from_dict(
self.data, dialect=_get_tz_strategy(self._device.timezone)
)
def _initialize_features(self) -> None:
"""Initialize features."""
for type_ in ["total", "last"]:
self._add_feature(
Feature(
self._device,
id=f"{type_}_clean_area",
name=f"{type_.capitalize()} area cleaned",
container=self,
attribute_getter=f"{type_}_clean_area",
unit_getter="area_unit",
category=Feature.Category.Debug,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id=f"{type_}_clean_time",
name=f"{type_.capitalize()} time cleaned",
container=self,
attribute_getter=f"{type_}_clean_time",
category=Feature.Category.Debug,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id="total_clean_count",
name="Total clean count",
container=self,
attribute_getter="total_clean_count",
category=Feature.Category.Debug,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id="last_clean_timestamp",
name="Last clean timestamp",
container=self,
attribute_getter="last_clean_timestamp",
category=Feature.Category.Debug,
type=Feature.Type.Sensor,
)
)
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {
"getCleanRecords": {},
}
@property
def total_clean_area(self) -> Annotated[int, FeatureAttribute()]:
"""Return total cleaning area."""
return self._parsed_data.total_area
@property
def total_clean_time(self) -> timedelta:
"""Return total cleaning time."""
return self._parsed_data.total_time
@property
def total_clean_count(self) -> int:
"""Return total clean count."""
return self._parsed_data.total_count
@property
def last_clean_area(self) -> Annotated[int, FeatureAttribute()]:
"""Return latest cleaning area."""
return self._parsed_data.last_clean.clean_area
@property
def last_clean_time(self) -> timedelta:
"""Return total cleaning time."""
return self._parsed_data.last_clean.clean_time
@property
def last_clean_timestamp(self) -> datetime:
"""Return latest cleaning timestamp."""
return self._parsed_data.last_clean.timestamp
@property
def area_unit(self) -> AreaUnit:
"""Return area unit."""
clean = cast(Clean, self._device.modules[Module.Clean])
return clean.area_unit
@property
def parsed_data(self) -> Records:
"""Return parsed records data."""
return self._parsed_data

View File

@@ -0,0 +1,170 @@
"""Implementation of vacuum consumables."""
from __future__ import annotations
import logging
from collections.abc import Mapping
from dataclasses import dataclass
from datetime import timedelta
from ...feature import Feature
from ..smartmodule import SmartModule
_LOGGER = logging.getLogger(__name__)
@dataclass
class _ConsumableMeta:
"""Consumable meta container."""
#: Name of the consumable.
name: str
#: Internal id of the consumable
id: str
#: Data key in the device reported data
data_key: str
#: Lifetime
lifetime: timedelta
@dataclass
class Consumable:
"""Consumable container."""
#: Name of the consumable.
name: str
#: Id of the consumable
id: str
#: Lifetime
lifetime: timedelta
#: Used
used: timedelta
#: Remaining
remaining: timedelta
#: Device data key
_data_key: str
CONSUMABLE_METAS = [
_ConsumableMeta(
"Main brush",
id="main_brush",
data_key="roll_brush_time",
lifetime=timedelta(hours=400),
),
_ConsumableMeta(
"Side brush",
id="side_brush",
data_key="edge_brush_time",
lifetime=timedelta(hours=200),
),
_ConsumableMeta(
"Filter",
id="filter",
data_key="filter_time",
lifetime=timedelta(hours=200),
),
_ConsumableMeta(
"Sensor",
id="sensor",
data_key="sensor_time",
lifetime=timedelta(hours=30),
),
_ConsumableMeta(
"Charging contacts",
id="charging_contacts",
data_key="charge_contact_time",
lifetime=timedelta(hours=30),
),
# Unknown keys: main_brush_lid_time, rag_time
]
class Consumables(SmartModule):
"""Implementation of vacuum consumables."""
REQUIRED_COMPONENT = "consumables"
QUERY_GETTER_NAME = "getConsumablesInfo"
_consumables: dict[str, Consumable] = {}
def _initialize_features(self) -> None:
"""Initialize features."""
for c_meta in CONSUMABLE_METAS:
if c_meta.data_key not in self.data:
continue
self._add_feature(
Feature(
self._device,
id=f"{c_meta.id}_used",
name=f"{c_meta.name} used",
container=self,
attribute_getter=lambda _, c_id=c_meta.id: self._consumables[
c_id
].used,
category=Feature.Category.Debug,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id=f"{c_meta.id}_remaining",
name=f"{c_meta.name} remaining",
container=self,
attribute_getter=lambda _, c_id=c_meta.id: self._consumables[
c_id
].remaining,
category=Feature.Category.Info,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
id=f"{c_meta.id}_reset",
name=f"Reset {c_meta.name.lower()} consumable",
container=self,
attribute_setter=lambda c_id=c_meta.id: self.reset_consumable(c_id),
category=Feature.Category.Debug,
type=Feature.Type.Action,
)
)
async def _post_update_hook(self) -> None:
"""Update the consumables."""
if not self._consumables:
for consumable_meta in CONSUMABLE_METAS:
if consumable_meta.data_key not in self.data:
continue
used = timedelta(minutes=self.data[consumable_meta.data_key])
consumable = Consumable(
id=consumable_meta.id,
name=consumable_meta.name,
lifetime=consumable_meta.lifetime,
used=used,
remaining=consumable_meta.lifetime - used,
_data_key=consumable_meta.data_key,
)
self._consumables[consumable_meta.id] = consumable
else:
for consumable in self._consumables.values():
consumable.used = timedelta(minutes=self.data[consumable._data_key])
consumable.remaining = consumable.lifetime - consumable.used
async def reset_consumable(self, consumable_id: str) -> dict:
"""Reset consumable stats."""
consumable_name = self._consumables[consumable_id]._data_key.removesuffix(
"_time"
)
return await self.call(
"resetConsumablesTime", {"reset_list": [consumable_name]}
)
@property
def consumables(self) -> Mapping[str, Consumable]:
"""Get list of consumables on the device."""
return self._consumables

View File

@@ -10,7 +10,7 @@ class ContactSensor(SmartModule):
"""Implementation of contact sensor module."""
REQUIRED_COMPONENT = None # we depend on availability of key
REQUIRED_KEY_ON_PARENT = "open"
SYSINFO_LOOKUP_KEYS = ["open"]
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""

View File

@@ -19,12 +19,15 @@ class DeviceModule(SmartModule):
def query(self) -> dict:
"""Query to execute during the update cycle."""
if self._device._is_hub_child:
# Child devices get their device info updated by the parent device.
return {}
query = {
"get_device_info": None,
}
# Device usage is not available on older firmware versions
# or child devices of hubs
if self.supported_version >= 2 and not self._device._is_hub_child:
if self.supported_version >= 2:
query["get_device_usage"] = None
return query

View File

@@ -0,0 +1,117 @@
"""Implementation of vacuum dustbin."""
from __future__ import annotations
import logging
from enum import IntEnum
from ...feature import Feature
from ..smartmodule import SmartModule
_LOGGER = logging.getLogger(__name__)
class Mode(IntEnum):
"""Dust collection modes."""
Smart = 0
Light = 1
Balanced = 2
Max = 3
class Dustbin(SmartModule):
"""Implementation of vacuum dustbin."""
REQUIRED_COMPONENT = "dust_bucket"
def _initialize_features(self) -> None:
"""Initialize features."""
self._add_feature(
Feature(
self._device,
id="dustbin_empty",
name="Empty dustbin",
container=self,
attribute_setter="start_emptying",
category=Feature.Category.Primary,
type=Feature.Action,
)
)
self._add_feature(
Feature(
self._device,
id="dustbin_autocollection_enabled",
name="Automatic emptying enabled",
container=self,
attribute_getter="auto_collection",
attribute_setter="set_auto_collection",
category=Feature.Category.Config,
type=Feature.Switch,
)
)
self._add_feature(
Feature(
self._device,
id="dustbin_mode",
name="Automatic emptying mode",
container=self,
attribute_getter="mode",
attribute_setter="set_mode",
icon="mdi:fan",
choices_getter=lambda: list(Mode.__members__),
category=Feature.Category.Config,
type=Feature.Type.Choice,
)
)
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {
"getAutoDustCollection": {},
"getDustCollectionInfo": {},
}
async def start_emptying(self) -> dict:
"""Start emptying the bin."""
return await self.call(
"setSwitchDustCollection",
{
"switch_dust_collection": True,
},
)
@property
def _settings(self) -> dict:
"""Return auto-empty settings."""
return self.data["getDustCollectionInfo"]
@property
def mode(self) -> str:
"""Return auto-emptying mode."""
return Mode(self._settings["dust_collection_mode"]).name
async def set_mode(self, mode: str) -> dict:
"""Set auto-emptying mode."""
name_to_value = {x.name: x.value for x in Mode}
if mode not in name_to_value:
raise ValueError(
"Invalid auto/emptying mode speed %s, available %s", mode, name_to_value
)
settings = self._settings.copy()
settings["dust_collection_mode"] = name_to_value[mode]
return await self.call("setDustCollectionInfo", settings)
@property
def auto_collection(self) -> dict:
"""Return auto-emptying config."""
return self._settings["auto_dust_collection"]
async def set_auto_collection(self, on: bool) -> dict:
"""Toggle auto-emptying."""
settings = self._settings.copy()
settings["auto_dust_collection"] = on
return await self.call("setDustCollectionInfo", settings)

View File

@@ -2,10 +2,10 @@
from __future__ import annotations
from typing import NoReturn
from typing import Any, NoReturn
from ...emeterstatus import EmeterStatus
from ...exceptions import KasaException
from ...exceptions import DeviceError, KasaException
from ...interfaces.energy import Energy as EnergyInterface
from ..smartmodule import SmartModule, raise_if_update_error
@@ -15,12 +15,39 @@ class Energy(SmartModule, EnergyInterface):
REQUIRED_COMPONENT = "energy_monitoring"
_energy: dict[str, Any]
_current_consumption: float | None
async def _post_update_hook(self) -> None:
if "voltage_mv" in self.data.get("get_emeter_data", {}):
try:
data = self.data
except DeviceError as de:
self._energy = {}
self._current_consumption = None
raise de
# If version is 1 then data is get_energy_usage
self._energy = data.get("get_energy_usage", data)
if "voltage_mv" in data.get("get_emeter_data", {}):
self._supported = (
self._supported | EnergyInterface.ModuleFeature.VOLTAGE_CURRENT
)
if (power := self._energy.get("current_power")) is not None or (
power := data.get("get_emeter_data", {}).get("power_mw")
) is not None:
self._current_consumption = power / 1_000
# Fallback if get_energy_usage does not provide current_power,
# which can happen on some newer devices (e.g. P304M).
# This may not be valid scenario as it pre-dates trying get_emeter_data
elif (
power := self.data.get("get_current_power", {}).get("current_power")
) is not None:
self._current_consumption = power
else:
self._current_consumption = None
def query(self) -> dict:
"""Query to execute during the update cycle."""
req = {
@@ -33,28 +60,21 @@ class Energy(SmartModule, EnergyInterface):
return req
@property
@raise_if_update_error
def current_consumption(self) -> float | None:
"""Current power in watts."""
if (power := self.energy.get("current_power")) is not None or (
power := self.data.get("get_emeter_data", {}).get("power_mw")
) is not None:
return power / 1_000
# Fallback if get_energy_usage does not provide current_power,
# which can happen on some newer devices (e.g. P304M).
elif (
power := self.data.get("get_current_power", {}).get("current_power")
) is not None:
return power
return None
def optional_response_keys(self) -> list[str]:
"""Return optional response keys for the module."""
if self.supported_version > 1:
return ["get_energy_usage"]
return []
@property
def current_consumption(self) -> float | None:
"""Current power in watts."""
return self._current_consumption
@property
@raise_if_update_error
def energy(self) -> dict:
"""Return get_energy_usage results."""
if en := self.data.get("get_energy_usage"):
return en
return self.data
return self._energy
def _get_status_from_energy(self, energy: dict) -> EmeterStatus:
return EmeterStatus(
@@ -83,16 +103,18 @@ class Energy(SmartModule, EnergyInterface):
return self._get_status_from_energy(res["get_energy_usage"])
@property
@raise_if_update_error
def consumption_this_month(self) -> float | None:
"""Get the emeter value for this month in kWh."""
return self.energy.get("month_energy", 0) / 1_000
if (month := self.energy.get("month_energy")) is not None:
return month / 1_000
return None
@property
@raise_if_update_error
def consumption_today(self) -> float | None:
"""Get the emeter value for today in kWh."""
return self.energy.get("today_energy", 0) / 1_000
if (today := self.energy.get("today_energy")) is not None:
return today / 1_000
return None
@property
@raise_if_update_error

View File

@@ -0,0 +1,32 @@
"""Implementation of homekit module."""
from __future__ import annotations
from ...feature import Feature
from ..smartmodule import SmartModule
class HomeKit(SmartModule):
"""Implementation of homekit module."""
QUERY_GETTER_NAME: str = "get_homekit_info"
REQUIRED_COMPONENT = "homekit"
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
id="homekit_setup_code",
name="Homekit setup code",
container=self,
attribute_getter=lambda x: x.info["mfi_setup_code"],
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
@property
def info(self) -> dict[str, str]:
"""Homekit mfi setup info."""
return self.data

View File

@@ -7,7 +7,7 @@ from typing import Annotated
from ...exceptions import KasaException
from ...feature import Feature
from ...interfaces.light import HSV, ColorTempRange, LightState
from ...interfaces.light import HSV, LightState
from ...interfaces.light import Light as LightInterface
from ...module import FeatureAttribute, Module
from ..smartmodule import SmartModule
@@ -34,39 +34,13 @@ class Light(SmartModule, LightInterface):
"""Query to execute during the update cycle."""
return {}
@property
def is_color(self) -> bool:
"""Whether the bulb supports color changes."""
return Module.Color in self._device.modules
@property
def is_dimmable(self) -> bool:
"""Whether the bulb supports brightness changes."""
return Module.Brightness in self._device.modules
@property
def is_variable_color_temp(self) -> bool:
"""Whether the bulb supports color temperature changes."""
return Module.ColorTemperature in self._device.modules
@property
def valid_temperature_range(self) -> ColorTempRange:
"""Return the device-specific white temperature range (in Kelvin).
:return: White temperature range in Kelvin (minimum, maximum)
"""
if not self.is_variable_color_temp:
raise KasaException("Color temperature not supported")
return self._device.modules[Module.ColorTemperature].valid_temperature_range
@property
def hsv(self) -> Annotated[HSV, FeatureAttribute()]:
"""Return the current HSV state of the bulb.
:return: hue, saturation and value (degrees, %, %)
"""
if not self.is_color:
if Module.Color not in self._device.modules:
raise KasaException("Bulb does not support color.")
return self._device.modules[Module.Color].hsv
@@ -74,7 +48,7 @@ class Light(SmartModule, LightInterface):
@property
def color_temp(self) -> Annotated[int, FeatureAttribute()]:
"""Whether the bulb supports color temperature changes."""
if not self.is_variable_color_temp:
if Module.ColorTemperature not in self._device.modules:
raise KasaException("Bulb does not support colortemp.")
return self._device.modules[Module.ColorTemperature].color_temp
@@ -82,7 +56,7 @@ class Light(SmartModule, LightInterface):
@property
def brightness(self) -> Annotated[int, FeatureAttribute()]:
"""Return the current brightness in percentage."""
if not self.is_dimmable: # pragma: no cover
if Module.Brightness not in self._device.modules: # pragma: no cover
raise KasaException("Bulb is not dimmable.")
return self._device.modules[Module.Brightness].brightness
@@ -104,7 +78,7 @@ class Light(SmartModule, LightInterface):
:param int value: value between 1 and 100
:param int transition: transition in milliseconds.
"""
if not self.is_color:
if Module.Color not in self._device.modules:
raise KasaException("Bulb does not support color.")
return await self._device.modules[Module.Color].set_hsv(hue, saturation, value)
@@ -119,7 +93,7 @@ class Light(SmartModule, LightInterface):
:param int temp: The new color temperature, in Kelvin
:param int transition: transition in milliseconds.
"""
if not self.is_variable_color_temp:
if Module.ColorTemperature not in self._device.modules:
raise KasaException("Bulb does not support colortemp.")
return await self._device.modules[Module.ColorTemperature].set_color_temp(
temp, brightness=brightness
@@ -135,16 +109,11 @@ class Light(SmartModule, LightInterface):
:param int brightness: brightness in percent
:param int transition: transition in milliseconds.
"""
if not self.is_dimmable: # pragma: no cover
if Module.Brightness not in self._device.modules: # pragma: no cover
raise KasaException("Bulb is not dimmable.")
return await self._device.modules[Module.Brightness].set_brightness(brightness)
@property
def has_effects(self) -> bool:
"""Return True if the device supports effects."""
return Module.LightEffect in self._device.modules
async def set_state(self, state: LightState) -> dict:
"""Set the light state."""
state_dict = asdict(state)
@@ -167,16 +136,17 @@ class Light(SmartModule, LightInterface):
return self._light_state
async def _post_update_hook(self) -> None:
if self._device.is_on is False:
device = self._device
if device.is_on is False:
state = LightState(light_on=False)
else:
state = LightState(light_on=True)
if self.is_dimmable:
if Module.Brightness in device.modules:
state.brightness = self.brightness
if self.is_color:
if Module.Color in device.modules:
hsv = self.hsv
state.hue = hsv.hue
state.saturation = hsv.saturation
if self.is_variable_color_temp:
if Module.ColorTemperature in device.modules:
state.color_temp = self.color_temp
self._light_state = state

View File

@@ -96,13 +96,18 @@ class LightPreset(SmartModule, LightPresetInterface):
"""Return current preset name."""
light = self._device.modules[SmartModule.Light]
brightness = light.brightness
color_temp = light.color_temp if light.is_variable_color_temp else None
h, s = (light.hsv.hue, light.hsv.saturation) if light.is_color else (None, None)
color_temp = light.color_temp if light.has_feature("color_temp") else None
h, s = (
(light.hsv.hue, light.hsv.saturation)
if light.has_feature("hsv")
else (None, None)
)
for preset_name, preset in self._presets.items():
if (
preset.brightness == brightness
and (
preset.color_temp == color_temp or not light.is_variable_color_temp
preset.color_temp == color_temp
or not light.has_feature("color_temp")
)
and preset.hue == h
and preset.saturation == s
@@ -117,7 +122,7 @@ class LightPreset(SmartModule, LightPresetInterface):
"""Set a light preset for the device."""
light = self._device.modules[SmartModule.Light]
if preset_name == self.PRESET_NOT_SET:
if light.is_color:
if light.has_feature("hsv"):
preset = LightState(hue=0, saturation=0, brightness=100)
else:
preset = LightState(brightness=100)

View File

@@ -0,0 +1,43 @@
"""Implementation of matter module."""
from __future__ import annotations
from ...feature import Feature
from ..smartmodule import SmartModule
class Matter(SmartModule):
"""Implementation of matter module."""
QUERY_GETTER_NAME: str = "get_matter_setup_info"
REQUIRED_COMPONENT = "matter"
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
id="matter_setup_code",
name="Matter setup code",
container=self,
attribute_getter=lambda x: x.info["setup_code"],
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
self._add_feature(
Feature(
self._device,
id="matter_setup_payload",
name="Matter setup payload",
container=self,
attribute_getter=lambda x: x.info["setup_payload"],
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
@property
def info(self) -> dict[str, str]:
"""Matter setup info."""
return self.data

90
kasa/smart/modules/mop.py Normal file
View File

@@ -0,0 +1,90 @@
"""Implementation of vacuum mop."""
from __future__ import annotations
import logging
from enum import IntEnum
from typing import Annotated
from ...feature import Feature
from ...module import FeatureAttribute
from ..smartmodule import SmartModule
_LOGGER = logging.getLogger(__name__)
class Waterlevel(IntEnum):
"""Water level for mopping."""
Disable = 0
Low = 1
Medium = 2
High = 3
class Mop(SmartModule):
"""Implementation of vacuum mop."""
REQUIRED_COMPONENT = "mop"
def _initialize_features(self) -> None:
"""Initialize features."""
self._add_feature(
Feature(
self._device,
id="mop_attached",
name="Mop attached",
container=self,
icon="mdi:square-rounded",
attribute_getter="mop_attached",
category=Feature.Category.Info,
type=Feature.BinarySensor,
)
)
self._add_feature(
Feature(
self._device,
id="mop_waterlevel",
name="Mop water level",
container=self,
attribute_getter="waterlevel",
attribute_setter="set_waterlevel",
icon="mdi:water",
choices_getter=lambda: list(Waterlevel.__members__),
category=Feature.Category.Config,
type=Feature.Type.Choice,
)
)
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {
"getMopState": {},
"getCleanAttr": {"type": "global"},
}
@property
def mop_attached(self) -> bool:
"""Return True if mop is attached."""
return self.data["getMopState"]["mop_state"]
@property
def _settings(self) -> dict:
"""Return settings settings."""
return self.data["getCleanAttr"]
@property
def waterlevel(self) -> Annotated[str, FeatureAttribute()]:
"""Return water level."""
return Waterlevel(int(self._settings["cistern"])).name
async def set_waterlevel(self, mode: str) -> Annotated[dict, FeatureAttribute()]:
"""Set waterlevel mode."""
name_to_value = {x.name: x.value for x in Waterlevel}
if mode not in name_to_value:
raise ValueError("Invalid waterlevel %s, available %s", mode, name_to_value)
settings = self._settings.copy()
settings["cistern"] = name_to_value[mode]
return await self.call("setCleanAttr", settings)

View File

@@ -0,0 +1,41 @@
"""Overheat module."""
from __future__ import annotations
from ...feature import Feature
from ..smartmodule import SmartModule
class OverheatProtection(SmartModule):
"""Implementation for overheat_protection."""
SYSINFO_LOOKUP_KEYS = ["overheated", "overheat_status"]
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
container=self,
id="overheated",
name="Overheated",
attribute_getter="overheated",
icon="mdi:heat-wave",
type=Feature.Type.BinarySensor,
category=Feature.Category.Info,
)
)
@property
def overheated(self) -> bool:
"""Return True if device reports overheating."""
if (value := self._device.sys_info.get("overheat_status")) is not None:
# Value can be normal, cooldown, or overheated.
# We report all but normal as overheated.
return value != "normal"
return self._device.sys_info["overheated"]
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {}

View File

@@ -0,0 +1,67 @@
"""Implementation of vacuum speaker."""
from __future__ import annotations
import logging
from typing import Annotated
from ...feature import Feature
from ...module import FeatureAttribute
from ..smartmodule import SmartModule
_LOGGER = logging.getLogger(__name__)
class Speaker(SmartModule):
"""Implementation of vacuum speaker."""
REQUIRED_COMPONENT = "speaker"
def _initialize_features(self) -> None:
"""Initialize features."""
self._add_feature(
Feature(
self._device,
id="locate",
name="Locate device",
container=self,
attribute_setter="locate",
category=Feature.Category.Primary,
type=Feature.Action,
)
)
self._add_feature(
Feature(
self._device,
id="volume",
name="Volume",
container=self,
attribute_getter="volume",
attribute_setter="set_volume",
range_getter=lambda: (0, 100),
category=Feature.Category.Config,
type=Feature.Type.Number,
)
)
def query(self) -> dict:
"""Query to execute during the update cycle."""
return {
"getVolume": None,
}
@property
def volume(self) -> Annotated[str, FeatureAttribute()]:
"""Return volume."""
return self.data["volume"]
async def set_volume(self, volume: int) -> Annotated[dict, FeatureAttribute()]:
"""Set volume."""
if volume < 0 or volume > 100:
raise ValueError("Volume must be between 0 and 100")
return await self.call("setVolume", {"volume": volume})
async def locate(self) -> dict:
"""Play sound to locate the device."""
return await self.call("playSelectAudio", {"audio_type": "seek_me"})

View File

@@ -6,10 +6,11 @@ import logging
import time
from typing import Any
from ..device import DeviceInfo
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..protocols.smartprotocol import SmartProtocol, _ChildProtocolWrapper
from .smartdevice import SmartDevice
from .smartdevice import ComponentsRaw, SmartDevice
from .smartmodule import SmartModule
_LOGGER = logging.getLogger(__name__)
@@ -23,6 +24,7 @@ class SmartChildDevice(SmartDevice):
CHILD_DEVICE_TYPE_MAP = {
"plug.powerstrip.sub-plug": DeviceType.Plug,
"subg.plugswitch.switch": DeviceType.WallSwitch,
"subg.trigger.contact-sensor": DeviceType.Sensor,
"subg.trigger.temp-hmdt-sensor": DeviceType.Sensor,
"subg.trigger.water-leak-sensor": DeviceType.Sensor,
@@ -37,7 +39,7 @@ class SmartChildDevice(SmartDevice):
self,
parent: SmartDevice,
info: dict,
component_info: dict,
component_info_raw: ComponentsRaw,
*,
config: DeviceConfig | None = None,
protocol: SmartProtocol | None = None,
@@ -47,7 +49,24 @@ class SmartChildDevice(SmartDevice):
super().__init__(parent.host, config=parent.config, protocol=_protocol)
self._parent = parent
self._update_internal_state(info)
self._components = component_info
self._components_raw = component_info_raw
self._components = self._parse_components(self._components_raw)
@property
def device_info(self) -> DeviceInfo:
"""Return device info.
Child device does not have it info and components in _last_update so
this overrides the base implementation to call _get_device_info with
info and components combined as they would be in _last_update.
"""
return self._get_device_info(
{
"get_device_info": self._info,
"component_nego": self._components_raw,
},
None,
)
async def update(self, update_children: bool = True) -> None:
"""Update child module info.
@@ -67,11 +86,22 @@ class SmartChildDevice(SmartDevice):
module_queries: list[SmartModule] = []
req: dict[str, Any] = {}
for module in self.modules.values():
if module.disabled is False and (mod_query := module.query()):
if (
module.disabled is False
and (mod_query := module.query())
and module._should_update(now)
):
module_queries.append(module)
req.update(mod_query)
if req:
self._last_update = await self.protocol.query(req)
first_update = self._last_update != {}
try:
resp = await self.protocol.query(req)
except Exception as ex:
resp = await self._handle_modular_update_error(
ex, first_update, ", ".join(mod.name for mod in module_queries), req
)
self._last_update = resp
for module in self.modules.values():
await self._handle_module_post_update(
@@ -79,12 +109,17 @@ class SmartChildDevice(SmartDevice):
)
self._last_update_time = now
# We can first initialize the features after the first update.
# We make here an assumption that every device has at least a single feature.
if not self._features:
await self._initialize_features()
@classmethod
async def create(
cls,
parent: SmartDevice,
child_info: dict,
child_components: dict,
child_components_raw: ComponentsRaw,
protocol: SmartProtocol | None = None,
*,
last_update: dict | None = None,
@@ -97,7 +132,7 @@ class SmartChildDevice(SmartDevice):
derived from the parent.
"""
child: SmartChildDevice = cls(
parent, child_info, child_components, protocol=protocol
parent, child_info, child_components_raw, protocol=protocol
)
if last_update:
child._last_update = last_update

View File

@@ -5,11 +5,12 @@ from __future__ import annotations
import base64
import logging
import time
from collections.abc import Mapping, Sequence
from collections import OrderedDict
from collections.abc import Sequence
from datetime import UTC, datetime, timedelta, tzinfo
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any, TypeAlias, cast
from ..device import Device, WifiNetwork, _DeviceInfo
from ..device import Device, DeviceInfo, WifiNetwork
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..exceptions import AuthenticationError, DeviceError, KasaException, SmartErrorCode
@@ -40,6 +41,8 @@ _LOGGER = logging.getLogger(__name__)
# same issue, homekit perhaps?
NON_HUB_PARENT_ONLY_MODULES = [DeviceModule, Time, Firmware, Cloud]
ComponentsRaw: TypeAlias = dict[str, list[dict[str, int | str]]]
# Device must go last as the other interfaces also inherit Device
# and python needs a consistent method resolution order.
@@ -61,16 +64,18 @@ class SmartDevice(Device):
)
super().__init__(host=host, config=config, protocol=_protocol)
self.protocol: SmartProtocol
self._components_raw: dict[str, Any] | None = None
self._components_raw: ComponentsRaw | None = None
self._components: dict[str, int] = {}
self._state_information: dict[str, Any] = {}
self._modules: dict[str | ModuleName[Module], SmartModule] = {}
self._modules: OrderedDict[str | ModuleName[Module], SmartModule] = (
OrderedDict()
)
self._parent: SmartDevice | None = None
self._children: Mapping[str, SmartDevice] = {}
self._last_update = {}
self._children: dict[str, SmartDevice] = {}
self._last_update_time: float | None = None
self._on_since: datetime | None = None
self._info: dict[str, Any] = {}
self._logged_missing_child_ids: set[str] = set()
async def _initialize_children(self) -> None:
"""Initialize children for power strips."""
@@ -81,25 +86,86 @@ class SmartDevice(Device):
resp = await self.protocol.query(child_info_query)
self.internal_state.update(resp)
children = self.internal_state["get_child_device_list"]["child_device_list"]
children_components = {
child["device_id"]: {
comp["id"]: int(comp["ver_code"]) for comp in child["component_list"]
}
for child in self.internal_state["get_child_device_component_list"][
"child_component_list"
]
}
async def _try_create_child(
self, info: dict, child_components: dict
) -> SmartDevice | None:
from .smartchilddevice import SmartChildDevice
self._children = {
child_info["device_id"]: await SmartChildDevice.create(
parent=self,
child_info=child_info,
child_components=children_components[child_info["device_id"]],
)
for child_info in children
return await SmartChildDevice.create(
parent=self,
child_info=info,
child_components_raw=child_components,
)
async def _create_delete_children(
self,
child_device_resp: dict[str, list],
child_device_components_resp: dict[str, list],
) -> bool:
"""Create and delete children. Return True if children changed.
Adds newly found children and deletes children that are no longer
reported by the device. It will only log once per child_id that
can't be created to avoid spamming the logs on every update.
"""
changed = False
smart_children_components = {
child["device_id"]: child
for child in child_device_components_resp["child_component_list"]
}
children = self._children
child_ids: set[str] = set()
existing_child_ids = set(self._children.keys())
for info in child_device_resp["child_device_list"]:
if (child_id := info.get("device_id")) and (
child_components := smart_children_components.get(child_id)
):
child_ids.add(child_id)
if child_id in existing_child_ids:
continue
child = await self._try_create_child(info, child_components)
if child:
_LOGGER.debug("Created child device %s for %s", child, self.host)
changed = True
children[child_id] = child
continue
if child_id not in self._logged_missing_child_ids:
self._logged_missing_child_ids.add(child_id)
_LOGGER.debug("Child device type not supported: %s", info)
continue
if child_id:
if child_id not in self._logged_missing_child_ids:
self._logged_missing_child_ids.add(child_id)
_LOGGER.debug(
"Could not find child components for device %s, "
"child_id %s, components: %s: ",
self.host,
child_id,
smart_children_components,
)
continue
# If we couldn't get a child device id we still only want to
# log once to avoid spamming the logs on every update cycle
# so store it under an empty string
if "" not in self._logged_missing_child_ids:
self._logged_missing_child_ids.add("")
_LOGGER.debug(
"Could not find child id for device %s, info: %s", self.host, info
)
removed_ids = existing_child_ids - child_ids
for removed_id in removed_ids:
changed = True
removed = children.pop(removed_id)
_LOGGER.debug("Removed child device %s from %s", removed, self.host)
return changed
@property
def children(self) -> Sequence[SmartDevice]:
@@ -131,6 +197,13 @@ class SmartDevice(Device):
f"{request} not found in {responses} for device {self.host}"
)
@staticmethod
def _parse_components(components_raw: ComponentsRaw) -> dict[str, int]:
return {
str(comp["id"]): int(comp["ver_code"])
for comp in components_raw["component_list"]
}
async def _negotiate(self) -> None:
"""Perform initialization.
@@ -151,29 +224,41 @@ class SmartDevice(Device):
self._info = self._try_get_response(resp, "get_device_info")
# Create our internal presentation of available components
self._components_raw = cast(dict, resp["component_nego"])
self._components_raw = cast(ComponentsRaw, resp["component_nego"])
self._components = {
comp["id"]: int(comp["ver_code"])
for comp in self._components_raw["component_list"]
}
self._components = self._parse_components(self._components_raw)
if "child_device" in self._components and not self.children:
await self._initialize_children()
def _update_children_info(self) -> None:
"""Update the internal child device info from the parent info."""
async def _update_children_info(self) -> bool:
"""Update the internal child device info from the parent info.
Return true if children added or deleted.
"""
changed = False
if child_info := self._try_get_response(
self._last_update, "get_child_device_list", {}
):
changed = await self._create_delete_children(
child_info, self._last_update["get_child_device_component_list"]
)
for info in child_info["child_device_list"]:
self._children[info["device_id"]]._update_internal_state(info)
child_id = info.get("device_id")
if child_id not in self._children:
# _create_delete_children has already logged a message
continue
self._children[child_id]._update_internal_state(info)
return changed
def _update_internal_info(self, info_resp: dict) -> None:
"""Update the internal device info."""
self._info = self._try_get_response(info_resp, "get_device_info")
async def update(self, update_children: bool = False) -> None:
async def update(self, update_children: bool = True) -> None:
"""Update the device."""
if self.credentials is None and self.credentials_hash is None:
raise AuthenticationError("Tapo plug requires authentication.")
@@ -191,13 +276,13 @@ class SmartDevice(Device):
resp = await self._modular_update(first_update, now)
self._update_children_info()
children_changed = await self._update_children_info()
# Call child update which will only update module calls, info is updated
# from get_child_device_list. update_children only affects hub devices, other
# devices will always update children to prevent errors on module access.
# This needs to go after updating the internal state of the children so that
# child modules have access to their sysinfo.
if update_children or self.device_type != DeviceType.Hub:
if children_changed or update_children or self.device_type != DeviceType.Hub:
for child in self._children.values():
if TYPE_CHECKING:
assert isinstance(child, SmartChildDevice)
@@ -250,11 +335,7 @@ class SmartDevice(Device):
if first_update and module.__class__ in self.FIRST_UPDATE_MODULES:
module._last_update_time = update_time
continue
if (
not module.update_interval
or not module._last_update_time
or (update_time - module._last_update_time) >= module.update_interval
):
if module._should_update(update_time):
module_queries.append(module)
req.update(query)
@@ -342,9 +423,8 @@ class SmartDevice(Device):
) or mod.__name__ in child_modules_to_skip:
continue
required_component = cast(str, mod.REQUIRED_COMPONENT)
if required_component in self._components or (
mod.REQUIRED_KEY_ON_PARENT
and self.sys_info.get(mod.REQUIRED_KEY_ON_PARENT) is not None
if required_component in self._components or any(
self.sys_info.get(key) is not None for key in mod.SYSINFO_LOOKUP_KEYS
):
_LOGGER.debug(
"Device %s, found required %s, adding %s to modules.",
@@ -368,6 +448,11 @@ class SmartDevice(Device):
):
self._modules[Thermostat.__name__] = Thermostat(self, "thermostat")
# We move time to the beginning so other modules can access the
# time and timezone after update if required. e.g. cleanrecords
if Time.__name__ in self._modules:
self._modules.move_to_end(Time.__name__, last=False)
async def _initialize_features(self) -> None:
"""Initialize device features."""
self._add_feature(
@@ -433,19 +518,6 @@ class SmartDevice(Device):
)
)
if "overheated" in self._info:
self._add_feature(
Feature(
self,
id="overheated",
name="Overheated",
attribute_getter=lambda x: x._info["overheated"],
icon="mdi:heat-wave",
type=Feature.Type.BinarySensor,
category=Feature.Category.Info,
)
)
# We check for the key available, and not for the property truthiness,
# as the value is falsy when the device is off.
if "on_time" in self._info:
@@ -473,12 +545,25 @@ class SmartDevice(Device):
)
)
if self.parent is not None and (
cs := self.parent.modules.get(Module.ChildSetup)
):
self._add_feature(
Feature(
device=self,
id="unpair",
name="Unpair device",
container=cs,
attribute_setter=lambda: cs.unpair(self.device_id),
category=Feature.Category.Debug,
type=Feature.Type.Action,
)
)
for module in self.modules.values():
module._initialize_features()
for feat in module._module_features.values():
self._add_feature(feat)
for child in self._children.values():
await child._initialize_features()
@property
def _is_hub_child(self) -> bool:
@@ -500,18 +585,13 @@ class SmartDevice(Device):
@property
def model(self) -> str:
"""Returns the device model."""
return str(self._info.get("model"))
# If update hasn't been called self._device_info can't be used
if self._last_update:
return self.device_info.short_name
@property
def _model_region(self) -> str:
"""Return device full model name and region."""
if (disco := self._discovery_info) and (
disco_model := disco.get("device_model")
):
return disco_model
# Some devices have the region in the specs element.
region = f"({specs})" if (specs := self._info.get("specs")) else ""
return f"{self.model}{region}"
disco_model = str(self._info.get("device_model"))
long_name, _, _ = disco_model.partition("(")
return long_name
@property
def alias(self) -> str | None:
@@ -611,12 +691,8 @@ class SmartDevice(Device):
"""
self._info = info
async def _query_helper(
self, method: str, params: dict | None = None, child_ids: None = None
) -> dict:
res = await self.protocol.query({method: params})
return res
async def _query_helper(self, method: str, params: dict | None = None) -> dict:
return await self.protocol.query({method: params})
@property
def ssid(self) -> str:
@@ -765,10 +841,11 @@ class SmartDevice(Device):
if self._device_type is not DeviceType.Unknown:
return self._device_type
# Fallback to device_type (from disco info)
type_str = self._info.get("type", self._info.get("device_type"))
if not type_str: # no update or discovery info
if (
not (type_str := self._info.get("type", self._info.get("device_type")))
or not self._components
):
# no update or discovery info
return self._device_type
self._device_type = self._get_device_type_from_components(
@@ -804,13 +881,15 @@ class SmartDevice(Device):
return DeviceType.Thermostat
if "ROBOVAC" in device_type:
return DeviceType.Vacuum
if "TAPOCHIME" in device_type:
return DeviceType.Chime
_LOGGER.warning("Unknown device type, falling back to plug")
return DeviceType.Plug
@staticmethod
def _get_device_info(
info: dict[str, Any], discovery_info: dict[str, Any] | None
) -> _DeviceInfo:
) -> DeviceInfo:
"""Get model information for a device."""
di = info["get_device_info"]
components = [comp["id"] for comp in info["component_nego"]["component_list"]]
@@ -839,7 +918,7 @@ class SmartDevice(Device):
# Brand inferred from SMART.KASAPLUG/SMART.TAPOPLUG etc.
brand = devicetype[:4].lower()
return _DeviceInfo(
return DeviceInfo(
short_name=short_name,
long_name=long_name,
brand=brand,

View File

@@ -54,14 +54,16 @@ class SmartModule(Module):
NAME: str
#: Module is initialized, if the given component is available
REQUIRED_COMPONENT: str | None = None
#: Module is initialized, if the given key available in the main sysinfo
REQUIRED_KEY_ON_PARENT: str | None = None
#: Module is initialized, if any of the given keys exists in the sysinfo
SYSINFO_LOOKUP_KEYS: list[str] = []
#: Query to execute during the main update cycle
QUERY_GETTER_NAME: str
QUERY_GETTER_NAME: str = ""
REGISTERED_MODULES: dict[str, type[SmartModule]] = {}
MINIMUM_UPDATE_INTERVAL_SECS = 0
MINIMUM_HUB_CHILD_UPDATE_INTERVAL_SECS = 60 * 60 * 24
UPDATE_INTERVAL_AFTER_ERROR_SECS = 30
DISABLE_AFTER_ERROR_COUNT = 10
@@ -72,6 +74,7 @@ class SmartModule(Module):
self._last_update_time: float | None = None
self._last_update_error: KasaException | None = None
self._error_count = 0
self._logged_remove_keys: list[str] = []
def __init_subclass__(cls, **kwargs) -> None:
# We only want to register submodules in a modules package so that
@@ -106,16 +109,27 @@ class SmartModule(Module):
@property
def update_interval(self) -> int:
"""Time to wait between updates."""
if self._last_update_error is None:
return self.MINIMUM_UPDATE_INTERVAL_SECS
if self._last_update_error:
return self.UPDATE_INTERVAL_AFTER_ERROR_SECS * self._error_count
return self.UPDATE_INTERVAL_AFTER_ERROR_SECS * self._error_count
if self._device._is_hub_child:
return self.MINIMUM_HUB_CHILD_UPDATE_INTERVAL_SECS
return self.MINIMUM_UPDATE_INTERVAL_SECS
@property
def disabled(self) -> bool:
"""Return true if the module is disabled due to errors."""
return self._error_count >= self.DISABLE_AFTER_ERROR_COUNT
def _should_update(self, update_time: float) -> bool:
"""Return true if module should update based on delay parameters."""
return (
not self.update_interval
or not self._last_update_time
or (update_time - self._last_update_time) >= self.update_interval
)
@classmethod
def _module_name(cls) -> str:
return getattr(cls, "NAME", cls.__name__)
@@ -138,7 +152,9 @@ class SmartModule(Module):
Default implementation uses the raw query getter w/o parameters.
"""
return {self.QUERY_GETTER_NAME: None}
if self.QUERY_GETTER_NAME:
return {self.QUERY_GETTER_NAME: None}
return {}
async def call(self, method: str, params: dict | None = None) -> dict:
"""Call a method.
@@ -147,6 +163,15 @@ class SmartModule(Module):
"""
return await self._device._query_helper(method, params)
@property
def optional_response_keys(self) -> list[str]:
"""Return optional response keys for the module.
Defaults to no keys. Overriding this and providing keys will remove
instead of raise on error.
"""
return []
@property
def data(self) -> dict[str, Any]:
"""Return response data for the module.
@@ -179,12 +204,31 @@ class SmartModule(Module):
filtered_data = {k: v for k, v in dev._last_update.items() if k in q_keys}
remove_keys: list[str] = []
for data_item in filtered_data:
if isinstance(filtered_data[data_item], SmartErrorCode):
raise DeviceError(
f"{data_item} for {self.name}", error_code=filtered_data[data_item]
if data_item in self.optional_response_keys:
remove_keys.append(data_item)
else:
raise DeviceError(
f"{data_item} for {self.name}",
error_code=filtered_data[data_item],
)
for key in remove_keys:
if key not in self._logged_remove_keys:
self._logged_remove_keys.append(key)
_LOGGER.debug(
"Removed key %s from response for device %s as it returned "
"error: %s. This message will only be logged once per key.",
key,
self._device.host,
filtered_data[key],
)
if len(filtered_data) == 1:
filtered_data.pop(key)
if len(filtered_data) == 1 and not remove_keys:
return next(iter(filtered_data.values()))
return filtered_data