diff --git a/kasa/transports/sslaestransport.py b/kasa/transports/sslaestransport.py index cebf2e3d..bcae5570 100644 --- a/kasa/transports/sslaestransport.py +++ b/kasa/transports/sslaestransport.py @@ -7,10 +7,11 @@ import base64 import hashlib import logging import secrets +import socket import ssl from enum import Enum, auto from typing import TYPE_CHECKING, Any, cast -import uuid + from yarl import URL from ..credentials import DEFAULT_CREDENTIALS, Credentials, get_default_credentials @@ -44,19 +45,10 @@ def _md5_hash(payload: bytes) -> str: return hashlib.md5(payload).hexdigest().upper() # noqa: S324 -def _sha256_hash32(payload: bytes) -> str: - digest = hashlib.sha256(payload).digest() # noqa: S324 - return base64.b32hexencode(digest).decode().upper() - - def _sha256_hash(payload: bytes) -> str: return hashlib.sha256(payload).hexdigest().upper() # noqa: S324 -def _sha1_hash(payload: bytes) -> str: - return hashlib.sha1(payload).hexdigest().upper() # noqa: S324 - - class TransportState(Enum): """Enum for AES state.""" @@ -78,8 +70,7 @@ class SslAesTransport(BaseTransport): "Accept": "application/json", "Connection": "close", "Accept-Encoding": "gzip, deflate", -# "User-Agent": "Tapo CameraClient Android", - "User-Agent": _md5_hash(uuid.uuid4().bytes) + "User-Agent": "Tapo CameraClient Android", } CIPHERS = ":".join( [ @@ -118,11 +109,7 @@ class SslAesTransport(BaseTransport): self._app_url = URL(f"https://{self._host_port}") self._token_url: URL | None = None self._ssl_context: ssl.SSLContext | None = None - self._headers = { - **self.COMMON_HEADERS, - "Host": self._host, - "Referer": "https://127.0.1.1", - } + self._headers: dict | None = None self._seq: int | None = None self._pwd_hash: str | None = None self._username: str | None = None @@ -211,6 +198,23 @@ class SslAesTransport(BaseTransport): ) return self._ssl_context + async def _get_headers(self) -> dict: + if not self._headers: + loop = asyncio.get_event_loop() + adrrinfo = await loop.getaddrinfo( + socket.gethostname(), 0, type=socket.SOCK_DGRAM, family=socket.AF_INET + ) + # getaddrinfo returns a list of 5 tuples with the following structure: + # (family, type, proto, canonname, sockaddr) + # where sockaddr is 2 tuple (ip, port). + this_ip = adrrinfo[0][4][0] + self._headers = { + **self.COMMON_HEADERS, + "Referer": f"https://{this_ip}", + "Host": self._host_port, + } + return self._headers + async def send_secure_passthrough(self, request: str) -> dict[str, Any]: """Send encrypted message as passthrough.""" if self._state is TransportState.ESTABLISHED and self._token_url: @@ -235,7 +239,7 @@ class SslAesTransport(BaseTransport): tag = self.generate_tag( passthrough_request_str, self._local_nonce, self._pwd_hash, self._seq ) - headers = {**self._headers, "Seq": str(self._seq), "Tapo_tag": tag} + headers = {**await self._get_headers(), "Seq": str(self._seq), "Tapo_tag": tag} self._seq += 1 status_code, resp_dict = await self._http_client.post( url, @@ -297,7 +301,7 @@ class SslAesTransport(BaseTransport): status_code, resp_dict = await self._http_client.post( url, json=request, - headers=self._headers, + headers=await self._get_headers(), ssl=await self._get_ssl_context(), ) @@ -366,21 +370,15 @@ class SslAesTransport(BaseTransport): local_nonce, server_nonce, pwd_hash = result await self.perform_handshake2(local_nonce, server_nonce, pwd_hash) - async def try_perform_login(self, server_nonce, local_nonce) -> bool: + async def try_perform_login(self) -> bool: """Perform the md5 login.""" _LOGGER.debug("Performing insecure login ...") pwd = self._pwd_to_hash() - # to_hash = server_nonce + ":" + pwd pwd_hash = _md5_hash(pwd.encode()) - # pwd_hash = _sha256_hash(pwd.encode()) - # to_hash = self._username + ":" + server_nonce - # un_hash = _md5_hash(to_hash.encode()) - # username = self._username body = { "method": "login", "params": { - # "cnonce": local_nonce, "hashed": True, "password": pwd_hash, "username": self._username, @@ -391,7 +389,7 @@ class SslAesTransport(BaseTransport): status_code, resp_dict = await http_client.post( self._app_url, json=body, - headers=self._headers, + headers=await self._get_headers(), ssl=await self._get_ssl_context(), ) if status_code != 200: @@ -435,7 +433,7 @@ class SslAesTransport(BaseTransport): status_code, resp_dict = await http_client.post( self._app_url, json=body, - headers=self._headers, + headers=await self._get_headers(), ssl=await self._get_ssl_context(), ) if status_code != 200: @@ -499,9 +497,7 @@ class SslAesTransport(BaseTransport): if ( resp_dict and self._is_less_secure_login(resp_dict) - and await self.try_perform_login( - resp_dict.get("data", {}).get("nonce"), local_nonce - ) + and await self.try_perform_login() ): return None @@ -528,10 +524,9 @@ class SslAesTransport(BaseTransport): error_code = default_error_code resp_dict = default_resp_dict # Otherwise could be less secure login - elif self._is_less_secure_login( - default_resp_dict - ) and await self.try_perform_login( - default_resp_dict.get("data", {}).get("nonce"), local_nonce + elif ( + self._is_less_secure_login(default_resp_dict) + and await self.try_perform_login() ): return None @@ -584,40 +579,6 @@ class SslAesTransport(BaseTransport): _LOGGER.debug("Credentials match") return local_nonce, server_nonce, pwd_hash - # For testing purposes only. - from ..credentials import DEFAULT_CREDENTIALS, get_default_credentials - - device_or_wifi_mac = "12:34:56:AB:CD:EF" - default_passes = { - get_default_credentials(cred).password - for cred in DEFAULT_CREDENTIALS.values() - } - vals = { - "admin", - "tpadmin", - "slprealtek", - self._password, - self._credentials.username, - f"{self._credentials.username}_{device_or_wifi_mac}", - f"{self._credentials.username}_{device_or_wifi_mac.lower()}", - } - vals.update(default_passes) - for val in vals: - for func in { - _sha256_hash, - _md5_hash, - _sha1_hash, - _sha256_hash32, - lambda x: x.decode(), - }: - if not val: - continue - pwd_hash = func(val.encode()) - ec = self.generate_confirm_hash(local_nonce, server_nonce, pwd_hash) - if device_confirm == ec: - _LOGGER.debug("Credentials match with %s %s", val, func.__name__) - return local_nonce, server_nonce, pwd_hash - msg = f"Server response doesn't match our challenge on ip {self._host}" _LOGGER.debug(msg) @@ -625,7 +586,7 @@ class SslAesTransport(BaseTransport): async def try_send_handshake1(self, username: str, local_nonce: str) -> dict: """Perform the handshake.""" - _LOGGER.debug("Will to send handshake1...") + _LOGGER.debug("Sending handshake1...") body = { "method": "login", @@ -640,7 +601,7 @@ class SslAesTransport(BaseTransport): status_code, resp_dict = await http_client.post( self._app_url, json=body, - headers=self._headers, + headers=await self._get_headers(), ssl=await self._get_ssl_context(), )