Avoid linear search for emeter realtime and emeter_today (#622)

* Avoid linear search for emeter realtime and emeter_today

Most of the time the data we want is at the end of the
list so we now search backwards to avoid having to
scale all the data and throw most of it away

* more tweaks

* coverage

* coverage

* preen

* coverage

* branch cover
This commit is contained in:
J. Nick Koston 2024-01-04 15:01:00 -10:00 committed by GitHub
parent efd67b9261
commit 554fe0a96e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 133 additions and 20 deletions

View File

@ -1,6 +1,6 @@
"""Implementation of the emeter module."""
from datetime import datetime
from typing import Dict, Optional
from typing import Dict, List, Optional, Union
from ..emeterstatus import EmeterStatus
from .usage import Usage
@ -19,8 +19,7 @@ class Emeter(Usage):
"""Return today's energy consumption in kWh."""
raw_data = self.daily_data
today = datetime.now().day
data = self._convert_stat_data(raw_data, entry_key="day")
data = self._convert_stat_data(raw_data, entry_key="day", key=today)
return data.get(today)
@property
@ -28,8 +27,7 @@ class Emeter(Usage):
"""Return this month's energy consumption in kWh."""
raw_data = self.monthly_data
current_month = datetime.now().month
data = self._convert_stat_data(raw_data, entry_key="month")
data = self._convert_stat_data(raw_data, entry_key="month", key=current_month)
return data.get(current_month)
async def erase_stats(self):
@ -61,7 +59,13 @@ class Emeter(Usage):
data = self._convert_stat_data(data["month_list"], entry_key="month", kwh=kwh)
return data
def _convert_stat_data(self, data, entry_key, kwh=True) -> Dict:
def _convert_stat_data(
self,
data: List[Dict[str, Union[int, float]]],
entry_key: str,
kwh: bool=True,
key: Optional[int] = None,
) -> Dict[Union[int, float], Union[int, float]]:
"""Return emeter information keyed with the day/month.
The incoming data is a list of dictionaries::
@ -89,6 +93,19 @@ class Emeter(Usage):
if not kwh:
scale = 1000
data = {entry[entry_key]: entry[value_key] * scale for entry in data}
if key is None:
# Return all the data
return {entry[entry_key]: entry[value_key] * scale for entry in data}
return data
# In this case we want a specific key in the data
# i.e. the current day or month.
#
# Since we usually want the data at the end of the list so we can
# optimize the search by starting at the end and avoid scaling
# the data we don't need.
#
for entry in reversed(data):
if entry[entry_key] == key:
return {entry[entry_key]: entry[value_key] * scale}
return {}

View File

@ -10,8 +10,9 @@ class Usage(Module):
def query(self):
"""Return the base query."""
year = datetime.now().year
month = datetime.now().month
now = datetime.now()
year = now.year
month = now.month
req = self.query_for_command("get_realtime")
req = merge(
@ -40,21 +41,21 @@ class Usage(Module):
def usage_today(self):
"""Return today's usage in minutes."""
today = datetime.now().day
converted = [x["time"] for x in self.daily_data if x["day"] == today]
if not converted:
return None
return converted.pop()
# Traverse the list in reverse order to find the latest entry.
for entry in reversed(self.daily_data):
if entry["day"] == today:
return entry["time"]
return None
@property
def usage_this_month(self):
"""Return usage in this month in minutes."""
this_month = datetime.now().month
converted = [x["time"] for x in self.monthly_data if x["month"] == this_month]
if not converted:
return None
return converted.pop()
# Traverse the list in reverse order to find the latest entry.
for entry in reversed(self.monthly_data):
if entry["month"] == this_month:
return entry["time"]
return None
async def get_raw_daystat(self, *, year=None, month=None) -> Dict:
"""Return raw daily stats for the given year & month."""

View File

@ -1,6 +1,10 @@
import datetime
from unittest.mock import Mock
import pytest
from kasa import EmeterStatus, SmartDeviceException
from kasa.modules.emeter import Emeter
from .conftest import has_emeter, has_emeter_iot, no_emeter
from .newfakes import CURRENT_CONSUMPTION_SCHEMA
@ -116,3 +120,29 @@ async def test_emeterstatus_missing_current():
missing_current = EmeterStatus({"err_code": 0, "power_mw": 0, "total_wh": 13})
assert missing_current["current"] is None
async def test_emeter_daily():
"""Test fetching the emeter for today.
This test uses inline data since the fixtures
will not have data for the current day.
"""
emeter_data = {
"get_daystat": {
"day_list": [{"day": 1, "energy_wh": 8, "month": 1, "year": 2023}],
"err_code": 0,
}
}
class MockEmeter(Emeter):
@property
def data(self):
return emeter_data
emeter = MockEmeter(Mock(), "emeter")
now = datetime.datetime.now()
emeter_data["get_daystat"]["day_list"].append(
{"day": now.day, "energy_wh": 500, "month": now.month, "year": now.year}
)
assert emeter.emeter_today == 0.500

View File

@ -1,3 +1,6 @@
import datetime
from unittest.mock import Mock
import pytest
from kasa.modules import Usage
@ -20,3 +23,65 @@ def test_usage_convert_stat_data():
assert isinstance(k, int)
assert isinstance(v, int)
assert k == 4 and v == 30
def test_usage_today():
"""Test fetching the usage for today.
This test uses inline data since the fixtures
will not have data for the current day.
"""
emeter_data = {
"get_daystat": {
"day_list": [],
"err_code": 0,
}
}
class MockUsage(Usage):
@property
def data(self):
return emeter_data
usage = MockUsage(Mock(), "usage")
assert usage.usage_today is None
now = datetime.datetime.now()
emeter_data["get_daystat"]["day_list"].extend(
[
{"day": now.day - 1, "time": 200, "month": now.month - 1, "year": now.year},
{"day": now.day, "time": 500, "month": now.month, "year": now.year},
{"day": now.day + 1, "time": 100, "month": now.month + 1, "year": now.year},
]
)
assert usage.usage_today == 500
def test_usage_this_month():
"""Test fetching the usage for this month.
This test uses inline data since the fixtures
will not have data for the current month.
"""
emeter_data = {
"get_monthstat": {
"month_list": [],
"err_code": 0,
}
}
class MockUsage(Usage):
@property
def data(self):
return emeter_data
usage = MockUsage(Mock(), "usage")
assert usage.usage_this_month is None
now = datetime.datetime.now()
emeter_data["get_monthstat"]["month_list"].extend(
[
{"time": 200, "month": now.month - 1, "year": now.year},
{"time": 500, "month": now.month, "year": now.year},
{"time": 100, "month": now.month + 1, "year": now.year},
]
)
assert usage.usage_this_month == 500