Merge remote-tracking branch 'upstream/master' into feat/light_module_feats

This commit is contained in:
Steven B
2024-12-23 09:46:11 +00:00
216 changed files with 11945 additions and 2156 deletions

View File

@@ -38,7 +38,7 @@ from kasa.feature import Feature
from kasa.interfaces.light import HSV, ColorTempRange, Light, LightState
from kasa.interfaces.thermostat import Thermostat, ThermostatState
from kasa.module import Module
from kasa.protocols import BaseProtocol, IotProtocol, SmartProtocol
from kasa.protocols import BaseProtocol, IotProtocol, SmartCamProtocol, SmartProtocol
from kasa.protocols.iotprotocol import _deprecated_TPLinkSmartHomeProtocol # noqa: F401
from kasa.smartcam.modules.camera import StreamResolution
from kasa.transports import BaseTransport
@@ -52,6 +52,7 @@ __all__ = [
"BaseTransport",
"IotProtocol",
"SmartProtocol",
"SmartCamProtocol",
"LightState",
"TurnOnBehaviors",
"TurnOnBehavior",

View File

@@ -2,12 +2,14 @@
from __future__ import annotations
import asyncio
import json
import re
import sys
from collections.abc import Callable
from contextlib import contextmanager
from functools import singledispatch, update_wrapper, wraps
from gettext import gettext
from typing import TYPE_CHECKING, Any, Final
import asyncclick as click
@@ -238,4 +240,19 @@ def CatchAllExceptions(cls):
except Exception as exc:
_handle_exception(self._debug, exc)
def __call__(self, *args, **kwargs):
"""Run the coroutine in the event loop and print any exceptions.
python click catches KeyboardInterrupt in main, raises Abort()
and does sys.exit. asyncclick doesn't properly handle a coroutine
receiving CancelledError on a KeyboardInterrupt, so we catch the
KeyboardInterrupt here once asyncio.run has re-raised it. This
avoids large stacktraces when a user presses Ctrl-C.
"""
try:
asyncio.run(self.main(*args, **kwargs))
except KeyboardInterrupt:
click.echo(gettext("\nAborted!"), file=sys.stderr)
sys.exit(1)
return _CommandCls

View File

@@ -41,8 +41,14 @@ async def state(ctx, dev: Device):
echo(f"Device state: {dev.is_on}")
echo(f"Time: {dev.time} (tz: {dev.timezone})")
echo(f"Hardware: {dev.hw_info['hw_ver']}")
echo(f"Software: {dev.hw_info['sw_ver']}")
echo(
f"Hardware: {dev.device_info.hardware_version}"
f"{' (' + dev.region + ')' if dev.region else ''}"
)
echo(
f"Firmware: {dev.device_info.firmware_version}"
f" {dev.device_info.firmware_build}"
)
echo(f"MAC (rssi): {dev.mac} ({dev.rssi})")
if verbose:
echo(f"Location: {dev.location}")

View File

@@ -123,14 +123,19 @@ async def list(ctx):
async def print_discovered(dev: Device):
cparams = dev.config.connection_type
infostr = (
f"{dev.host:<15} {cparams.device_family.value:<20} "
f"{cparams.encryption_type.value:<7}"
f"{dev.host:<15} {dev.model:<9} {cparams.device_family.value:<20} "
f"{cparams.encryption_type.value:<7} {cparams.https:<5} "
f"{cparams.login_version or '-':<3}"
)
async with sem:
try:
await dev.update()
except AuthenticationError:
echo(f"{infostr} - Authentication failed")
except TimeoutError:
echo(f"{infostr} - Timed out")
except Exception as ex:
echo(f"{infostr} - Error: {ex}")
else:
echo(f"{infostr} {dev.alias}")
@@ -138,7 +143,10 @@ async def list(ctx):
if host := unsupported_exception.host:
echo(f"{host:<15} UNSUPPORTED DEVICE")
echo(f"{'HOST':<15} {'DEVICE FAMILY':<20} {'ENCRYPT':<7} {'ALIAS'}")
echo(
f"{'HOST':<15} {'MODEL':<9} {'DEVICE FAMILY':<20} {'ENCRYPT':<7} "
f"{'HTTPS':<5} {'LV':<3} {'ALIAS'}"
)
return await _discover(
ctx,
print_discovered=print_discovered,

View File

@@ -25,7 +25,9 @@ def light(dev) -> None:
@pass_dev_or_child
async def brightness(dev: Device, brightness: int, transition: int):
"""Get or set brightness."""
if not (light := dev.modules.get(Module.Light)) or not light.is_dimmable:
if not (light := dev.modules.get(Module.Light)) or not light.has_feature(
"brightness"
):
error("This device does not support brightness.")
return
@@ -45,13 +47,15 @@ async def brightness(dev: Device, brightness: int, transition: int):
@pass_dev_or_child
async def temperature(dev: Device, temperature: int, transition: int):
"""Get or set color temperature."""
if not (light := dev.modules.get(Module.Light)) or not light.is_variable_color_temp:
if not (light := dev.modules.get(Module.Light)) or not (
color_temp_feat := light.get_feature("color_temp")
):
error("Device does not support color temperature")
return
if temperature is None:
echo(f"Color temperature: {light.color_temp}")
valid_temperature_range = light.valid_temperature_range
valid_temperature_range = color_temp_feat.range
if valid_temperature_range != (0, 0):
echo("(min: {}, max: {})".format(*valid_temperature_range))
else:
@@ -59,7 +63,7 @@ async def temperature(dev: Device, temperature: int, transition: int):
"Temperature range unknown, please open a github issue"
f" or a pull request for model '{dev.model}'"
)
return light.valid_temperature_range
return color_temp_feat.range
else:
echo(f"Setting color temperature to {temperature}")
return await light.set_color_temp(temperature, transition=transition)
@@ -99,7 +103,7 @@ async def effect(dev: Device, ctx, effect):
@pass_dev_or_child
async def hsv(dev: Device, ctx, h, s, v, transition):
"""Get or set color in HSV."""
if not (light := dev.modules.get(Module.Light)) or not light.is_color:
if not (light := dev.modules.get(Module.Light)) or not light.has_feature("hsv"):
error("Device does not support colors")
return

View File

@@ -29,7 +29,7 @@ All devices provide several informational properties:
>>> dev.alias
Bedroom Lamp Plug
>>> dev.model
HS110(EU)
HS110
>>> dev.rssi
-71
>>> dev.mac
@@ -151,7 +151,7 @@ _LOGGER = logging.getLogger(__name__)
@dataclass
class _DeviceInfo:
class DeviceInfo:
"""Device Model Information."""
short_name: str
@@ -208,7 +208,7 @@ class Device(ABC):
self.protocol: BaseProtocol = protocol or IotProtocol(
transport=XorTransport(config=config or DeviceConfig(host=host)),
)
self._last_update: Any = None
self._last_update: dict[str, Any] = {}
_LOGGER.debug("Initializing %s of type %s", host, type(self))
self._device_type = DeviceType.Unknown
# TODO: typing Any is just as using dict | None would require separate
@@ -334,9 +334,21 @@ class Device(ABC):
"""Returns the device model."""
@property
def region(self) -> str | None:
"""Returns the device region."""
return self.device_info.region
@property
def device_info(self) -> DeviceInfo:
"""Return device info."""
return self._get_device_info(self._last_update, self._discovery_info)
@staticmethod
@abstractmethod
def _model_region(self) -> str:
"""Return device full model name and region."""
def _get_device_info(
info: dict[str, Any], discovery_info: dict[str, Any] | None
) -> DeviceInfo:
"""Get device info."""
@property
@abstractmethod

View File

@@ -8,7 +8,7 @@ from typing import Any
from .device import Device
from .device_type import DeviceType
from .deviceconfig import DeviceConfig
from .deviceconfig import DeviceConfig, DeviceFamily
from .exceptions import KasaException, UnsupportedDeviceError
from .iot import (
IotBulb,
@@ -179,20 +179,29 @@ def get_device_class_from_family(
def get_protocol(
config: DeviceConfig,
) -> BaseProtocol | None:
"""Return the protocol from the connection name."""
protocol_name = config.connection_type.device_family.value.split(".")[0]
"""Return the protocol from the connection name.
For cameras and vacuums the device family is a simple mapping to
the protocol/transport. For other device types the transport varies
based on the discovery information.
"""
ctype = config.connection_type
protocol_name = ctype.device_family.value.split(".")[0]
if ctype.device_family is DeviceFamily.SmartIpCamera:
return SmartCamProtocol(transport=SslAesTransport(config=config))
if ctype.device_family is DeviceFamily.IotIpCamera:
return IotProtocol(transport=LinkieTransportV2(config=config))
if ctype.device_family is DeviceFamily.SmartTapoRobovac:
return SmartProtocol(transport=SslTransport(config=config))
protocol_transport_key = (
protocol_name
+ "."
+ ctype.encryption_type.value
+ (".HTTPS" if ctype.https else "")
+ (
f".{ctype.login_version}"
if ctype.login_version and ctype.login_version > 1
else ""
)
)
_LOGGER.debug("Finding transport for %s", protocol_transport_key)
@@ -201,12 +210,11 @@ def get_protocol(
] = {
"IOT.XOR": (IotProtocol, XorTransport),
"IOT.KLAP": (IotProtocol, KlapTransport),
"IOT.XOR.HTTPS.2": (IotProtocol, LinkieTransportV2),
"SMART.AES": (SmartProtocol, AesTransport),
"SMART.AES.2": (SmartProtocol, AesTransport),
"SMART.KLAP.2": (SmartProtocol, KlapTransportV2),
"SMART.AES.HTTPS.2": (SmartCamProtocol, SslAesTransport),
"SMART.AES.HTTPS": (SmartProtocol, SslTransport),
"SMART.KLAP": (SmartProtocol, KlapTransportV2),
# H200 is device family SMART.TAPOHUB and uses SmartCamProtocol so use
# https to distuingish from SmartProtocol devices
"SMART.AES.HTTPS": (SmartCamProtocol, SslAesTransport),
}
if not (prot_tran_cls := supported_device_protocols.get(protocol_transport_key)):
return None

View File

@@ -22,7 +22,7 @@ Discovery returns a dict of {ip: discovered devices}:
>>>
>>> found_devices = await Discover.discover()
>>> [dev.model for dev in found_devices.values()]
['KP303(UK)', 'HS110(EU)', 'L530E', 'KL430(US)', 'HS220(US)']
['KP303', 'HS110', 'L530E', 'KL430', 'HS220']
You can pass username and password for devices requiring authentication
@@ -65,17 +65,17 @@ It is also possible to pass a coroutine to be executed for each found device:
>>> print(f"Discovered {dev.alias} (model: {dev.model})")
>>>
>>> devices = await Discover.discover(on_discovered=print_dev_info, credentials=creds)
Discovered Bedroom Power Strip (model: KP303(UK))
Discovered Bedroom Lamp Plug (model: HS110(EU))
Discovered Bedroom Power Strip (model: KP303)
Discovered Bedroom Lamp Plug (model: HS110)
Discovered Living Room Bulb (model: L530)
Discovered Bedroom Lightstrip (model: KL430(US))
Discovered Living Room Dimmer Switch (model: HS220(US))
Discovered Bedroom Lightstrip (model: KL430)
Discovered Living Room Dimmer Switch (model: HS220)
Discovering a single device returns a kasa.Device object.
>>> device = await Discover.discover_single("127.0.0.1", credentials=creds)
>>> device.model
'KP303(UK)'
'KP303'
"""
@@ -168,6 +168,12 @@ OnUnsupportedCallable = Callable[[UnsupportedDeviceError], Coroutine]
OnConnectAttemptCallable = Callable[[ConnectAttempt, bool], None]
DeviceDict = dict[str, Device]
DECRYPTED_REDACTORS: dict[str, Callable[[Any], Any] | None] = {
"connect_ssid": lambda x: "#MASKED_SSID#" if x else "",
"device_id": lambda x: "REDACTED_" + x[9::],
"owner": lambda x: "REDACTED_" + x[9::],
}
NEW_DISCOVERY_REDACTORS: dict[str, Callable[[Any], Any] | None] = {
"device_id": lambda x: "REDACTED_" + x[9::],
"device_name": lambda x: "#MASKED_NAME#" if x else "",
@@ -177,6 +183,8 @@ NEW_DISCOVERY_REDACTORS: dict[str, Callable[[Any], Any] | None] = {
"group_id": lambda x: "REDACTED_" + x[9::],
"group_name": lambda x: "I01BU0tFRF9TU0lEIw==",
"encrypt_info": lambda x: {**x, "key": "", "data": ""},
"ip": lambda x: x, # don't redact but keep listed here for dump_devinfo
"decrypted_data": lambda x: redact_data(x, DECRYPTED_REDACTORS),
}
@@ -490,7 +498,7 @@ class Discover:
try:
_LOGGER.debug("Waiting %s seconds for responses...", discovery_timeout)
await protocol.wait_for_discovery_to_complete()
except KasaException as ex:
except (KasaException, asyncio.CancelledError) as ex:
for device in protocol.discovered_devices.values():
await device.protocol.close()
raise ex
@@ -742,6 +750,7 @@ class Discover:
@staticmethod
def _decrypt_discovery_data(discovery_result: DiscoveryResult) -> None:
debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
if TYPE_CHECKING:
assert discovery_result.encrypt_info
assert _AesDiscoveryQuery.keypair
@@ -757,7 +766,19 @@ class Discover:
session = AesEncyptionSession(key, iv)
decrypted_data = session.decrypt(encrypted_data)
discovery_result.decrypted_data = json_loads(decrypted_data)
result = json_loads(decrypted_data)
if debug_enabled:
data = (
redact_data(result, DECRYPTED_REDACTORS)
if Discover._redact_data
else result
)
_LOGGER.debug(
"Decrypted encrypt_info for %s: %s",
discovery_result.ip,
pf(data),
)
discovery_result.decrypted_data = result
@staticmethod
def _get_discovery_json(data: bytes, ip: str) -> dict:
@@ -826,12 +847,12 @@ class Discover:
):
encrypt_type = encrypt_info.sym_schm
if (
not (login_version := encrypt_schm.lv)
and (et := discovery_result.encrypt_type)
and et == ["3"]
if not (login_version := encrypt_schm.lv) and (
et := discovery_result.encrypt_type
):
login_version = 2
# Known encrypt types are ["1","2"] and ["3"]
# Reuse the login_version attribute to pass the max to transport
login_version = max([int(i) for i in et])
if not encrypt_type:
raise UnsupportedDeviceError(

View File

@@ -113,10 +113,23 @@ class HttpClient:
ssl=ssl,
)
async with resp:
if resp.status == 200:
response_data = await resp.read()
if return_json:
response_data = await resp.read()
if resp.status == 200:
if return_json:
response_data = json_loads(response_data.decode())
else:
_LOGGER.debug(
"Device %s received status code %s with response %s",
self._config.host,
resp.status,
str(response_data),
)
if response_data and return_json:
try:
response_data = json_loads(response_data.decode())
except Exception:
_LOGGER.debug("Device %s response could not be parsed as json")
except (aiohttp.ServerDisconnectedError, aiohttp.ClientOSError) as ex:
if not self._wait_between_requests:

View File

@@ -23,7 +23,7 @@ Get the light module to interact:
>>> light = dev.modules[Module.Light]
You can use the ``is_``-prefixed properties to check for supported features:
You can use the ``has_feature()`` method to check for supported features:
>>> light.has_feature("brightness")
True

View File

@@ -22,7 +22,7 @@ from datetime import datetime, timedelta, tzinfo
from typing import TYPE_CHECKING, Any, cast
from warnings import warn
from ..device import Device, WifiNetwork, _DeviceInfo
from ..device import Device, DeviceInfo, WifiNetwork
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..exceptions import KasaException
@@ -43,7 +43,7 @@ def requires_update(f: Callable) -> Any:
@functools.wraps(f)
async def wrapped(*args: Any, **kwargs: Any) -> Any:
self = args[0]
if self._last_update is None and (
if not self._last_update and (
self._sys_info is None or f.__name__ not in self._sys_info
):
raise KasaException("You need to await update() to access the data")
@@ -54,7 +54,7 @@ def requires_update(f: Callable) -> Any:
@functools.wraps(f)
def wrapped(*args: Any, **kwargs: Any) -> Any:
self = args[0]
if self._last_update is None and (
if not self._last_update and (
self._sys_info is None or f.__name__ not in self._sys_info
):
raise KasaException("You need to await update() to access the data")
@@ -112,7 +112,7 @@ class IotDevice(Device):
>>> dev.alias
Bedroom Lamp Plug
>>> dev.model
HS110(EU)
HS110
>>> dev.rssi
-71
>>> dev.mac
@@ -310,7 +310,7 @@ class IotDevice(Device):
# If this is the initial update, check only for the sysinfo
# This is necessary as some devices crash on unexpected modules
# See #105, #120, #161
if self._last_update is None:
if not self._last_update:
_LOGGER.debug("Performing the initial update to obtain sysinfo")
response = await self.protocol.query(req)
self._last_update = response
@@ -452,7 +452,9 @@ class IotDevice(Device):
# This allows setting of some info properties directly
# from partial discovery info that will then be found
# by the requires_update decorator
self._set_sys_info(info)
discovery_model = info["device_model"]
no_region_model, _, _ = discovery_model.partition("(")
self._set_sys_info({**info, "model": no_region_model})
def _set_sys_info(self, sys_info: dict[str, Any]) -> None:
"""Set sys_info."""
@@ -471,18 +473,13 @@ class IotDevice(Device):
"""
return self._sys_info # type: ignore
@property # type: ignore
@requires_update
def model(self) -> str:
"""Return device model."""
sys_info = self._sys_info
return str(sys_info["model"])
@property
@requires_update
def _model_region(self) -> str:
"""Return device full model name and region."""
return self.model
def model(self) -> str:
"""Returns the device model."""
if self._last_update:
return self.device_info.short_name
return self._sys_info["model"]
@property # type: ignore
def alias(self) -> str | None:
@@ -748,7 +745,7 @@ class IotDevice(Device):
@staticmethod
def _get_device_info(
info: dict[str, Any], discovery_info: dict[str, Any] | None
) -> _DeviceInfo:
) -> DeviceInfo:
"""Get model information for a device."""
sys_info = _extract_sys_info(info)
@@ -766,7 +763,7 @@ class IotDevice(Device):
firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1)
auth = bool(discovery_info and ("mgt_encrypt_schm" in discovery_info))
return _DeviceInfo(
return DeviceInfo(
short_name=long_name,
long_name=long_name,
brand="kasa",

View File

@@ -207,17 +207,18 @@ class Light(IotModule, LightInterface):
return self._light_state
async def _post_update_hook(self) -> None:
if self._device.is_on is False:
device = self._device
if device.is_on is False:
state = LightState(light_on=False)
else:
state = LightState(light_on=True)
if self._device._is_dimmable:
if device._is_dimmable:
state.brightness = self.brightness
if self._device._is_color:
if device._is_color:
hsv = self.hsv
state.hue = hsv.hue
state.saturation = hsv.saturation
if self._device._is_variable_color_temp:
if device._is_variable_color_temp:
state.color_temp = self.color_temp
self._light_state = state

View File

@@ -85,17 +85,19 @@ class LightPreset(IotModule, LightPresetInterface):
def preset(self) -> str:
"""Return current preset name."""
light = self._device.modules[Module.Light]
is_color = light.has_feature("hsv")
is_variable_color_temp = light.has_feature("color_temp")
brightness = light.brightness
color_temp = light.color_temp if light.is_variable_color_temp else None
h, s = (light.hsv.hue, light.hsv.saturation) if light.is_color else (None, None)
color_temp = light.color_temp if is_variable_color_temp else None
h, s = (light.hsv.hue, light.hsv.saturation) if is_color else (None, None)
for preset_name, preset in self._presets.items():
if (
preset.brightness == brightness
and (
preset.color_temp == color_temp or not light.is_variable_color_temp
)
and (preset.hue == h or not light.is_color)
and (preset.saturation == s or not light.is_color)
and (preset.color_temp == color_temp or not is_variable_color_temp)
and (preset.hue == h or not is_color)
and (preset.saturation == s or not is_color)
):
return preset_name
return self.PRESET_NOT_SET
@@ -107,7 +109,7 @@ class LightPreset(IotModule, LightPresetInterface):
"""Set a light preset for the device."""
light = self._device.modules[Module.Light]
if preset_name == self.PRESET_NOT_SET:
if light.is_color:
if light.has_feature("hsv"):
preset = LightState(hue=0, saturation=0, brightness=100)
else:
preset = LightState(brightness=100)

View File

@@ -21,6 +21,9 @@ check for the existence of the module:
>>> print(light.brightness)
100
.. include:: ../featureattributes.md
:parser: myst_parser.sphinx_
To see whether a device supports specific functionality, you can check whether the
module has that feature:
@@ -151,8 +154,12 @@ class Module(ABC):
)
TriggerLogs: Final[ModuleName[smart.TriggerLogs]] = ModuleName("TriggerLogs")
HomeKit: Final[ModuleName[smart.HomeKit]] = ModuleName("HomeKit")
Matter: Final[ModuleName[smart.Matter]] = ModuleName("Matter")
# SMARTCAM only modules
Camera: Final[ModuleName[smartcam.Camera]] = ModuleName("Camera")
LensMask: Final[ModuleName[smartcam.LensMask]] = ModuleName("LensMask")
def __init__(self, device: Device, module: str) -> None:
self._device = device

View File

@@ -2,6 +2,7 @@
from .iotprotocol import IotProtocol
from .protocol import BaseProtocol
from .smartcamprotocol import SmartCamProtocol
from .smartprotocol import SmartErrorCode, SmartProtocol
__all__ = [
@@ -9,4 +10,5 @@ __all__ = [
"IotProtocol",
"SmartErrorCode",
"SmartProtocol",
"SmartCamProtocol",
]

View File

@@ -25,19 +25,35 @@ if TYPE_CHECKING:
_LOGGER = logging.getLogger(__name__)
def _mask_children(children: list[dict[str, Any]]) -> list[dict[str, Any]]:
def mask_child(child: dict[str, Any], index: int) -> dict[str, Any]:
result = {
**child,
"id": f"SCRUBBED_CHILD_DEVICE_ID_{index+1}",
}
# Will leave empty aliases as blank
if child.get("alias"):
result["alias"] = f"#MASKED_NAME# {index + 1}"
return result
return [mask_child(child, index) for index, child in enumerate(children)]
REDACTORS: dict[str, Callable[[Any], Any] | None] = {
"latitude": lambda x: 0,
"longitude": lambda x: 0,
"latitude_i": lambda x: 0,
"longitude_i": lambda x: 0,
"deviceId": lambda x: "REDACTED_" + x[9::],
"id": lambda x: "REDACTED_" + x[9::],
"children": _mask_children,
"alias": lambda x: "#MASKED_NAME#" if x else "",
"mac": mask_mac,
"mic_mac": mask_mac,
"ssid": lambda x: "#MASKED_SSID#" if x else "",
"oemId": lambda x: "REDACTED_" + x[9::],
"username": lambda _: "user@example.com", # cnCloud
"hwId": lambda x: "REDACTED_" + x[9::],
}

View File

@@ -66,6 +66,8 @@ def redact_data(data: _T, redactors: dict[str, Callable[[Any], Any] | None]) ->
def mask_mac(mac: str) -> str:
"""Return mac address with last two octects blanked."""
if len(mac) == 12:
return f"{mac[:6]}000000"
delim = ":" if ":" in mac else "-"
rest = delim.join(format(s, "02x") for s in bytes.fromhex("000000"))
return f"{mac[:8]}{delim}{rest}"

View File

@@ -19,7 +19,7 @@ from ..transports.sslaestransport import (
SMART_RETRYABLE_ERRORS,
SmartErrorCode,
)
from . import SmartProtocol
from .smartprotocol import SmartProtocol
_LOGGER = logging.getLogger(__name__)

View File

@@ -9,6 +9,7 @@ from __future__ import annotations
import asyncio
import base64
import logging
import re
import time
import uuid
from collections.abc import Callable
@@ -45,15 +46,36 @@ REDACTORS: dict[str, Callable[[Any], Any] | None] = {
"original_device_id": lambda x: "REDACTED_" + x[9::], # Strip children
"nickname": lambda x: "I01BU0tFRF9OQU1FIw==" if x else "",
"mac": mask_mac,
"ssid": lambda x: "I01BU0tFRF9TU0lEIw=" if x else "",
"ssid": lambda x: "I01BU0tFRF9TU0lEIw==" if x else "",
"bssid": lambda _: "000000000000",
"channel": lambda _: 0,
"oem_id": lambda x: "REDACTED_" + x[9::],
"setup_code": None, # matter
"setup_payload": None, # matter
"mfi_setup_code": None, # mfi_ for homekit
"mfi_setup_id": None,
"mfi_token_token": None,
"mfi_token_uuid": None,
"hw_id": lambda x: "REDACTED_" + x[9::],
"fw_id": lambda x: "REDACTED_" + x[9::],
"setup_code": lambda x: re.sub(r"\w", "0", x), # matter
"setup_payload": lambda x: re.sub(r"\w", "0", x), # matter
"mfi_setup_code": lambda x: re.sub(r"\w", "0", x), # mfi_ for homekit
"mfi_setup_id": lambda x: re.sub(r"\w", "0", x),
"mfi_token_token": lambda x: re.sub(r"\w", "0", x),
"mfi_token_uuid": lambda x: re.sub(r"\w", "0", x),
"ip": lambda x: x, # don't redact but keep listed here for dump_devinfo
# smartcam
"dev_id": lambda x: "REDACTED_" + x[9::],
"device_name": lambda x: "#MASKED_NAME#" if x else "",
"device_alias": lambda x: "#MASKED_NAME#" if x else "",
"local_ip": lambda x: x, # don't redact but keep listed here for dump_devinfo
# robovac
"board_sn": lambda _: "000000000000",
"custom_sn": lambda _: "000000000000",
"location": lambda x: "#MASKED_NAME#" if x else "",
"map_data": lambda x: "#SCRUBBED_MAPDATA#" if x else "",
}
# Queries that are known not to work properly when sent as a
# multiRequest. They will not return the `method` key.
FORCE_SINGLE_REQUEST = {
"getConnectStatus",
"scanApList",
}
@@ -76,6 +98,7 @@ class SmartProtocol(BaseProtocol):
self._transport._config.batch_size or self.DEFAULT_MULTI_REQUEST_BATCH_SIZE
)
self._redact_data = True
self._method_missing_logged = False
def get_smart_request(self, method: str, params: dict | None = None) -> str:
"""Get a request message as a string."""
@@ -162,17 +185,18 @@ class SmartProtocol(BaseProtocol):
multi_result: dict[str, Any] = {}
smart_method = "multipleRequest"
multi_requests = [
{"method": method, "params": params} if params else {"method": method}
for method, params in requests.items()
]
end = len(multi_requests)
end = len(requests)
# The SmartCamProtocol sends requests with a length 1 as a
# multipleRequest. The SmartProtocol doesn't so will never
# raise_on_error
raise_on_error = end == 1
multi_requests = [
{"method": method, "params": params} if params else {"method": method}
for method, params in requests.items()
if method not in FORCE_SINGLE_REQUEST
]
# Break the requests down as there can be a size limit
step = self._multi_request_batch_size
if step == 1:
@@ -233,7 +257,20 @@ class SmartProtocol(BaseProtocol):
responses = response_step["result"]["responses"]
for response in responses:
method = response["method"]
# some smartcam devices calls do not populate the method key
# these should be defined in DO_NOT_SEND_AS_MULTI_REQUEST.
if not (method := response.get("method")):
if not self._method_missing_logged:
# Avoid spamming the logs
self._method_missing_logged = True
_LOGGER.error(
"No method key in response for %s, skipping: %s",
self._host,
response_step,
)
# These will end up being queried individually
continue
self._handle_response_error_code(
response, method, raise_on_error=raise_on_error
)
@@ -242,13 +279,17 @@ class SmartProtocol(BaseProtocol):
result, method, retry_count=retry_count
)
multi_result[method] = result
# Multi requests don't continue after errors so requery any missing
# Multi requests don't continue after errors so requery any missing.
# Will also query individually any DO_NOT_SEND_AS_MULTI_REQUEST.
for method, params in requests.items():
if method not in multi_result:
resp = await self._transport.send(
self.get_smart_request(method, params)
)
self._handle_response_error_code(resp, method, raise_on_error=False)
self._handle_response_error_code(
resp, method, raise_on_error=raise_on_error
)
multi_result[method] = resp.get("result")
return multi_result

View File

@@ -16,6 +16,7 @@ from .energy import Energy
from .fan import Fan
from .firmware import Firmware
from .frostprotection import FrostProtection
from .homekit import HomeKit
from .humiditysensor import HumiditySensor
from .led import Led
from .light import Light
@@ -23,6 +24,7 @@ from .lighteffect import LightEffect
from .lightpreset import LightPreset
from .lightstripeffect import LightStripEffect
from .lighttransition import LightTransition
from .matter import Matter
from .motionsensor import MotionSensor
from .overheatprotection import OverheatProtection
from .reportmode import ReportMode
@@ -66,4 +68,6 @@ __all__ = [
"Thermostat",
"SmartLightEffect",
"OverheatProtection",
"HomeKit",
"Matter",
]

View File

@@ -2,10 +2,10 @@
from __future__ import annotations
from typing import NoReturn
from typing import Any, NoReturn
from ...emeterstatus import EmeterStatus
from ...exceptions import KasaException
from ...exceptions import DeviceError, KasaException
from ...interfaces.energy import Energy as EnergyInterface
from ..smartmodule import SmartModule, raise_if_update_error
@@ -15,12 +15,39 @@ class Energy(SmartModule, EnergyInterface):
REQUIRED_COMPONENT = "energy_monitoring"
_energy: dict[str, Any]
_current_consumption: float | None
async def _post_update_hook(self) -> None:
if "voltage_mv" in self.data.get("get_emeter_data", {}):
try:
data = self.data
except DeviceError as de:
self._energy = {}
self._current_consumption = None
raise de
# If version is 1 then data is get_energy_usage
self._energy = data.get("get_energy_usage", data)
if "voltage_mv" in data.get("get_emeter_data", {}):
self._supported = (
self._supported | EnergyInterface.ModuleFeature.VOLTAGE_CURRENT
)
if (power := self._energy.get("current_power")) is not None or (
power := data.get("get_emeter_data", {}).get("power_mw")
) is not None:
self._current_consumption = power / 1_000
# Fallback if get_energy_usage does not provide current_power,
# which can happen on some newer devices (e.g. P304M).
# This may not be valid scenario as it pre-dates trying get_emeter_data
elif (
power := self.data.get("get_current_power", {}).get("current_power")
) is not None:
self._current_consumption = power
else:
self._current_consumption = None
def query(self) -> dict:
"""Query to execute during the update cycle."""
req = {
@@ -33,28 +60,21 @@ class Energy(SmartModule, EnergyInterface):
return req
@property
@raise_if_update_error
def current_consumption(self) -> float | None:
"""Current power in watts."""
if (power := self.energy.get("current_power")) is not None or (
power := self.data.get("get_emeter_data", {}).get("power_mw")
) is not None:
return power / 1_000
# Fallback if get_energy_usage does not provide current_power,
# which can happen on some newer devices (e.g. P304M).
elif (
power := self.data.get("get_current_power", {}).get("current_power")
) is not None:
return power
return None
def optional_response_keys(self) -> list[str]:
"""Return optional response keys for the module."""
if self.supported_version > 1:
return ["get_energy_usage"]
return []
@property
def current_consumption(self) -> float | None:
"""Current power in watts."""
return self._current_consumption
@property
@raise_if_update_error
def energy(self) -> dict:
"""Return get_energy_usage results."""
if en := self.data.get("get_energy_usage"):
return en
return self.data
return self._energy
def _get_status_from_energy(self, energy: dict) -> EmeterStatus:
return EmeterStatus(
@@ -83,16 +103,18 @@ class Energy(SmartModule, EnergyInterface):
return self._get_status_from_energy(res["get_energy_usage"])
@property
@raise_if_update_error
def consumption_this_month(self) -> float | None:
"""Get the emeter value for this month in kWh."""
return self.energy.get("month_energy", 0) / 1_000
if (month := self.energy.get("month_energy")) is not None:
return month / 1_000
return None
@property
@raise_if_update_error
def consumption_today(self) -> float | None:
"""Get the emeter value for today in kWh."""
return self.energy.get("today_energy", 0) / 1_000
if (today := self.energy.get("today_energy")) is not None:
return today / 1_000
return None
@property
@raise_if_update_error

View File

@@ -0,0 +1,32 @@
"""Implementation of homekit module."""
from __future__ import annotations
from ...feature import Feature
from ..smartmodule import SmartModule
class HomeKit(SmartModule):
"""Implementation of homekit module."""
QUERY_GETTER_NAME: str = "get_homekit_info"
REQUIRED_COMPONENT = "homekit"
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
id="homekit_setup_code",
name="Homekit setup code",
container=self,
attribute_getter=lambda x: x.info["mfi_setup_code"],
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
@property
def info(self) -> dict[str, str]:
"""Homekit mfi setup info."""
return self.data

View File

@@ -136,16 +136,17 @@ class Light(SmartModule, LightInterface):
return self._light_state
async def _post_update_hook(self) -> None:
if self._device.is_on is False:
device = self._device
if device.is_on is False:
state = LightState(light_on=False)
else:
state = LightState(light_on=True)
if Module.Brightness in self._device.modules:
if Module.Brightness in device.modules:
state.brightness = self.brightness
if Module.Color in self._device.modules:
if Module.Color in device.modules:
hsv = self.hsv
state.hue = hsv.hue
state.saturation = hsv.saturation
if Module.ColorTemperature in self._device.modules:
if Module.ColorTemperature in device.modules:
state.color_temp = self.color_temp
self._light_state = state

View File

@@ -0,0 +1,43 @@
"""Implementation of matter module."""
from __future__ import annotations
from ...feature import Feature
from ..smartmodule import SmartModule
class Matter(SmartModule):
"""Implementation of matter module."""
QUERY_GETTER_NAME: str = "get_matter_setup_info"
REQUIRED_COMPONENT = "matter"
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
id="matter_setup_code",
name="Matter setup code",
container=self,
attribute_getter=lambda x: x.info["setup_code"],
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
self._add_feature(
Feature(
self._device,
id="matter_setup_payload",
name="Matter setup payload",
container=self,
attribute_getter=lambda x: x.info["setup_payload"],
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
@property
def info(self) -> dict[str, str]:
"""Matter setup info."""
return self.data

View File

@@ -6,10 +6,11 @@ import logging
import time
from typing import Any
from ..device import DeviceInfo
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..protocols.smartprotocol import SmartProtocol, _ChildProtocolWrapper
from .smartdevice import SmartDevice
from .smartdevice import ComponentsRaw, SmartDevice
from .smartmodule import SmartModule
_LOGGER = logging.getLogger(__name__)
@@ -37,7 +38,7 @@ class SmartChildDevice(SmartDevice):
self,
parent: SmartDevice,
info: dict,
component_info: dict,
component_info_raw: ComponentsRaw,
*,
config: DeviceConfig | None = None,
protocol: SmartProtocol | None = None,
@@ -47,7 +48,24 @@ class SmartChildDevice(SmartDevice):
super().__init__(parent.host, config=parent.config, protocol=_protocol)
self._parent = parent
self._update_internal_state(info)
self._components = component_info
self._components_raw = component_info_raw
self._components = self._parse_components(self._components_raw)
@property
def device_info(self) -> DeviceInfo:
"""Return device info.
Child device does not have it info and components in _last_update so
this overrides the base implementation to call _get_device_info with
info and components combined as they would be in _last_update.
"""
return self._get_device_info(
{
"get_device_info": self._info,
"component_nego": self._components_raw,
},
None,
)
async def update(self, update_children: bool = True) -> None:
"""Update child module info.
@@ -84,7 +102,7 @@ class SmartChildDevice(SmartDevice):
cls,
parent: SmartDevice,
child_info: dict,
child_components: dict,
child_components_raw: ComponentsRaw,
protocol: SmartProtocol | None = None,
*,
last_update: dict | None = None,
@@ -97,7 +115,7 @@ class SmartChildDevice(SmartDevice):
derived from the parent.
"""
child: SmartChildDevice = cls(
parent, child_info, child_components, protocol=protocol
parent, child_info, child_components_raw, protocol=protocol
)
if last_update:
child._last_update = last_update

View File

@@ -7,9 +7,9 @@ import logging
import time
from collections.abc import Mapping, Sequence
from datetime import UTC, datetime, timedelta, tzinfo
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any, TypeAlias, cast
from ..device import Device, WifiNetwork, _DeviceInfo
from ..device import Device, DeviceInfo, WifiNetwork
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..exceptions import AuthenticationError, DeviceError, KasaException, SmartErrorCode
@@ -40,6 +40,8 @@ _LOGGER = logging.getLogger(__name__)
# same issue, homekit perhaps?
NON_HUB_PARENT_ONLY_MODULES = [DeviceModule, Time, Firmware, Cloud]
ComponentsRaw: TypeAlias = dict[str, list[dict[str, int | str]]]
# Device must go last as the other interfaces also inherit Device
# and python needs a consistent method resolution order.
@@ -61,13 +63,12 @@ class SmartDevice(Device):
)
super().__init__(host=host, config=config, protocol=_protocol)
self.protocol: SmartProtocol
self._components_raw: dict[str, Any] | None = None
self._components_raw: ComponentsRaw | None = None
self._components: dict[str, int] = {}
self._state_information: dict[str, Any] = {}
self._modules: dict[str | ModuleName[Module], SmartModule] = {}
self._parent: SmartDevice | None = None
self._children: Mapping[str, SmartDevice] = {}
self._last_update = {}
self._last_update_time: float | None = None
self._on_since: datetime | None = None
self._info: dict[str, Any] = {}
@@ -82,10 +83,8 @@ class SmartDevice(Device):
self.internal_state.update(resp)
children = self.internal_state["get_child_device_list"]["child_device_list"]
children_components = {
child["device_id"]: {
comp["id"]: int(comp["ver_code"]) for comp in child["component_list"]
}
children_components_raw = {
child["device_id"]: child
for child in self.internal_state["get_child_device_component_list"][
"child_component_list"
]
@@ -96,7 +95,7 @@ class SmartDevice(Device):
child_info["device_id"]: await SmartChildDevice.create(
parent=self,
child_info=child_info,
child_components=children_components[child_info["device_id"]],
child_components_raw=children_components_raw[child_info["device_id"]],
)
for child_info in children
}
@@ -131,6 +130,13 @@ class SmartDevice(Device):
f"{request} not found in {responses} for device {self.host}"
)
@staticmethod
def _parse_components(components_raw: ComponentsRaw) -> dict[str, int]:
return {
str(comp["id"]): int(comp["ver_code"])
for comp in components_raw["component_list"]
}
async def _negotiate(self) -> None:
"""Perform initialization.
@@ -151,12 +157,9 @@ class SmartDevice(Device):
self._info = self._try_get_response(resp, "get_device_info")
# Create our internal presentation of available components
self._components_raw = cast(dict, resp["component_nego"])
self._components_raw = cast(ComponentsRaw, resp["component_nego"])
self._components = {
comp["id"]: int(comp["ver_code"])
for comp in self._components_raw["component_list"]
}
self._components = self._parse_components(self._components_raw)
if "child_device" in self._components and not self.children:
await self._initialize_children()
@@ -493,18 +496,13 @@ class SmartDevice(Device):
@property
def model(self) -> str:
"""Returns the device model."""
return str(self._info.get("model"))
# If update hasn't been called self._device_info can't be used
if self._last_update:
return self.device_info.short_name
@property
def _model_region(self) -> str:
"""Return device full model name and region."""
if (disco := self._discovery_info) and (
disco_model := disco.get("device_model")
):
return disco_model
# Some devices have the region in the specs element.
region = f"({specs})" if (specs := self._info.get("specs")) else ""
return f"{self.model}{region}"
disco_model = str(self._info.get("device_model"))
long_name, _, _ = disco_model.partition("(")
return long_name
@property
def alias(self) -> str | None:
@@ -804,7 +802,7 @@ class SmartDevice(Device):
@staticmethod
def _get_device_info(
info: dict[str, Any], discovery_info: dict[str, Any] | None
) -> _DeviceInfo:
) -> DeviceInfo:
"""Get model information for a device."""
di = info["get_device_info"]
components = [comp["id"] for comp in info["component_nego"]["component_list"]]
@@ -833,7 +831,7 @@ class SmartDevice(Device):
# Brand inferred from SMART.KASAPLUG/SMART.TAPOPLUG etc.
brand = devicetype[:4].lower()
return _DeviceInfo(
return DeviceInfo(
short_name=short_name,
long_name=long_name,
brand=brand,

View File

@@ -57,7 +57,7 @@ class SmartModule(Module):
#: Module is initialized, if any of the given keys exists in the sysinfo
SYSINFO_LOOKUP_KEYS: list[str] = []
#: Query to execute during the main update cycle
QUERY_GETTER_NAME: str
QUERY_GETTER_NAME: str = ""
REGISTERED_MODULES: dict[str, type[SmartModule]] = {}
@@ -72,6 +72,7 @@ class SmartModule(Module):
self._last_update_time: float | None = None
self._last_update_error: KasaException | None = None
self._error_count = 0
self._logged_remove_keys: list[str] = []
def __init_subclass__(cls, **kwargs) -> None:
# We only want to register submodules in a modules package so that
@@ -138,7 +139,9 @@ class SmartModule(Module):
Default implementation uses the raw query getter w/o parameters.
"""
return {self.QUERY_GETTER_NAME: None}
if self.QUERY_GETTER_NAME:
return {self.QUERY_GETTER_NAME: None}
return {}
async def call(self, method: str, params: dict | None = None) -> dict:
"""Call a method.
@@ -147,6 +150,15 @@ class SmartModule(Module):
"""
return await self._device._query_helper(method, params)
@property
def optional_response_keys(self) -> list[str]:
"""Return optional response keys for the module.
Defaults to no keys. Overriding this and providing keys will remove
instead of raise on error.
"""
return []
@property
def data(self) -> dict[str, Any]:
"""Return response data for the module.
@@ -179,12 +191,31 @@ class SmartModule(Module):
filtered_data = {k: v for k, v in dev._last_update.items() if k in q_keys}
remove_keys: list[str] = []
for data_item in filtered_data:
if isinstance(filtered_data[data_item], SmartErrorCode):
raise DeviceError(
f"{data_item} for {self.name}", error_code=filtered_data[data_item]
if data_item in self.optional_response_keys:
remove_keys.append(data_item)
else:
raise DeviceError(
f"{data_item} for {self.name}",
error_code=filtered_data[data_item],
)
for key in remove_keys:
if key not in self._logged_remove_keys:
self._logged_remove_keys.append(key)
_LOGGER.debug(
"Removed key %s from response for device %s as it returned "
"error: %s. This message will only be logged once per key.",
key,
self._device.host,
filtered_data[key],
)
if len(filtered_data) == 1:
filtered_data.pop(key)
if len(filtered_data) == 1 and not remove_keys:
return next(iter(filtered_data.values()))
return filtered_data

View File

@@ -1,19 +1,33 @@
"""Modules for SMARTCAM devices."""
from .alarm import Alarm
from .babycrydetection import BabyCryDetection
from .camera import Camera
from .childdevice import ChildDevice
from .device import DeviceModule
from .homekit import HomeKit
from .led import Led
from .lensmask import LensMask
from .matter import Matter
from .motiondetection import MotionDetection
from .pantilt import PanTilt
from .persondetection import PersonDetection
from .tamperdetection import TamperDetection
from .time import Time
__all__ = [
"Alarm",
"BabyCryDetection",
"Camera",
"ChildDevice",
"DeviceModule",
"Led",
"PanTilt",
"PersonDetection",
"Time",
"HomeKit",
"Matter",
"MotionDetection",
"LensMask",
"TamperDetection",
]

View File

@@ -0,0 +1,47 @@
"""Implementation of baby cry detection module."""
from __future__ import annotations
import logging
from ...feature import Feature
from ..smartcammodule import SmartCamModule
_LOGGER = logging.getLogger(__name__)
class BabyCryDetection(SmartCamModule):
"""Implementation of baby cry detection module."""
REQUIRED_COMPONENT = "babyCryDetection"
QUERY_GETTER_NAME = "getBCDConfig"
QUERY_MODULE_NAME = "sound_detection"
QUERY_SECTION_NAMES = "bcd"
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
id="baby_cry_detection",
name="Baby cry detection",
container=self,
attribute_getter="enabled",
attribute_setter="set_enabled",
type=Feature.Type.Switch,
category=Feature.Category.Primary,
)
)
@property
def enabled(self) -> bool:
"""Return the baby cry detection enabled state."""
return self.data["bcd"]["enabled"] == "on"
async def set_enabled(self, enable: bool) -> dict:
"""Set the baby cry detection enabled state."""
params = {"enabled": "on" if enable else "off"}
return await self._device._query_setter_helper(
"setBCDConfig", self.QUERY_MODULE_NAME, "bcd", params
)

View File

@@ -1,16 +1,18 @@
"""Implementation of device module."""
"""Implementation of camera module."""
from __future__ import annotations
import base64
import logging
from enum import StrEnum
from typing import Annotated
from urllib.parse import quote_plus
from ...credentials import Credentials
from ...device_type import DeviceType
from ...feature import Feature
from ...json import loads as json_loads
from ...module import FeatureAttribute, Module
from ..smartcammodule import SmartCamModule
_LOGGER = logging.getLogger(__name__)
@@ -29,28 +31,38 @@ class StreamResolution(StrEnum):
class Camera(SmartCamModule):
"""Implementation of device module."""
QUERY_GETTER_NAME = "getLensMaskConfig"
QUERY_MODULE_NAME = "lens_mask"
QUERY_SECTION_NAMES = "lens_mask_info"
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
id="state",
name="State",
attribute_getter="is_on",
attribute_setter="set_state",
type=Feature.Type.Switch,
category=Feature.Category.Primary,
if Module.LensMask in self._device.modules:
self._add_feature(
Feature(
self._device,
id="state",
name="State",
container=self,
attribute_getter="is_on",
attribute_setter="set_state",
type=Feature.Type.Switch,
category=Feature.Category.Primary,
)
)
)
@property
def is_on(self) -> bool:
"""Return the device id."""
return self.data["lens_mask_info"]["enabled"] == "off"
"""Return the device on state."""
if lens_mask := self._device.modules.get(Module.LensMask):
return not lens_mask.enabled
return True
async def set_state(self, on: bool) -> Annotated[dict, FeatureAttribute()]:
"""Set the device on state.
If the device does not support setting state will do nothing.
"""
if lens_mask := self._device.modules.get(Module.LensMask):
# Turning off enables the privacy mask which is why value is reversed.
return await lens_mask.set_enabled(not on)
return {}
def _get_credentials(self) -> Credentials | None:
"""Get credentials from ."""
@@ -109,14 +121,6 @@ class Camera(SmartCamModule):
"""Return the onvif url."""
return f"http://{self._device.host}:{ONVIF_PORT}/onvif/device_service"
async def set_state(self, on: bool) -> dict:
"""Set the device state."""
# Turning off enables the privacy mask which is why value is reversed.
params = {"enabled": "off" if on else "on"}
return await self._device._query_setter_helper(
"setLensMaskConfig", self.QUERY_MODULE_NAME, "lens_mask_info", params
)
async def _check_supported(self) -> bool:
"""Additional check to see if the module is supported by the device."""
return self._device.device_type is DeviceType.Camera

View File

@@ -14,6 +14,13 @@ class DeviceModule(SmartCamModule):
QUERY_MODULE_NAME = "device_info"
QUERY_SECTION_NAMES = ["basic_info", "info"]
def query(self) -> dict:
"""Query to execute during the update cycle."""
q = super().query()
q["getConnectionType"] = {"network": {"get_connection_type": []}}
return q
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
@@ -26,6 +33,32 @@ class DeviceModule(SmartCamModule):
type=Feature.Type.Sensor,
)
)
if self.rssi is not None:
self._add_feature(
Feature(
self._device,
container=self,
id="rssi",
name="RSSI",
attribute_getter="rssi",
icon="mdi:signal",
unit_getter=lambda: "dBm",
category=Feature.Category.Debug,
type=Feature.Type.Sensor,
)
)
self._add_feature(
Feature(
self._device,
container=self,
id="signal_level",
name="Signal Level",
attribute_getter="signal_level",
icon="mdi:signal",
category=Feature.Category.Info,
type=Feature.Type.Sensor,
)
)
async def _post_update_hook(self) -> None:
"""Overriden to prevent module disabling.
@@ -37,4 +70,14 @@ class DeviceModule(SmartCamModule):
@property
def device_id(self) -> str:
"""Return the device id."""
return self.data["basic_info"]["dev_id"]
return self.data[self.QUERY_GETTER_NAME]["basic_info"]["dev_id"]
@property
def rssi(self) -> int | None:
"""Return the device id."""
return self.data["getConnectionType"].get("rssiValue")
@property
def signal_level(self) -> int | None:
"""Return the device id."""
return self.data["getConnectionType"].get("rssi")

View File

@@ -0,0 +1,16 @@
"""Implementation of homekit module."""
from __future__ import annotations
from ..smartcammodule import SmartCamModule
class HomeKit(SmartCamModule):
"""Implementation of homekit module."""
REQUIRED_COMPONENT = "homekit"
@property
def info(self) -> dict[str, str]:
"""Not supported, return empty dict."""
return {}

View File

@@ -0,0 +1,31 @@
"""Implementation of lens mask privacy module."""
from __future__ import annotations
import logging
from ..smartcammodule import SmartCamModule
_LOGGER = logging.getLogger(__name__)
class LensMask(SmartCamModule):
"""Implementation of lens mask module."""
REQUIRED_COMPONENT = "lensMask"
QUERY_GETTER_NAME = "getLensMaskConfig"
QUERY_MODULE_NAME = "lens_mask"
QUERY_SECTION_NAMES = "lens_mask_info"
@property
def enabled(self) -> bool:
"""Return the lens mask state."""
return self.data["lens_mask_info"]["enabled"] == "on"
async def set_enabled(self, enable: bool) -> dict:
"""Set the lens mask state."""
params = {"enabled": "on" if enable else "off"}
return await self._device._query_setter_helper(
"setLensMaskConfig", self.QUERY_MODULE_NAME, "lens_mask_info", params
)

View File

@@ -0,0 +1,44 @@
"""Implementation of matter module."""
from __future__ import annotations
from ...feature import Feature
from ..smartcammodule import SmartCamModule
class Matter(SmartCamModule):
"""Implementation of matter module."""
QUERY_GETTER_NAME = "getMatterSetupInfo"
QUERY_MODULE_NAME = "matter"
REQUIRED_COMPONENT = "matter"
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
id="matter_setup_code",
name="Matter setup code",
container=self,
attribute_getter=lambda x: x.info["setup_code"],
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
self._add_feature(
Feature(
self._device,
id="matter_setup_payload",
name="Matter setup payload",
container=self,
attribute_getter=lambda x: x.info["setup_payload"],
type=Feature.Type.Sensor,
category=Feature.Category.Debug,
)
)
@property
def info(self) -> dict[str, str]:
"""Matter setup info."""
return self.data

View File

@@ -0,0 +1,47 @@
"""Implementation of motion detection module."""
from __future__ import annotations
import logging
from ...feature import Feature
from ..smartcammodule import SmartCamModule
_LOGGER = logging.getLogger(__name__)
class MotionDetection(SmartCamModule):
"""Implementation of motion detection module."""
REQUIRED_COMPONENT = "detection"
QUERY_GETTER_NAME = "getDetectionConfig"
QUERY_MODULE_NAME = "motion_detection"
QUERY_SECTION_NAMES = "motion_det"
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
id="motion_detection",
name="Motion detection",
container=self,
attribute_getter="enabled",
attribute_setter="set_enabled",
type=Feature.Type.Switch,
category=Feature.Category.Primary,
)
)
@property
def enabled(self) -> bool:
"""Return the motion detection enabled state."""
return self.data["motion_det"]["enabled"] == "on"
async def set_enabled(self, enable: bool) -> dict:
"""Set the motion detection enabled state."""
params = {"enabled": "on" if enable else "off"}
return await self._device._query_setter_helper(
"setDetectionConfig", self.QUERY_MODULE_NAME, "motion_det", params
)

View File

@@ -0,0 +1,47 @@
"""Implementation of person detection module."""
from __future__ import annotations
import logging
from ...feature import Feature
from ..smartcammodule import SmartCamModule
_LOGGER = logging.getLogger(__name__)
class PersonDetection(SmartCamModule):
"""Implementation of person detection module."""
REQUIRED_COMPONENT = "personDetection"
QUERY_GETTER_NAME = "getPersonDetectionConfig"
QUERY_MODULE_NAME = "people_detection"
QUERY_SECTION_NAMES = "detection"
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
id="person_detection",
name="Person detection",
container=self,
attribute_getter="enabled",
attribute_setter="set_enabled",
type=Feature.Type.Switch,
category=Feature.Category.Primary,
)
)
@property
def enabled(self) -> bool:
"""Return the person detection enabled state."""
return self.data["detection"]["enabled"] == "on"
async def set_enabled(self, enable: bool) -> dict:
"""Set the person detection enabled state."""
params = {"enabled": "on" if enable else "off"}
return await self._device._query_setter_helper(
"setPersonDetectionConfig", self.QUERY_MODULE_NAME, "detection", params
)

