From 520b9d7a384fda6eb80d22533ce1258bad32dcb1 Mon Sep 17 00:00:00 2001 From: "Steven B." <51370195+sdb9696@users.noreply.github.com> Date: Fri, 30 Aug 2024 18:01:54 +0100 Subject: [PATCH] Disable automatic updating of latest firmware (#1103) To resolve issues with the calls to the tplink cloud to get the latest firmware. Disables the automatic calling of `get_latest_fw` and requires firmware update checks to be triggered manually. --- docs/tutorial.py | 2 +- kasa/feature.py | 5 +- kasa/smart/modules/firmware.py | 73 ++++++++++++++++------- kasa/tests/smart/modules/test_firmware.py | 37 +++++++++++- 4 files changed, 88 insertions(+), 29 deletions(-) diff --git a/docs/tutorial.py b/docs/tutorial.py index f2b777b1..8d0a1435 100644 --- a/docs/tutorial.py +++ b/docs/tutorial.py @@ -91,5 +91,5 @@ False True >>> for feat in dev.features.values(): >>> print(f"{feat.name}: {feat.value}") -Device ID: 0000000000000000000000000000000000000000\nState: True\nSignal Level: 2\nRSSI: -52\nSSID: #MASKED_SSID#\nOverheated: False\nReboot: \nBrightness: 50\nCloud connection: True\nHSV: HSV(hue=0, saturation=100, value=50)\nColor temperature: 2700\nAuto update enabled: True\nUpdate available: False\nCurrent firmware version: 1.1.6 Build 240130 Rel.173828\nAvailable firmware version: 1.1.6 Build 240130 Rel.173828\nLight effect: Party\nLight preset: Light preset 1\nSmooth transition on: 2\nSmooth transition off: 2\nDevice time: 2024-02-23 02:40:15+01:00 +Device ID: 0000000000000000000000000000000000000000\nState: True\nSignal Level: 2\nRSSI: -52\nSSID: #MASKED_SSID#\nOverheated: False\nReboot: \nBrightness: 50\nCloud connection: True\nHSV: HSV(hue=0, saturation=100, value=50)\nColor temperature: 2700\nAuto update enabled: True\nUpdate available: None\nCurrent firmware version: 1.1.6 Build 240130 Rel.173828\nAvailable firmware version: None\nCheck latest firmware: \nLight effect: Party\nLight preset: Light preset 1\nSmooth transition on: 2\nSmooth transition off: 2\nDevice time: 2024-02-23 02:40:15+01:00 """ diff --git a/kasa/feature.py b/kasa/feature.py index ad709424..e20a926d 100644 --- a/kasa/feature.py +++ b/kasa/feature.py @@ -31,9 +31,10 @@ Cloud connection (cloud_connection): True HSV (hsv): HSV(hue=0, saturation=100, value=100) Color temperature (color_temperature): 2700 Auto update enabled (auto_update_enabled): False -Update available (update_available): False +Update available (update_available): None Current firmware version (current_firmware_version): 1.1.6 Build 240130 Rel.173828 -Available firmware version (available_firmware_version): 1.1.6 Build 240130 Rel.173828 +Available firmware version (available_firmware_version): None +Check latest firmware (check_latest_firmware): Light effect (light_effect): Off Light preset (light_preset): Not set Smooth transition on (smooth_transition_on): 2 diff --git a/kasa/smart/modules/firmware.py b/kasa/smart/modules/firmware.py index 21026676..036c0b6c 100644 --- a/kasa/smart/modules/firmware.py +++ b/kasa/smart/modules/firmware.py @@ -6,13 +6,14 @@ import asyncio import logging from collections.abc import Coroutine from datetime import date -from typing import TYPE_CHECKING, Any, Callable, Optional +from typing import TYPE_CHECKING, Callable, Optional # When support for cpython older than 3.11 is dropped # async_timeout can be replaced with asyncio.timeout from async_timeout import timeout as asyncio_timeout from pydantic.v1 import BaseModel, Field, validator +from ...exceptions import KasaException from ...feature import Feature from ..smartmodule import SmartModule, allow_update_after @@ -70,6 +71,11 @@ class Firmware(SmartModule): def __init__(self, device: SmartDevice, module: str): super().__init__(device, module) + self._firmware_update_info: UpdateInfo | None = None + + def _initialize_features(self): + """Initialize features.""" + device = self._device if self.supported_version > 1: self._add_feature( Feature( @@ -115,13 +121,34 @@ class Firmware(SmartModule): type=Feature.Type.Sensor, ) ) + self._add_feature( + Feature( + device, + id="check_latest_firmware", + name="Check latest firmware", + container=self, + attribute_setter="check_latest_firmware", + category=Feature.Category.Info, + type=Feature.Type.Action, + ) + ) def query(self) -> dict: """Query to execute during the update cycle.""" - req: dict[str, Any] = {"get_latest_fw": None} if self.supported_version > 1: - req["get_auto_update_info"] = None - return req + return {"get_auto_update_info": None} + return {} + + async def check_latest_firmware(self) -> UpdateInfo | None: + """Check for the latest firmware for the device.""" + try: + fw = await self.call("get_latest_fw") + self._firmware_update_info = UpdateInfo.parse_obj(fw["get_latest_fw"]) + return self._firmware_update_info + except Exception: + _LOGGER.exception("Error getting latest firmware for %s:", self._device) + self._firmware_update_info = None + return None @property def current_firmware(self) -> str: @@ -129,26 +156,23 @@ class Firmware(SmartModule): return self._device.hw_info["sw_ver"] @property - def latest_firmware(self) -> str: + def latest_firmware(self) -> str | None: """Return the latest firmware version.""" - return self.firmware_update_info.version + if not self._firmware_update_info: + return None + return self._firmware_update_info.version @property - def firmware_update_info(self): + def firmware_update_info(self) -> UpdateInfo | None: """Return latest firmware information.""" - if not self._device.is_cloud_connected or self._has_data_error(): - # Error in response, probably disconnected from the cloud. - return UpdateInfo(type=0, need_to_upgrade=False) - - fw = self.data.get("get_latest_fw") or self.data - return UpdateInfo.parse_obj(fw) + return self._firmware_update_info @property def update_available(self) -> bool | None: """Return True if update is available.""" - if not self._device.is_cloud_connected: + if not self._device.is_cloud_connected or not self._firmware_update_info: return None - return self.firmware_update_info.update_available + return self._firmware_update_info.update_available async def get_update_state(self) -> DownloadState: """Return update state.""" @@ -161,11 +185,17 @@ class Firmware(SmartModule): self, progress_cb: Callable[[DownloadState], Coroutine] | None = None ): """Update the device firmware.""" + if not self._firmware_update_info: + raise KasaException( + "You must call check_latest_firmware before calling update" + ) + if not self.update_available: + raise KasaException("A new update must be available to call update") current_fw = self.current_firmware _LOGGER.info( "Going to upgrade from %s to %s", current_fw, - self.firmware_update_info.version, + self._firmware_update_info.version, ) await self.call("fw_download") @@ -188,7 +218,7 @@ class Firmware(SmartModule): if state.status == 0: _LOGGER.info( "Update idle, hopefully updated to %s", - self.firmware_update_info.version, + self._firmware_update_info.version, ) break elif state.status == 2: @@ -207,15 +237,12 @@ class Firmware(SmartModule): _LOGGER.warning("Unhandled state code: %s", state) @property - def auto_update_enabled(self): + def auto_update_enabled(self) -> bool: """Return True if autoupdate is enabled.""" - return ( - "get_auto_update_info" in self.data - and self.data["get_auto_update_info"]["enable"] - ) + return "enable" in self.data and self.data["enable"] @allow_update_after async def set_auto_update_enabled(self, enabled: bool): """Change autoupdate setting.""" - data = {**self.data["get_auto_update_info"], "enable": enabled} + data = {**self.data, "enable": enabled} await self.call("set_auto_update_info", data) diff --git a/kasa/tests/smart/modules/test_firmware.py b/kasa/tests/smart/modules/test_firmware.py index 6e7c3314..c10d9086 100644 --- a/kasa/tests/smart/modules/test_firmware.py +++ b/kasa/tests/smart/modules/test_firmware.py @@ -2,12 +2,13 @@ from __future__ import annotations import asyncio import logging +from contextlib import nullcontext from typing import TypedDict import pytest from pytest_mock import MockerFixture -from kasa import Module +from kasa import KasaException, Module from kasa.smart import SmartDevice from kasa.smart.modules.firmware import DownloadState from kasa.tests.device_fixtures import parametrize @@ -33,10 +34,12 @@ async def test_firmware_features( """Test light effect.""" fw = dev.modules.get(Module.Firmware) assert fw + assert fw.firmware_update_info is None if not dev.is_cloud_connected: pytest.skip("Device is not cloud connected, skipping test") + await fw.check_latest_firmware() if fw.supported_version < required_version: pytest.skip("Feature %s requires newer version" % feature) @@ -53,20 +56,36 @@ async def test_update_available_without_cloud(dev: SmartDevice): """Test that update_available returns None when disconnected.""" fw = dev.modules.get(Module.Firmware) assert fw + assert fw.firmware_update_info is None if dev.is_cloud_connected: + await fw.check_latest_firmware() assert isinstance(fw.update_available, bool) else: assert fw.update_available is None @firmware +@pytest.mark.parametrize( + ("update_available", "expected_result"), + [ + pytest.param(True, nullcontext(), id="available"), + pytest.param(False, pytest.raises(KasaException), id="not-available"), + ], +) async def test_firmware_update( - dev: SmartDevice, mocker: MockerFixture, caplog: pytest.LogCaptureFixture + dev: SmartDevice, + mocker: MockerFixture, + caplog: pytest.LogCaptureFixture, + update_available, + expected_result, ): """Test updating firmware.""" caplog.set_level(logging.INFO) + if not dev.is_cloud_connected: + pytest.skip("Device is not cloud connected, skipping test") + fw = dev.modules.get(Module.Firmware) assert fw @@ -101,7 +120,19 @@ async def test_firmware_update( cb_mock = mocker.AsyncMock() - await fw.update(progress_cb=cb_mock) + assert fw.firmware_update_info is None + with pytest.raises(KasaException): + await fw.update(progress_cb=cb_mock) + await fw.check_latest_firmware() + assert fw.firmware_update_info is not None + + fw._firmware_update_info.status = 1 if update_available else 0 + + with expected_result: + await fw.update(progress_cb=cb_mock) + + if not update_available: + return # This is necessary to allow the eventloop to process the created tasks await asyncio_sleep(0)