Create common Time module and add time set cli command (#1157)

This commit is contained in:
Steven B. 2024-10-15 08:59:25 +01:00 committed by GitHub
parent 885a04d24f
commit 7fd8c14c1f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 350 additions and 69 deletions

View File

@ -201,6 +201,8 @@ def CatchAllExceptions(cls):
# Handle exit request from click.
if isinstance(exc, click.exceptions.Exit):
sys.exit(exc.exit_code)
if isinstance(exc, click.exceptions.Abort):
sys.exit(0)
echo(f"Raised error: {exc}")
if debug:

View File

@ -5,15 +5,18 @@ from __future__ import annotations
from datetime import datetime
import asyncclick as click
import zoneinfo
from kasa import (
Device,
Module,
)
from kasa.smart import SmartDevice
from kasa.iot import IotDevice
from kasa.iot.iottimezone import get_matching_timezones
from .common import (
echo,
error,
pass_dev,
)
@ -31,25 +34,127 @@ async def time(ctx: click.Context):
async def time_get(dev: Device):
"""Get the device time."""
res = dev.time
echo(f"Current time: {res}")
echo(f"Current time: {dev.time} ({dev.timezone})")
return res
@time.command(name="sync")
@click.option(
"--timezone",
type=str,
required=False,
default=None,
help="IANA timezone name, will use current device timezone if not provided.",
)
@click.option(
"--skip-confirm",
type=str,
required=False,
default=False,
is_flag=True,
help="Do not ask to confirm the timezone if an exact match is not found.",
)
@pass_dev
async def time_sync(dev: Device):
async def time_sync(dev: Device, timezone: str | None, skip_confirm: bool):
"""Set the device time to current time."""
if not isinstance(dev, SmartDevice):
raise NotImplementedError("setting time currently only implemented on smart")
if (time := dev.modules.get(Module.Time)) is None:
echo("Device does not have time module")
return
echo("Old time: %s" % time.time)
now = datetime.now()
local_tz = datetime.now().astimezone().tzinfo
await time.set_time(datetime.now(tz=local_tz))
tzinfo: zoneinfo.ZoneInfo | None = None
if timezone:
tzinfo = await _get_timezone(dev, timezone, skip_confirm)
if tzinfo.utcoffset(now) != now.astimezone().utcoffset():
error(
f"{timezone} has a different utc offset to local time,"
+ "syncing will produce unexpected results."
)
now = now.replace(tzinfo=tzinfo)
echo(f"Old time: {time.time} ({time.timezone})")
await time.set_time(now)
await dev.update()
echo("New time: %s" % time.time)
echo(f"New time: {time.time} ({time.timezone})")
@time.command(name="set")
@click.argument("year", type=int)
@click.argument("month", type=int)
@click.argument("day", type=int)
@click.argument("hour", type=int)
@click.argument("minute", type=int)
@click.argument("seconds", type=int, required=False, default=0)
@click.option(
"--timezone",
type=str,
required=False,
default=None,
help="IANA timezone name, will use current device timezone if not provided.",
)
@click.option(
"--skip-confirm",
type=bool,
required=False,
default=False,
is_flag=True,
help="Do not ask to confirm the timezone if an exact match is not found.",
)
@pass_dev
async def time_set(
dev: Device,
year: int,
month: int,
day: int,
hour: int,
minute: int,
seconds: int,
timezone: str | None,
skip_confirm: bool,
):
"""Set the device time to the provided time."""
if (time := dev.modules.get(Module.Time)) is None:
echo("Device does not have time module")
return
tzinfo: zoneinfo.ZoneInfo | None = None
if timezone:
tzinfo = await _get_timezone(dev, timezone, skip_confirm)
echo(f"Old time: {time.time} ({time.timezone})")
await time.set_time(datetime(year, month, day, hour, minute, seconds, 0, tzinfo))
await dev.update()
echo(f"New time: {time.time} ({time.timezone})")
async def _get_timezone(dev, timezone, skip_confirm) -> zoneinfo.ZoneInfo:
"""Get the tzinfo from the timezone or return none."""
tzinfo: zoneinfo.ZoneInfo | None = None
if timezone not in zoneinfo.available_timezones():
error(f"{timezone} is not a valid IANA timezone.")
tzinfo = zoneinfo.ZoneInfo(timezone)
if skip_confirm is False and isinstance(dev, IotDevice):
matches = await get_matching_timezones(tzinfo)
if not matches:
error(f"Device cannot support {timezone} timezone.")
first = matches[0]
msg = (
f"An exact match for {timezone} could not be found, "
+ f"timezone will be set to {first}"
)
if len(matches) == 1:
click.confirm(msg, abort=True)
else:
msg = (
f"Supported timezones matching {timezone} are {', '.join(matches)}\n"
+ msg
)
click.confirm(msg, abort=True)
return tzinfo

View File

@ -51,7 +51,7 @@ Energy
schedule
usage
anti_theft
time
Time
cloud
Led

View File

@ -6,6 +6,7 @@ from .led import Led
from .light import Light, LightState
from .lighteffect import LightEffect
from .lightpreset import LightPreset
from .time import Time
__all__ = [
"Fan",
@ -15,4 +16,5 @@ __all__ = [
"LightEffect",
"LightState",
"LightPreset",
"Time",
]

26
kasa/interfaces/time.py Normal file
View File

@ -0,0 +1,26 @@
"""Module for time interface."""
from __future__ import annotations
from abc import ABC, abstractmethod
from datetime import datetime, tzinfo
from ..module import Module
class Time(Module, ABC):
"""Base class for tplink time module."""
@property
@abstractmethod
def time(self) -> datetime:
"""Return timezone aware current device time."""
@property
@abstractmethod
def timezone(self) -> tzinfo:
"""Return current timezone."""
@abstractmethod
async def set_time(self, dt: datetime) -> dict:
"""Set the device time."""

View File

@ -219,7 +219,7 @@ class IotBulb(IotDevice):
self.add_module(
Module.IotAntitheft, Antitheft(self, "smartlife.iot.common.anti_theft")
)
self.add_module(Module.IotTime, Time(self, "smartlife.iot.common.timesetting"))
self.add_module(Module.Time, Time(self, "smartlife.iot.common.timesetting"))
self.add_module(Module.Energy, Emeter(self, self.emeter_type))
self.add_module(Module.IotCountdown, Countdown(self, "countdown"))
self.add_module(Module.IotCloud, Cloud(self, "smartlife.iot.common.cloud"))

View File

@ -20,6 +20,7 @@ import logging
from collections.abc import Mapping, Sequence
from datetime import datetime, timedelta, tzinfo
from typing import TYPE_CHECKING, Any, cast
from warnings import warn
from ..device import Device, WifiNetwork
from ..deviceconfig import DeviceConfig
@ -460,27 +461,27 @@ class IotDevice(Device):
@requires_update
def time(self) -> datetime:
"""Return current time from the device."""
return self.modules[Module.IotTime].time
return self.modules[Module.Time].time
@property
@requires_update
def timezone(self) -> tzinfo:
"""Return the current timezone."""
return self.modules[Module.IotTime].timezone
return self.modules[Module.Time].timezone
async def get_time(self) -> datetime | None:
async def get_time(self) -> datetime:
"""Return current time from the device, if available."""
_LOGGER.warning(
"Use `time` property instead, this call will be removed in the future."
)
return await self.modules[Module.IotTime].get_time()
msg = "Use `time` property instead, this call will be removed in the future."
warn(msg, DeprecationWarning, stacklevel=1)
return self.time
async def get_timezone(self) -> dict:
async def get_timezone(self) -> tzinfo:
"""Return timezone information."""
_LOGGER.warning(
msg = (
"Use `timezone` property instead, this call will be removed in the future."
)
return await self.modules[Module.IotTime].get_timezone()
warn(msg, DeprecationWarning, stacklevel=1)
return self.timezone
@property # type: ignore
@requires_update

View File

@ -60,7 +60,7 @@ class IotPlug(IotDevice):
self.add_module(Module.IotSchedule, Schedule(self, "schedule"))
self.add_module(Module.IotUsage, Usage(self, "schedule"))
self.add_module(Module.IotAntitheft, Antitheft(self, "anti_theft"))
self.add_module(Module.IotTime, Time(self, "time"))
self.add_module(Module.Time, Time(self, "time"))
self.add_module(Module.IotCloud, Cloud(self, "cnCloud"))
self.add_module(Module.Led, Led(self, "system"))

View File

@ -105,7 +105,7 @@ class IotStrip(IotDevice):
self.add_module(Module.IotAntitheft, Antitheft(self, "anti_theft"))
self.add_module(Module.IotSchedule, Schedule(self, "schedule"))
self.add_module(Module.IotUsage, Usage(self, "schedule"))
self.add_module(Module.IotTime, Time(self, "time"))
self.add_module(Module.Time, Time(self, "time"))
self.add_module(Module.IotCountdown, Countdown(self, "countdown"))
self.add_module(Module.Led, Led(self, "system"))
self.add_module(Module.IotCloud, Cloud(self, "cnCloud"))

View File

@ -3,7 +3,10 @@
from __future__ import annotations
import logging
from datetime import datetime, tzinfo
from datetime import datetime, timedelta, tzinfo
from typing import cast
from zoneinfo import ZoneInfo
from ..cachedzoneinfo import CachedZoneInfo
@ -22,26 +25,53 @@ async def get_timezone(index: int) -> tzinfo:
return await CachedZoneInfo.get_cached_zone_info(name)
async def get_timezone_index(name: str) -> int:
async def get_timezone_index(tzone: tzinfo) -> int:
"""Return the iot firmware index for a valid IANA timezone key."""
if isinstance(tzone, ZoneInfo):
name = tzone.key
rev = {val: key for key, val in TIMEZONE_INDEX.items()}
if name in rev:
return rev[name]
# Try to find a supported timezone matching dst true/false
zone = await CachedZoneInfo.get_cached_zone_info(name)
now = datetime.now()
winter = datetime(now.year, 1, 1, 12)
summer = datetime(now.year, 7, 1, 12)
for i in range(110):
configured_zone = await get_timezone(i)
if zone.utcoffset(winter) == configured_zone.utcoffset(
winter
) and zone.utcoffset(summer) == configured_zone.utcoffset(summer):
if _is_same_timezone(tzone, await get_timezone(i)):
return i
raise ValueError("Device does not support timezone %s", name)
async def get_matching_timezones(tzone: tzinfo) -> list[str]:
"""Return the iot firmware index for a valid IANA timezone key."""
matches = []
if isinstance(tzone, ZoneInfo):
name = tzone.key
vals = {val for val in TIMEZONE_INDEX.values()}
if name in vals:
matches.append(name)
for i in range(110):
fw_tz = await get_timezone(i)
if _is_same_timezone(tzone, fw_tz):
match_key = cast(ZoneInfo, fw_tz).key
if match_key not in matches:
matches.append(match_key)
return matches
def _is_same_timezone(tzone1: tzinfo, tzone2: tzinfo) -> bool:
"""Return true if the timezones have the same utcffset and dst offset.
Iot devices only support a limited static list of IANA timezones; this is used to
check if a static timezone matches the same utc offset and dst settings.
"""
now = datetime.now()
start_day = datetime(now.year, 1, 1, 12)
for i in range(365):
the_day = start_day + timedelta(days=i)
if tzone1.utcoffset(the_day) != tzone2.utcoffset(the_day):
return False
return True
TIMEZONE_INDEX = {
0: "Etc/GMT+12",
1: "Pacific/Samoa",

View File

@ -5,11 +5,12 @@ from __future__ import annotations
from datetime import datetime, timezone, tzinfo
from ...exceptions import KasaException
from ...interfaces import Time as TimeInterface
from ..iotmodule import IotModule, merge
from ..iottimezone import get_timezone
from ..iottimezone import get_timezone, get_timezone_index
class Time(IotModule):
class Time(IotModule, TimeInterface):
"""Implements the timezone settings."""
_timezone: tzinfo = timezone.utc
@ -57,10 +58,36 @@ class Time(IotModule):
res["hour"],
res["min"],
res["sec"],
tzinfo=self.timezone,
)
except KasaException:
return None
async def set_time(self, dt: datetime) -> dict:
"""Set the device time."""
params = {
"year": dt.year,
"month": dt.month,
"mday": dt.day,
"hour": dt.hour,
"min": dt.minute,
"sec": dt.second,
}
if dt.tzinfo:
index = await get_timezone_index(dt.tzinfo)
current_index = self.data.get("get_timezone", {}).get("index", -1)
if current_index != -1 and current_index != index:
params["index"] = index
method = "set_timezone"
else:
method = "set_time"
else:
method = "set_time"
try:
return await self.call(method, params)
except Exception as ex:
raise KasaException(ex) from ex
async def get_timezone(self):
"""Request timezone information from the device."""
return await self.call("get_timezone")

View File

@ -77,6 +77,7 @@ class Module(ABC):
Led: Final[ModuleName[interfaces.Led]] = ModuleName("Led")
Light: Final[ModuleName[interfaces.Light]] = ModuleName("Light")
LightPreset: Final[ModuleName[interfaces.LightPreset]] = ModuleName("LightPreset")
Time: Final[ModuleName[interfaces.Time]] = ModuleName("Time")
# IOT only Modules
IotAmbientLight: Final[ModuleName[iot.AmbientLight]] = ModuleName("ambient")
@ -86,7 +87,6 @@ class Module(ABC):
IotSchedule: Final[ModuleName[iot.Schedule]] = ModuleName("schedule")
IotUsage: Final[ModuleName[iot.Usage]] = ModuleName("usage")
IotCloud: Final[ModuleName[iot.Cloud]] = ModuleName("cloud")
IotTime: Final[ModuleName[iot.Time]] = ModuleName("time")
# SMART only Modules
Alarm: Final[ModuleName[smart.Alarm]] = ModuleName("Alarm")
@ -123,7 +123,6 @@ class Module(ABC):
TemperatureControl: Final[ModuleName[smart.TemperatureControl]] = ModuleName(
"TemperatureControl"
)
Time: Final[ModuleName[smart.Time]] = ModuleName("Time")
WaterleakSensor: Final[ModuleName[smart.WaterleakSensor]] = ModuleName(
"WaterleakSensor"
)

View File

@ -3,17 +3,17 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone, tzinfo
from time import mktime
from typing import cast
from zoneinfo import ZoneInfoNotFoundError
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from ...cachedzoneinfo import CachedZoneInfo
from ...feature import Feature
from ...interfaces import Time as TimeInterface
from ..smartmodule import SmartModule
class Time(SmartModule):
class Time(SmartModule, TimeInterface):
"""Implementation of device_local_time."""
REQUIRED_COMPONENT = "time"
@ -63,16 +63,23 @@ class Time(SmartModule):
tz=self.timezone,
)
async def set_time(self, dt: datetime):
async def set_time(self, dt: datetime) -> dict:
"""Set device time."""
unixtime = mktime(dt.timetuple())
offset = cast(timedelta, dt.utcoffset())
diff = offset / timedelta(minutes=1)
return await self.call(
"set_device_time",
{
"timestamp": int(unixtime),
"time_diff": int(diff),
"region": dt.tzname(),
},
)
if not dt.tzinfo:
timestamp = dt.replace(tzinfo=self.timezone).timestamp()
utc_offset = cast(timedelta, self.timezone.utcoffset(dt))
else:
timestamp = dt.timestamp()
utc_offset = cast(timedelta, dt.utcoffset())
time_diff = utc_offset / timedelta(minutes=1)
params: dict[str, int | str] = {
"timestamp": int(timestamp),
"time_diff": int(time_diff),
}
if tz := dt.tzinfo:
region = tz.key if isinstance(tz, ZoneInfo) else dt.tzname()
# tzname can return null if a simple timezone object is provided.
if region:
params["region"] = region
return await self.call("set_device_time", params)

View File

@ -118,7 +118,6 @@ TIME_MODULE = {
"index": 12,
"tz_str": "test2",
},
"set_timezone": None,
}
CLOUD_MODULE = {
@ -353,6 +352,19 @@ class FakeIotTransport(BaseTransport):
else:
return light_state
def set_time(self, new_state: dict, *args):
"""Implement set_time."""
mods = [
v
for k, v in self.proto.items()
if k in {"time", "smartlife.iot.common.timesetting"}
]
index = new_state.pop("index", None)
for mod in mods:
mod["get_time"] = new_state
if index is not None:
mod["get_timezone"]["index"] = index
baseproto = {
"system": {
"set_relay_state": set_relay_state,
@ -391,8 +403,12 @@ class FakeIotTransport(BaseTransport):
"smartlife.iot.common.system": {
"set_dev_alias": set_alias,
},
"time": TIME_MODULE,
"smartlife.iot.common.timesetting": TIME_MODULE,
"time": {**TIME_MODULE, "set_time": set_time, "set_timezone": set_time},
"smartlife.iot.common.timesetting": {
**TIME_MODULE,
"set_time": set_time,
"set_timezone": set_time,
},
# HS220 brightness, different setter and getter
"smartlife.iot.dimmer": {
"set_brightness": set_hs220_brightness,

View File

@ -1,11 +1,13 @@
import json
import os
import re
from datetime import datetime
import asyncclick as click
import pytest
from asyncclick.testing import CliRunner
from pytest_mock import MockerFixture
from zoneinfo import ZoneInfo
from kasa import (
AuthenticationError,
@ -308,12 +310,8 @@ async def test_time_get(dev, runner):
assert "Current time: " in res.output
@device_smart
async def test_time_sync(dev, mocker, runner):
"""Test time sync command.
Currently implemented only for SMART.
"""
"""Test time sync command."""
update = mocker.patch.object(dev, "update")
set_time_mock = mocker.spy(dev.modules[Module.Time], "set_time")
res = await runner.invoke(
@ -329,6 +327,48 @@ async def test_time_sync(dev, mocker, runner):
assert "New time: " in res.output
async def test_time_set(dev: Device, mocker, runner):
"""Test time set command."""
time_mod = dev.modules[Module.Time]
set_time_mock = mocker.spy(time_mod, "set_time")
dt = datetime(2024, 10, 15, 8, 15)
res = await runner.invoke(
time,
["set", str(dt.year), str(dt.month), str(dt.day), str(dt.hour), str(dt.minute)],
obj=dev,
)
set_time_mock.assert_called()
assert time_mod.time == dt.replace(tzinfo=time_mod.timezone)
assert res.exit_code == 0
assert "Old time: " in res.output
assert "New time: " in res.output
zone = ZoneInfo("Europe/Berlin")
dt = dt.replace(tzinfo=zone)
res = await runner.invoke(
time,
[
"set",
str(dt.year),
str(dt.month),
str(dt.day),
str(dt.hour),
str(dt.minute),
"--timezone",
zone.key,
],
input="y\n",
obj=dev,
)
assert time_mod.time == dt
assert res.exit_code == 0
assert "Old time: " in res.output
assert "New time: " in res.output
async def test_emeter(dev: Device, mocker, runner):
res = await runner.invoke(emeter, obj=dev)
if not dev.has_emeter:

View File

@ -1,5 +1,9 @@
from datetime import datetime
import pytest
from freezegun.api import FrozenDateTimeFactory
from pytest_mock import MockerFixture
from zoneinfo import ZoneInfo
from kasa import Device, LightState, Module
from kasa.tests.device_fixtures import (
@ -319,3 +323,24 @@ async def test_light_preset_save(dev: Device, mocker: MockerFixture):
assert new_preset_state.hue == new_preset.hue
assert new_preset_state.saturation == new_preset.saturation
assert new_preset_state.color_temp == new_preset.color_temp
async def test_set_time(dev: Device, freezer: FrozenDateTimeFactory):
"""Test setting the device time."""
freezer.move_to("2021-01-09 12:00:00+00:00")
time_mod = dev.modules[Module.Time]
tz_info = time_mod.timezone
now = datetime.now(tz=tz_info)
now = now.replace(microsecond=0)
assert time_mod.time != now
await time_mod.set_time(now)
await dev.update()
assert time_mod.time == now
zone = ZoneInfo("Europe/Berlin")
now = datetime.now(tz=zone)
now = now.replace(microsecond=0)
await time_mod.set_time(now)
await dev.update()
assert time_mod.time == now

View File

@ -321,13 +321,14 @@ async def test_device_timezones():
# Get an index from a timezone
for index, zone in TIMEZONE_INDEX.items():
found_index = await get_timezone_index(zone)
zone_info = zoneinfo.ZoneInfo(zone)
found_index = await get_timezone_index(zone_info)
assert found_index == index
# Try a timezone not hardcoded finds another match
index = await get_timezone_index("Asia/Katmandu")
index = await get_timezone_index(zoneinfo.ZoneInfo("Asia/Katmandu"))
assert index == 77
# Try a timezone not hardcoded no match
with pytest.raises(zoneinfo.ZoneInfoNotFoundError):
await get_timezone_index("Foo/bar")
await get_timezone_index(zoneinfo.ZoneInfo("Foo/bar"))

View File

@ -184,7 +184,7 @@ async def test_time(dev):
@device_iot
async def test_timezone(dev):
TZ_SCHEMA(await dev.get_timezone())
TZ_SCHEMA(await dev.modules[Module.Time].get_timezone())
@device_iot