mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-08-09 20:24:02 +00:00
Merge remote-tracking branch 'upstream/master' into feat/motion
This commit is contained in:
@@ -2,12 +2,14 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from collections.abc import Callable
|
||||
from contextlib import contextmanager
|
||||
from functools import singledispatch, update_wrapper, wraps
|
||||
from gettext import gettext
|
||||
from typing import TYPE_CHECKING, Any, Final
|
||||
|
||||
import asyncclick as click
|
||||
@@ -238,4 +240,19 @@ def CatchAllExceptions(cls):
|
||||
except Exception as exc:
|
||||
_handle_exception(self._debug, exc)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
"""Run the coroutine in the event loop and print any exceptions.
|
||||
|
||||
python click catches KeyboardInterrupt in main, raises Abort()
|
||||
and does sys.exit. asyncclick doesn't properly handle a coroutine
|
||||
receiving CancelledError on a KeyboardInterrupt, so we catch the
|
||||
KeyboardInterrupt here once asyncio.run has re-raised it. This
|
||||
avoids large stacktraces when a user presses Ctrl-C.
|
||||
"""
|
||||
try:
|
||||
asyncio.run(self.main(*args, **kwargs))
|
||||
except KeyboardInterrupt:
|
||||
click.echo(gettext("\nAborted!"), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
return _CommandCls
|
||||
|
@@ -498,7 +498,7 @@ class Discover:
|
||||
try:
|
||||
_LOGGER.debug("Waiting %s seconds for responses...", discovery_timeout)
|
||||
await protocol.wait_for_discovery_to_complete()
|
||||
except KasaException as ex:
|
||||
except (KasaException, asyncio.CancelledError) as ex:
|
||||
for device in protocol.discovered_devices.values():
|
||||
await device.protocol.close()
|
||||
raise ex
|
||||
|
@@ -113,10 +113,23 @@ class HttpClient:
|
||||
ssl=ssl,
|
||||
)
|
||||
async with resp:
|
||||
if resp.status == 200:
|
||||
response_data = await resp.read()
|
||||
if return_json:
|
||||
response_data = await resp.read()
|
||||
|
||||
if resp.status == 200:
|
||||
if return_json:
|
||||
response_data = json_loads(response_data.decode())
|
||||
else:
|
||||
_LOGGER.debug(
|
||||
"Device %s received status code %s with response %s",
|
||||
self._config.host,
|
||||
resp.status,
|
||||
str(response_data),
|
||||
)
|
||||
if response_data and return_json:
|
||||
try:
|
||||
response_data = json_loads(response_data.decode())
|
||||
except Exception:
|
||||
_LOGGER.debug("Device %s response could not be parsed as json")
|
||||
|
||||
except (aiohttp.ServerDisconnectedError, aiohttp.ClientOSError) as ex:
|
||||
if not self._wait_between_requests:
|
||||
|
@@ -160,7 +160,9 @@ class Module(ABC):
|
||||
# SMARTCAM only modules
|
||||
Camera: Final[ModuleName[smartcam.Camera]] = ModuleName("Camera")
|
||||
LensMask: Final[ModuleName[smartcam.LensMask]] = ModuleName("LensMask")
|
||||
Motion: Final[ModuleName[smartcam.Motion]] = ModuleName("Motion")
|
||||
MotionDetection: Final[ModuleName[smartcam.MotionDetection]] = ModuleName(
|
||||
"MotionDetection"
|
||||
)
|
||||
|
||||
def __init__(self, device: Device, module: str) -> None:
|
||||
self._device = device
|
||||
|
@@ -2,10 +2,10 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import NoReturn
|
||||
from typing import Any, NoReturn
|
||||
|
||||
from ...emeterstatus import EmeterStatus
|
||||
from ...exceptions import KasaException
|
||||
from ...exceptions import DeviceError, KasaException
|
||||
from ...interfaces.energy import Energy as EnergyInterface
|
||||
from ..smartmodule import SmartModule, raise_if_update_error
|
||||
|
||||
@@ -15,12 +15,39 @@ class Energy(SmartModule, EnergyInterface):
|
||||
|
||||
REQUIRED_COMPONENT = "energy_monitoring"
|
||||
|
||||
_energy: dict[str, Any]
|
||||
_current_consumption: float | None
|
||||
|
||||
async def _post_update_hook(self) -> None:
|
||||
if "voltage_mv" in self.data.get("get_emeter_data", {}):
|
||||
try:
|
||||
data = self.data
|
||||
except DeviceError as de:
|
||||
self._energy = {}
|
||||
self._current_consumption = None
|
||||
raise de
|
||||
|
||||
# If version is 1 then data is get_energy_usage
|
||||
self._energy = data.get("get_energy_usage", data)
|
||||
|
||||
if "voltage_mv" in data.get("get_emeter_data", {}):
|
||||
self._supported = (
|
||||
self._supported | EnergyInterface.ModuleFeature.VOLTAGE_CURRENT
|
||||
)
|
||||
|
||||
if (power := self._energy.get("current_power")) is not None or (
|
||||
power := data.get("get_emeter_data", {}).get("power_mw")
|
||||
) is not None:
|
||||
self._current_consumption = power / 1_000
|
||||
# Fallback if get_energy_usage does not provide current_power,
|
||||
# which can happen on some newer devices (e.g. P304M).
|
||||
# This may not be valid scenario as it pre-dates trying get_emeter_data
|
||||
elif (
|
||||
power := self.data.get("get_current_power", {}).get("current_power")
|
||||
) is not None:
|
||||
self._current_consumption = power
|
||||
else:
|
||||
self._current_consumption = None
|
||||
|
||||
def query(self) -> dict:
|
||||
"""Query to execute during the update cycle."""
|
||||
req = {
|
||||
@@ -33,28 +60,21 @@ class Energy(SmartModule, EnergyInterface):
|
||||
return req
|
||||
|
||||
@property
|
||||
@raise_if_update_error
|
||||
def current_consumption(self) -> float | None:
|
||||
"""Current power in watts."""
|
||||
if (power := self.energy.get("current_power")) is not None or (
|
||||
power := self.data.get("get_emeter_data", {}).get("power_mw")
|
||||
) is not None:
|
||||
return power / 1_000
|
||||
# Fallback if get_energy_usage does not provide current_power,
|
||||
# which can happen on some newer devices (e.g. P304M).
|
||||
elif (
|
||||
power := self.data.get("get_current_power", {}).get("current_power")
|
||||
) is not None:
|
||||
return power
|
||||
return None
|
||||
def optional_response_keys(self) -> list[str]:
|
||||
"""Return optional response keys for the module."""
|
||||
if self.supported_version > 1:
|
||||
return ["get_energy_usage"]
|
||||
return []
|
||||
|
||||
@property
|
||||
def current_consumption(self) -> float | None:
|
||||
"""Current power in watts."""
|
||||
return self._current_consumption
|
||||
|
||||
@property
|
||||
@raise_if_update_error
|
||||
def energy(self) -> dict:
|
||||
"""Return get_energy_usage results."""
|
||||
if en := self.data.get("get_energy_usage"):
|
||||
return en
|
||||
return self.data
|
||||
return self._energy
|
||||
|
||||
def _get_status_from_energy(self, energy: dict) -> EmeterStatus:
|
||||
return EmeterStatus(
|
||||
@@ -83,16 +103,18 @@ class Energy(SmartModule, EnergyInterface):
|
||||
return self._get_status_from_energy(res["get_energy_usage"])
|
||||
|
||||
@property
|
||||
@raise_if_update_error
|
||||
def consumption_this_month(self) -> float | None:
|
||||
"""Get the emeter value for this month in kWh."""
|
||||
return self.energy.get("month_energy", 0) / 1_000
|
||||
if (month := self.energy.get("month_energy")) is not None:
|
||||
return month / 1_000
|
||||
return None
|
||||
|
||||
@property
|
||||
@raise_if_update_error
|
||||
def consumption_today(self) -> float | None:
|
||||
"""Get the emeter value for today in kWh."""
|
||||
return self.energy.get("today_energy", 0) / 1_000
|
||||
if (today := self.energy.get("today_energy")) is not None:
|
||||
return today / 1_000
|
||||
return None
|
||||
|
||||
@property
|
||||
@raise_if_update_error
|
||||
|
@@ -72,6 +72,7 @@ class SmartModule(Module):
|
||||
self._last_update_time: float | None = None
|
||||
self._last_update_error: KasaException | None = None
|
||||
self._error_count = 0
|
||||
self._logged_remove_keys: list[str] = []
|
||||
|
||||
def __init_subclass__(cls, **kwargs) -> None:
|
||||
# We only want to register submodules in a modules package so that
|
||||
@@ -149,6 +150,15 @@ class SmartModule(Module):
|
||||
"""
|
||||
return await self._device._query_helper(method, params)
|
||||
|
||||
@property
|
||||
def optional_response_keys(self) -> list[str]:
|
||||
"""Return optional response keys for the module.
|
||||
|
||||
Defaults to no keys. Overriding this and providing keys will remove
|
||||
instead of raise on error.
|
||||
"""
|
||||
return []
|
||||
|
||||
@property
|
||||
def data(self) -> dict[str, Any]:
|
||||
"""Return response data for the module.
|
||||
@@ -181,12 +191,31 @@ class SmartModule(Module):
|
||||
|
||||
filtered_data = {k: v for k, v in dev._last_update.items() if k in q_keys}
|
||||
|
||||
remove_keys: list[str] = []
|
||||
for data_item in filtered_data:
|
||||
if isinstance(filtered_data[data_item], SmartErrorCode):
|
||||
raise DeviceError(
|
||||
f"{data_item} for {self.name}", error_code=filtered_data[data_item]
|
||||
if data_item in self.optional_response_keys:
|
||||
remove_keys.append(data_item)
|
||||
else:
|
||||
raise DeviceError(
|
||||
f"{data_item} for {self.name}",
|
||||
error_code=filtered_data[data_item],
|
||||
)
|
||||
|
||||
for key in remove_keys:
|
||||
if key not in self._logged_remove_keys:
|
||||
self._logged_remove_keys.append(key)
|
||||
_LOGGER.debug(
|
||||
"Removed key %s from response for device %s as it returned "
|
||||
"error: %s. This message will only be logged once per key.",
|
||||
key,
|
||||
self._device.host,
|
||||
filtered_data[key],
|
||||
)
|
||||
if len(filtered_data) == 1:
|
||||
|
||||
filtered_data.pop(key)
|
||||
|
||||
if len(filtered_data) == 1 and not remove_keys:
|
||||
return next(iter(filtered_data.values()))
|
||||
|
||||
return filtered_data
|
||||
|
@@ -1,6 +1,7 @@
|
||||
"""Modules for SMARTCAM devices."""
|
||||
|
||||
from .alarm import Alarm
|
||||
from .babycrydetection import BabyCryDetection
|
||||
from .camera import Camera
|
||||
from .childdevice import ChildDevice
|
||||
from .device import DeviceModule
|
||||
@@ -8,20 +9,25 @@ from .homekit import HomeKit
|
||||
from .led import Led
|
||||
from .lensmask import LensMask
|
||||
from .matter import Matter
|
||||
from .motion import Motion
|
||||
from .motiondetection import MotionDetection
|
||||
from .pantilt import PanTilt
|
||||
from .persondetection import PersonDetection
|
||||
from .tamperdetection import TamperDetection
|
||||
from .time import Time
|
||||
|
||||
__all__ = [
|
||||
"Alarm",
|
||||
"BabyCryDetection",
|
||||
"Camera",
|
||||
"ChildDevice",
|
||||
"DeviceModule",
|
||||
"Led",
|
||||
"PanTilt",
|
||||
"PersonDetection",
|
||||
"Time",
|
||||
"HomeKit",
|
||||
"Matter",
|
||||
"Motion",
|
||||
"MotionDetection",
|
||||
"LensMask",
|
||||
"TamperDetection",
|
||||
]
|
||||
|
47
kasa/smartcam/modules/babycrydetection.py
Normal file
47
kasa/smartcam/modules/babycrydetection.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""Implementation of baby cry detection module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from ...feature import Feature
|
||||
from ..smartcammodule import SmartCamModule
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BabyCryDetection(SmartCamModule):
|
||||
"""Implementation of baby cry detection module."""
|
||||
|
||||
REQUIRED_COMPONENT = "babyCryDetection"
|
||||
|
||||
QUERY_GETTER_NAME = "getBCDConfig"
|
||||
QUERY_MODULE_NAME = "sound_detection"
|
||||
QUERY_SECTION_NAMES = "bcd"
|
||||
|
||||
def _initialize_features(self) -> None:
|
||||
"""Initialize features after the initial update."""
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
id="baby_cry_detection",
|
||||
name="Baby cry detection",
|
||||
container=self,
|
||||
attribute_getter="enabled",
|
||||
attribute_setter="set_enabled",
|
||||
type=Feature.Type.Switch,
|
||||
category=Feature.Category.Primary,
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
"""Return the baby cry detection enabled state."""
|
||||
return self.data["bcd"]["enabled"] == "on"
|
||||
|
||||
async def set_enabled(self, enable: bool) -> dict:
|
||||
"""Set the baby cry detection enabled state."""
|
||||
params = {"enabled": "on" if enable else "off"}
|
||||
return await self._device._query_setter_helper(
|
||||
"setBCDConfig", self.QUERY_MODULE_NAME, "bcd", params
|
||||
)
|
@@ -14,6 +14,13 @@ class DeviceModule(SmartCamModule):
|
||||
QUERY_MODULE_NAME = "device_info"
|
||||
QUERY_SECTION_NAMES = ["basic_info", "info"]
|
||||
|
||||
def query(self) -> dict:
|
||||
"""Query to execute during the update cycle."""
|
||||
q = super().query()
|
||||
q["getConnectionType"] = {"network": {"get_connection_type": []}}
|
||||
|
||||
return q
|
||||
|
||||
def _initialize_features(self) -> None:
|
||||
"""Initialize features after the initial update."""
|
||||
self._add_feature(
|
||||
@@ -26,6 +33,32 @@ class DeviceModule(SmartCamModule):
|
||||
type=Feature.Type.Sensor,
|
||||
)
|
||||
)
|
||||
if self.rssi is not None:
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
container=self,
|
||||
id="rssi",
|
||||
name="RSSI",
|
||||
attribute_getter="rssi",
|
||||
icon="mdi:signal",
|
||||
unit_getter=lambda: "dBm",
|
||||
category=Feature.Category.Debug,
|
||||
type=Feature.Type.Sensor,
|
||||
)
|
||||
)
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
container=self,
|
||||
id="signal_level",
|
||||
name="Signal Level",
|
||||
attribute_getter="signal_level",
|
||||
icon="mdi:signal",
|
||||
category=Feature.Category.Info,
|
||||
type=Feature.Type.Sensor,
|
||||
)
|
||||
)
|
||||
|
||||
async def _post_update_hook(self) -> None:
|
||||
"""Overriden to prevent module disabling.
|
||||
@@ -37,4 +70,14 @@ class DeviceModule(SmartCamModule):
|
||||
@property
|
||||
def device_id(self) -> str:
|
||||
"""Return the device id."""
|
||||
return self.data["basic_info"]["dev_id"]
|
||||
return self.data[self.QUERY_GETTER_NAME]["basic_info"]["dev_id"]
|
||||
|
||||
@property
|
||||
def rssi(self) -> int | None:
|
||||
"""Return the device id."""
|
||||
return self.data["getConnectionType"].get("rssiValue")
|
||||
|
||||
@property
|
||||
def signal_level(self) -> int | None:
|
||||
"""Return the device id."""
|
||||
return self.data["getConnectionType"].get("rssi")
|
||||
|
@@ -10,7 +10,7 @@ from ..smartcammodule import SmartCamModule
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Motion(SmartCamModule):
|
||||
class MotionDetection(SmartCamModule):
|
||||
"""Implementation of motion detection module."""
|
||||
|
||||
REQUIRED_COMPONENT = "detection"
|
||||
@@ -24,8 +24,8 @@ class Motion(SmartCamModule):
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
id="motion_detection_enabled",
|
||||
name="Motion detection enabled",
|
||||
id="motion_detection",
|
||||
name="Motion detection",
|
||||
container=self,
|
||||
attribute_getter="enabled",
|
||||
attribute_setter="set_enabled",
|
||||
@@ -36,12 +36,12 @@ class Motion(SmartCamModule):
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
"""Return the lens mask state."""
|
||||
"""Return the motion detection enabled state."""
|
||||
return self.data["motion_det"]["enabled"] == "on"
|
||||
|
||||
async def set_enabled(self, state: bool) -> dict:
|
||||
"""Set the lens mask state."""
|
||||
params = {"enabled": "on" if state else "off"}
|
||||
async def set_enabled(self, enable: bool) -> dict:
|
||||
"""Set the motion detection enabled state."""
|
||||
params = {"enabled": "on" if enable else "off"}
|
||||
return await self._device._query_setter_helper(
|
||||
"setLensMaskConfig", self.QUERY_MODULE_NAME, "motion_det", params
|
||||
"setDetectionConfig", self.QUERY_MODULE_NAME, "motion_det", params
|
||||
)
|
47
kasa/smartcam/modules/persondetection.py
Normal file
47
kasa/smartcam/modules/persondetection.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""Implementation of person detection module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from ...feature import Feature
|
||||
from ..smartcammodule import SmartCamModule
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PersonDetection(SmartCamModule):
|
||||
"""Implementation of person detection module."""
|
||||
|
||||
REQUIRED_COMPONENT = "personDetection"
|
||||
|
||||
QUERY_GETTER_NAME = "getPersonDetectionConfig"
|
||||
QUERY_MODULE_NAME = "people_detection"
|
||||
QUERY_SECTION_NAMES = "detection"
|
||||
|
||||
def _initialize_features(self) -> None:
|
||||
"""Initialize features after the initial update."""
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
id="person_detection",
|
||||
name="Person detection",
|
||||
container=self,
|
||||
attribute_getter="enabled",
|
||||
attribute_setter="set_enabled",
|
||||
type=Feature.Type.Switch,
|
||||
category=Feature.Category.Primary,
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
"""Return the person detection enabled state."""
|
||||
return self.data["detection"]["enabled"] == "on"
|
||||
|
||||
async def set_enabled(self, enable: bool) -> dict:
|
||||
"""Set the person detection enabled state."""
|
||||
params = {"enabled": "on" if enable else "off"}
|
||||
return await self._device._query_setter_helper(
|
||||
"setPersonDetectionConfig", self.QUERY_MODULE_NAME, "detection", params
|
||||
)
|
47
kasa/smartcam/modules/tamperdetection.py
Normal file
47
kasa/smartcam/modules/tamperdetection.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""Implementation of tamper detection module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from ...feature import Feature
|
||||
from ..smartcammodule import SmartCamModule
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TamperDetection(SmartCamModule):
|
||||
"""Implementation of tamper detection module."""
|
||||
|
||||
REQUIRED_COMPONENT = "tamperDetection"
|
||||
|
||||
QUERY_GETTER_NAME = "getTamperDetectionConfig"
|
||||
QUERY_MODULE_NAME = "tamper_detection"
|
||||
QUERY_SECTION_NAMES = "tamper_det"
|
||||
|
||||
def _initialize_features(self) -> None:
|
||||
"""Initialize features after the initial update."""
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
id="tamper_detection",
|
||||
name="Tamper detection",
|
||||
container=self,
|
||||
attribute_getter="enabled",
|
||||
attribute_setter="set_enabled",
|
||||
type=Feature.Type.Switch,
|
||||
category=Feature.Category.Primary,
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
"""Return the tamper detection enabled state."""
|
||||
return self.data["tamper_det"]["enabled"] == "on"
|
||||
|
||||
async def set_enabled(self, enable: bool) -> dict:
|
||||
"""Set the tamper detection enabled state."""
|
||||
params = {"enabled": "on" if enable else "off"}
|
||||
return await self._device._query_setter_helper(
|
||||
"setTamperDetectionConfig", self.QUERY_MODULE_NAME, "tamper_det", params
|
||||
)
|
@@ -159,7 +159,7 @@ class SmartCamDevice(SmartDevice):
|
||||
if self._device_type is DeviceType.Camera:
|
||||
self._modules[Camera._module_name()] = Camera(self, Camera._module_name())
|
||||
|
||||
if Module.Motion in self._modules:
|
||||
if Module.MotionDetection in self._modules:
|
||||
self._try_add_listen_module()
|
||||
|
||||
async def _initialize_features(self) -> None:
|
||||
@@ -202,6 +202,7 @@ class SmartCamDevice(SmartDevice):
|
||||
initial_query = {
|
||||
"getDeviceInfo": {"device_info": {"name": ["basic_info", "info"]}},
|
||||
"getAppComponentList": {"app_component": {"name": "app_component_list"}},
|
||||
"getConnectionType": {"network": {"get_connection_type": {}}},
|
||||
}
|
||||
resp = await self.protocol.query(initial_query)
|
||||
self._last_update.update(resp)
|
||||
@@ -278,3 +279,8 @@ class SmartCamDevice(SmartDevice):
|
||||
"dev_name": self.alias,
|
||||
"oemId": self._info.get("oem_id"),
|
||||
}
|
||||
|
||||
@property
|
||||
def rssi(self) -> int | None:
|
||||
"""Return the device id."""
|
||||
return self.modules[SmartCamModule.SmartCamDeviceModule].rssi
|
||||
|
@@ -20,6 +20,22 @@ class SmartCamModule(SmartModule):
|
||||
"""Base class for SMARTCAM modules."""
|
||||
|
||||
SmartCamAlarm: Final[ModuleName[modules.Alarm]] = ModuleName("SmartCamAlarm")
|
||||
SmartCamMotionDetection: Final[ModuleName[modules.MotionDetection]] = ModuleName(
|
||||
"MotionDetection"
|
||||
)
|
||||
SmartCamPersonDetection: Final[ModuleName[modules.PersonDetection]] = ModuleName(
|
||||
"PersonDetection"
|
||||
)
|
||||
SmartCamTamperDetection: Final[ModuleName[modules.TamperDetection]] = ModuleName(
|
||||
"TamperDetection"
|
||||
)
|
||||
SmartCamBabyCryDetection: Final[ModuleName[modules.BabyCryDetection]] = ModuleName(
|
||||
"BabyCryDetection"
|
||||
)
|
||||
|
||||
SmartCamDeviceModule: Final[ModuleName[modules.DeviceModule]] = ModuleName(
|
||||
"devicemodule"
|
||||
)
|
||||
|
||||
#: Module name to be queried
|
||||
QUERY_MODULE_NAME: str
|
||||
|
@@ -8,6 +8,7 @@ import hashlib
|
||||
import logging
|
||||
import secrets
|
||||
import ssl
|
||||
from contextlib import suppress
|
||||
from enum import Enum, auto
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
@@ -160,6 +161,19 @@ class SslAesTransport(BaseTransport):
|
||||
error_code = SmartErrorCode.INTERNAL_UNKNOWN_ERROR
|
||||
return error_code
|
||||
|
||||
def _get_response_inner_error(self, resp_dict: Any) -> SmartErrorCode | None:
|
||||
error_code_raw = resp_dict.get("data", {}).get("code")
|
||||
if error_code_raw is None:
|
||||
return None
|
||||
try:
|
||||
error_code = SmartErrorCode.from_int(error_code_raw)
|
||||
except ValueError:
|
||||
_LOGGER.warning(
|
||||
"Device %s received unknown error code: %s", self._host, error_code_raw
|
||||
)
|
||||
error_code = SmartErrorCode.INTERNAL_UNKNOWN_ERROR
|
||||
return error_code
|
||||
|
||||
def _handle_response_error_code(self, resp_dict: Any, msg: str) -> None:
|
||||
error_code = self._get_response_error(resp_dict)
|
||||
if error_code is SmartErrorCode.SUCCESS:
|
||||
@@ -216,6 +230,31 @@ class SslAesTransport(BaseTransport):
|
||||
ssl=await self._get_ssl_context(),
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
assert self._encryption_session is not None
|
||||
|
||||
# Devices can respond with 500 if another session is created from
|
||||
# the same host. Decryption may not succeed after that
|
||||
if status_code == 500:
|
||||
msg = (
|
||||
f"Device {self._host} replied with status 500 after handshake, "
|
||||
f"response: "
|
||||
)
|
||||
decrypted = None
|
||||
if isinstance(resp_dict, dict) and (
|
||||
response := resp_dict.get("result", {}).get("response")
|
||||
):
|
||||
with suppress(Exception):
|
||||
decrypted = self._encryption_session.decrypt(response.encode())
|
||||
|
||||
if decrypted:
|
||||
msg += decrypted
|
||||
else:
|
||||
msg += str(resp_dict)
|
||||
|
||||
_LOGGER.debug(msg)
|
||||
raise _RetryableError(msg)
|
||||
|
||||
if status_code != 200:
|
||||
raise KasaException(
|
||||
f"{self._host} responded with an unexpected "
|
||||
@@ -228,7 +267,6 @@ class SslAesTransport(BaseTransport):
|
||||
|
||||
if TYPE_CHECKING:
|
||||
resp_dict = cast(dict[str, Any], resp_dict)
|
||||
assert self._encryption_session is not None
|
||||
|
||||
if "result" in resp_dict and "response" in resp_dict["result"]:
|
||||
raw_response: str = resp_dict["result"]["response"]
|
||||
@@ -383,13 +421,29 @@ class SslAesTransport(BaseTransport):
|
||||
error_code = default_error_code
|
||||
resp_dict = default_resp_dict
|
||||
|
||||
# If the default login worked it's ok not to provide credentials but if
|
||||
# it didn't raise auth error here.
|
||||
if not self._username:
|
||||
raise AuthenticationError(
|
||||
f"Credentials must be supplied to connect to {self._host}"
|
||||
)
|
||||
|
||||
# Device responds with INVALID_NONCE and a "nonce" to indicate ready
|
||||
# for secure login. Otherwise error.
|
||||
if error_code is not SmartErrorCode.INVALID_NONCE or (
|
||||
resp_dict and "nonce" not in resp_dict["result"].get("data", {})
|
||||
resp_dict and "nonce" not in resp_dict.get("result", {}).get("data", {})
|
||||
):
|
||||
if (
|
||||
resp_dict
|
||||
and self._get_response_inner_error(resp_dict)
|
||||
is SmartErrorCode.DEVICE_BLOCKED
|
||||
):
|
||||
sec_left = resp_dict.get("data", {}).get("sec_left")
|
||||
msg = "Device blocked" + (
|
||||
f" for {sec_left} seconds" if sec_left else ""
|
||||
)
|
||||
raise DeviceError(msg, error_code=SmartErrorCode.DEVICE_BLOCKED)
|
||||
|
||||
raise AuthenticationError(f"Error trying handshake1: {resp_dict}")
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
Reference in New Issue
Block a user