Disable automatic updating of latest firmware (#1103)

To resolve issues with the calls to the tplink cloud to get the latest firmware.
Disables the automatic calling of `get_latest_fw` and requires firmware update checks to be triggered manually.
This commit is contained in:
Steven B.
2024-08-30 18:01:54 +01:00
committed by GitHub
parent 6a86ffbbba
commit 520b9d7a38
4 changed files with 88 additions and 29 deletions

View File

@@ -6,13 +6,14 @@ import asyncio
import logging
from collections.abc import Coroutine
from datetime import date
from typing import TYPE_CHECKING, Any, Callable, Optional
from typing import TYPE_CHECKING, Callable, Optional
# When support for cpython older than 3.11 is dropped
# async_timeout can be replaced with asyncio.timeout
from async_timeout import timeout as asyncio_timeout
from pydantic.v1 import BaseModel, Field, validator
from ...exceptions import KasaException
from ...feature import Feature
from ..smartmodule import SmartModule, allow_update_after
@@ -70,6 +71,11 @@ class Firmware(SmartModule):
def __init__(self, device: SmartDevice, module: str):
super().__init__(device, module)
self._firmware_update_info: UpdateInfo | None = None
def _initialize_features(self):
"""Initialize features."""
device = self._device
if self.supported_version > 1:
self._add_feature(
Feature(
@@ -115,13 +121,34 @@ class Firmware(SmartModule):
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
device,
id="check_latest_firmware",
name="Check latest firmware",
container=self,
attribute_setter="check_latest_firmware",
category=Feature.Category.Info,
type=Feature.Type.Action,
)
)
def query(self) -> dict:
"""Query to execute during the update cycle."""
req: dict[str, Any] = {"get_latest_fw": None}
if self.supported_version > 1:
req["get_auto_update_info"] = None
return req
return {"get_auto_update_info": None}
return {}
async def check_latest_firmware(self) -> UpdateInfo | None:
"""Check for the latest firmware for the device."""
try:
fw = await self.call("get_latest_fw")
self._firmware_update_info = UpdateInfo.parse_obj(fw["get_latest_fw"])
return self._firmware_update_info
except Exception:
_LOGGER.exception("Error getting latest firmware for %s:", self._device)
self._firmware_update_info = None
return None
@property
def current_firmware(self) -> str:
@@ -129,26 +156,23 @@ class Firmware(SmartModule):
return self._device.hw_info["sw_ver"]
@property
def latest_firmware(self) -> str:
def latest_firmware(self) -> str | None:
"""Return the latest firmware version."""
return self.firmware_update_info.version
if not self._firmware_update_info:
return None
return self._firmware_update_info.version
@property
def firmware_update_info(self):
def firmware_update_info(self) -> UpdateInfo | None:
"""Return latest firmware information."""
if not self._device.is_cloud_connected or self._has_data_error():
# Error in response, probably disconnected from the cloud.
return UpdateInfo(type=0, need_to_upgrade=False)
fw = self.data.get("get_latest_fw") or self.data
return UpdateInfo.parse_obj(fw)
return self._firmware_update_info
@property
def update_available(self) -> bool | None:
"""Return True if update is available."""
if not self._device.is_cloud_connected:
if not self._device.is_cloud_connected or not self._firmware_update_info:
return None
return self.firmware_update_info.update_available
return self._firmware_update_info.update_available
async def get_update_state(self) -> DownloadState:
"""Return update state."""
@@ -161,11 +185,17 @@ class Firmware(SmartModule):
self, progress_cb: Callable[[DownloadState], Coroutine] | None = None
):
"""Update the device firmware."""
if not self._firmware_update_info:
raise KasaException(
"You must call check_latest_firmware before calling update"
)
if not self.update_available:
raise KasaException("A new update must be available to call update")
current_fw = self.current_firmware
_LOGGER.info(
"Going to upgrade from %s to %s",
current_fw,
self.firmware_update_info.version,
self._firmware_update_info.version,
)
await self.call("fw_download")
@@ -188,7 +218,7 @@ class Firmware(SmartModule):
if state.status == 0:
_LOGGER.info(
"Update idle, hopefully updated to %s",
self.firmware_update_info.version,
self._firmware_update_info.version,
)
break
elif state.status == 2:
@@ -207,15 +237,12 @@ class Firmware(SmartModule):
_LOGGER.warning("Unhandled state code: %s", state)
@property
def auto_update_enabled(self):
def auto_update_enabled(self) -> bool:
"""Return True if autoupdate is enabled."""
return (
"get_auto_update_info" in self.data
and self.data["get_auto_update_info"]["enable"]
)
return "enable" in self.data and self.data["enable"]
@allow_update_after
async def set_auto_update_enabled(self, enabled: bool):
"""Change autoupdate setting."""
data = {**self.data["get_auto_update_info"], "enable": enabled}
data = {**self.data, "enable": enabled}
await self.call("set_auto_update_info", data)