Enable and convert to future annotations (#838)

This commit is contained in:
Steven B
2024-04-17 14:39:24 +01:00
committed by GitHub
parent 82d92aeea5
commit 203bd79253
59 changed files with 562 additions and 462 deletions

View File

@@ -4,13 +4,15 @@ Based on the work of https://github.com/petretiandrea/plugp100
under compatible GNU GPL3 license.
"""
from __future__ import annotations
import asyncio
import base64
import logging
import time
import uuid
from pprint import pformat as pf
from typing import Any, Dict, Union
from typing import Any
from .exceptions import (
SMART_AUTHENTICATION_ERRORS,
@@ -57,12 +59,12 @@ class SmartProtocol(BaseProtocol):
}
return json_dumps(request)
async def query(self, request: Union[str, Dict], retry_count: int = 3) -> Dict:
async def query(self, request: str | dict, retry_count: int = 3) -> dict:
"""Query the device retrying for retry_count on failure."""
async with self._query_lock:
return await self._query(request, retry_count)
async def _query(self, request: Union[str, Dict], retry_count: int = 3) -> Dict:
async def _query(self, request: str | dict, retry_count: int = 3) -> dict:
for retry in range(retry_count + 1):
try:
return await self._execute_query(request, retry)
@@ -103,9 +105,9 @@ class SmartProtocol(BaseProtocol):
# make mypy happy, this should never be reached..
raise KasaException("Query reached somehow to unreachable")
async def _execute_multiple_query(self, request: Dict, retry_count: int) -> Dict:
async def _execute_multiple_query(self, request: dict, retry_count: int) -> dict:
debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
multi_result: Dict[str, Any] = {}
multi_result: dict[str, Any] = {}
smart_method = "multipleRequest"
requests = [
{"method": method, "params": params} for method, params in request.items()
@@ -146,7 +148,7 @@ class SmartProtocol(BaseProtocol):
multi_result[method] = result
return multi_result
async def _execute_query(self, request: Union[str, Dict], retry_count: int) -> Dict:
async def _execute_query(self, request: str | dict, retry_count: int) -> dict:
debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
if isinstance(request, dict):
@@ -322,7 +324,7 @@ class _ChildProtocolWrapper(SmartProtocol):
return smart_method, smart_params
async def query(self, request: Union[str, Dict], retry_count: int = 3) -> Dict:
async def query(self, request: str | dict, retry_count: int = 3) -> dict:
"""Wrap request inside control_child envelope."""
method, params = self._get_method_and_params_for_request(request)
request_data = {