View File

@@ -0,0 +1,47 @@
"""Implementation of tamper detection module."""
from __future__ import annotations
import logging
from ...feature import Feature
from ..smartcammodule import SmartCamModule
_LOGGER = logging.getLogger(__name__)
class TamperDetection(SmartCamModule):
"""Implementation of tamper detection module."""
REQUIRED_COMPONENT = "tamperDetection"
QUERY_GETTER_NAME = "getTamperDetectionConfig"
QUERY_MODULE_NAME = "tamper_detection"
QUERY_SECTION_NAMES = "tamper_det"
def _initialize_features(self) -> None:
"""Initialize features after the initial update."""
self._add_feature(
Feature(
self._device,
id="tamper_detection",
name="Tamper detection",
container=self,
attribute_getter="enabled",
attribute_setter="set_enabled",
type=Feature.Type.Switch,
category=Feature.Category.Primary,
)
)
@property
def enabled(self) -> bool:
"""Return the tamper detection enabled state."""
return self.data["tamper_det"]["enabled"] == "on"
async def set_enabled(self, enable: bool) -> dict:
"""Set the tamper detection enabled state."""
params = {"enabled": "on" if enable else "off"}
return await self._device._query_setter_helper(
"setTamperDetectionConfig", self.QUERY_MODULE_NAME, "tamper_det", params
)

