Add common energy module and deprecate device emeter attributes (#976)

Consolidates logic for energy monitoring across smart and iot devices.
Deprecates emeter attributes in favour of common names.
This commit is contained in:
Steven B
2024-06-17 11:22:05 +01:00
committed by GitHub
parent 51a972542f
commit b4a6df2b5c
14 changed files with 487 additions and 382 deletions

View File

@@ -9,16 +9,17 @@ from typing import Any
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..emeterstatus import EmeterStatus
from ..exceptions import KasaException
from ..feature import Feature
from ..interfaces import Energy
from ..module import Module
from ..protocol import BaseProtocol
from .iotdevice import (
EmeterStatus,
IotDevice,
merge,
requires_update,
)
from .iotmodule import IotModule
from .iotplug import IotPlug
from .modules import Antitheft, Countdown, Schedule, Time, Usage
@@ -97,11 +98,20 @@ class IotStrip(IotDevice):
super().__init__(host=host, config=config, protocol=protocol)
self.emeter_type = "emeter"
self._device_type = DeviceType.Strip
async def _initialize_modules(self):
"""Initialize modules."""
# Strip has different modules to plug so do not call super
self.add_module(Module.IotAntitheft, Antitheft(self, "anti_theft"))
self.add_module(Module.IotSchedule, Schedule(self, "schedule"))
self.add_module(Module.IotUsage, Usage(self, "schedule"))
self.add_module(Module.IotTime, Time(self, "time"))
self.add_module(Module.IotCountdown, Countdown(self, "countdown"))
if self.has_emeter:
_LOGGER.debug(
"The device has emeter, querying its information along sysinfo"
)
self.add_module(Module.Energy, StripEmeter(self, self.emeter_type))
@property # type: ignore
@requires_update
@@ -114,10 +124,12 @@ class IotStrip(IotDevice):
Needed for methods that are decorated with `requires_update`.
"""
# Super initializes modules and features
await super().update(update_children)
initialize_children = not self.children
# Initialize the child devices during the first update.
if not self.children:
if initialize_children:
children = self.sys_info["children"]
_LOGGER.debug("Initializing %s child sockets", len(children))
self._children = {
@@ -127,12 +139,22 @@ class IotStrip(IotDevice):
for child in children
}
for child in self._children.values():
await child._initialize_features()
await child._initialize_modules()
if update_children and self.has_emeter:
if update_children:
for plug in self.children:
await plug.update()
if not self.features:
await self._initialize_features()
async def _initialize_features(self):
"""Initialize common features."""
# Do not initialize features until children are created
if not self.children:
return
await super()._initialize_features()
async def turn_on(self, **kwargs):
"""Turn the strip on."""
await self._query_helper("system", "set_relay_state", {"state": 1})
@@ -150,21 +172,43 @@ class IotStrip(IotDevice):
return min(plug.on_since for plug in self.children if plug.on_since is not None)
async def current_consumption(self) -> float:
"""Get the current power consumption in watts."""
return sum([await plug.current_consumption() for plug in self.children])
@requires_update
async def get_emeter_realtime(self) -> EmeterStatus:
class StripEmeter(IotModule, Energy):
"""Energy module implementation to aggregate child modules."""
_supported = (
Energy.ModuleFeature.CONSUMPTION_TOTAL
| Energy.ModuleFeature.PERIODIC_STATS
| Energy.ModuleFeature.VOLTAGE_CURRENT
)
def supports(self, module_feature: Energy.ModuleFeature) -> bool:
"""Return True if module supports the feature."""
return module_feature in self._supported
def query(self):
"""Return the base query."""
return {}
@property
def current_consumption(self) -> float | None:
"""Get the current power consumption in watts."""
return sum(
v if (v := plug.modules[Module.Energy].current_consumption) else 0.0
for plug in self._device.children
)
async def get_status(self) -> EmeterStatus:
"""Retrieve current energy readings."""
emeter_rt = await self._async_get_emeter_sum("get_emeter_realtime", {})
emeter_rt = await self._async_get_emeter_sum("get_status", {})
# Voltage is averaged since each read will result
# in a slightly different voltage since they are not atomic
emeter_rt["voltage_mv"] = int(emeter_rt["voltage_mv"] / len(self.children))
emeter_rt["voltage_mv"] = int(
emeter_rt["voltage_mv"] / len(self._device.children)
)
return EmeterStatus(emeter_rt)
@requires_update
async def get_emeter_daily(
async def get_daily_stats(
self, year: int | None = None, month: int | None = None, kwh: bool = True
) -> dict:
"""Retrieve daily statistics for a given month.
@@ -176,11 +220,10 @@ class IotStrip(IotDevice):
:return: mapping of day of month to value
"""
return await self._async_get_emeter_sum(
"get_emeter_daily", {"year": year, "month": month, "kwh": kwh}
"get_daily_stats", {"year": year, "month": month, "kwh": kwh}
)
@requires_update
async def get_emeter_monthly(
async def get_monthly_stats(
self, year: int | None = None, kwh: bool = True
) -> dict:
"""Retrieve monthly statistics for a given year.
@@ -189,44 +232,68 @@ class IotStrip(IotDevice):
:param kwh: return usage in kWh (default: True)
"""
return await self._async_get_emeter_sum(
"get_emeter_monthly", {"year": year, "kwh": kwh}
"get_monthly_stats", {"year": year, "kwh": kwh}
)
async def _async_get_emeter_sum(self, func: str, kwargs: dict[str, Any]) -> dict:
"""Retreive emeter stats for a time period from children."""
self._verify_emeter()
"""Retrieve emeter stats for a time period from children."""
return merge_sums(
[await getattr(plug, func)(**kwargs) for plug in self.children]
[
await getattr(plug.modules[Module.Energy], func)(**kwargs)
for plug in self._device.children
]
)
@requires_update
async def erase_emeter_stats(self):
async def erase_stats(self):
"""Erase energy meter statistics for all plugs."""
for plug in self.children:
await plug.erase_emeter_stats()
for plug in self._device.children:
await plug.modules[Module.Energy].erase_stats()
@property # type: ignore
@requires_update
def emeter_this_month(self) -> float | None:
def consumption_this_month(self) -> float | None:
"""Return this month's energy consumption in kWh."""
return sum(v if (v := plug.emeter_this_month) else 0 for plug in self.children)
return sum(
v if (v := plug.modules[Module.Energy].consumption_this_month) else 0.0
for plug in self._device.children
)
@property # type: ignore
@requires_update
def emeter_today(self) -> float | None:
def consumption_today(self) -> float | None:
"""Return this month's energy consumption in kWh."""
return sum(v if (v := plug.emeter_today) else 0 for plug in self.children)
return sum(
v if (v := plug.modules[Module.Energy].consumption_today) else 0.0
for plug in self._device.children
)
@property # type: ignore
@requires_update
def emeter_realtime(self) -> EmeterStatus:
def consumption_total(self) -> float | None:
"""Return total energy consumption since reboot in kWh."""
return sum(
v if (v := plug.modules[Module.Energy].consumption_total) else 0.0
for plug in self._device.children
)
@property # type: ignore
def status(self) -> EmeterStatus:
"""Return current energy readings."""
emeter = merge_sums([plug.emeter_realtime for plug in self.children])
emeter = merge_sums(
[plug.modules[Module.Energy].status for plug in self._device.children]
)
# Voltage is averaged since each read will result
# in a slightly different voltage since they are not atomic
emeter["voltage_mv"] = int(emeter["voltage_mv"] / len(self.children))
emeter["voltage_mv"] = int(emeter["voltage_mv"] / len(self._device.children))
return EmeterStatus(emeter)
@property
def current(self) -> float | None:
"""Return the current in A."""
return self.status.current
@property
def voltage(self) -> float | None:
"""Get the current voltage in V."""
return self.status.voltage
class IotStripPlug(IotPlug):
"""Representation of a single socket in a power strip.
@@ -275,9 +342,10 @@ class IotStripPlug(IotPlug):
icon="mdi:clock",
)
)
# If the strip plug has it's own modules we should call initialize
# features for the modules here. However the _initialize_modules function
# above does not seem to be called.
for module in self._supported_modules.values():
module._initialize_features()
for module_feat in module._module_features.values():
self._add_feature(module_feat)
async def update(self, update_children: bool = True):
"""Query the device to update the data.
@@ -285,26 +353,8 @@ class IotStripPlug(IotPlug):
Needed for properties that are decorated with `requires_update`.
"""
await self._modular_update({})
def _create_emeter_request(self, year: int | None = None, month: int | None = None):
"""Create a request for requesting all emeter statistics at once."""
if year is None:
year = datetime.now().year
if month is None:
month = datetime.now().month
req: dict[str, Any] = {}
merge(req, self._create_request("emeter", "get_realtime"))
merge(req, self._create_request("emeter", "get_monthstat", {"year": year}))
merge(
req,
self._create_request(
"emeter", "get_daystat", {"month": month, "year": year}
),
)
return req
if not self._features:
await self._initialize_features()
def _create_request(
self, target: str, cmd: str, arg: dict | None = None, child_ids=None