mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-11-06 07:31:55 +00:00
Improve usage module, consolidate API with emeter (#249)
* Consolidate API for both emeter&usage modules * Add new cli command 'usage' to query usage
This commit is contained in:
@@ -1,4 +1,7 @@
|
||||
"""Implementation of the emeter module."""
|
||||
from datetime import datetime
|
||||
from typing import Dict, Optional
|
||||
|
||||
from ..emeterstatus import EmeterStatus
|
||||
from .usage import Usage
|
||||
|
||||
@@ -6,15 +9,61 @@ from .usage import Usage
|
||||
class Emeter(Usage):
|
||||
"""Emeter module."""
|
||||
|
||||
def query(self):
|
||||
"""Prepare query for emeter data."""
|
||||
return self._device._create_emeter_request()
|
||||
|
||||
@property # type: ignore
|
||||
def realtime(self) -> EmeterStatus:
|
||||
"""Return current energy readings."""
|
||||
return EmeterStatus(self.data["get_realtime"])
|
||||
|
||||
@property
|
||||
def emeter_today(self) -> Optional[float]:
|
||||
"""Return today's energy consumption in kWh."""
|
||||
raw_data = self.daily_data
|
||||
today = datetime.now().day
|
||||
data = self._emeter_convert_emeter_data(raw_data)
|
||||
|
||||
return data.get(today)
|
||||
|
||||
@property
|
||||
def emeter_this_month(self) -> Optional[float]:
|
||||
"""Return this month's energy consumption in kWh."""
|
||||
raw_data = self.monthly_data
|
||||
current_month = datetime.now().month
|
||||
data = self._emeter_convert_emeter_data(raw_data)
|
||||
|
||||
return data.get(current_month)
|
||||
|
||||
async def erase_stats(self):
|
||||
"""Erase all stats."""
|
||||
"""Erase all stats.
|
||||
|
||||
Uses different query than usage meter.
|
||||
"""
|
||||
return await self.call("erase_emeter_stat")
|
||||
|
||||
async def get_daystat(self, *, year, month, kwh=True):
|
||||
"""Return daily stats for the given year & month."""
|
||||
raw_data = await super().get_daystat(year=year, month=month)
|
||||
return self._emeter_convert_emeter_data(raw_data["day_list"], kwh)
|
||||
|
||||
async def get_monthstat(self, *, year, kwh=True):
|
||||
"""Return monthly stats for the given year."""
|
||||
raw_data = await super().get_monthstat(year=year)
|
||||
return self._emeter_convert_emeter_data(raw_data["month_list"], kwh)
|
||||
|
||||
def _emeter_convert_emeter_data(self, data, kwh=True) -> Dict:
|
||||
"""Return emeter information keyed with the day/month.."""
|
||||
response = [EmeterStatus(**x) for x in data]
|
||||
|
||||
if not response:
|
||||
return {}
|
||||
|
||||
energy_key = "energy_wh"
|
||||
if kwh:
|
||||
energy_key = "energy"
|
||||
|
||||
entry_key = "month"
|
||||
if "day" in response[0]:
|
||||
entry_key = "day"
|
||||
|
||||
data = {entry[entry_key]: entry[energy_key] for entry in response}
|
||||
|
||||
return data
|
||||
|
||||
Reference in New Issue
Block a user