mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-01-23 05:07:09 +00:00
7b3dde9aa0
Some checks are pending
CI / Perform linting checks (3.13) (push) Waiting to run
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.13) (push) Blocked by required conditions
CodeQL checks / Analyze (python) (push) Waiting to run
263 lines
8.7 KiB
Python
263 lines
8.7 KiB
Python
"""Module for SmartCamProtocol."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from pprint import pformat as pf
|
|
from typing import Any, cast
|
|
|
|
from ..exceptions import (
|
|
AuthenticationError,
|
|
DeviceError,
|
|
KasaException,
|
|
_RetryableError,
|
|
)
|
|
from ..json import dumps as json_dumps
|
|
from ..transports.sslaestransport import (
|
|
SMART_AUTHENTICATION_ERRORS,
|
|
SMART_RETRYABLE_ERRORS,
|
|
SmartErrorCode,
|
|
)
|
|
from .smartprotocol import SmartProtocol
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
# List of getMethodNames that should be sent as {"method":"do"}
|
|
# https://md.depau.eu/s/r1Ys_oWoP#Modules
|
|
GET_METHODS_AS_DO = {
|
|
"getSdCardFormatStatus",
|
|
"getConnectionType",
|
|
"getUserID",
|
|
"getP2PSharePassword",
|
|
"getAESEncryptKey",
|
|
"getFirmwareAFResult",
|
|
"getWhitelampStatus",
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class SingleRequest:
|
|
"""Class for returning single request details from helper functions."""
|
|
|
|
method_type: str
|
|
method_name: str
|
|
param_name: str
|
|
request: dict[str, Any]
|
|
|
|
|
|
class SmartCamProtocol(SmartProtocol):
|
|
"""Class for SmartCam Protocol."""
|
|
|
|
def _get_list_request(
|
|
self, method: str, params: dict | None, start_index: int
|
|
) -> dict:
|
|
# All smartcam requests have params
|
|
params = cast(dict, params)
|
|
module_name = next(iter(params))
|
|
return {method: {module_name: {"start_index": start_index}}}
|
|
|
|
def _handle_response_error_code(
|
|
self, resp_dict: dict, method: str, raise_on_error: bool = True
|
|
) -> None:
|
|
error_code_raw = resp_dict.get("error_code")
|
|
try:
|
|
error_code = SmartErrorCode.from_int(error_code_raw)
|
|
except ValueError:
|
|
_LOGGER.warning(
|
|
"Device %s received unknown error code: %s", self._host, error_code_raw
|
|
)
|
|
error_code = SmartErrorCode.INTERNAL_UNKNOWN_ERROR
|
|
|
|
if error_code is SmartErrorCode.SUCCESS:
|
|
return
|
|
|
|
if not raise_on_error:
|
|
resp_dict["result"] = error_code
|
|
return
|
|
|
|
msg = (
|
|
f"Error querying device: {self._host}: "
|
|
+ f"{error_code.name}({error_code.value})"
|
|
+ f" for method: {method}"
|
|
)
|
|
if error_code in SMART_RETRYABLE_ERRORS:
|
|
raise _RetryableError(msg, error_code=error_code)
|
|
if error_code in SMART_AUTHENTICATION_ERRORS:
|
|
raise AuthenticationError(msg, error_code=error_code)
|
|
raise DeviceError(msg, error_code=error_code)
|
|
|
|
async def close(self) -> None:
|
|
"""Close the underlying transport."""
|
|
await self._transport.close()
|
|
|
|
@staticmethod
|
|
def _get_smart_camera_single_request(
|
|
request: dict[str, dict[str, Any]],
|
|
) -> SingleRequest:
|
|
method = next(iter(request))
|
|
if method == "multipleRequest":
|
|
method_type = "multi"
|
|
params = request["multipleRequest"]
|
|
req = {"method": "multipleRequest", "params": params}
|
|
return SingleRequest("multi", "multipleRequest", "", req)
|
|
|
|
param = next(iter(request[method]))
|
|
method_type = method
|
|
req = {
|
|
"method": method,
|
|
param: request[method][param],
|
|
}
|
|
return SingleRequest(method_type, method, param, req)
|
|
|
|
@staticmethod
|
|
def _make_snake_name(name: str) -> str:
|
|
"""Convert camel or pascal case to snake name."""
|
|
sn = "".join(["_" + i.lower() if i.isupper() else i for i in name]).lstrip("_")
|
|
return sn
|
|
|
|
@staticmethod
|
|
def _make_smart_camera_single_request(
|
|
request: str,
|
|
) -> SingleRequest:
|
|
"""Make a single request given a method name and no params.
|
|
|
|
If method like getSomeThing then module will be some_thing.
|
|
"""
|
|
method = request
|
|
method_type = request[:3]
|
|
snake_name = SmartCamProtocol._make_snake_name(request)
|
|
param = snake_name[4:]
|
|
if (
|
|
(short_method := method[:3])
|
|
and short_method in {"get", "set"}
|
|
and method not in GET_METHODS_AS_DO
|
|
):
|
|
method_type = short_method
|
|
param = snake_name[4:]
|
|
else:
|
|
method_type = "do"
|
|
param = snake_name
|
|
req = {"method": method_type, param: {}}
|
|
return SingleRequest(method_type, method, param, req)
|
|
|
|
async def _execute_query(
|
|
self, request: str | dict, *, retry_count: int, iterate_list_pages: bool = True
|
|
) -> dict:
|
|
debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
|
|
if isinstance(request, dict):
|
|
method = next(iter(request))
|
|
if len(request) == 1 and method in {"get", "set", "do", "multipleRequest"}:
|
|
single_request = self._get_smart_camera_single_request(request)
|
|
else:
|
|
return await self._execute_multiple_query(
|
|
request, retry_count, iterate_list_pages
|
|
)
|
|
else:
|
|
single_request = self._make_smart_camera_single_request(request)
|
|
|
|
smart_request = json_dumps(single_request.request)
|
|
if debug_enabled:
|
|
_LOGGER.debug(
|
|
"%s >> %s",
|
|
self._host,
|
|
pf(smart_request),
|
|
)
|
|
response_data = await self._transport.send(smart_request)
|
|
|
|
if debug_enabled:
|
|
_LOGGER.debug(
|
|
"%s << %s",
|
|
self._host,
|
|
pf(response_data),
|
|
)
|
|
|
|
if "error_code" in response_data:
|
|
# H200 does not return an error code
|
|
self._handle_response_error_code(response_data, single_request.method_name)
|
|
# Requests that are invalid and raise PROTOCOL_FORMAT_ERROR when sent
|
|
# as a multipleRequest will return {} when sent as a single request.
|
|
if single_request.method_type == "get" and (
|
|
not (section := next(iter(response_data))) or response_data[section] == {}
|
|
):
|
|
raise DeviceError(
|
|
f"No results for get request {single_request.method_name}"
|
|
)
|
|
|
|
# TODO need to update handle response lists
|
|
|
|
if single_request.method_type == "do":
|
|
return {single_request.method_name: response_data}
|
|
if single_request.method_type == "set":
|
|
return {}
|
|
if single_request.method_type == "multi":
|
|
return {single_request.method_name: response_data["result"]}
|
|
return {
|
|
single_request.method_name: {
|
|
single_request.param_name: response_data[single_request.param_name]
|
|
}
|
|
}
|
|
|
|
|
|
class _ChildCameraProtocolWrapper(SmartProtocol):
|
|
"""Protocol wrapper for controlling child devices.
|
|
|
|
This is an internal class used to communicate with child devices,
|
|
and should not be used directly.
|
|
|
|
This class overrides query() method of the protocol to modify all
|
|
outgoing queries to use ``controlChild`` command, and unwraps the
|
|
device responses before returning to the caller.
|
|
"""
|
|
|
|
def __init__(self, device_id: str, base_protocol: SmartProtocol) -> None:
|
|
self._device_id = device_id
|
|
self._protocol = base_protocol
|
|
self._transport = base_protocol._transport
|
|
|
|
async def query(self, request: str | dict, retry_count: int = 3) -> dict:
|
|
"""Wrap request inside controlChild envelope."""
|
|
return await self._query(request, retry_count)
|
|
|
|
async def _query(self, request: str | dict, retry_count: int = 3) -> dict:
|
|
"""Wrap request inside controlChild envelope."""
|
|
if not isinstance(request, dict):
|
|
raise KasaException("Child requests must be dictionaries.")
|
|
requests = []
|
|
methods = []
|
|
for key, val in request.items():
|
|
request = {
|
|
"method": "controlChild",
|
|
"params": {
|
|
"childControl": {
|
|
"device_id": self._device_id,
|
|
"request_data": {"method": key, "params": val},
|
|
}
|
|
},
|
|
}
|
|
methods.append(key)
|
|
requests.append(request)
|
|
|
|
multipleRequest = {"multipleRequest": {"requests": requests}}
|
|
|
|
response = await self._protocol.query(multipleRequest, retry_count)
|
|
|
|
responses = response["multipleRequest"]["responses"]
|
|
response_dict = {}
|
|
|
|
# Raise errors for single calls
|
|
raise_on_error = len(requests) == 1
|
|
|
|
for index_id, response in enumerate(responses):
|
|
response_data = response["result"]["response_data"]
|
|
method = methods[index_id]
|
|
self._handle_response_error_code(
|
|
response_data, method, raise_on_error=raise_on_error
|
|
)
|
|
response_dict[method] = response_data.get("result")
|
|
|
|
return response_dict
|
|
|
|
async def close(self) -> None:
|
|
"""Do nothing as the parent owns the protocol."""
|