Add emeter support for strip sockets (#203)

* Add support for plugs with emeters.

* Tweaks for emeter

* black

* tweaks

* tweaks

* more tweaks

* dry

* flake8

* flake8

* legacy typing

* Update kasa/smartstrip.py

Co-authored-by: Teemu R. <tpr@iki.fi>

* reduce

* remove useless delegation

* tweaks

* tweaks

* dry

* tweak

* tweak

* tweak

* tweak

* update tests

* wrap

* preen

* prune

* prune

* prune

* guard

* adjust

* robust

* prune

* prune

* reduce dict lookups by 1

* Update kasa/smartstrip.py

Co-authored-by: Teemu R. <tpr@iki.fi>

* delete utils

* isort

Co-authored-by: Brendan Burns <brendan.d.burns@gmail.com>
Co-authored-by: Teemu R. <tpr@iki.fi>
This commit is contained in:
J. Nick Koston
2021-09-23 17:24:44 -05:00
committed by GitHub
parent d7202883e9
commit 94e5a90ac4
4 changed files with 112 additions and 101 deletions

View File

@@ -11,13 +11,14 @@ Stroetmann which is licensed under the Apache License, Version 2.0.
You may obtain a copy of the license at
http://www.apache.org/licenses/LICENSE-2.0
"""
import collections.abc
import functools
import inspect
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum, auto
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Set
from .emeterstatus import EmeterStatus
from .exceptions import SmartDeviceException
@@ -51,6 +52,16 @@ class WifiNetwork:
rssi: Optional[int] = None
def merge(d, u):
"""Update dict recursively."""
for k, v in u.items():
if isinstance(v, collections.abc.Mapping):
d[k] = merge(d.get(k, {}), v)
else:
d[k] = v
return d
def requires_update(f):
"""Indicate that `update` should be called before accessing this method.""" # noqa: D202
if inspect.iscoroutinefunction(f):
@@ -204,6 +215,11 @@ class SmartDevice:
return request
def _verify_emeter(self) -> None:
"""Raise an exception if there is no emeter."""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
async def _query_helper(
self, target: str, cmd: str, arg: Optional[Dict] = None, child_ids=None
) -> Any:
@@ -240,13 +256,17 @@ class SmartDevice:
return result
@property # type: ignore
@requires_update
def features(self) -> Set[str]:
"""Return a set of features that the device supports."""
return set(self.sys_info["feature"].split(":"))
@property # type: ignore
@requires_update
def has_emeter(self) -> bool:
"""Return True if device has an energy meter."""
sys_info = self.sys_info
features = sys_info["feature"].split(":")
return "ENE" in features
return "ENE" in self.features
async def get_sys_info(self) -> Dict[str, Any]:
"""Retrieve system information."""
@@ -374,10 +394,8 @@ class SmartDevice:
@requires_update
def rssi(self) -> Optional[int]:
"""Return WiFi signal strenth (rssi)."""
sys_info = self.sys_info
if "rssi" in sys_info:
return int(sys_info["rssi"])
return None
rssi = self.sys_info.get("rssi")
return None if rssi is None else int(rssi)
@property # type: ignore
@requires_update
@@ -410,16 +428,12 @@ class SmartDevice:
@requires_update
def emeter_realtime(self) -> EmeterStatus:
"""Return current energy readings."""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
self._verify_emeter()
return EmeterStatus(self._last_update[self.emeter_type]["get_realtime"])
async def get_emeter_realtime(self) -> EmeterStatus:
"""Retrieve current energy readings."""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
self._verify_emeter()
return EmeterStatus(await self._query_helper(self.emeter_type, "get_realtime"))
def _create_emeter_request(self, year: int = None, month: int = None):
@@ -429,23 +443,12 @@ class SmartDevice:
if month is None:
month = datetime.now().month
import collections.abc
def update(d, u):
"""Update dict recursively."""
for k, v in u.items():
if isinstance(v, collections.abc.Mapping):
d[k] = update(d.get(k, {}), v)
else:
d[k] = v
return d
req: Dict[str, Any] = {}
update(req, self._create_request(self.emeter_type, "get_realtime"))
update(
merge(req, self._create_request(self.emeter_type, "get_realtime"))
merge(
req, self._create_request(self.emeter_type, "get_monthstat", {"year": year})
)
update(
merge(
req,
self._create_request(
self.emeter_type, "get_daystat", {"month": month, "year": year}
@@ -458,9 +461,7 @@ class SmartDevice:
@requires_update
def emeter_today(self) -> Optional[float]:
"""Return today's energy consumption in kWh."""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
self._verify_emeter()
raw_data = self._last_update[self.emeter_type]["get_daystat"]["day_list"]
data = self._emeter_convert_emeter_data(raw_data)
today = datetime.now().day
@@ -474,9 +475,7 @@ class SmartDevice:
@requires_update
def emeter_this_month(self) -> Optional[float]:
"""Return this month's energy consumption in kWh."""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
self._verify_emeter()
raw_data = self._last_update[self.emeter_type]["get_monthstat"]["month_list"]
data = self._emeter_convert_emeter_data(raw_data)
current_month = datetime.now().month
@@ -516,9 +515,7 @@ class SmartDevice:
:param kwh: return usage in kWh (default: True)
:return: mapping of day of month to value
"""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
self._verify_emeter()
if year is None:
year = datetime.now().year
if month is None:
@@ -538,9 +535,7 @@ class SmartDevice:
:param kwh: return usage in kWh (default: True)
:return: dict: mapping of month to value
"""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
self._verify_emeter()
if year is None:
year = datetime.now().year
@@ -553,17 +548,13 @@ class SmartDevice:
@requires_update
async def erase_emeter_stats(self) -> Dict:
"""Erase energy meter statistics."""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
self._verify_emeter()
return await self._query_helper(self.emeter_type, "erase_emeter_stat", None)
@requires_update
async def current_consumption(self) -> float:
"""Get the current power consumption in Watt."""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
self._verify_emeter()
response = EmeterStatus(await self.get_emeter_realtime())
return float(response["power"])