Cleanup experimental code

This commit is contained in:
Steven B 2024-12-13 16:29:07 +00:00
parent 6d753efb8f
commit 841cb3aed4
No known key found for this signature in database
GPG Key ID: 6D5B46B3679F2A43

View File

@ -7,10 +7,11 @@ import base64
import hashlib
import logging
import secrets
import socket
import ssl
from enum import Enum, auto
from typing import TYPE_CHECKING, Any, cast
import uuid
from yarl import URL
from ..credentials import DEFAULT_CREDENTIALS, Credentials, get_default_credentials
@ -44,19 +45,10 @@ def _md5_hash(payload: bytes) -> str:
return hashlib.md5(payload).hexdigest().upper() # noqa: S324
def _sha256_hash32(payload: bytes) -> str:
digest = hashlib.sha256(payload).digest() # noqa: S324
return base64.b32hexencode(digest).decode().upper()
def _sha256_hash(payload: bytes) -> str:
return hashlib.sha256(payload).hexdigest().upper() # noqa: S324
def _sha1_hash(payload: bytes) -> str:
return hashlib.sha1(payload).hexdigest().upper() # noqa: S324
class TransportState(Enum):
"""Enum for AES state."""
@ -78,8 +70,7 @@ class SslAesTransport(BaseTransport):
"Accept": "application/json",
"Connection": "close",
"Accept-Encoding": "gzip, deflate",
# "User-Agent": "Tapo CameraClient Android",
"User-Agent": _md5_hash(uuid.uuid4().bytes)
"User-Agent": "Tapo CameraClient Android",
}
CIPHERS = ":".join(
[
@ -118,11 +109,7 @@ class SslAesTransport(BaseTransport):
self._app_url = URL(f"https://{self._host_port}")
self._token_url: URL | None = None
self._ssl_context: ssl.SSLContext | None = None
self._headers = {
**self.COMMON_HEADERS,
"Host": self._host,
"Referer": "https://127.0.1.1",
}
self._headers: dict | None = None
self._seq: int | None = None
self._pwd_hash: str | None = None
self._username: str | None = None
@ -211,6 +198,23 @@ class SslAesTransport(BaseTransport):
)
return self._ssl_context
async def _get_headers(self) -> dict:
if not self._headers:
loop = asyncio.get_event_loop()
adrrinfo = await loop.getaddrinfo(
socket.gethostname(), 0, type=socket.SOCK_DGRAM, family=socket.AF_INET
)
# getaddrinfo returns a list of 5 tuples with the following structure:
# (family, type, proto, canonname, sockaddr)
# where sockaddr is 2 tuple (ip, port).
this_ip = adrrinfo[0][4][0]
self._headers = {
**self.COMMON_HEADERS,
"Referer": f"https://{this_ip}",
"Host": self._host_port,
}
return self._headers
async def send_secure_passthrough(self, request: str) -> dict[str, Any]:
"""Send encrypted message as passthrough."""
if self._state is TransportState.ESTABLISHED and self._token_url:
@ -235,7 +239,7 @@ class SslAesTransport(BaseTransport):
tag = self.generate_tag(
passthrough_request_str, self._local_nonce, self._pwd_hash, self._seq
)
headers = {**self._headers, "Seq": str(self._seq), "Tapo_tag": tag}
headers = {**await self._get_headers(), "Seq": str(self._seq), "Tapo_tag": tag}
self._seq += 1
status_code, resp_dict = await self._http_client.post(
url,
@ -297,7 +301,7 @@ class SslAesTransport(BaseTransport):
status_code, resp_dict = await self._http_client.post(
url,
json=request,
headers=self._headers,
headers=await self._get_headers(),
ssl=await self._get_ssl_context(),
)
@ -366,21 +370,15 @@ class SslAesTransport(BaseTransport):
local_nonce, server_nonce, pwd_hash = result
await self.perform_handshake2(local_nonce, server_nonce, pwd_hash)
async def try_perform_login(self, server_nonce, local_nonce) -> bool:
async def try_perform_login(self) -> bool:
"""Perform the md5 login."""
_LOGGER.debug("Performing insecure login ...")
pwd = self._pwd_to_hash()
# to_hash = server_nonce + ":" + pwd
pwd_hash = _md5_hash(pwd.encode())
# pwd_hash = _sha256_hash(pwd.encode())
# to_hash = self._username + ":" + server_nonce
# un_hash = _md5_hash(to_hash.encode())
# username = self._username
body = {
"method": "login",
"params": {
# "cnonce": local_nonce,
"hashed": True,
"password": pwd_hash,
"username": self._username,
@ -391,7 +389,7 @@ class SslAesTransport(BaseTransport):
status_code, resp_dict = await http_client.post(
self._app_url,
json=body,
headers=self._headers,
headers=await self._get_headers(),
ssl=await self._get_ssl_context(),
)
if status_code != 200:
@ -435,7 +433,7 @@ class SslAesTransport(BaseTransport):
status_code, resp_dict = await http_client.post(
self._app_url,
json=body,
headers=self._headers,
headers=await self._get_headers(),
ssl=await self._get_ssl_context(),
)
if status_code != 200:
@ -499,9 +497,7 @@ class SslAesTransport(BaseTransport):
if (
resp_dict
and self._is_less_secure_login(resp_dict)
and await self.try_perform_login(
resp_dict.get("data", {}).get("nonce"), local_nonce
)
and await self.try_perform_login()
):
return None
@ -528,10 +524,9 @@ class SslAesTransport(BaseTransport):
error_code = default_error_code
resp_dict = default_resp_dict
# Otherwise could be less secure login
elif self._is_less_secure_login(
default_resp_dict
) and await self.try_perform_login(
default_resp_dict.get("data", {}).get("nonce"), local_nonce
elif (
self._is_less_secure_login(default_resp_dict)
and await self.try_perform_login()
):
return None
@ -584,40 +579,6 @@ class SslAesTransport(BaseTransport):
_LOGGER.debug("Credentials match")
return local_nonce, server_nonce, pwd_hash
# For testing purposes only.
from ..credentials import DEFAULT_CREDENTIALS, get_default_credentials
device_or_wifi_mac = "12:34:56:AB:CD:EF"
default_passes = {
get_default_credentials(cred).password
for cred in DEFAULT_CREDENTIALS.values()
}
vals = {
"admin",
"tpadmin",
"slprealtek",
self._password,
self._credentials.username,
f"{self._credentials.username}_{device_or_wifi_mac}",
f"{self._credentials.username}_{device_or_wifi_mac.lower()}",
}
vals.update(default_passes)
for val in vals:
for func in {
_sha256_hash,
_md5_hash,
_sha1_hash,
_sha256_hash32,
lambda x: x.decode(),
}:
if not val:
continue
pwd_hash = func(val.encode())
ec = self.generate_confirm_hash(local_nonce, server_nonce, pwd_hash)
if device_confirm == ec:
_LOGGER.debug("Credentials match with %s %s", val, func.__name__)
return local_nonce, server_nonce, pwd_hash
msg = f"Server response doesn't match our challenge on ip {self._host}"
_LOGGER.debug(msg)
@ -625,7 +586,7 @@ class SslAesTransport(BaseTransport):
async def try_send_handshake1(self, username: str, local_nonce: str) -> dict:
"""Perform the handshake."""
_LOGGER.debug("Will to send handshake1...")
_LOGGER.debug("Sending handshake1...")
body = {
"method": "login",
@ -640,7 +601,7 @@ class SslAesTransport(BaseTransport):
status_code, resp_dict = await http_client.post(
self._app_url,
json=body,
headers=self._headers,
headers=await self._get_headers(),
ssl=await self._get_ssl_context(),
)