mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-01-09 22:37:08 +00:00
5dac092227
Pick commit 7fd5c213e6
from 1052
Addresses stability issues on older hw device versions
- Handles module timeout errors better by querying modules individually on errors and disabling problematic modules like Firmware that go out to the internet to get updates.
- Addresses an issue with the Led module on P100 hardware version 1.0 which appears to have a memory leak and will cause the device to crash after approximately 500 calls.
- Delays updates of modules that do not have regular changes like LightPreset and LightEffect and enables them to be updated on the next update cycle only if required values have changed.
222 lines
7.3 KiB
Python
222 lines
7.3 KiB
Python
"""Implementation of firmware module."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from collections.abc import Coroutine
|
|
from datetime import date
|
|
from typing import TYPE_CHECKING, Any, Callable, Optional
|
|
|
|
# When support for cpython older than 3.11 is dropped
|
|
# async_timeout can be replaced with asyncio.timeout
|
|
from async_timeout import timeout as asyncio_timeout
|
|
from pydantic.v1 import BaseModel, Field, validator
|
|
|
|
from ...feature import Feature
|
|
from ..smartmodule import SmartModule, allow_update_after
|
|
|
|
if TYPE_CHECKING:
|
|
from ..smartdevice import SmartDevice
|
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class DownloadState(BaseModel):
|
|
"""Download state."""
|
|
|
|
# Example:
|
|
# {'status': 0, 'download_progress': 0, 'reboot_time': 5,
|
|
# 'upgrade_time': 5, 'auto_upgrade': False}
|
|
status: int
|
|
progress: int = Field(alias="download_progress")
|
|
reboot_time: int
|
|
upgrade_time: int
|
|
auto_upgrade: bool
|
|
|
|
|
|
class UpdateInfo(BaseModel):
|
|
"""Update info status object."""
|
|
|
|
status: int = Field(alias="type")
|
|
version: Optional[str] = Field(alias="fw_ver", default=None) # noqa: UP007
|
|
release_date: Optional[date] = None # noqa: UP007
|
|
release_notes: Optional[str] = Field(alias="release_note", default=None) # noqa: UP007
|
|
fw_size: Optional[int] = None # noqa: UP007
|
|
oem_id: Optional[str] = None # noqa: UP007
|
|
needs_upgrade: bool = Field(alias="need_to_upgrade")
|
|
|
|
@validator("release_date", pre=True)
|
|
def _release_date_optional(cls, v):
|
|
if not v:
|
|
return None
|
|
|
|
return v
|
|
|
|
@property
|
|
def update_available(self):
|
|
"""Return True if update available."""
|
|
if self.status != 0:
|
|
return True
|
|
return False
|
|
|
|
|
|
class Firmware(SmartModule):
|
|
"""Implementation of firmware module."""
|
|
|
|
REQUIRED_COMPONENT = "firmware"
|
|
MINIMUM_UPDATE_INTERVAL_SECS = 60 * 60 * 24
|
|
|
|
def __init__(self, device: SmartDevice, module: str):
|
|
super().__init__(device, module)
|
|
if self.supported_version > 1:
|
|
self._add_feature(
|
|
Feature(
|
|
device,
|
|
id="auto_update_enabled",
|
|
name="Auto update enabled",
|
|
container=self,
|
|
attribute_getter="auto_update_enabled",
|
|
attribute_setter="set_auto_update_enabled",
|
|
type=Feature.Type.Switch,
|
|
)
|
|
)
|
|
self._add_feature(
|
|
Feature(
|
|
device,
|
|
id="update_available",
|
|
name="Update available",
|
|
container=self,
|
|
attribute_getter="update_available",
|
|
type=Feature.Type.BinarySensor,
|
|
category=Feature.Category.Info,
|
|
)
|
|
)
|
|
self._add_feature(
|
|
Feature(
|
|
device,
|
|
id="current_firmware_version",
|
|
name="Current firmware version",
|
|
container=self,
|
|
attribute_getter="current_firmware",
|
|
category=Feature.Category.Debug,
|
|
type=Feature.Type.Sensor,
|
|
)
|
|
)
|
|
self._add_feature(
|
|
Feature(
|
|
device,
|
|
id="available_firmware_version",
|
|
name="Available firmware version",
|
|
container=self,
|
|
attribute_getter="latest_firmware",
|
|
category=Feature.Category.Debug,
|
|
type=Feature.Type.Sensor,
|
|
)
|
|
)
|
|
|
|
def query(self) -> dict:
|
|
"""Query to execute during the update cycle."""
|
|
req: dict[str, Any] = {"get_latest_fw": None}
|
|
if self.supported_version > 1:
|
|
req["get_auto_update_info"] = None
|
|
return req
|
|
|
|
@property
|
|
def current_firmware(self) -> str:
|
|
"""Return the current firmware version."""
|
|
return self._device.hw_info["sw_ver"]
|
|
|
|
@property
|
|
def latest_firmware(self) -> str:
|
|
"""Return the latest firmware version."""
|
|
return self.firmware_update_info.version
|
|
|
|
@property
|
|
def firmware_update_info(self):
|
|
"""Return latest firmware information."""
|
|
if not self._device.is_cloud_connected or self._has_data_error():
|
|
# Error in response, probably disconnected from the cloud.
|
|
return UpdateInfo(type=0, need_to_upgrade=False)
|
|
|
|
fw = self.data.get("get_latest_fw") or self.data
|
|
return UpdateInfo.parse_obj(fw)
|
|
|
|
@property
|
|
def update_available(self) -> bool | None:
|
|
"""Return True if update is available."""
|
|
if not self._device.is_cloud_connected:
|
|
return None
|
|
return self.firmware_update_info.update_available
|
|
|
|
async def get_update_state(self) -> DownloadState:
|
|
"""Return update state."""
|
|
resp = await self.call("get_fw_download_state")
|
|
state = resp["get_fw_download_state"]
|
|
return DownloadState(**state)
|
|
|
|
@allow_update_after
|
|
async def update(
|
|
self, progress_cb: Callable[[DownloadState], Coroutine] | None = None
|
|
):
|
|
"""Update the device firmware."""
|
|
current_fw = self.current_firmware
|
|
_LOGGER.info(
|
|
"Going to upgrade from %s to %s",
|
|
current_fw,
|
|
self.firmware_update_info.version,
|
|
)
|
|
await self.call("fw_download")
|
|
|
|
# TODO: read timeout from get_auto_update_info or from get_fw_download_state?
|
|
async with asyncio_timeout(60 * 5):
|
|
while True:
|
|
await asyncio.sleep(0.5)
|
|
try:
|
|
state = await self.get_update_state()
|
|
except Exception as ex:
|
|
_LOGGER.warning(
|
|
"Got exception, maybe the device is rebooting? %s", ex
|
|
)
|
|
continue
|
|
|
|
_LOGGER.debug("Update state: %s" % state)
|
|
if progress_cb is not None:
|
|
asyncio.create_task(progress_cb(state))
|
|
|
|
if state.status == 0:
|
|
_LOGGER.info(
|
|
"Update idle, hopefully updated to %s",
|
|
self.firmware_update_info.version,
|
|
)
|
|
break
|
|
elif state.status == 2:
|
|
_LOGGER.info("Downloading firmware, progress: %s", state.progress)
|
|
elif state.status == 3:
|
|
upgrade_sleep = state.upgrade_time
|
|
_LOGGER.info(
|
|
"Flashing firmware, sleeping for %s before checking status",
|
|
upgrade_sleep,
|
|
)
|
|
await asyncio.sleep(upgrade_sleep)
|
|
elif state.status < 0:
|
|
_LOGGER.error("Got error: %s", state.status)
|
|
break
|
|
else:
|
|
_LOGGER.warning("Unhandled state code: %s", state)
|
|
|
|
@property
|
|
def auto_update_enabled(self):
|
|
"""Return True if autoupdate is enabled."""
|
|
return (
|
|
"get_auto_update_info" in self.data
|
|
and self.data["get_auto_update_info"]["enable"]
|
|
)
|
|
|
|
@allow_update_after
|
|
async def set_auto_update_enabled(self, enabled: bool):
|
|
"""Change autoupdate setting."""
|
|
data = {**self.data["get_auto_update_info"], "enable": enabled}
|
|
await self.call("set_auto_update_info", data)
|