View File

@@ -3,13 +3,14 @@
from __future__ import annotations
import logging
from typing import Any
from typing import Any, cast
from ..device import _DeviceInfo
from ..device import DeviceInfo
from ..device_type import DeviceType
from ..module import Module
from ..protocols.smartcamprotocol import _ChildCameraProtocolWrapper
from ..smart import SmartChildDevice, SmartDevice
from ..smart.smartdevice import ComponentsRaw
from .modules import ChildDevice, DeviceModule
from .smartcammodule import SmartCamModule
@@ -36,7 +37,7 @@ class SmartCamDevice(SmartDevice):
@staticmethod
def _get_device_info(
info: dict[str, Any], discovery_info: dict[str, Any] | None
) -> _DeviceInfo:
) -> DeviceInfo:
"""Get model information for a device."""
basic_info = info["getDeviceInfo"]["device_info"]["basic_info"]
short_name = basic_info["device_model"]
@@ -44,7 +45,7 @@ class SmartCamDevice(SmartDevice):
device_type = SmartCamDevice._get_device_type_from_sysinfo(basic_info)
fw_version_full = basic_info["sw_version"]
firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1)
return _DeviceInfo(
return DeviceInfo(
short_name=basic_info["device_model"],
long_name=long_name,
brand="tapo",
@@ -78,7 +79,7 @@ class SmartCamDevice(SmartDevice):
self._children[child_id]._update_internal_state(info)
async def _initialize_smart_child(
self, info: dict, child_components: dict
self, info: dict, child_components_raw: ComponentsRaw
) -> SmartDevice:
"""Initialize a smart child device attached to a smartcam device."""
child_id = info["device_id"]
@@ -93,7 +94,7 @@ class SmartCamDevice(SmartDevice):
return await SmartChildDevice.create(
parent=self,
child_info=info,
child_components=child_components,
child_components_raw=child_components_raw,
protocol=child_protocol,
last_update=initial_response,
)
@@ -108,17 +109,8 @@ class SmartCamDevice(SmartDevice):
self.internal_state.update(resp)
smart_children_components = {
child["device_id"]: {
comp["id"]: int(comp["ver_code"]) for comp in component_list
}
child["device_id"]: child
for child in resp["getChildDeviceComponentList"]["child_component_list"]
if (component_list := child.get("component_list"))
# Child camera devices will have a different component schema so only
# extract smart values.
and (first_comp := next(iter(component_list), None))
and isinstance(first_comp, dict)
and "id" in first_comp
and "ver_code" in first_comp
}
children = {}
for info in resp["getChildDeviceList"]["child_device_list"]:
@@ -142,6 +134,11 @@ class SmartCamDevice(SmartDevice):
if (
mod.REQUIRED_COMPONENT
and mod.REQUIRED_COMPONENT not in self._components
# Always add Camera module to cameras
and (
mod._module_name() != Module.Camera
or self._device_type is not DeviceType.Camera
)
):
continue
module = mod(self, mod._module_name())
@@ -172,6 +169,13 @@ class SmartCamDevice(SmartDevice):
return res
@staticmethod
def _parse_components(components_raw: ComponentsRaw) -> dict[str, int]:
return {
str(comp["name"]): int(comp["version"])
for comp in components_raw["app_component_list"]
}
async def _negotiate(self) -> None:
"""Perform initialization.
@@ -181,17 +185,16 @@ class SmartCamDevice(SmartDevice):
initial_query = {
"getDeviceInfo": {"device_info": {"name": ["basic_info", "info"]}},
"getAppComponentList": {"app_component": {"name": "app_component_list"}},
"getConnectionType": {"network": {"get_connection_type": {}}},
}
resp = await self.protocol.query(initial_query)
self._last_update.update(resp)
self._update_internal_info(resp)
self._components = {
comp["name"]: int(comp["version"])
for comp in resp["getAppComponentList"]["app_component"][
"app_component_list"
]
}
self._components_raw = cast(
ComponentsRaw, resp["getAppComponentList"]["app_component"]
)
self._components = self._parse_components(self._components_raw)
if "childControl" in self._components and not self.children:
await self._initialize_children()
@@ -251,11 +254,16 @@ class SmartCamDevice(SmartDevice):
def hw_info(self) -> dict:
"""Return hardware info for the device."""
return {
"sw_ver": self._info.get("hw_ver"),
"hw_ver": self._info.get("fw_ver"),
"sw_ver": self._info.get("fw_ver"),
"hw_ver": self._info.get("hw_ver"),
"mac": self._info.get("mac"),
"type": self._info.get("type"),
"hwId": self._info.get("hwId"),
"dev_name": self.alias,
"oemId": self._info.get("oem_id"),
}
@property
def rssi(self) -> int | None:
"""Return the device id."""
return self.modules[SmartCamModule.SmartCamDeviceModule].rssi

View File

@@ -20,9 +20,23 @@ class SmartCamModule(SmartModule):
"""Base class for SMARTCAM modules."""
SmartCamAlarm: Final[ModuleName[modules.Alarm]] = ModuleName("SmartCamAlarm")
SmartCamMotionDetection: Final[ModuleName[modules.MotionDetection]] = ModuleName(
"MotionDetection"
)
SmartCamPersonDetection: Final[ModuleName[modules.PersonDetection]] = ModuleName(
"PersonDetection"
)
SmartCamTamperDetection: Final[ModuleName[modules.TamperDetection]] = ModuleName(
"TamperDetection"
)
SmartCamBabyCryDetection: Final[ModuleName[modules.BabyCryDetection]] = ModuleName(
"BabyCryDetection"
)
SmartCamDeviceModule: Final[ModuleName[modules.DeviceModule]] = ModuleName(
"devicemodule"
)
#: Query to execute during the main update cycle
QUERY_GETTER_NAME: str
#: Module name to be queried
QUERY_MODULE_NAME: str
#: Section name or names to be queried
@@ -37,6 +51,8 @@ class SmartCamModule(SmartModule):
Default implementation uses the raw query getter w/o parameters.
"""
if not self.QUERY_GETTER_NAME:
return {}
section_names = (
{"name": self.QUERY_SECTION_NAMES} if self.QUERY_SECTION_NAMES else {}
)
@@ -86,7 +102,8 @@ class SmartCamModule(SmartModule):
f" for '{self._module}'"
)
return query_resp.get(self.QUERY_MODULE_NAME)
# Some calls return the data under the module, others not
return query_resp.get(self.QUERY_MODULE_NAME, query_resp)
else:
found = {key: val for key, val in dev._last_update.items() if key in q}
for key in q:

View File

@@ -4,6 +4,7 @@ from .aestransport import AesEncyptionSession, AesTransport
from .basetransport import BaseTransport
from .klaptransport import KlapTransport, KlapTransportV2
from .linkietransport import LinkieTransportV2
from .sslaestransport import SslAesTransport
from .ssltransport import SslTransport
from .xortransport import XorEncryption, XorTransport
@@ -11,6 +12,7 @@ __all__ = [
"AesTransport",
"AesEncyptionSession",
"SslTransport",
"SslAesTransport",
"BaseTransport",
"KlapTransport",
"KlapTransportV2",

View File

@@ -8,6 +8,7 @@ import hashlib
import logging
import secrets
import ssl
from contextlib import suppress
from enum import Enum, auto
from typing import TYPE_CHECKING, Any, cast
@@ -160,6 +161,19 @@ class SslAesTransport(BaseTransport):
error_code = SmartErrorCode.INTERNAL_UNKNOWN_ERROR
return error_code
def _get_response_inner_error(self, resp_dict: Any) -> SmartErrorCode | None:
error_code_raw = resp_dict.get("data", {}).get("code")
if error_code_raw is None:
return None
try:
error_code = SmartErrorCode.from_int(error_code_raw)
except ValueError:
_LOGGER.warning(
"Device %s received unknown error code: %s", self._host, error_code_raw
)
error_code = SmartErrorCode.INTERNAL_UNKNOWN_ERROR
return error_code
def _handle_response_error_code(self, resp_dict: Any, msg: str) -> None:
error_code = self._get_response_error(resp_dict)
if error_code is SmartErrorCode.SUCCESS:
@@ -216,6 +230,31 @@ class SslAesTransport(BaseTransport):
ssl=await self._get_ssl_context(),
)
if TYPE_CHECKING:
assert self._encryption_session is not None
# Devices can respond with 500 if another session is created from
# the same host. Decryption may not succeed after that
if status_code == 500:
msg = (
f"Device {self._host} replied with status 500 after handshake, "
f"response: "
)
decrypted = None
if isinstance(resp_dict, dict) and (
response := resp_dict.get("result", {}).get("response")
):
with suppress(Exception):
decrypted = self._encryption_session.decrypt(response.encode())
if decrypted:
msg += decrypted
else:
msg += str(resp_dict)
_LOGGER.debug(msg)
raise _RetryableError(msg)
if status_code != 200:
raise KasaException(
f"{self._host} responded with an unexpected "
@@ -228,7 +267,6 @@ class SslAesTransport(BaseTransport):
if TYPE_CHECKING:
resp_dict = cast(dict[str, Any], resp_dict)
assert self._encryption_session is not None
if "result" in resp_dict and "response" in resp_dict["result"]:
raw_response: str = resp_dict["result"]["response"]
@@ -383,13 +421,29 @@ class SslAesTransport(BaseTransport):
error_code = default_error_code
resp_dict = default_resp_dict
# If the default login worked it's ok not to provide credentials but if
# it didn't raise auth error here.
if not self._username:
raise AuthenticationError(
f"Credentials must be supplied to connect to {self._host}"
)
# Device responds with INVALID_NONCE and a "nonce" to indicate ready
# for secure login. Otherwise error.
if error_code is not SmartErrorCode.INVALID_NONCE or (
resp_dict and "nonce" not in resp_dict["result"].get("data", {})
resp_dict and "nonce" not in resp_dict.get("result", {}).get("data", {})
):
if (
resp_dict
and self._get_response_inner_error(resp_dict)
is SmartErrorCode.DEVICE_BLOCKED
):
sec_left = resp_dict.get("data", {}).get("sec_left")
msg = "Device blocked" + (
f" for {sec_left} seconds" if sec_left else ""
)
raise DeviceError(msg, error_code=SmartErrorCode.DEVICE_BLOCKED)
raise AuthenticationError(f"Error trying handshake1: {resp_dict}")
if TYPE_CHECKING: