Improve smartdevice update module

* Expose current and latest firmware as features
* Implement update loop that blocks until the update is complete
This commit is contained in:
Teemu Rytilahti 2024-02-23 02:32:38 +01:00
parent 253287c7b7
commit e9dfdf6cf7

View File

@ -1,6 +1,7 @@
"""Implementation of firmware module."""
from __future__ import annotations
import asyncio
from datetime import date
from typing import TYPE_CHECKING, Any, Optional
@ -11,6 +12,11 @@ from ...exceptions import SmartErrorCode
from ...feature import Feature
from ..smartmodule import SmartModule
# When support for cpython older than 3.11 is dropped
# async_timeout can be replaced with asyncio.timeout
from async_timeout import timeout as asyncio_timeout
if TYPE_CHECKING:
from ..smartdevice import SmartDevice
@ -19,7 +25,7 @@ class UpdateInfo(BaseModel):
"""Update info status object."""
status: int = Field(alias="type")
fw_ver: Optional[str] = None # noqa: UP007
version: Optional[str] = Field(alias="fw_ver", default=None) # noqa: UP007
release_date: Optional[date] = None # noqa: UP007
release_notes: Optional[str] = Field(alias="release_note", default=None) # noqa: UP007
fw_size: Optional[int] = None # noqa: UP007
@ -71,6 +77,12 @@ class Firmware(SmartModule):
category=Feature.Category.Info,
)
)
self._add_feature(
Feature(device, "Current firmware version", container=self, attribute_getter="current_firmware")
)
self._add_feature(
Feature(device, "Available firmware version", container=self, attribute_getter="latest_firmware")
)
def query(self) -> dict:
"""Query to execute during the update cycle."""
@ -80,7 +92,18 @@ class Firmware(SmartModule):
return req
@property
def latest_firmware(self):
def current_firmware(self) -> str:
"""Return the current firmware version."""
return self._device.hw_info["sw_ver"]
@property
def latest_firmware(self) -> str:
"""Return the latest firmware version."""
return self.firmware_update_info.version
@property
def firmware_update_info(self):
"""Return latest firmware information."""
fw = self.data.get("get_latest_fw") or self.data
if not self._device.is_cloud_connected or isinstance(fw, SmartErrorCode):
@ -94,7 +117,7 @@ class Firmware(SmartModule):
"""Return True if update is available."""
if not self._device.is_cloud_connected:
return None
return self.latest_firmware.update_available
return self.firmware_update_info.update_available
async def get_update_state(self):
"""Return update state."""
@ -102,7 +125,21 @@ class Firmware(SmartModule):
async def update(self):
"""Update the device firmware."""
return await self.call("fw_download")
current_fw = self.current_firmware
_LOGGER.debug("Going to upgrade from %s to %s", current_fw, self.firmware_update_info.version)
resp = await self.call("fw_download")
_LOGGER.debug("Update request response: %s", resp)
# TODO: read timeout from get_auto_update_info or from get_fw_download_state?
async with asyncio_timeout(60*5):
while True:
await asyncio.sleep(0.5)
state = await self.get_update_state()
_LOGGER.debug("Update state: %s" % state)
# TODO: this could await a given callable for progress
if self.firmware_update_info.version != current_fw:
_LOGGER.info("Updated to %s", self.firmware_update_info.version)
break
@property
def auto_update_enabled(self):
@ -115,4 +152,4 @@ class Firmware(SmartModule):
async def set_auto_update_enabled(self, enabled: bool):
"""Change autoupdate setting."""
data = {**self.data["get_auto_update_info"], "enable": enabled}
await self.call("set_auto_update_info", data) # {"enable": enabled})
await self.call("set_auto_update_info", data)