Expose energy command to cli (#1307)

Co-authored-by: Steven B <51370195+sdb9696@users.noreply.github.com>
This commit is contained in:
Teemu R. 2024-11-26 10:42:55 +01:00 committed by GitHub
parent 3dfada7575
commit 69e08c2385
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 44 additions and 42 deletions

View File

@ -75,6 +75,7 @@ def _legacy_type_to_class(_type: str) -> Any:
"time": None, "time": None,
"schedule": None, "schedule": None,
"usage": None, "usage": None,
"energy": "usage",
# device commands runnnable at top level # device commands runnnable at top level
"state": "device", "state": "device",
"on": "device", "on": "device",

View File

@ -2,7 +2,6 @@
from __future__ import annotations from __future__ import annotations
import logging
from typing import cast from typing import cast
import asyncclick as click import asyncclick as click
@ -21,21 +20,6 @@ from .common import (
) )
@click.command()
@click.option("--index", type=int, required=False)
@click.option("--name", type=str, required=False)
@click.option("--year", type=click.DateTime(["%Y"]), default=None, required=False)
@click.option("--month", type=click.DateTime(["%Y-%m"]), default=None, required=False)
@click.option("--erase", is_flag=True)
@click.pass_context
async def emeter(ctx: click.Context, index, name, year, month, erase):
"""Query emeter for historical consumption."""
logging.warning("Deprecated, use 'kasa energy'")
return await ctx.invoke(
energy, child_index=index, child=name, year=year, month=month, erase=erase
)
@click.command() @click.command()
@click.option("--year", type=click.DateTime(["%Y"]), default=None, required=False) @click.option("--year", type=click.DateTime(["%Y"]), default=None, required=False)
@click.option("--month", type=click.DateTime(["%Y-%m"]), default=None, required=False) @click.option("--month", type=click.DateTime(["%Y-%m"]), default=None, required=False)
@ -46,7 +30,7 @@ async def energy(dev: Device, year, month, erase):
Daily and monthly data provided in CSV format. Daily and monthly data provided in CSV format.
""" """
echo("[bold]== Emeter ==[/bold]") echo("[bold]== Energy ==[/bold]")
if not (energy := dev.modules.get(Module.Energy)): if not (energy := dev.modules.get(Module.Energy)):
error("Device has no energy module.") error("Device has no energy module.")
return return
@ -71,7 +55,7 @@ async def energy(dev: Device, year, month, erase):
usage_data = await energy.get_daily_stats(year=month.year, month=month.month) usage_data = await energy.get_daily_stats(year=month.year, month=month.month)
else: else:
# Call with no argument outputs summary data and returns # Call with no argument outputs summary data and returns
emeter_status = await energy.get_status() emeter_status = energy.status
echo("Current: {} A".format(emeter_status["current"])) echo("Current: {} A".format(emeter_status["current"]))
echo("Voltage: {} V".format(emeter_status["voltage"])) echo("Voltage: {} V".format(emeter_status["voltage"]))

View File

@ -75,8 +75,12 @@ class Energy(SmartModule, EnergyInterface):
async def get_status(self) -> EmeterStatus: async def get_status(self) -> EmeterStatus:
"""Return real-time statistics.""" """Return real-time statistics."""
res = await self.call("get_energy_usage") if "get_emeter_data" in self.data:
return self._get_status_from_energy(res["get_energy_usage"]) res = await self.call("get_emeter_data")
return EmeterStatus(res["get_emeter_data"])
else:
res = await self.call("get_energy_usage")
return self._get_status_from_energy(res["get_energy_usage"])
@property @property
@raise_if_update_error @raise_if_update_error

View File

