Add LinkieTransportV2 and basic IOT.IPCAMERA support (#1270)

Add LinkieTransportV2 transport used by kasa cameras and a basic
implementation for IOT.IPCAMERA (kasacam) devices.

---------

Co-authored-by: Zach Price <pricezt@ornl.gov>
Co-authored-by: Steven B <51370195+sdb9696@users.noreply.github.com>
Co-authored-by: Teemu Rytilahti <tpr@iki.fi>
This commit is contained in:
Puxtril 2024-12-06 18:06:58 -05:00 committed by GitHub
parent 6d9b4421fe
commit cb89342be1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 461 additions and 13 deletions

View File

@ -15,6 +15,7 @@ from kasa import (
UnsupportedDeviceError,
)
from kasa.discover import ConnectAttempt, DiscoveryResult
from kasa.iot.iotdevice import _extract_sys_info
from .common import echo, error
@ -201,8 +202,8 @@ def _echo_discovery_info(discovery_info) -> None:
if discovery_info is None:
return
if "system" in discovery_info and "get_sysinfo" in discovery_info["system"]:
_echo_dictionary(discovery_info["system"]["get_sysinfo"])
if sysinfo := _extract_sys_info(discovery_info):
_echo_dictionary(sysinfo)
return
try:

View File

@ -25,6 +25,7 @@ def get_default_credentials(tuple: tuple[str, str]) -> Credentials:
DEFAULT_CREDENTIALS = {
"KASA": ("a2FzYUB0cC1saW5rLm5ldA==", "a2FzYVNldHVw"),
"KASACAMERA": ("YWRtaW4=", "MjEyMzJmMjk3YTU3YTVhNzQzODk0YTBlNGE4MDFmYzM="),
"TAPO": ("dGVzdEB0cC1saW5rLm5ldA==", "dGVzdA=="),
"TAPOCAMERA": ("YWRtaW4=", "YWRtaW4="),
}

5
kasa/device_factory.py Executable file → Normal file
View File

@ -12,6 +12,7 @@ from .deviceconfig import DeviceConfig
from .exceptions import KasaException, UnsupportedDeviceError
from .iot import (
IotBulb,
IotCamera,
IotDevice,
IotDimmer,
IotLightStrip,
@ -32,6 +33,7 @@ from .transports import (
BaseTransport,
KlapTransport,
KlapTransportV2,
LinkieTransportV2,
SslTransport,
XorTransport,
)
@ -138,6 +140,7 @@ def get_device_class_from_sys_info(sysinfo: dict[str, Any]) -> type[IotDevice]:
DeviceType.Strip: IotStrip,
DeviceType.WallSwitch: IotWallSwitch,
DeviceType.LightStrip: IotLightStrip,
DeviceType.Camera: IotCamera,
}
return TYPE_TO_CLASS[IotDevice._get_device_type_from_sys_info(sysinfo)]
@ -159,6 +162,7 @@ def get_device_class_from_family(
"SMART.TAPOROBOVAC": SmartDevice,
"IOT.SMARTPLUGSWITCH": IotPlug,
"IOT.SMARTBULB": IotBulb,
"IOT.IPCAMERA": IotCamera,
}
lookup_key = f"{device_type}{'.HTTPS' if https else ''}"
if (
@ -197,6 +201,7 @@ def get_protocol(
] = {
"IOT.XOR": (IotProtocol, XorTransport),
"IOT.KLAP": (IotProtocol, KlapTransport),
"IOT.XOR.HTTPS.2": (IotProtocol, LinkieTransportV2),
"SMART.AES": (SmartProtocol, AesTransport),
"SMART.AES.2": (SmartProtocol, AesTransport),
"SMART.KLAP.2": (SmartProtocol, KlapTransportV2),

View File

@ -69,6 +69,7 @@ class DeviceFamily(Enum):
IotSmartPlugSwitch = "IOT.SMARTPLUGSWITCH"
IotSmartBulb = "IOT.SMARTBULB"
IotIpCamera = "IOT.IPCAMERA"
SmartKasaPlug = "SMART.KASAPLUG"
SmartKasaSwitch = "SMART.KASASWITCH"
SmartTapoPlug = "SMART.TAPOPLUG"

View File

@ -123,7 +123,7 @@ from kasa.exceptions import (
TimeoutError,
UnsupportedDeviceError,
)
from kasa.iot.iotdevice import IotDevice
from kasa.iot.iotdevice import IotDevice, _extract_sys_info
from kasa.json import DataClassJSONMixin
from kasa.json import dumps as json_dumps
from kasa.json import loads as json_loads
@ -681,11 +681,16 @@ class Discover:
device_class = cast(type[IotDevice], Discover._get_device_class(info))
device = device_class(config.host, config=config)
sys_info = info["system"]["get_sysinfo"]
if device_type := sys_info.get("mic_type", sys_info.get("type")):
sys_info = _extract_sys_info(info)
device_type = sys_info.get("mic_type", sys_info.get("type"))
login_version = (
sys_info.get("stream_version") if device_type == "IOT.IPCAMERA" else None
)
config.connection_type = DeviceConnectionParameters.from_values(
device_family=device_type,
encryption_type=DeviceEncryptionType.Xor.value,
https=device_type == "IOT.IPCAMERA",
login_version=login_version,
)
device.protocol = get_protocol(config) # type: ignore[assignment]
device.update_from_discover_info(info)

View File

@ -1,6 +1,7 @@
"""Package for supporting legacy kasa devices."""
from .iotbulb import IotBulb
from .iotcamera import IotCamera
from .iotdevice import IotDevice
from .iotdimmer import IotDimmer
from .iotlightstrip import IotLightStrip
@ -15,4 +16,5 @@ __all__ = [
"IotDimmer",
"IotLightStrip",
"IotWallSwitch",
"IotCamera",
]

42
kasa/iot/iotcamera.py Normal file
View File

@ -0,0 +1,42 @@
"""Module for cameras."""
from __future__ import annotations
import logging
from datetime import datetime, tzinfo
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..protocols import BaseProtocol
from .iotdevice import IotDevice
_LOGGER = logging.getLogger(__name__)
class IotCamera(IotDevice):
"""Representation of a TP-Link Camera."""
def __init__(
self,
host: str,
*,
config: DeviceConfig | None = None,
protocol: BaseProtocol | None = None,
) -> None:
super().__init__(host=host, config=config, protocol=protocol)
self._device_type = DeviceType.Camera
@property
def time(self) -> datetime:
"""Get the camera's time."""
return datetime.fromtimestamp(self.sys_info["system_time"])
@property
def timezone(self) -> tzinfo:
"""Get the camera's timezone."""
return None # type: ignore
@property # type: ignore
def is_on(self) -> bool:
"""Return whether device is on."""
return True

View File

@ -70,6 +70,16 @@ def _parse_features(features: str) -> set[str]:
return set(features.split(":"))
def _extract_sys_info(info: dict[str, Any]) -> dict[str, Any]:
"""Return the system info structure."""
sysinfo_default = info.get("system", {}).get("get_sysinfo", {})
sysinfo_nest = sysinfo_default.get("system", {})
if len(sysinfo_nest) > len(sysinfo_default) and isinstance(sysinfo_nest, dict):
return sysinfo_nest
return sysinfo_default
class IotDevice(Device):
"""Base class for all supported device types.
@ -304,14 +314,14 @@ class IotDevice(Device):
_LOGGER.debug("Performing the initial update to obtain sysinfo")
response = await self.protocol.query(req)
self._last_update = response
self._set_sys_info(response["system"]["get_sysinfo"])
self._set_sys_info(_extract_sys_info(response))
if not self._modules:
await self._initialize_modules()
await self._modular_update(req)
self._set_sys_info(self._last_update["system"]["get_sysinfo"])
self._set_sys_info(_extract_sys_info(self._last_update))
for module in self._modules.values():
await module._post_update_hook()
@ -705,10 +715,13 @@ class IotDevice(Device):
@staticmethod
def _get_device_type_from_sys_info(info: dict[str, Any]) -> DeviceType:
"""Find SmartDevice subclass for device described by passed data."""
if "system" in info.get("system", {}).get("get_sysinfo", {}):
return DeviceType.Camera
if "system" not in info or "get_sysinfo" not in info["system"]:
raise KasaException("No 'system' or 'get_sysinfo' in response")
sysinfo: dict[str, Any] = info["system"]["get_sysinfo"]
sysinfo: dict[str, Any] = _extract_sys_info(info)
type_: str | None = sysinfo.get("type", sysinfo.get("mic_type"))
if type_ is None:
raise KasaException("Unable to find the device type field!")
@ -728,6 +741,7 @@ class IotDevice(Device):
return DeviceType.LightStrip
return DeviceType.Bulb
_LOGGER.warning("Unknown device type %s, falling back to plug", type_)
return DeviceType.Plug
@ -736,7 +750,7 @@ class IotDevice(Device):
info: dict[str, Any], discovery_info: dict[str, Any] | None
) -> _DeviceInfo:
"""Get model information for a device."""
sys_info = info["system"]["get_sysinfo"]
sys_info = _extract_sys_info(info)
# Get model and region info
region = None

View File

@ -3,6 +3,7 @@
from .aestransport import AesEncyptionSession, AesTransport
from .basetransport import BaseTransport
from .klaptransport import KlapTransport, KlapTransportV2
from .linkietransport import LinkieTransportV2
from .ssltransport import SslTransport
from .xortransport import XorEncryption, XorTransport
@ -13,6 +14,7 @@ __all__ = [
"BaseTransport",
"KlapTransport",
"KlapTransportV2",
"LinkieTransportV2",
"XorTransport",
"XorEncryption",
]

View File

@ -0,0 +1,143 @@
"""Implementation of the linkie kasa camera transport."""
from __future__ import annotations
import asyncio
import base64
import logging
import ssl
from typing import TYPE_CHECKING, cast
from urllib.parse import quote
from yarl import URL
from kasa.credentials import DEFAULT_CREDENTIALS, get_default_credentials
from kasa.deviceconfig import DeviceConfig
from kasa.exceptions import KasaException, _RetryableError
from kasa.httpclient import HttpClient
from kasa.json import loads as json_loads
from kasa.transports.xortransport import XorEncryption
from .basetransport import BaseTransport
_LOGGER = logging.getLogger(__name__)
class LinkieTransportV2(BaseTransport):
"""Implementation of the Linkie encryption protocol.
Linkie is used as the endpoint for TP-Link's camera encryption
protocol, used by newer firmware versions.
"""
DEFAULT_PORT: int = 10443
CIPHERS = ":".join(
[
"AES256-GCM-SHA384",
"AES256-SHA256",
"AES128-GCM-SHA256",
"AES128-SHA256",
"AES256-SHA",
]
)
def __init__(self, *, config: DeviceConfig) -> None:
super().__init__(config=config)
self._http_client = HttpClient(config)
self._ssl_context: ssl.SSLContext | None = None
self._app_url = URL(f"https://{self._host}:{self._port}/data/LINKIE2.json")
self._headers = {
"Authorization": f"Basic {self.credentials_hash}",
"Content-Type": "application/x-www-form-urlencoded",
}
@property
def default_port(self) -> int:
"""Default port for the transport."""
return self.DEFAULT_PORT
@property
def credentials_hash(self) -> str | None:
"""The hashed credentials used by the transport."""
creds = get_default_credentials(DEFAULT_CREDENTIALS["KASACAMERA"])
creds_combined = f"{creds.username}:{creds.password}"
return base64.b64encode(creds_combined.encode()).decode()
async def _execute_send(self, request: str) -> dict:
"""Execute a query on the device and wait for the response."""
_LOGGER.debug("%s >> %s", self._host, request)
encrypted_cmd = XorEncryption.encrypt(request)[4:]
b64_cmd = base64.b64encode(encrypted_cmd).decode()
url_safe_cmd = quote(b64_cmd, safe="!~*'()")
status_code, response = await self._http_client.post(
self._app_url,
headers=self._headers,
data=f"content={url_safe_cmd}".encode(),
ssl=await self._get_ssl_context(),
)
if TYPE_CHECKING:
response = cast(bytes, response)
if status_code != 200:
raise KasaException(
f"{self._host} responded with an unexpected "
+ f"status code {status_code} to passthrough"
)
# Expected response
try:
json_payload: dict = json_loads(
XorEncryption.decrypt(base64.b64decode(response))
)
_LOGGER.debug("%s << %s", self._host, json_payload)
return json_payload
except Exception: # noqa: S110
pass
# Device returned error as json plaintext
to_raise: KasaException | None = None
try:
error_payload: dict = json_loads(response)
to_raise = KasaException(f"Device {self._host} send error: {error_payload}")
except Exception as ex:
raise KasaException("Unable to read response") from ex
raise to_raise
async def close(self) -> None:
"""Close the http client and reset internal state."""
await self._http_client.close()
async def reset(self) -> None:
"""Reset the transport.
NOOP for this transport.
"""
async def send(self, request: str) -> dict:
"""Send a message to the device and return a response."""
try:
return await self._execute_send(request)
except Exception as ex:
await self.reset()
raise _RetryableError(
f"Unable to query the device {self._host}:{self._port}: {ex}"
) from ex
async def _get_ssl_context(self) -> ssl.SSLContext:
if not self._ssl_context:
loop = asyncio.get_running_loop()
self._ssl_context = await loop.run_in_executor(
None, self._create_ssl_context
)
return self._ssl_context
def _create_ssl_context(self) -> ssl.SSLContext:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.set_ciphers(self.CIPHERS)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context

View File

@ -0,0 +1,86 @@
{
"emeter": {
"get_realtime": {
"err_code": -10008,
"err_msg": "Unsupported API call."
}
},
"smartlife.iot.LAS": {
"get_current_brt": {
"err_code": -10008,
"err_msg": "Unsupported API call."
}
},
"smartlife.iot.PIR": {
"get_config": {
"err_code": -10008,
"err_msg": "Unsupported API call."
}
},
"smartlife.iot.common.emeter": {
"get_realtime": {
"err_code": -10008,
"err_msg": "Unsupported API call."
}
},
"smartlife.iot.dimmer": {
"get_dimmer_parameters": {
"err_code": -10008,
"err_msg": "Unsupported API call."
}
},
"smartlife.iot.smartbulb.lightingservice": {
"get_light_state": {
"err_code": -10008,
"err_msg": "Unsupported API call."
}
},
"system": {
"get_sysinfo": {
"err_code": 0,
"system": {
"a_type": 2,
"alias": "#MASKED_NAME#",
"bind_status": false,
"c_opt": [
0,
1
],
"camera_switch": "on",
"dev_name": "Kasa Spot, 24/7 Recording",
"deviceId": "0000000000000000000000000000000000000000",
"f_list": [
1,
2
],
"hwId": "00000000000000000000000000000000",
"hw_ver": "4.0",
"is_cal": 1,
"last_activity_timestamp": 0,
"latitude": 0,
"led_status": "on",
"longitude": 0,
"mac": "74:FE:CE:00:00:00",
"mic_mac": "74FECE000000",
"model": "EC60(US)",
"new_feature": [
2,
3,
4,
5,
7,
9
],
"oemId": "00000000000000000000000000000000",
"resolution": "720P",
"rssi": -28,
"status": "new",
"stream_version": 2,
"sw_ver": "2.3.22 Build 20230731 rel.69808",
"system_time": 1690827820,
"type": "IOT.IPCAMERA",
"updating": false
}
}
}
}

View File

@ -16,6 +16,7 @@ import kasa
from kasa import Credentials, Device, DeviceConfig, DeviceType, KasaException, Module
from kasa.iot import (
IotBulb,
IotCamera,
IotDevice,
IotDimmer,
IotLightStrip,
@ -118,6 +119,7 @@ async def test_device_class_repr(device_class_name_obj):
IotStrip: DeviceType.Strip,
IotWallSwitch: DeviceType.WallSwitch,
IotLightStrip: DeviceType.LightStrip,
IotCamera: DeviceType.Camera,
SmartChildDevice: DeviceType.Unknown,
SmartDevice: DeviceType.Unknown,
SmartCamDevice: DeviceType.Camera,

View File

@ -0,0 +1,144 @@
import base64
from unittest.mock import ANY
import aiohttp
import pytest
from yarl import URL
from kasa.credentials import DEFAULT_CREDENTIALS, Credentials, get_default_credentials
from kasa.deviceconfig import DeviceConfig
from kasa.exceptions import KasaException
from kasa.httpclient import HttpClient
from kasa.json import dumps as json_dumps
from kasa.transports.linkietransport import LinkieTransportV2
KASACAM_REQUEST_PLAINTEXT = '{"smartlife.cam.ipcamera.dateTime":{"get_status":{}}}'
KASACAM_RESPONSE_ENCRYPTED = "0PKG74LnnfKc+dvhw5bCgaycqZOjk7Gdv96syaiKsJLTvtupwKPC7aPGse632KrB48/tiPiX9JzDsNW2lK6fqZCgmKuZoZGh3A=="
KASACAM_RESPONSE_ERROR = '{"smartlife.cam.ipcamera.cloud": {"get_inf": {"err_code": -10008, "err_msg": "Unsupported API call."}}}'
KASA_DEFAULT_CREDENTIALS_HASH = "YWRtaW46MjEyMzJmMjk3YTU3YTVhNzQzODk0YTBlNGE4MDFmYzM="
async def test_working(mocker):
"""No errors with an expected request/response."""
host = "127.0.0.1"
mock_linkie_device = MockLinkieDevice(host)
mocker.patch.object(
aiohttp.ClientSession, "post", side_effect=mock_linkie_device.post
)
transport_no_creds = LinkieTransportV2(config=DeviceConfig(host))
response = await transport_no_creds.send(KASACAM_REQUEST_PLAINTEXT)
assert response == {
"timezone": "UTC-05:00",
"area": "America/New_York",
"epoch_sec": 1690832800,
}
async def test_credentials_hash(mocker):
"""Ensure the default credentials are always passed as Basic Auth."""
# Test without credentials input
host = "127.0.0.1"
mock_linkie_device = MockLinkieDevice(host)
mock_post = mocker.patch.object(
aiohttp.ClientSession, "post", side_effect=mock_linkie_device.post
)
transport_no_creds = LinkieTransportV2(config=DeviceConfig(host))
await transport_no_creds.send(KASACAM_REQUEST_PLAINTEXT)
mock_post.assert_called_once_with(
URL(f"https://{host}:10443/data/LINKIE2.json"),
params=None,
data=ANY,
json=None,
timeout=ANY,
cookies=None,
headers={
"Authorization": "Basic " + _generate_kascam_basic_auth(),
"Content-Type": "application/x-www-form-urlencoded",
},
ssl=ANY,
)
assert transport_no_creds.credentials_hash == KASA_DEFAULT_CREDENTIALS_HASH
# Test with credentials input
transport_with_creds = LinkieTransportV2(
config=DeviceConfig(host, credentials=Credentials("Admin", "password"))
)
mock_post.reset_mock()
await transport_with_creds.send(KASACAM_REQUEST_PLAINTEXT)
mock_post.assert_called_once_with(
URL(f"https://{host}:10443/data/LINKIE2.json"),
params=None,
data=ANY,
json=None,
timeout=ANY,
cookies=None,
headers={
"Authorization": "Basic " + _generate_kascam_basic_auth(),
"Content-Type": "application/x-www-form-urlencoded",
},
ssl=ANY,
)
@pytest.mark.parametrize(
("return_status", "return_data", "expected"),
[
(500, KASACAM_RESPONSE_ENCRYPTED, "500"),
(200, "AAAAAAAAAAAAAAAAAAAAAAAA", "Unable to read response"),
(200, KASACAM_RESPONSE_ERROR, "Unsupported API call"),
],
)
async def test_exceptions(mocker, return_status, return_data, expected):
"""Test a variety of possible responses from the device."""
host = "127.0.0.1"
transport = LinkieTransportV2(config=DeviceConfig(host))
mock_linkie_device = MockLinkieDevice(
host, status_code=return_status, response=return_data
)
mocker.patch.object(
aiohttp.ClientSession, "post", side_effect=mock_linkie_device.post
)
with pytest.raises(KasaException, match=expected):
await transport.send(KASACAM_REQUEST_PLAINTEXT)
def _generate_kascam_basic_auth():
creds = get_default_credentials(DEFAULT_CREDENTIALS["KASACAMERA"])
creds_combined = f"{creds.username}:{creds.password}"
return base64.b64encode(creds_combined.encode()).decode()
class MockLinkieDevice:
"""Based on MockSslDevice."""
class _mock_response:
def __init__(self, status, request: dict):
self.status = status
self._json = request
async def __aenter__(self):
return self
async def __aexit__(self, exc_t, exc_v, exc_tb):
pass
async def read(self):
if isinstance(self._json, dict):
return json_dumps(self._json).encode()
return self._json
def __init__(self, host, *, status_code=200, response=KASACAM_RESPONSE_ENCRYPTED):
self.host = host
self.http_client = HttpClient(DeviceConfig(self.host))
self.status_code = status_code
self.response = response
async def post(
self, url: URL, *, headers=None, params=None, json=None, data=None, **__
):
return self._mock_response(self.status_code, self.response)