mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-08-06 10:44:04 +00:00
Add voltage and current monitoring to smart Devices (#1281)
This commit is contained in:
@@ -15,6 +15,12 @@ class Energy(SmartModule, EnergyInterface):
|
||||
|
||||
REQUIRED_COMPONENT = "energy_monitoring"
|
||||
|
||||
async def _post_update_hook(self) -> None:
|
||||
if "voltage_mv" in self.data.get("get_emeter_data", {}):
|
||||
self._supported = (
|
||||
self._supported | EnergyInterface.ModuleFeature.VOLTAGE_CURRENT
|
||||
)
|
||||
|
||||
def query(self) -> dict:
|
||||
"""Query to execute during the update cycle."""
|
||||
req = {
|
||||
@@ -22,13 +28,17 @@ class Energy(SmartModule, EnergyInterface):
|
||||
}
|
||||
if self.supported_version > 1:
|
||||
req["get_current_power"] = None
|
||||
req["get_emeter_data"] = None
|
||||
req["get_emeter_vgain_igain"] = None
|
||||
return req
|
||||
|
||||
@property
|
||||
@raise_if_update_error
|
||||
def current_consumption(self) -> float | None:
|
||||
"""Current power in watts."""
|
||||
if (power := self.energy.get("current_power")) is not None:
|
||||
if (power := self.energy.get("current_power")) is not None or (
|
||||
power := self.data.get("get_emeter_data", {}).get("power_mw")
|
||||
) is not None:
|
||||
return power / 1_000
|
||||
# Fallback if get_energy_usage does not provide current_power,
|
||||
# which can happen on some newer devices (e.g. P304M).
|
||||
@@ -58,7 +68,10 @@ class Energy(SmartModule, EnergyInterface):
|
||||
@raise_if_update_error
|
||||
def status(self) -> EmeterStatus:
|
||||
"""Get the emeter status."""
|
||||
return self._get_status_from_energy(self.energy)
|
||||
if "get_emeter_data" in self.data:
|
||||
return EmeterStatus(self.data["get_emeter_data"])
|
||||
else:
|
||||
return self._get_status_from_energy(self.energy)
|
||||
|
||||
async def get_status(self) -> EmeterStatus:
|
||||
"""Return real-time statistics."""
|
||||
@@ -87,13 +100,15 @@ class Energy(SmartModule, EnergyInterface):
|
||||
@raise_if_update_error
|
||||
def current(self) -> float | None:
|
||||
"""Return the current in A."""
|
||||
return None
|
||||
ma = self.data.get("get_emeter_data", {}).get("current_ma")
|
||||
return ma / 1000 if ma else None
|
||||
|
||||
@property
|
||||
@raise_if_update_error
|
||||
def voltage(self) -> float | None:
|
||||
"""Get the current voltage in V."""
|
||||
return None
|
||||
mv = self.data.get("get_emeter_data", {}).get("voltage_mv")
|
||||
return mv / 1000 if mv else None
|
||||
|
||||
async def _deprecated_get_realtime(self) -> EmeterStatus:
|
||||
"""Retrieve current energy readings."""
|
||||
|
Reference in New Issue
Block a user