@ -2,7 +2,7 @@ import json
import os import os
import re import re
from datetime import datetime from datetime import datetime
from unittest.mock import ANY from unittest.mock import ANY, PropertyMock, patch
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
import asyncclick as click import asyncclick as click
@ -40,7 +40,7 @@ from kasa.cli.light import (
) )
from kasa.cli.main import TYPES, _legacy_type_to_class, cli, cmd_command, raw_command from kasa.cli.main import TYPES, _legacy_type_to_class, cli, cmd_command, raw_command
from kasa.cli.time import time from kasa.cli.time import time
from kasa.cli.usage import emeter, energy from kasa.cli.usage import energy
from kasa.cli.wifi import wifi from kasa.cli.wifi import wifi
from kasa.discover import Discover, DiscoveryResult from kasa.discover import Discover, DiscoveryResult
from kasa.iot import IotDevice from kasa.iot import IotDevice
@ -432,38 +432,45 @@ async def test_time_set(dev: Device, mocker, runner):
async def test_emeter(dev: Device, mocker, runner): async def test_emeter(dev: Device, mocker, runner):
res = await runner.invoke(emeter, obj=dev) mocker.patch("kasa.Discover.discover_single", return_value=dev)
base_cmd = ["--host", "dummy", "energy"]
res = await runner.invoke(cli, base_cmd, obj=dev)
if not (energy := dev.modules.get(Module.Energy)): if not (energy := dev.modules.get(Module.Energy)):
assert "Device has no energy module." in res.output assert "Device has no energy module." in res.output
return return
assert "== Emeter ==" in res.output assert "== Energy ==" in res.output
if dev.device_type is not DeviceType.Strip: if dev.device_type is not DeviceType.Strip:
res = await runner.invoke(emeter, ["--index", "0"], obj=dev) res = await runner.invoke(cli, [*base_cmd, "--index", "0"], obj=dev)
assert f"Device: {dev.host} does not have children" in res.output assert f"Device: {dev.host} does not have children" in res.output
res = await runner.invoke(emeter, ["--name", "mock"], obj=dev) res = await runner.invoke(cli, [*base_cmd, "--name", "mock"], obj=dev)
assert f"Device: {dev.host} does not have children" in res.output assert f"Device: {dev.host} does not have children" in res.output
if dev.device_type is DeviceType.Strip and len(dev.children) > 0: if dev.device_type is DeviceType.Strip and len(dev.children) > 0:
child_energy = dev.children[0].modules.get(Module.Energy) child_energy = dev.children[0].modules.get(Module.Energy)
assert child_energy assert child_energy
realtime_emeter = mocker.patch.object(child_energy, "get_status")
realtime_emeter.return_value = EmeterStatus({"voltage_mv": 122066})
res = await runner.invoke(emeter, ["--index", "0"], obj=dev) with patch.object(
assert "Voltage: 122.066 V" in res.output type(child_energy), "status", new_callable=PropertyMock
realtime_emeter.assert_called() ) as child_status:
assert realtime_emeter.call_count == 1 child_status.return_value = EmeterStatus({"voltage_mv": 122066})
res = await runner.invoke(emeter, ["--name", dev.children[0].alias], obj=dev) res = await runner.invoke(cli, [*base_cmd, "--index", "0"], obj=dev)
assert "Voltage: 122.066 V" in res.output assert "Voltage: 122.066 V" in res.output
assert realtime_emeter.call_count == 2 child_status.assert_called()
assert child_status.call_count == 1
res = await runner.invoke(
cli, [*base_cmd, "--name", dev.children[0].alias], obj=dev
)
assert "Voltage: 122.066 V" in res.output
assert child_status.call_count == 2
if isinstance(dev, IotDevice): if isinstance(dev, IotDevice):
monthly = mocker.patch.object(energy, "get_monthly_stats") monthly = mocker.patch.object(energy, "get_monthly_stats")
monthly.return_value = {1: 1234} monthly.return_value = {1: 1234}
res = await runner.invoke(emeter, ["--year", "1900"], obj=dev) res = await runner.invoke(cli, [*base_cmd, "--year", "1900"], obj=dev)
if not isinstance(dev, IotDevice): if not isinstance(dev, IotDevice):
assert "Device does not support historical statistics" in res.output assert "Device does not support historical statistics" in res.output
return return
@ -474,7 +481,7 @@ async def test_emeter(dev: Device, mocker, runner):
if isinstance(dev, IotDevice): if isinstance(dev, IotDevice):
daily = mocker.patch.object(energy, "get_daily_stats") daily = mocker.patch.object(energy, "get_daily_stats")
daily.return_value = {1: 1234} daily.return_value = {1: 1234}
res = await runner.invoke(emeter, ["--month", "1900-12"], obj=dev) res = await runner.invoke(cli, [*base_cmd, "--month", "1900-12"], obj=dev)
if not isinstance(dev, IotDevice): if not isinstance(dev, IotDevice):
assert "Device has no historical statistics" in res.output assert "Device has no historical statistics" in res.output
return return

View File

@ -23,14 +23,16 @@ from .conftest import has_emeter, has_emeter_iot, no_emeter
CURRENT_CONSUMPTION_SCHEMA = Schema( CURRENT_CONSUMPTION_SCHEMA = Schema(
Any( Any(
{ {
"voltage": Any(All(float, Range(min=0, max=300)), None),
"power": Any(Coerce(float), None),
"total": Any(Coerce(float), None),
"current": Any(All(float), None),
"voltage_mv": Any(All(float, Range(min=0, max=300000)), int, None), "voltage_mv": Any(All(float, Range(min=0, max=300000)), int, None),
"power_mw": Any(Coerce(float), None), "power_mw": Any(Coerce(float), None),
"total_wh": Any(Coerce(float), None),
"current_ma": Any(All(float), int, None), "current_ma": Any(All(float), int, None),
"energy_wh": Any(Coerce(float), None),
"total_wh": Any(Coerce(float), None),
"voltage": Any(All(float, Range(min=0, max=300)), None),
"power": Any(Coerce(float), None),
"current": Any(All(float), None),
"total": Any(Coerce(float), None),
"energy": Any(Coerce(float), None),
"slot_id": Any(Coerce(int), None), "slot_id": Any(Coerce(int), None),
}, },
None, None,
@ -65,6 +67,10 @@ async def test_get_emeter_realtime(dev):
emeter = dev.modules[Module.Energy] emeter = dev.modules[Module.Energy]
current_emeter = await emeter.get_status() current_emeter = await emeter.get_status()
# Check realtime query gets the same value as status property
# iot _query_helper strips out the error code from module responses.
# but it's not stripped out of the _modular_update queries.
assert current_emeter == {k: v for k, v in emeter.status.items() if k != "err_code"}
CURRENT_CONSUMPTION_SCHEMA(current_emeter) CURRENT_CONSUMPTION_SCHEMA(current_emeter)