mirror of
https://github.com/python-kasa/python-kasa.git
synced 2024-12-23 03:33:35 +00:00
905a14895d
Ensures that all modules try to access their data in `_post_update_hook` in a safe manner and disable themselves if there's an error. Also adds parameters to get_preset_rules and get_on_off_gradually_info to fix issues with recent firmware updates. [#1033](https://github.com/python-kasa/python-kasa/issues/1033)
226 lines
7.4 KiB
Python
226 lines
7.4 KiB
Python
"""Implementation of firmware module."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from collections.abc import Coroutine
|
|
from datetime import date
|
|
from typing import TYPE_CHECKING, Any, Callable, Optional
|
|
|
|
# When support for cpython older than 3.11 is dropped
|
|
# async_timeout can be replaced with asyncio.timeout
|
|
from async_timeout import timeout as asyncio_timeout
|
|
from pydantic.v1 import BaseModel, Field, validator
|
|
|
|
from ...feature import Feature
|
|
from ..smartmodule import SmartModule
|
|
|
|
if TYPE_CHECKING:
|
|
from ..smartdevice import SmartDevice
|
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class DownloadState(BaseModel):
|
|
"""Download state."""
|
|
|
|
# Example:
|
|
# {'status': 0, 'download_progress': 0, 'reboot_time': 5,
|
|
# 'upgrade_time': 5, 'auto_upgrade': False}
|
|
status: int
|
|
progress: int = Field(alias="download_progress")
|
|
reboot_time: int
|
|
upgrade_time: int
|
|
auto_upgrade: bool
|
|
|
|
|
|
class UpdateInfo(BaseModel):
|
|
"""Update info status object."""
|
|
|
|
status: int = Field(alias="type")
|
|
version: Optional[str] = Field(alias="fw_ver", default=None) # noqa: UP007
|
|
release_date: Optional[date] = None # noqa: UP007
|
|
release_notes: Optional[str] = Field(alias="release_note", default=None) # noqa: UP007
|
|
fw_size: Optional[int] = None # noqa: UP007
|
|
oem_id: Optional[str] = None # noqa: UP007
|
|
needs_upgrade: bool = Field(alias="need_to_upgrade")
|
|
|
|
@validator("release_date", pre=True)
|
|
def _release_date_optional(cls, v):
|
|
if not v:
|
|
return None
|
|
|
|
return v
|
|
|
|
@property
|
|
def update_available(self):
|
|
"""Return True if update available."""
|
|
if self.status != 0:
|
|
return True
|
|
return False
|
|
|
|
|
|
class Firmware(SmartModule):
|
|
"""Implementation of firmware module."""
|
|
|
|
REQUIRED_COMPONENT = "firmware"
|
|
|
|
def __init__(self, device: SmartDevice, module: str):
|
|
super().__init__(device, module)
|
|
if self.supported_version > 1:
|
|
self._add_feature(
|
|
Feature(
|
|
device,
|
|
id="auto_update_enabled",
|
|
name="Auto update enabled",
|
|
container=self,
|
|
attribute_getter="auto_update_enabled",
|
|
attribute_setter="set_auto_update_enabled",
|
|
type=Feature.Type.Switch,
|
|
)
|
|
)
|
|
self._add_feature(
|
|
Feature(
|
|
device,
|
|
id="update_available",
|
|
name="Update available",
|
|
container=self,
|
|
attribute_getter="update_available",
|
|
type=Feature.Type.BinarySensor,
|
|
category=Feature.Category.Info,
|
|
)
|
|
)
|
|
self._add_feature(
|
|
Feature(
|
|
device,
|
|
id="current_firmware_version",
|
|
name="Current firmware version",
|
|
container=self,
|
|
attribute_getter="current_firmware",
|
|
category=Feature.Category.Debug,
|
|
type=Feature.Type.Sensor,
|
|
)
|
|
)
|
|
self._add_feature(
|
|
Feature(
|
|
device,
|
|
id="available_firmware_version",
|
|
name="Available firmware version",
|
|
container=self,
|
|
attribute_getter="latest_firmware",
|
|
category=Feature.Category.Debug,
|
|
type=Feature.Type.Sensor,
|
|
)
|
|
)
|
|
|
|
def query(self) -> dict:
|
|
"""Query to execute during the update cycle."""
|
|
req: dict[str, Any] = {"get_latest_fw": None}
|
|
if self.supported_version > 1:
|
|
req["get_auto_update_info"] = None
|
|
return req
|
|
|
|
def _post_update_hook(self):
|
|
"""Perform actions after a device update.
|
|
|
|
Overrides the default behaviour to disable a module if the query returns
|
|
an error because some of the module still functions.
|
|
"""
|
|
|
|
@property
|
|
def current_firmware(self) -> str:
|
|
"""Return the current firmware version."""
|
|
return self._device.hw_info["sw_ver"]
|
|
|
|
@property
|
|
def latest_firmware(self) -> str:
|
|
"""Return the latest firmware version."""
|
|
return self.firmware_update_info.version
|
|
|
|
@property
|
|
def firmware_update_info(self):
|
|
"""Return latest firmware information."""
|
|
if not self._device.is_cloud_connected or self._has_data_error():
|
|
# Error in response, probably disconnected from the cloud.
|
|
return UpdateInfo(type=0, need_to_upgrade=False)
|
|
|
|
fw = self.data.get("get_latest_fw") or self.data
|
|
return UpdateInfo.parse_obj(fw)
|
|
|
|
@property
|
|
def update_available(self) -> bool | None:
|
|
"""Return True if update is available."""
|
|
if not self._device.is_cloud_connected:
|
|
return None
|
|
return self.firmware_update_info.update_available
|
|
|
|
async def get_update_state(self) -> DownloadState:
|
|
"""Return update state."""
|
|
resp = await self.call("get_fw_download_state")
|
|
state = resp["get_fw_download_state"]
|
|
return DownloadState(**state)
|
|
|
|
async def update(
|
|
self, progress_cb: Callable[[DownloadState], Coroutine] | None = None
|
|
):
|
|
"""Update the device firmware."""
|
|
current_fw = self.current_firmware
|
|
_LOGGER.info(
|
|
"Going to upgrade from %s to %s",
|
|
current_fw,
|
|
self.firmware_update_info.version,
|
|
)
|
|
await self.call("fw_download")
|
|
|
|
# TODO: read timeout from get_auto_update_info or from get_fw_download_state?
|
|
async with asyncio_timeout(60 * 5):
|
|
while True:
|
|
await asyncio.sleep(0.5)
|
|
try:
|
|
state = await self.get_update_state()
|
|
except Exception as ex:
|
|
_LOGGER.warning(
|
|
"Got exception, maybe the device is rebooting? %s", ex
|
|
)
|
|
continue
|
|
|
|
_LOGGER.debug("Update state: %s" % state)
|
|
if progress_cb is not None:
|
|
asyncio.create_task(progress_cb(state))
|
|
|
|
if state.status == 0:
|
|
_LOGGER.info(
|
|
"Update idle, hopefully updated to %s",
|
|
self.firmware_update_info.version,
|
|
)
|
|
break
|
|
elif state.status == 2:
|
|
_LOGGER.info("Downloading firmware, progress: %s", state.progress)
|
|
elif state.status == 3:
|
|
upgrade_sleep = state.upgrade_time
|
|
_LOGGER.info(
|
|
"Flashing firmware, sleeping for %s before checking status",
|
|
upgrade_sleep,
|
|
)
|
|
await asyncio.sleep(upgrade_sleep)
|
|
elif state.status < 0:
|
|
_LOGGER.error("Got error: %s", state.status)
|
|
break
|
|
else:
|
|
_LOGGER.warning("Unhandled state code: %s", state)
|
|
|
|
@property
|
|
def auto_update_enabled(self):
|
|
"""Return True if autoupdate is enabled."""
|
|
return (
|
|
"get_auto_update_info" in self.data
|
|
and self.data["get_auto_update_info"]["enable"]
|
|
)
|
|
|
|
async def set_auto_update_enabled(self, enabled: bool):
|
|
"""Change autoupdate setting."""
|
|
data = {**self.data["get_auto_update_info"], "enable": enabled}
|
|
await self.call("set_auto_update_info", data)
|