mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-10-20 22:38:02 +00:00
Rename smartcamera to smartcam (#1300)
This commit is contained in:
5
kasa/smartcam/__init__.py
Normal file
5
kasa/smartcam/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""Package for supporting tapo-branded cameras."""
|
||||
|
||||
from .smartcamdevice import SmartCamDevice
|
||||
|
||||
__all__ = ["SmartCamDevice"]
|
19
kasa/smartcam/modules/__init__.py
Normal file
19
kasa/smartcam/modules/__init__.py
Normal file
@@ -0,0 +1,19 @@
|
||||
"""Modules for SMARTCAM devices."""
|
||||
|
||||
from .alarm import Alarm
|
||||
from .camera import Camera
|
||||
from .childdevice import ChildDevice
|
||||
from .device import DeviceModule
|
||||
from .led import Led
|
||||
from .pantilt import PanTilt
|
||||
from .time import Time
|
||||
|
||||
__all__ = [
|
||||
"Alarm",
|
||||
"Camera",
|
||||
"ChildDevice",
|
||||
"DeviceModule",
|
||||
"Led",
|
||||
"PanTilt",
|
||||
"Time",
|
||||
]
|
166
kasa/smartcam/modules/alarm.py
Normal file
166
kasa/smartcam/modules/alarm.py
Normal file
@@ -0,0 +1,166 @@
|
||||
"""Implementation of alarm module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from ...feature import Feature
|
||||
from ..smartcammodule import SmartCamModule
|
||||
|
||||
DURATION_MIN = 0
|
||||
DURATION_MAX = 6000
|
||||
|
||||
VOLUME_MIN = 0
|
||||
VOLUME_MAX = 10
|
||||
|
||||
|
||||
class Alarm(SmartCamModule):
|
||||
"""Implementation of alarm module."""
|
||||
|
||||
# Needs a different name to avoid clashing with SmartAlarm
|
||||
NAME = "SmartCamAlarm"
|
||||
|
||||
REQUIRED_COMPONENT = "siren"
|
||||
QUERY_GETTER_NAME = "getSirenStatus"
|
||||
QUERY_MODULE_NAME = "siren"
|
||||
|
||||
def query(self) -> dict:
|
||||
"""Query to execute during the update cycle."""
|
||||
q = super().query()
|
||||
q["getSirenConfig"] = {self.QUERY_MODULE_NAME: {}}
|
||||
q["getSirenTypeList"] = {self.QUERY_MODULE_NAME: {}}
|
||||
|
||||
return q
|
||||
|
||||
def _initialize_features(self) -> None:
|
||||
"""Initialize features."""
|
||||
device = self._device
|
||||
self._add_feature(
|
||||
Feature(
|
||||
device,
|
||||
id="alarm",
|
||||
name="Alarm",
|
||||
container=self,
|
||||
attribute_getter="active",
|
||||
icon="mdi:bell",
|
||||
category=Feature.Category.Debug,
|
||||
type=Feature.Type.BinarySensor,
|
||||
)
|
||||
)
|
||||
self._add_feature(
|
||||
Feature(
|
||||
device,
|
||||
id="alarm_sound",
|
||||
name="Alarm sound",
|
||||
container=self,
|
||||
attribute_getter="alarm_sound",
|
||||
attribute_setter="set_alarm_sound",
|
||||
category=Feature.Category.Config,
|
||||
type=Feature.Type.Choice,
|
||||
choices_getter="alarm_sounds",
|
||||
)
|
||||
)
|
||||
self._add_feature(
|
||||
Feature(
|
||||
device,
|
||||
id="alarm_volume",
|
||||
name="Alarm volume",
|
||||
container=self,
|
||||
attribute_getter="alarm_volume",
|
||||
attribute_setter="set_alarm_volume",
|
||||
category=Feature.Category.Config,
|
||||
type=Feature.Type.Number,
|
||||
range_getter=lambda: (VOLUME_MIN, VOLUME_MAX),
|
||||
)
|
||||
)
|
||||
self._add_feature(
|
||||
Feature(
|
||||
device,
|
||||
id="alarm_duration",
|
||||
name="Alarm duration",
|
||||
container=self,
|
||||
attribute_getter="alarm_duration",
|
||||
attribute_setter="set_alarm_duration",
|
||||
category=Feature.Category.Config,
|
||||
type=Feature.Type.Number,
|
||||
range_getter=lambda: (DURATION_MIN, DURATION_MAX),
|
||||
)
|
||||
)
|
||||
self._add_feature(
|
||||
Feature(
|
||||
device,
|
||||
id="test_alarm",
|
||||
name="Test alarm",
|
||||
container=self,
|
||||
attribute_setter="play",
|
||||
type=Feature.Type.Action,
|
||||
)
|
||||
)
|
||||
self._add_feature(
|
||||
Feature(
|
||||
device,
|
||||
id="stop_alarm",
|
||||
name="Stop alarm",
|
||||
container=self,
|
||||
attribute_setter="stop",
|
||||
type=Feature.Type.Action,
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
def alarm_sound(self) -> str:
|
||||
"""Return current alarm sound."""
|
||||
return self.data["getSirenConfig"]["siren_type"]
|
||||
|
||||
async def set_alarm_sound(self, sound: str) -> dict:
|
||||
"""Set alarm sound.
|
||||
|
||||
See *alarm_sounds* for list of available sounds.
|
||||
"""
|
||||
if sound not in self.alarm_sounds:
|
||||
raise ValueError(
|
||||
f"sound must be one of {', '.join(self.alarm_sounds)}: {sound}"
|
||||
)
|
||||
return await self.call("setSirenConfig", {"siren": {"siren_type": sound}})
|
||||
|
||||
@property
|
||||
def alarm_sounds(self) -> list[str]:
|
||||
"""Return list of available alarm sounds."""
|
||||
return self.data["getSirenTypeList"]["siren_type_list"]
|
||||
|
||||
@property
|
||||
def alarm_volume(self) -> int:
|
||||
"""Return alarm volume.
|
||||
|
||||
Unlike duration the device expects/returns a string for volume.
|
||||
"""
|
||||
return int(self.data["getSirenConfig"]["volume"])
|
||||
|
||||
async def set_alarm_volume(self, volume: int) -> dict:
|
||||
"""Set alarm volume."""
|
||||
if volume < VOLUME_MIN or volume > VOLUME_MAX:
|
||||
raise ValueError(f"volume must be between {VOLUME_MIN} and {VOLUME_MAX}")
|
||||
return await self.call("setSirenConfig", {"siren": {"volume": str(volume)}})
|
||||
|
||||
@property
|
||||
def alarm_duration(self) -> int:
|
||||
"""Return alarm duration."""
|
||||
return self.data["getSirenConfig"]["duration"]
|
||||
|
||||
async def set_alarm_duration(self, duration: int) -> dict:
|
||||
"""Set alarm volume."""
|
||||
if duration < DURATION_MIN or duration > DURATION_MAX:
|
||||
msg = f"duration must be between {DURATION_MIN} and {DURATION_MAX}"
|
||||
raise ValueError(msg)
|
||||
return await self.call("setSirenConfig", {"siren": {"duration": duration}})
|
||||
|
||||
@property
|
||||
def active(self) -> bool:
|
||||
"""Return true if alarm is active."""
|
||||
return self.data["getSirenStatus"]["status"] != "off"
|
||||
|
||||
async def play(self) -> dict:
|
||||
"""Play alarm."""
|
||||
return await self.call("setSirenStatus", {"siren": {"status": "on"}})
|
||||
|
||||
async def stop(self) -> dict:
|
||||
"""Stop alarm."""
|
||||
return await self.call("setSirenStatus", {"siren": {"status": "off"}})
|
98
kasa/smartcam/modules/camera.py
Normal file
98
kasa/smartcam/modules/camera.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""Implementation of device module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import logging
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
from ...credentials import Credentials
|
||||
from ...device_type import DeviceType
|
||||
from ...feature import Feature
|
||||
from ...json import loads as json_loads
|
||||
from ..smartcammodule import SmartCamModule
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
LOCAL_STREAMING_PORT = 554
|
||||
|
||||
|
||||
class Camera(SmartCamModule):
|
||||
"""Implementation of device module."""
|
||||
|
||||
QUERY_GETTER_NAME = "getLensMaskConfig"
|
||||
QUERY_MODULE_NAME = "lens_mask"
|
||||
QUERY_SECTION_NAMES = "lens_mask_info"
|
||||
|
||||
def _initialize_features(self) -> None:
|
||||
"""Initialize features after the initial update."""
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
id="state",
|
||||
name="State",
|
||||
attribute_getter="is_on",
|
||||
attribute_setter="set_state",
|
||||
type=Feature.Type.Switch,
|
||||
category=Feature.Category.Primary,
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
def is_on(self) -> bool:
|
||||
"""Return the device id."""
|
||||
return self.data["lens_mask_info"]["enabled"] == "off"
|
||||
|
||||
def _get_credentials(self) -> Credentials | None:
|
||||
"""Get credentials from ."""
|
||||
config = self._device.config
|
||||
if credentials := config.credentials:
|
||||
return credentials
|
||||
|
||||
if credentials_hash := config.credentials_hash:
|
||||
try:
|
||||
decoded = json_loads(
|
||||
base64.b64decode(credentials_hash.encode()).decode()
|
||||
)
|
||||
except Exception:
|
||||
_LOGGER.warning(
|
||||
"Unable to deserialize credentials_hash: %s", credentials_hash
|
||||
)
|
||||
return None
|
||||
if (username := decoded.get("un")) and (password := decoded.get("pwd")):
|
||||
return Credentials(username, password)
|
||||
|
||||
return None
|
||||
|
||||
def stream_rtsp_url(self, credentials: Credentials | None = None) -> str | None:
|
||||
"""Return the local rtsp streaming url.
|
||||
|
||||
:param credentials: Credentials for camera account.
|
||||
These could be different credentials to tplink cloud credentials.
|
||||
If not provided will use tplink credentials if available
|
||||
:return: rtsp url with escaped credentials or None if no credentials or
|
||||
camera is off.
|
||||
"""
|
||||
if not self.is_on:
|
||||
return None
|
||||
dev = self._device
|
||||
if not credentials:
|
||||
credentials = self._get_credentials()
|
||||
|
||||
if not credentials or not credentials.username or not credentials.password:
|
||||
return None
|
||||
username = quote_plus(credentials.username)
|
||||
password = quote_plus(credentials.password)
|
||||
return f"rtsp://{username}:{password}@{dev.host}:{LOCAL_STREAMING_PORT}/stream1"
|
||||
|
||||
async def set_state(self, on: bool) -> dict:
|
||||
"""Set the device state."""
|
||||
# Turning off enables the privacy mask which is why value is reversed.
|
||||
params = {"enabled": "off" if on else "on"}
|
||||
return await self._device._query_setter_helper(
|
||||
"setLensMaskConfig", self.QUERY_MODULE_NAME, "lens_mask_info", params
|
||||
)
|
||||
|
||||
async def _check_supported(self) -> bool:
|
||||
"""Additional check to see if the module is supported by the device."""
|
||||
return self._device.device_type is DeviceType.Camera
|
26
kasa/smartcam/modules/childdevice.py
Normal file
26
kasa/smartcam/modules/childdevice.py
Normal file
@@ -0,0 +1,26 @@
|
||||
"""Module for child devices."""
|
||||
|
||||
from ...device_type import DeviceType
|
||||
from ..smartcammodule import SmartCamModule
|
||||
|
||||
|
||||
class ChildDevice(SmartCamModule):
|
||||
"""Implementation for child devices."""
|
||||
|
||||
REQUIRED_COMPONENT = "childControl"
|
||||
NAME = "childdevice"
|
||||
QUERY_GETTER_NAME = "getChildDeviceList"
|
||||
# This module is unusual in that QUERY_MODULE_NAME in the response is not
|
||||
# the same one used in the request.
|
||||
QUERY_MODULE_NAME = "child_device_list"
|
||||
|
||||
def query(self) -> dict:
|
||||
"""Query to execute during the update cycle.
|
||||
|
||||
Default implementation uses the raw query getter w/o parameters.
|
||||
"""
|
||||
return {self.QUERY_GETTER_NAME: {"childControl": {"start_index": 0}}}
|
||||
|
||||
async def _check_supported(self) -> bool:
|
||||
"""Additional check to see if the module is supported by the device."""
|
||||
return self._device.device_type is DeviceType.Hub
|
40
kasa/smartcam/modules/device.py
Normal file
40
kasa/smartcam/modules/device.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""Implementation of device module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from ...feature import Feature
|
||||
from ..smartcammodule import SmartCamModule
|
||||
|
||||
|
||||
class DeviceModule(SmartCamModule):
|
||||
"""Implementation of device module."""
|
||||
|
||||
NAME = "devicemodule"
|
||||
QUERY_GETTER_NAME = "getDeviceInfo"
|
||||
QUERY_MODULE_NAME = "device_info"
|
||||
QUERY_SECTION_NAMES = ["basic_info", "info"]
|
||||
|
||||
def _initialize_features(self) -> None:
|
||||
"""Initialize features after the initial update."""
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
id="device_id",
|
||||
name="Device ID",
|
||||
attribute_getter="device_id",
|
||||
category=Feature.Category.Debug,
|
||||
type=Feature.Type.Sensor,
|
||||
)
|
||||
)
|
||||
|
||||
async def _post_update_hook(self) -> None:
|
||||
"""Overriden to prevent module disabling.
|
||||
|
||||
Overrides the default behaviour to disable a module if the query returns
|
||||
an error because this module is critical.
|
||||
"""
|
||||
|
||||
@property
|
||||
def device_id(self) -> str:
|
||||
"""Return the device id."""
|
||||
return self.data["basic_info"]["dev_id"]
|
28
kasa/smartcam/modules/led.py
Normal file
28
kasa/smartcam/modules/led.py
Normal file
@@ -0,0 +1,28 @@
|
||||
"""Module for led controls."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from ...interfaces.led import Led as LedInterface
|
||||
from ..smartcammodule import SmartCamModule
|
||||
|
||||
|
||||
class Led(SmartCamModule, LedInterface):
|
||||
"""Implementation of led controls."""
|
||||
|
||||
REQUIRED_COMPONENT = "led"
|
||||
QUERY_GETTER_NAME = "getLedStatus"
|
||||
QUERY_MODULE_NAME = "led"
|
||||
QUERY_SECTION_NAMES = "config"
|
||||
|
||||
@property
|
||||
def led(self) -> bool:
|
||||
"""Return current led status."""
|
||||
return self.data["config"]["enabled"] == "on"
|
||||
|
||||
async def set_led(self, enable: bool) -> dict:
|
||||
"""Set led.
|
||||
|
||||
This should probably be a select with always/never/nightmode.
|
||||
"""
|
||||
params = {"enabled": "on"} if enable else {"enabled": "off"}
|
||||
return await self.call("setLedStatus", {"led": {"config": params}})
|
107
kasa/smartcam/modules/pantilt.py
Normal file
107
kasa/smartcam/modules/pantilt.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""Implementation of time module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from ...feature import Feature
|
||||
from ..smartcammodule import SmartCamModule
|
||||
|
||||
DEFAULT_PAN_STEP = 30
|
||||
DEFAULT_TILT_STEP = 10
|
||||
|
||||
|
||||
class PanTilt(SmartCamModule):
|
||||
"""Implementation of device_local_time."""
|
||||
|
||||
REQUIRED_COMPONENT = "ptz"
|
||||
_pan_step = DEFAULT_PAN_STEP
|
||||
_tilt_step = DEFAULT_TILT_STEP
|
||||
|
||||
def _initialize_features(self) -> None:
|
||||
"""Initialize features after the initial update."""
|
||||
|
||||
async def set_pan_step(value: int) -> None:
|
||||
self._pan_step = value
|
||||
|
||||
async def set_tilt_step(value: int) -> None:
|
||||
self._tilt_step = value
|
||||
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
"pan_right",
|
||||
"Pan right",
|
||||
container=self,
|
||||
attribute_setter=lambda: self.pan(self._pan_step * -1),
|
||||
type=Feature.Type.Action,
|
||||
)
|
||||
)
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
"pan_left",
|
||||
"Pan left",
|
||||
container=self,
|
||||
attribute_setter=lambda: self.pan(self._pan_step),
|
||||
type=Feature.Type.Action,
|
||||
)
|
||||
)
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
"pan_step",
|
||||
"Pan step",
|
||||
container=self,
|
||||
attribute_getter="_pan_step",
|
||||
attribute_setter=set_pan_step,
|
||||
type=Feature.Type.Number,
|
||||
)
|
||||
)
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
"tilt_up",
|
||||
"Tilt up",
|
||||
container=self,
|
||||
attribute_setter=lambda: self.tilt(self._tilt_step),
|
||||
type=Feature.Type.Action,
|
||||
)
|
||||
)
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
"tilt_down",
|
||||
"Tilt down",
|
||||
container=self,
|
||||
attribute_setter=lambda: self.tilt(self._tilt_step * -1),
|
||||
type=Feature.Type.Action,
|
||||
)
|
||||
)
|
||||
self._add_feature(
|
||||
Feature(
|
||||
self._device,
|
||||
"tilt_step",
|
||||
"Tilt step",
|
||||
container=self,
|
||||
attribute_getter="_tilt_step",
|
||||
attribute_setter=set_tilt_step,
|
||||
type=Feature.Type.Number,
|
||||
)
|
||||
)
|
||||
|
||||
def query(self) -> dict:
|
||||
"""Query to execute during the update cycle."""
|
||||
return {}
|
||||
|
||||
async def pan(self, pan: int) -> dict:
|
||||
"""Pan horizontally."""
|
||||
return await self.move(pan=pan, tilt=0)
|
||||
|
||||
async def tilt(self, tilt: int) -> dict:
|
||||
"""Tilt vertically."""
|
||||
return await self.move(pan=0, tilt=tilt)
|
||||
|
||||
async def move(self, *, pan: int, tilt: int) -> dict:
|
||||
"""Pan and tilt camera."""
|
||||
return await self._device._raw_query(
|
||||
{"do": {"motor": {"move": {"x_coord": str(pan), "y_coord": str(tilt)}}}}
|
||||
)
|
90
kasa/smartcam/modules/time.py
Normal file
90
kasa/smartcam/modules/time.py
Normal file
@@ -0,0 +1,90 @@
|
||||
"""Implementation of time module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime, tzinfo
|
||||
from typing import cast
|
||||
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
|
||||
|
||||
from ...cachedzoneinfo import CachedZoneInfo
|
||||
from ...feature import Feature
|
||||
from ...interfaces import Time as TimeInterface
|
||||
from ..smartcammodule import SmartCamModule
|
||||
|
||||
|
||||
class Time(SmartCamModule, TimeInterface):
|
||||
"""Implementation of device_local_time."""
|
||||
|
||||
QUERY_GETTER_NAME = "getTimezone"
|
||||
QUERY_MODULE_NAME = "system"
|
||||
QUERY_SECTION_NAMES = "basic"
|
||||
|
||||
_timezone: tzinfo = UTC
|
||||
_time: datetime
|
||||
|
||||
def _initialize_features(self) -> None:
|
||||
"""Initialize features after the initial update."""
|
||||
self._add_feature(
|
||||
Feature(
|
||||
device=self._device,
|
||||
id="device_time",
|
||||
name="Device time",
|
||||
attribute_getter="time",
|
||||
container=self,
|
||||
category=Feature.Category.Debug,
|
||||
type=Feature.Type.Sensor,
|
||||
)
|
||||
)
|
||||
|
||||
def query(self) -> dict:
|
||||
"""Query to execute during the update cycle."""
|
||||
q = super().query()
|
||||
q["getClockStatus"] = {self.QUERY_MODULE_NAME: {"name": "clock_status"}}
|
||||
|
||||
return q
|
||||
|
||||
async def _post_update_hook(self) -> None:
|
||||
"""Perform actions after a device update."""
|
||||
time_data = self.data["getClockStatus"]["system"]["clock_status"]
|
||||
timezone_data = self.data["getTimezone"]["system"]["basic"]
|
||||
zone_id = timezone_data["zone_id"]
|
||||
timestamp = time_data["seconds_from_1970"]
|
||||
try:
|
||||
# Zoneinfo will return a DST aware object
|
||||
tz: tzinfo = await CachedZoneInfo.get_cached_zone_info(zone_id)
|
||||
except ZoneInfoNotFoundError:
|
||||
# timezone string like: UTC+10:00
|
||||
timezone_str = timezone_data["timezone"]
|
||||
tz = cast(tzinfo, datetime.strptime(timezone_str[-6:], "%z").tzinfo)
|
||||
|
||||
self._timezone = tz
|
||||
self._time = datetime.fromtimestamp(
|
||||
cast(float, timestamp),
|
||||
tz=tz,
|
||||
)
|
||||
|
||||
@property
|
||||
def timezone(self) -> tzinfo:
|
||||
"""Return current timezone."""
|
||||
return self._timezone
|
||||
|
||||
@property
|
||||
def time(self) -> datetime:
|
||||
"""Return device's current datetime."""
|
||||
return self._time
|
||||
|
||||
async def set_time(self, dt: datetime) -> dict:
|
||||
"""Set device time."""
|
||||
if not dt.tzinfo:
|
||||
timestamp = dt.replace(tzinfo=self.timezone).timestamp()
|
||||
else:
|
||||
timestamp = dt.timestamp()
|
||||
|
||||
lt = datetime.fromtimestamp(timestamp).isoformat().replace("T", " ")
|
||||
params = {"seconds_from_1970": int(timestamp), "local_time": lt}
|
||||
# Doesn't seem to update the time, perhaps because timing_mode is ntp
|
||||
res = await self.call("setTimezone", {"system": {"clock_status": params}})
|
||||
if (zinfo := dt.tzinfo) and isinstance(zinfo, ZoneInfo):
|
||||
tz_params = {"zone_id": zinfo.key}
|
||||
res = await self.call("setTimezone", {"system": {"basic": tz_params}})
|
||||
return res
|
244
kasa/smartcam/smartcamdevice.py
Normal file
244
kasa/smartcam/smartcamdevice.py
Normal file
@@ -0,0 +1,244 @@
|
||||
"""Module for SmartCamDevice."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from ..device import _DeviceInfo
|
||||
from ..device_type import DeviceType
|
||||
from ..module import Module
|
||||
from ..protocols.smartcamprotocol import _ChildCameraProtocolWrapper
|
||||
from ..smart import SmartChildDevice, SmartDevice
|
||||
from .modules import ChildDevice, DeviceModule
|
||||
from .smartcammodule import SmartCamModule
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SmartCamDevice(SmartDevice):
|
||||
"""Class for smart cameras."""
|
||||
|
||||
# Modules that are called as part of the init procedure on first update
|
||||
FIRST_UPDATE_MODULES = {DeviceModule, ChildDevice}
|
||||
|
||||
@staticmethod
|
||||
def _get_device_type_from_sysinfo(sysinfo: dict[str, Any]) -> DeviceType:
|
||||
"""Find type to be displayed as a supported device category."""
|
||||
if (
|
||||
sysinfo
|
||||
and (device_type := sysinfo.get("device_type"))
|
||||
and device_type.endswith("HUB")
|
||||
):
|
||||
return DeviceType.Hub
|
||||
return DeviceType.Camera
|
||||
|
||||
@staticmethod
|
||||
def _get_device_info(
|
||||
info: dict[str, Any], discovery_info: dict[str, Any] | None
|
||||
) -> _DeviceInfo:
|
||||
"""Get model information for a device."""
|
||||
basic_info = info["getDeviceInfo"]["device_info"]["basic_info"]
|
||||
short_name = basic_info["device_model"]
|
||||
long_name = discovery_info["device_model"] if discovery_info else short_name
|
||||
device_type = SmartCamDevice._get_device_type_from_sysinfo(basic_info)
|
||||
fw_version_full = basic_info["sw_version"]
|
||||
firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1)
|
||||
return _DeviceInfo(
|
||||
short_name=basic_info["device_model"],
|
||||
long_name=long_name,
|
||||
brand="tapo",
|
||||
device_family=basic_info["device_type"],
|
||||
device_type=device_type,
|
||||
hardware_version=basic_info["hw_version"],
|
||||
firmware_version=firmware_version,
|
||||
firmware_build=firmware_build,
|
||||
requires_auth=True,
|
||||
region=basic_info.get("region"),
|
||||
)
|
||||
|
||||
def _update_internal_info(self, info_resp: dict) -> None:
|
||||
"""Update the internal device info."""
|
||||
info = self._try_get_response(info_resp, "getDeviceInfo")
|
||||
self._info = self._map_info(info["device_info"])
|
||||
|
||||
def _update_children_info(self) -> None:
|
||||
"""Update the internal child device info from the parent info."""
|
||||
if child_info := self._try_get_response(
|
||||
self._last_update, "getChildDeviceList", {}
|
||||
):
|
||||
for info in child_info["child_device_list"]:
|
||||
self._children[info["device_id"]]._update_internal_state(info)
|
||||
|
||||
async def _initialize_smart_child(
|
||||
self, info: dict, child_components: dict
|
||||
) -> SmartDevice:
|
||||
"""Initialize a smart child device attached to a smartcam device."""
|
||||
child_id = info["device_id"]
|
||||
child_protocol = _ChildCameraProtocolWrapper(child_id, self.protocol)
|
||||
try:
|
||||
initial_response = await child_protocol.query(
|
||||
{"get_connect_cloud_state": None}
|
||||
)
|
||||
except Exception as ex:
|
||||
_LOGGER.exception("Error initialising child %s: %s", child_id, ex)
|
||||
|
||||
return await SmartChildDevice.create(
|
||||
parent=self,
|
||||
child_info=info,
|
||||
child_components=child_components,
|
||||
protocol=child_protocol,
|
||||
last_update=initial_response,
|
||||
)
|
||||
|
||||
async def _initialize_children(self) -> None:
|
||||
"""Initialize children for hubs."""
|
||||
child_info_query = {
|
||||
"getChildDeviceList": {"childControl": {"start_index": 0}},
|
||||
"getChildDeviceComponentList": {"childControl": {"start_index": 0}},
|
||||
}
|
||||
resp = await self.protocol.query(child_info_query)
|
||||
self.internal_state.update(resp)
|
||||
|
||||
children_components = {
|
||||
child["device_id"]: {
|
||||
comp["id"]: int(comp["ver_code"]) for comp in child["component_list"]
|
||||
}
|
||||
for child in resp["getChildDeviceComponentList"]["child_component_list"]
|
||||
}
|
||||
children = {}
|
||||
for info in resp["getChildDeviceList"]["child_device_list"]:
|
||||
if (
|
||||
category := info.get("category")
|
||||
) and category in SmartChildDevice.CHILD_DEVICE_TYPE_MAP:
|
||||
child_id = info["device_id"]
|
||||
children[child_id] = await self._initialize_smart_child(
|
||||
info, children_components[child_id]
|
||||
)
|
||||
else:
|
||||
_LOGGER.debug("Child device type not supported: %s", info)
|
||||
|
||||
self._children = children
|
||||
|
||||
async def _initialize_modules(self) -> None:
|
||||
"""Initialize modules based on component negotiation response."""
|
||||
for mod in SmartCamModule.REGISTERED_MODULES.values():
|
||||
if (
|
||||
mod.REQUIRED_COMPONENT
|
||||
and mod.REQUIRED_COMPONENT not in self._components
|
||||
):
|
||||
continue
|
||||
module = mod(self, mod._module_name())
|
||||
if await module._check_supported():
|
||||
self._modules[module.name] = module
|
||||
|
||||
async def _initialize_features(self) -> None:
|
||||
"""Initialize device features."""
|
||||
for module in self.modules.values():
|
||||
module._initialize_features()
|
||||
for feat in module._module_features.values():
|
||||
self._add_feature(feat)
|
||||
|
||||
for child in self._children.values():
|
||||
await child._initialize_features()
|
||||
|
||||
async def _query_setter_helper(
|
||||
self, method: str, module: str, section: str, params: dict | None = None
|
||||
) -> dict:
|
||||
res = await self.protocol.query({method: {module: {section: params}}})
|
||||
|
||||
return res
|
||||
|
||||
async def _query_getter_helper(
|
||||
self, method: str, module: str, sections: str | list[str]
|
||||
) -> Any:
|
||||
res = await self.protocol.query({method: {module: {"name": sections}}})
|
||||
|
||||
return res
|
||||
|
||||
async def _negotiate(self) -> None:
|
||||
"""Perform initialization.
|
||||
|
||||
We fetch the device info and the available components as early as possible.
|
||||
If the device reports supporting child devices, they are also initialized.
|
||||
"""
|
||||
initial_query = {
|
||||
"getDeviceInfo": {"device_info": {"name": ["basic_info", "info"]}},
|
||||
"getAppComponentList": {"app_component": {"name": "app_component_list"}},
|
||||
}
|
||||
resp = await self.protocol.query(initial_query)
|
||||
self._last_update.update(resp)
|
||||
self._update_internal_info(resp)
|
||||
|
||||
self._components = {
|
||||
comp["name"]: int(comp["version"])
|
||||
for comp in resp["getAppComponentList"]["app_component"][
|
||||
"app_component_list"
|
||||
]
|
||||
}
|
||||
|
||||
if "childControl" in self._components and not self.children:
|
||||
await self._initialize_children()
|
||||
|
||||
def _map_info(self, device_info: dict) -> dict:
|
||||
basic_info = device_info["basic_info"]
|
||||
return {
|
||||
"model": basic_info["device_model"],
|
||||
"device_type": basic_info["device_type"],
|
||||
"alias": basic_info["device_alias"],
|
||||
"fw_ver": basic_info["sw_version"],
|
||||
"hw_ver": basic_info["hw_version"],
|
||||
"mac": basic_info["mac"],
|
||||
"hwId": basic_info.get("hw_id"),
|
||||
"oem_id": basic_info["oem_id"],
|
||||
}
|
||||
|
||||
@property
|
||||
def is_on(self) -> bool:
|
||||
"""Return true if the device is on."""
|
||||
if (camera := self.modules.get(Module.Camera)) and not camera.disabled:
|
||||
return camera.is_on
|
||||
|
||||
return True
|
||||
|
||||
async def set_state(self, on: bool) -> dict:
|
||||
"""Set the device state."""
|
||||
if (camera := self.modules.get(Module.Camera)) and not camera.disabled:
|
||||
return await camera.set_state(on)
|
||||
|
||||
return {}
|
||||
|
||||
@property
|
||||
def device_type(self) -> DeviceType:
|
||||
"""Return the device type."""
|
||||
if self._device_type == DeviceType.Unknown:
|
||||
self._device_type = self._get_device_type_from_sysinfo(self._info)
|
||||
return self._device_type
|
||||
|
||||
@property
|
||||
def alias(self) -> str | None:
|
||||
"""Returns the device alias or nickname."""
|
||||
if self._info:
|
||||
return self._info.get("alias")
|
||||
return None
|
||||
|
||||
async def set_alias(self, alias: str) -> dict:
|
||||
"""Set the device name (alias)."""
|
||||
return await self.protocol.query(
|
||||
{
|
||||
"setDeviceAlias": {"system": {"sys": {"dev_alias": alias}}},
|
||||
}
|
||||
)
|
||||
|
||||
@property
|
||||
def hw_info(self) -> dict:
|
||||
"""Return hardware info for the device."""
|
||||
return {
|
||||
"sw_ver": self._info.get("hw_ver"),
|
||||
"hw_ver": self._info.get("fw_ver"),
|
||||
"mac": self._info.get("mac"),
|
||||
"type": self._info.get("type"),
|
||||
"hwId": self._info.get("hwId"),
|
||||
"dev_name": self.alias,
|
||||
"oemId": self._info.get("oem_id"),
|
||||
}
|
103
kasa/smartcam/smartcammodule.py
Normal file
103
kasa/smartcam/smartcammodule.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""Base implementation for SMART modules."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Final, cast
|
||||
|
||||
from ..exceptions import DeviceError, KasaException, SmartErrorCode
|
||||
from ..modulemapping import ModuleName
|
||||
from ..smart.smartmodule import SmartModule
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from . import modules
|
||||
from .smartcamdevice import SmartCamDevice
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SmartCamModule(SmartModule):
|
||||
"""Base class for SMARTCAM modules."""
|
||||
|
||||
SmartCamAlarm: Final[ModuleName[modules.Alarm]] = ModuleName("SmartCamAlarm")
|
||||
|
||||
#: Query to execute during the main update cycle
|
||||
QUERY_GETTER_NAME: str
|
||||
#: Module name to be queried
|
||||
QUERY_MODULE_NAME: str
|
||||
#: Section name or names to be queried
|
||||
QUERY_SECTION_NAMES: str | list[str] | None = None
|
||||
|
||||
REGISTERED_MODULES = {}
|
||||
|
||||
_device: SmartCamDevice
|
||||
|
||||
def query(self) -> dict:
|
||||
"""Query to execute during the update cycle.
|
||||
|
||||
Default implementation uses the raw query getter w/o parameters.
|
||||
"""
|
||||
section_names = (
|
||||
{"name": self.QUERY_SECTION_NAMES} if self.QUERY_SECTION_NAMES else {}
|
||||
)
|
||||
return {self.QUERY_GETTER_NAME: {self.QUERY_MODULE_NAME: section_names}}
|
||||
|
||||
async def call(self, method: str, params: dict | None = None) -> dict:
|
||||
"""Call a method.
|
||||
|
||||
Just a helper method.
|
||||
"""
|
||||
if params:
|
||||
module = next(iter(params))
|
||||
section = next(iter(params[module]))
|
||||
else:
|
||||
module = "system"
|
||||
section = "null"
|
||||
|
||||
if method[:3] == "get":
|
||||
return await self._device._query_getter_helper(method, module, section)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
params = cast(dict[str, dict[str, Any]], params)
|
||||
return await self._device._query_setter_helper(
|
||||
method, module, section, params[module][section]
|
||||
)
|
||||
|
||||
@property
|
||||
def data(self) -> dict:
|
||||
"""Return response data for the module."""
|
||||
dev = self._device
|
||||
q = self.query()
|
||||
|
||||
if not q:
|
||||
return dev.sys_info
|
||||
|
||||
if len(q) == 1:
|
||||
query_resp = dev._last_update.get(self.QUERY_GETTER_NAME, {})
|
||||
if isinstance(query_resp, SmartErrorCode):
|
||||
raise DeviceError(
|
||||
f"Error accessing module data in {self._module}",
|
||||
error_code=query_resp,
|
||||
)
|
||||
|
||||
if not query_resp:
|
||||
raise KasaException(
|
||||
f"You need to call update() prior accessing module data"
|
||||
f" for '{self._module}'"
|
||||
)
|
||||
|
||||
return query_resp.get(self.QUERY_MODULE_NAME)
|
||||
else:
|
||||
found = {key: val for key, val in dev._last_update.items() if key in q}
|
||||
for key in q:
|
||||
if key not in found:
|
||||
raise KasaException(
|
||||
f"{key} not found, you need to call update() prior accessing"
|
||||
f" module data for '{self._module}'"
|
||||
)
|
||||
if isinstance(found[key], SmartErrorCode):
|
||||
raise DeviceError(
|
||||
f"Error accessing module data {key} in {self._module}",
|
||||
error_code=found[key],
|
||||
)
|
||||
return found
|
Reference in New Issue
Block a user