From dcc36e1dfe3fc9955698cdaf976a68ddfc2e3b6f Mon Sep 17 00:00:00 2001 From: "Steven B." <51370195+sdb9696@users.noreply.github.com> Date: Wed, 16 Oct 2024 16:53:52 +0100 Subject: [PATCH] Initial TapoCamera support (#1165) Adds experimental support for the Tapo Camera protocol also used by the H200 hub. Creates a new SslAesTransport and a derived SmartCamera and SmartCameraProtocol. --- kasa/cli/main.py | 39 +- kasa/device_factory.py | 23 +- kasa/device_type.py | 1 + kasa/deviceconfig.py | 6 + kasa/discover.py | 1 + kasa/experimental/__init__.py | 1 + kasa/experimental/enabled.py | 12 + kasa/experimental/smartcamera.py | 84 ++++ kasa/experimental/smartcameraprotocol.py | 109 +++++ kasa/experimental/sslaestransport.py | 494 +++++++++++++++++++++++ kasa/httpclient.py | 3 +- kasa/tests/test_cli.py | 3 + pyproject.toml | 5 +- 13 files changed, 770 insertions(+), 11 deletions(-) create mode 100644 kasa/experimental/__init__.py create mode 100644 kasa/experimental/enabled.py create mode 100644 kasa/experimental/smartcamera.py create mode 100644 kasa/experimental/smartcameraprotocol.py create mode 100644 kasa/experimental/sslaestransport.py diff --git a/kasa/cli/main.py b/kasa/cli/main.py index 1550b7af..7ba65155 100755 --- a/kasa/cli/main.py +++ b/kasa/cli/main.py @@ -35,6 +35,7 @@ TYPES = [ "strip", "lightstrip", "smart", + "camera", ] ENCRYPT_TYPES = [encrypt_type.value for encrypt_type in DeviceEncryptionType] @@ -172,6 +173,14 @@ def _legacy_type_to_class(_type): type=int, help="The login version for device authentication. Defaults to 2", ) +@click.option( + "--https/--no-https", + envvar="KASA_HTTPS", + default=False, + is_flag=True, + type=bool, + help="Set flag if the device encryption uses https.", +) @click.option( "--timeout", envvar="KASA_TIMEOUT", @@ -209,6 +218,14 @@ def _legacy_type_to_class(_type): envvar="KASA_CREDENTIALS_HASH", help="Hashed credentials used to authenticate to the device.", ) +@click.option( + "--experimental", + default=False, + is_flag=True, + type=bool, + envvar="KASA_EXPERIMENTAL", + help="Enable experimental mode for devices not yet fully supported.", +) @click.version_option(package_name="python-kasa") @click.pass_context async def cli( @@ -221,6 +238,7 @@ async def cli( debug, type, encrypt_type, + https, device_family, login_version, json, @@ -229,6 +247,7 @@ async def cli( username, password, credentials_hash, + experimental, ): """A tool for controlling TP-Link smart home devices.""" # noqa # no need to perform any checks if we are just displaying the help @@ -237,6 +256,11 @@ async def cli( ctx.obj = object() return + if experimental: + from kasa.experimental.enabled import Enabled + + Enabled.set(True) + logging_config: dict[str, Any] = { "level": logging.DEBUG if debug > 0 else logging.INFO } @@ -295,12 +319,21 @@ async def cli( return await ctx.invoke(discover) device_updated = False - if type is not None and type != "smart": + if type is not None and type not in {"smart", "camera"}: from kasa.deviceconfig import DeviceConfig config = DeviceConfig(host=host, port_override=port, timeout=timeout) dev = _legacy_type_to_class(type)(host, config=config) - elif type == "smart" or (device_family and encrypt_type): + elif type in {"smart", "camera"} or (device_family and encrypt_type): + if type == "camera": + if not experimental: + error( + "Camera is an experimental type, please enable with --experimental" + ) + encrypt_type = "AES" + https = True + device_family = "SMART.IPCAMERA" + from kasa.device import Device from kasa.deviceconfig import ( DeviceConfig, @@ -311,10 +344,12 @@ async def cli( if not encrypt_type: encrypt_type = "KLAP" + ctype = DeviceConnectionParameters( DeviceFamily(device_family), DeviceEncryptionType(encrypt_type), login_version, + https, ) config = DeviceConfig( host=host, diff --git a/kasa/device_factory.py b/kasa/device_factory.py index a124bb4c..01b2c8e7 100755 --- a/kasa/device_factory.py +++ b/kasa/device_factory.py @@ -11,6 +11,9 @@ from .device import Device from .device_type import DeviceType from .deviceconfig import DeviceConfig from .exceptions import KasaException, UnsupportedDeviceError +from .experimental.smartcamera import SmartCamera +from .experimental.smartcameraprotocol import SmartCameraProtocol +from .experimental.sslaestransport import SslAesTransport from .iot import ( IotBulb, IotDevice, @@ -171,6 +174,7 @@ def get_device_class_from_family(device_type: str) -> type[Device] | None: "SMART.TAPOHUB": SmartDevice, "SMART.KASAHUB": SmartDevice, "SMART.KASASWITCH": SmartDevice, + "SMART.IPCAMERA": SmartCamera, "IOT.SMARTPLUGSWITCH": IotPlug, "IOT.SMARTBULB": IotBulb, } @@ -188,8 +192,12 @@ def get_protocol( ) -> BaseProtocol | None: """Return the protocol from the connection name.""" protocol_name = config.connection_type.device_family.value.split(".")[0] + ctype = config.connection_type protocol_transport_key = ( - protocol_name + "." + config.connection_type.encryption_type.value + protocol_name + + "." + + ctype.encryption_type.value + + (".HTTPS" if ctype.https else "") ) supported_device_protocols: dict[ str, tuple[type[BaseProtocol], type[BaseTransport]] @@ -199,10 +207,11 @@ def get_protocol( "SMART.AES": (SmartProtocol, AesTransport), "SMART.KLAP": (SmartProtocol, KlapTransportV2), } - if protocol_transport_key not in supported_device_protocols: - return None + if not (prot_tran_cls := supported_device_protocols.get(protocol_transport_key)): + from .experimental.enabled import Enabled - protocol_class, transport_class = supported_device_protocols.get( - protocol_transport_key - ) # type: ignore - return protocol_class(transport=transport_class(config=config)) + if Enabled.value and protocol_transport_key == "SMART.AES.HTTPS": + prot_tran_cls = (SmartCameraProtocol, SslAesTransport) + else: + return None + return prot_tran_cls[0](transport=prot_tran_cls[1](config=config)) diff --git a/kasa/device_type.py b/kasa/device_type.py index 3d3b828d..b690f1f1 100755 --- a/kasa/device_type.py +++ b/kasa/device_type.py @@ -12,6 +12,7 @@ class DeviceType(Enum): Plug = "plug" Bulb = "bulb" Strip = "strip" + Camera = "camera" WallSwitch = "wallswitch" StripSocket = "stripsocket" Dimmer = "dimmer" diff --git a/kasa/deviceconfig.py b/kasa/deviceconfig.py index 0833c079..1bd806f0 100644 --- a/kasa/deviceconfig.py +++ b/kasa/deviceconfig.py @@ -72,6 +72,7 @@ class DeviceFamily(Enum): SmartTapoSwitch = "SMART.TAPOSWITCH" SmartTapoHub = "SMART.TAPOHUB" SmartKasaHub = "SMART.KASAHUB" + SmartIpCamera = "SMART.IPCAMERA" def _dataclass_from_dict(klass, in_val): @@ -118,19 +119,24 @@ class DeviceConnectionParameters: device_family: DeviceFamily encryption_type: DeviceEncryptionType login_version: Optional[int] = None + https: bool = False @staticmethod def from_values( device_family: str, encryption_type: str, login_version: Optional[int] = None, + https: Optional[bool] = None, ) -> "DeviceConnectionParameters": """Return connection parameters from string values.""" try: + if https is None: + https = False return DeviceConnectionParameters( DeviceFamily(device_family), DeviceEncryptionType(encryption_type), login_version, + https, ) except (ValueError, TypeError) as ex: raise KasaException( diff --git a/kasa/discover.py b/kasa/discover.py index 9d615398..79c16216 100755 --- a/kasa/discover.py +++ b/kasa/discover.py @@ -637,6 +637,7 @@ class Discover: type_, encrypt_type, discovery_result.mgt_encrypt_schm.lv, + discovery_result.mgt_encrypt_schm.is_support_https, ) except KasaException as ex: raise UnsupportedDeviceError( diff --git a/kasa/experimental/__init__.py b/kasa/experimental/__init__.py new file mode 100644 index 00000000..60462246 --- /dev/null +++ b/kasa/experimental/__init__.py @@ -0,0 +1 @@ +"""Package for experimental.""" diff --git a/kasa/experimental/enabled.py b/kasa/experimental/enabled.py new file mode 100644 index 00000000..7679f97c --- /dev/null +++ b/kasa/experimental/enabled.py @@ -0,0 +1,12 @@ +"""Package for experimental enabled.""" + + +class Enabled: + """Class for enabling experimental functionality.""" + + value = False + + @classmethod + def set(cls, value): + """Set the enabled value.""" + cls.value = value diff --git a/kasa/experimental/smartcamera.py b/kasa/experimental/smartcamera.py new file mode 100644 index 00000000..809ac74a --- /dev/null +++ b/kasa/experimental/smartcamera.py @@ -0,0 +1,84 @@ +"""Module for smartcamera.""" + +from __future__ import annotations + +from ..device_type import DeviceType +from ..smart import SmartDevice +from .sslaestransport import SmartErrorCode + + +class SmartCamera(SmartDevice): + """Class for smart cameras.""" + + async def update(self, update_children: bool = False): + """Update the device.""" + initial_query = { + "getDeviceInfo": {"device_info": {"name": ["basic_info", "info"]}}, + "getLensMaskConfig": {"lens_mask": {"name": ["lens_mask_info"]}}, + } + resp = await self.protocol.query(initial_query) + self._last_update.update(resp) + info = self._try_get_response(resp, "getDeviceInfo") + self._info = self._map_info(info["device_info"]) + self._last_update = resp + + def _map_info(self, device_info: dict) -> dict: + basic_info = device_info["basic_info"] + return { + "model": basic_info["device_model"], + "type": basic_info["device_type"], + "alias": basic_info["device_alias"], + "fw_ver": basic_info["sw_version"], + "hw_ver": basic_info["hw_version"], + "mac": basic_info["mac"], + "hwId": basic_info["hw_id"], + "oem_id": basic_info["oem_id"], + } + + @property + def is_on(self) -> bool: + """Return true if the device is on.""" + if isinstance(self._last_update["getLensMaskConfig"], SmartErrorCode): + return True + return ( + self._last_update["getLensMaskConfig"]["lens_mask"]["lens_mask_info"][ + "enabled" + ] + == "on" + ) + + async def set_state(self, on: bool): + """Set the device state.""" + if isinstance(self._last_update["getLensMaskConfig"], SmartErrorCode): + return + query = { + "setLensMaskConfig": { + "lens_mask": {"lens_mask_info": {"enabled": "on" if on else "off"}} + }, + } + return await self.protocol.query(query) + + @property + def device_type(self) -> DeviceType: + """Return the device type.""" + return DeviceType.Camera + + @property + def alias(self) -> str | None: + """Returns the device alias or nickname.""" + if self._info: + return self._info.get("alias") + return None + + @property + def hw_info(self) -> dict: + """Return hardware info for the device.""" + return { + "sw_ver": self._info.get("hw_ver"), + "hw_ver": self._info.get("fw_ver"), + "mac": self._info.get("mac"), + "type": self._info.get("type"), + "hwId": self._info.get("hwId"), + "dev_name": self.alias, + "oemId": self._info.get("oem_id"), + } diff --git a/kasa/experimental/smartcameraprotocol.py b/kasa/experimental/smartcameraprotocol.py new file mode 100644 index 00000000..384b76e9 --- /dev/null +++ b/kasa/experimental/smartcameraprotocol.py @@ -0,0 +1,109 @@ +"""Module for SmartCamera Protocol.""" + +from __future__ import annotations + +import logging +from pprint import pformat as pf +from typing import Any + +from ..exceptions import AuthenticationError, DeviceError, _RetryableError +from ..json import dumps as json_dumps +from ..smartprotocol import SmartProtocol +from .sslaestransport import ( + SMART_AUTHENTICATION_ERRORS, + SMART_RETRYABLE_ERRORS, + SmartErrorCode, +) + +_LOGGER = logging.getLogger(__name__) + + +class SmartCameraProtocol(SmartProtocol): + """Class for SmartCamera Protocol.""" + + async def _handle_response_lists( + self, response_result: dict[str, Any], method, retry_count + ): + pass + + def _handle_response_error_code(self, resp_dict: dict, method, raise_on_error=True): + error_code_raw = resp_dict.get("error_code") + try: + error_code = SmartErrorCode.from_int(error_code_raw) + except ValueError: + _LOGGER.warning( + "Device %s received unknown error code: %s", self._host, error_code_raw + ) + error_code = SmartErrorCode.INTERNAL_UNKNOWN_ERROR + + if error_code is SmartErrorCode.SUCCESS: + return + + if not raise_on_error: + resp_dict["result"] = error_code + return + + msg = ( + f"Error querying device: {self._host}: " + + f"{error_code.name}({error_code.value})" + + f" for method: {method}" + ) + if error_code in SMART_RETRYABLE_ERRORS: + raise _RetryableError(msg, error_code=error_code) + if error_code in SMART_AUTHENTICATION_ERRORS: + raise AuthenticationError(msg, error_code=error_code) + raise DeviceError(msg, error_code=error_code) + + async def close(self) -> None: + """Close the underlying transport.""" + await self._transport.close() + + async def _execute_query( + self, request: str | dict, *, retry_count: int, iterate_list_pages: bool = True + ) -> dict: + debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG) + + if isinstance(request, dict): + if len(request) == 1: + multi_method = next(iter(request)) + module = next(iter(request[multi_method])) + req = { + "method": multi_method[:3], + module: request[multi_method][module], + } + else: + return await self._execute_multiple_query(request, retry_count) + else: + # If method like getSomeThing then module will be some_thing + multi_method = request + snake_name = "".join( + ["_" + i.lower() if i.isupper() else i for i in multi_method] + ).lstrip("_") + module = snake_name[4:] + req = {"method": snake_name[:3], module: {}} + + smart_request = json_dumps(req) + if debug_enabled: + _LOGGER.debug( + "%s >> %s", + self._host, + pf(smart_request), + ) + response_data = await self._transport.send(smart_request) + + if debug_enabled: + _LOGGER.debug( + "%s << %s", + self._host, + pf(response_data), + ) + + if "error_code" in response_data: + # H200 does not return an error code + self._handle_response_error_code(response_data, multi_method) + + # TODO need to update handle response lists + + if multi_method[:3] == "set": + return {} + return {multi_method: {module: response_data[module]}} diff --git a/kasa/experimental/sslaestransport.py b/kasa/experimental/sslaestransport.py new file mode 100644 index 00000000..8936db8d --- /dev/null +++ b/kasa/experimental/sslaestransport.py @@ -0,0 +1,494 @@ +"""Implementation of the TP-Link SSL AES transport.""" + +from __future__ import annotations + +import base64 +import hashlib +import logging +import secrets +import ssl +import time +from enum import Enum, IntEnum, auto +from functools import cache +from typing import TYPE_CHECKING, Any, Dict, cast + +from urllib3.util import create_urllib3_context +from yarl import URL + +from ..aestransport import AesEncyptionSession +from ..credentials import Credentials +from ..deviceconfig import DeviceConfig +from ..exceptions import ( + AuthenticationError, + DeviceError, + KasaException, + _RetryableError, +) +from ..httpclient import HttpClient +from ..json import dumps as json_dumps +from ..json import loads as json_loads +from ..protocol import BaseTransport + +_LOGGER = logging.getLogger(__name__) + + +ONE_DAY_SECONDS = 86400 +SESSION_EXPIRE_BUFFER_SECONDS = 60 * 20 + + +def _sha256(payload: bytes) -> bytes: + return hashlib.sha256(payload).digest() # noqa: S324 + + +def _md5_hash(payload: bytes) -> str: + return hashlib.md5(payload).hexdigest().upper() # noqa: S324 + + +def _sha256_hash(payload: bytes) -> str: + return hashlib.sha256(payload).hexdigest().upper() # noqa: S324 + + +class TransportState(Enum): + """Enum for AES state.""" + + HANDSHAKE_REQUIRED = auto() # Handshake needed + ESTABLISHED = auto() # Ready to send requests + + +class SslAesTransport(BaseTransport): + """Implementation of the AES encryption protocol. + + AES is the name used in device discovery for TP-Link's TAPO encryption + protocol, sometimes used by newer firmware versions on kasa devices. + """ + + DEFAULT_PORT: int = 443 + COMMON_HEADERS = { + "Content-Type": "application/json; charset=UTF-8", + "requestByApp": "true", + "Accept": "application/json", + "Accept-Encoding": "gzip, deflate", + "User-Agent": "Tapo CameraClient Android", + "Connection": "close", + } + CIPHERS = ":".join( + [ + "AES256-GCM-SHA384", + "AES256-SHA256", + "AES128-GCM-SHA256", + "AES128-SHA256", + "AES256-SHA", + ] + ) + DEFAULT_TIMEOUT = 10 + + def __init__( + self, + *, + config: DeviceConfig, + ) -> None: + super().__init__(config=config) + + self._login_version = config.connection_type.login_version + if ( + not self._credentials or self._credentials.username is None + ) and not self._credentials_hash: + self._credentials = Credentials() + self._default_credentials: Credentials | None = None + + if not config.timeout: + config.timeout = self.DEFAULT_TIMEOUT + self._http_client: HttpClient = HttpClient(config) + + self._state = TransportState.HANDSHAKE_REQUIRED + + self._encryption_session: AesEncyptionSession | None = None + self._session_expire_at: float | None = None + + self._host_port = f"{self._host}:{self._port}" + self._app_url = URL(f"https://{self._host_port}") + self._token_url: URL | None = None + self._ssl_context = create_urllib3_context( + ciphers=self.CIPHERS, + cert_reqs=ssl.CERT_NONE, + options=0, + ) + ref = str(self._token_url) if self._token_url else str(self._app_url) + self._headers = { + **self.COMMON_HEADERS, + "Host": self._host_port, + "Referer": ref, + } + self._seq: int | None = None + self._pwd_hash: str | None = None + self._username: str | None = None + if self._credentials != Credentials() and self._credentials: + self._username = self._credentials.username + elif self._credentials_hash: + ch = json_loads(base64.b64decode(self._credentials_hash.encode())) + self._pwd_hash = ch["pwd"] + self._username = ch["un"] + self._local_nonce: str | None = None + + _LOGGER.debug("Created AES transport for %s", self._host) + + @property + def default_port(self) -> int: + """Default port for the transport.""" + return self.DEFAULT_PORT + + @property + def credentials_hash(self) -> str | None: + """The hashed credentials used by the transport.""" + if self._credentials == Credentials(): + return None + if self._credentials_hash: + return self._credentials_hash + if self._pwd_hash and self._credentials: + ch = {"un": self._credentials.username, "pwd": self._pwd_hash} + return base64.b64encode(json_dumps(ch).encode()).decode() + return None + + def _handle_response_error_code(self, resp_dict: Any, msg: str) -> None: + error_code_raw = resp_dict.get("error_code") + try: + error_code = SmartErrorCode.from_int(error_code_raw) + except ValueError: + _LOGGER.warning( + "Device %s received unknown error code: %s", self._host, error_code_raw + ) + error_code = SmartErrorCode.INTERNAL_UNKNOWN_ERROR + if error_code is SmartErrorCode.SUCCESS: + return + msg = f"{msg}: {self._host}: {error_code.name}({error_code.value})" + if error_code in SMART_RETRYABLE_ERRORS: + raise _RetryableError(msg, error_code=error_code) + if error_code in SMART_AUTHENTICATION_ERRORS: + self._state = TransportState.HANDSHAKE_REQUIRED + raise AuthenticationError(msg, error_code=error_code) + raise DeviceError(msg, error_code=error_code) + + async def send_secure_passthrough(self, request: str) -> dict[str, Any]: + """Send encrypted message as passthrough.""" + if self._state is TransportState.ESTABLISHED and self._token_url: + url = self._token_url + else: + url = self._app_url + + encrypted_payload = self._encryption_session.encrypt(request.encode()) # type: ignore + passthrough_request = { + "method": "securePassthrough", + "params": {"request": encrypted_payload.decode()}, + } + passthrough_request_str = json_dumps(passthrough_request) + if TYPE_CHECKING: + assert self._pwd_hash + assert self._local_nonce + assert self._seq + tag = self.generate_tag( + passthrough_request_str, self._local_nonce, self._pwd_hash, self._seq + ) + headers = {**self._headers, "Seq": str(self._seq), "Tapo_tag": tag} + self._seq += 1 + status_code, resp_dict = await self._http_client.post( + url, + json=passthrough_request_str, + headers=headers, + ssl=self._ssl_context, + ) + + if status_code != 200: + raise KasaException( + f"{self._host} responded with an unexpected " + + f"status code {status_code} to passthrough" + ) + + self._handle_response_error_code( + resp_dict, "Error sending secure_passthrough message" + ) + + if TYPE_CHECKING: + resp_dict = cast(Dict[str, Any], resp_dict) + assert self._encryption_session is not None + + if "result" in resp_dict and "response" in resp_dict["result"]: + raw_response: str = resp_dict["result"]["response"] + else: + # Tapo Cameras respond unencrypted to single requests. + return resp_dict + + try: + response = self._encryption_session.decrypt(raw_response.encode()) + ret_val = json_loads(response) + except Exception as ex: + try: + ret_val = json_loads(raw_response) + _LOGGER.debug( + "Received unencrypted response over secure passthrough from %s", + self._host, + ) + except Exception: + raise KasaException( + f"Unable to decrypt response from {self._host}, " + + f"error: {ex}, response: {raw_response}", + ex, + ) from ex + return ret_val # type: ignore[return-value] + + @staticmethod + def generate_confirm_hash(local_nonce, server_nonce, pwd_hash): + """Generate an auth hash for the protocol on the supplied credentials.""" + expected_confirm_bytes = _sha256_hash( + local_nonce.encode() + pwd_hash.encode() + server_nonce.encode() + ) + return expected_confirm_bytes + server_nonce + local_nonce + + @staticmethod + def generate_digest_password(local_nonce, server_nonce, pwd_hash): + """Generate an auth hash for the protocol on the supplied credentials.""" + digest_password_hash = _sha256_hash( + pwd_hash.encode() + local_nonce.encode() + server_nonce.encode() + ) + return ( + digest_password_hash.encode() + local_nonce.encode() + server_nonce.encode() + ).decode() + + @staticmethod + def generate_encryption_token( + token_type, local_nonce, server_nonce, pwd_hash + ) -> bytes: + """Generate encryption token.""" + hashedKey = _sha256_hash( + local_nonce.encode() + pwd_hash.encode() + server_nonce.encode() + ) + return _sha256( + token_type.encode() + + local_nonce.encode() + + server_nonce.encode() + + hashedKey.encode() + )[:16] + + @staticmethod + def generate_tag(request: str, local_nonce: str, pwd_hash: str, seq: int) -> str: + """Generate the tag header from the request for the header.""" + pwd_nonce_hash = _sha256_hash(pwd_hash.encode() + local_nonce.encode()) + tag = _sha256_hash( + pwd_nonce_hash.encode() + request.encode() + str(seq).encode() + ) + return tag + + async def perform_handshake(self) -> None: + """Perform the handshake.""" + local_nonce, server_nonce, pwd_hash = await self.perform_handshake1() + await self.perform_handshake2(local_nonce, server_nonce, pwd_hash) + + async def perform_handshake2(self, local_nonce, server_nonce, pwd_hash) -> None: + """Perform the handshake.""" + _LOGGER.debug("Performing handshake2 ...") + digest_password = self.generate_digest_password( + local_nonce, server_nonce, pwd_hash + ) + body = { + "method": "login", + "params": { + "cnonce": local_nonce, + "encrypt_type": "3", + "digest_passwd": digest_password, + "username": self._username, + }, + } + http_client = self._http_client + status_code, resp_dict = await http_client.post( + self._app_url, json=body, headers=self._headers, ssl=self._ssl_context + ) + if status_code != 200: + raise KasaException( + f"{self._host} responded with an unexpected " + + f"status code {status_code} to handshake2" + ) + resp_dict = cast(dict, resp_dict) + self._seq = resp_dict["result"]["start_seq"] + stok = resp_dict["result"]["stok"] + self._token_url = URL(f"{str(self._app_url)}/stok={stok}/ds") + self._pwd_hash = pwd_hash + self._local_nonce = local_nonce + lsk = self.generate_encryption_token("lsk", local_nonce, server_nonce, pwd_hash) + ivb = self.generate_encryption_token("ivb", local_nonce, server_nonce, pwd_hash) + self._encryption_session = AesEncyptionSession(lsk, ivb) + self._state = TransportState.ESTABLISHED + _LOGGER.debug("Handshake2 complete ...") + + async def perform_handshake1(self) -> tuple[str, str, str]: + """Perform the handshake.""" + _LOGGER.debug("Will perform handshaking...") + + if not self._username: + raise KasaException("Cannot connect to device with no credentials") + local_nonce = secrets.token_bytes(8).hex().upper() + # Device needs the content length or it will response with 500 + body = { + "method": "login", + "params": { + "cnonce": local_nonce, + "encrypt_type": "3", + "username": self._username, + }, + } + http_client = self._http_client + + status_code, resp_dict = await http_client.post( + self._app_url, json=body, headers=self._headers, ssl=self._ssl_context + ) + + _LOGGER.debug("Device responded with: %s", resp_dict) + + if status_code != 200: + raise KasaException( + f"{self._host} responded with an unexpected " + + f"status code {status_code} to handshake1" + ) + + resp_dict = cast(dict, resp_dict) + error_code = SmartErrorCode.from_int(resp_dict["error_code"]) + if error_code != SmartErrorCode.INVALID_NONCE: + self._handle_response_error_code(resp_dict, "Unable to complete handshake") + + if TYPE_CHECKING: + resp_dict = cast(Dict[str, Any], resp_dict) + + server_nonce = resp_dict["result"]["data"]["nonce"] + device_confirm = resp_dict["result"]["data"]["device_confirm"] + if self._credentials and self._credentials != Credentials(): + pwd_hash = _sha256_hash(self._credentials.password.encode()) + else: + if TYPE_CHECKING: + assert self._pwd_hash + pwd_hash = self._pwd_hash + + expected_confirm_sha256 = self.generate_confirm_hash( + local_nonce, server_nonce, pwd_hash + ) + if device_confirm == expected_confirm_sha256: + _LOGGER.debug("Credentials match") + return local_nonce, server_nonce, pwd_hash + + if TYPE_CHECKING: + assert self._credentials + assert self._credentials.password + pwd_hash = _md5_hash(self._credentials.password.encode()) + expected_confirm_md5 = self.generate_confirm_hash( + local_nonce, server_nonce, pwd_hash + ) + if device_confirm == expected_confirm_md5: + _LOGGER.debug("Credentials match") + return local_nonce, server_nonce, pwd_hash + + msg = f"Server response doesn't match our challenge on ip {self._host}" + _LOGGER.debug(msg) + raise AuthenticationError(msg) + + def _handshake_session_expired(self): + """Return true if session has expired.""" + return ( + self._session_expire_at is None + or self._session_expire_at - time.time() <= 0 + ) + + async def send(self, request: str) -> dict[str, Any]: + """Send the request.""" + if ( + self._state is TransportState.HANDSHAKE_REQUIRED + or self._handshake_session_expired() + ): + await self.perform_handshake() + + return await self.send_secure_passthrough(request) + + async def close(self) -> None: + """Close the http client and reset internal state.""" + await self.reset() + await self._http_client.close() + + async def reset(self) -> None: + """Reset internal handshake state.""" + self._state = TransportState.HANDSHAKE_REQUIRED + self._encryption_session = None + self._seq = 0 + self._pwd_hash = None + self._local_nonce = None + + +class SmartErrorCode(IntEnum): + """Smart error codes for this transport.""" + + def __str__(self): + return f"{self.name}({self.value})" + + @staticmethod + @cache + def from_int(value: int) -> SmartErrorCode: + """Convert an integer to a SmartErrorCode.""" + return SmartErrorCode(value) + + SUCCESS = 0 + + SYSTEM_ERROR = -40101 + INVALID_ARGUMENTS = -40209 + + # Camera error codes + SESSION_EXPIRED = -40401 + HOMEKIT_LOGIN_FAIL = -40412 + DEVICE_BLOCKED = -40404 + DEVICE_FACTORY = -40405 + OUT_OF_LIMIT = -40406 + OTHER_ERROR = -40407 + SYSTEM_BLOCKED = -40408 + NONCE_EXPIRED = -40409 + FFS_NONE_PWD = -90000 + TIMEOUT_ERROR = 40108 + UNSUPPORTED_METHOD = -40106 + ONE_SECOND_REPEAT_REQUEST = -40109 + INVALID_NONCE = -40413 + PROTOCOL_FORMAT_ERROR = -40210 + IP_CONFLICT = -40321 + DIAGNOSE_TYPE_NOT_SUPPORT = -69051 + DIAGNOSE_TASK_FULL = -69052 + DIAGNOSE_TASK_BUSY = -69053 + DIAGNOSE_INTERNAL_ERROR = -69055 + DIAGNOSE_ID_NOT_FOUND = -69056 + DIAGNOSE_TASK_NULL = -69057 + CLOUD_LINK_DOWN = -69060 + ONVIF_SET_WRONG_TIME = -69061 + CLOUD_NTP_NO_RESPONSE = -69062 + CLOUD_GET_WRONG_TIME = -69063 + SNTP_SRV_NO_RESPONSE = -69064 + SNTP_GET_WRONG_TIME = -69065 + LINK_UNCONNECTED = -69076 + WIFI_SIGNAL_WEAK = -69077 + LOCAL_NETWORK_POOR = -69078 + CLOUD_NETWORK_POOR = -69079 + INTER_NETWORK_POOR = -69080 + DNS_TIMEOUT = -69081 + DNS_ERROR = -69082 + PING_NO_RESPONSE = -69083 + DHCP_MULTI_SERVER = -69084 + DHCP_ERROR = -69085 + STREAM_SESSION_CLOSE = -69094 + STREAM_BITRATE_EXCEPTION = -69095 + STREAM_FULL = -69096 + STREAM_NO_INTERNET = -69097 + HARDWIRED_NOT_FOUND = -72101 + + # Library internal for unknown error codes + INTERNAL_UNKNOWN_ERROR = -100_000 + # Library internal for query errors + INTERNAL_QUERY_ERROR = -100_001 + + +SMART_RETRYABLE_ERRORS = [ + SmartErrorCode.SESSION_EXPIRED, +] + +SMART_AUTHENTICATION_ERRORS = [ + SmartErrorCode.INVALID_ARGUMENTS, +] diff --git a/kasa/httpclient.py b/kasa/httpclient.py index ec80ad61..9904b17b 100644 --- a/kasa/httpclient.py +++ b/kasa/httpclient.py @@ -64,6 +64,7 @@ class HttpClient: json: dict | Any | None = None, headers: dict[str, str] | None = None, cookies_dict: dict[str, str] | None = None, + ssl=False, ) -> tuple[int, dict | bytes | None]: """Send an http post request to the device. @@ -106,7 +107,7 @@ class HttpClient: timeout=client_timeout, cookies=cookies_dict, headers=headers, - ssl=False, + ssl=ssl, ) async with resp: if resp.status == 200: diff --git a/kasa/tests/test_cli.py b/kasa/tests/test_cli.py index 553f93d3..f22286e5 100644 --- a/kasa/tests/test_cli.py +++ b/kasa/tests/test_cli.py @@ -800,6 +800,9 @@ async def test_host_auth_failed(discovery_mock, mocker, runner): @pytest.mark.parametrize("device_type", TYPES) async def test_type_param(device_type, mocker, runner): """Test for handling only one of username or password supplied.""" + if device_type == "camera": + pytest.skip(reason="camera is experimental") + result_device = FileNotFoundError pass_dev = click.make_pass_decorator(Device) diff --git a/pyproject.toml b/pyproject.toml index 8d2d58b9..f92130ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,7 +85,10 @@ exclude = [ [tool.coverage.run] source = ["kasa"] branch = true -omit = ["kasa/tests/*"] +omit = [ + "kasa/tests/*", + "kasa/experimental/*" +] [tool.coverage.report] exclude_lines = [