mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-01-24 21:57:07 +00:00
Try variations of hashing and passwords
This commit is contained in:
parent
4a5bc20ee2
commit
36a9823b63
@ -43,6 +43,9 @@ def _sha256(payload: bytes) -> bytes:
|
|||||||
def _md5_hash(payload: bytes) -> str:
|
def _md5_hash(payload: bytes) -> str:
|
||||||
return hashlib.md5(payload).hexdigest().upper() # noqa: S324
|
return hashlib.md5(payload).hexdigest().upper() # noqa: S324
|
||||||
|
|
||||||
|
def _sha256_hash32(payload: bytes) -> str:
|
||||||
|
digest = hashlib.sha256(payload).digest() # noqa: S324
|
||||||
|
return base64.b32hexencode(digest).decode().upper()
|
||||||
|
|
||||||
def _sha256_hash(payload: bytes) -> str:
|
def _sha256_hash(payload: bytes) -> str:
|
||||||
return hashlib.sha256(payload).hexdigest().upper() # noqa: S324
|
return hashlib.sha256(payload).hexdigest().upper() # noqa: S324
|
||||||
@ -71,6 +74,7 @@ class SslAesTransport(BaseTransport):
|
|||||||
"Content-Type": "application/json; charset=UTF-8",
|
"Content-Type": "application/json; charset=UTF-8",
|
||||||
"requestByApp": "true",
|
"requestByApp": "true",
|
||||||
"Accept": "application/json",
|
"Accept": "application/json",
|
||||||
|
"Connection": "close",
|
||||||
"Accept-Encoding": "gzip, deflate",
|
"Accept-Encoding": "gzip, deflate",
|
||||||
"User-Agent": "Tapo CameraClient Android",
|
"User-Agent": "Tapo CameraClient Android",
|
||||||
}
|
}
|
||||||
@ -114,7 +118,7 @@ class SslAesTransport(BaseTransport):
|
|||||||
self._headers = {
|
self._headers = {
|
||||||
**self.COMMON_HEADERS,
|
**self.COMMON_HEADERS,
|
||||||
"Host": self._host,
|
"Host": self._host,
|
||||||
"Referer": f"https://{self._host}",
|
"Referer": f"https://{self._host_port}",
|
||||||
}
|
}
|
||||||
self._seq: int | None = None
|
self._seq: int | None = None
|
||||||
self._pwd_hash: str | None = None
|
self._pwd_hash: str | None = None
|
||||||
@ -346,18 +350,24 @@ class SslAesTransport(BaseTransport):
|
|||||||
local_nonce, server_nonce, pwd_hash = result
|
local_nonce, server_nonce, pwd_hash = result
|
||||||
await self.perform_handshake2(local_nonce, server_nonce, pwd_hash)
|
await self.perform_handshake2(local_nonce, server_nonce, pwd_hash)
|
||||||
|
|
||||||
async def try_perform_login(self) -> bool:
|
async def try_perform_login(self, server_nonce, local_nonce) -> bool:
|
||||||
"""Perform the md5 login."""
|
"""Perform the md5 login."""
|
||||||
_LOGGER.debug("Performing insecure login ...")
|
_LOGGER.debug("Performing insecure login ...")
|
||||||
|
|
||||||
pwd_hash = _md5_hash(self._pwd_to_hash().encode())
|
pwd = self._pwd_to_hash()
|
||||||
username = self._username
|
#to_hash = server_nonce + ":" + pwd
|
||||||
|
pwd_hash = _md5_hash(pwd.encode())
|
||||||
|
#pwd_hash = _sha256_hash(pwd.encode())
|
||||||
|
#to_hash = self._username + ":" + server_nonce
|
||||||
|
#un_hash = _md5_hash(to_hash.encode())
|
||||||
|
#username = self._username
|
||||||
body = {
|
body = {
|
||||||
"method": "login",
|
"method": "login",
|
||||||
"params": {
|
"params": {
|
||||||
|
#"cnonce": local_nonce,
|
||||||
"hashed": True,
|
"hashed": True,
|
||||||
"password": pwd_hash,
|
"password": pwd_hash,
|
||||||
"username": username,
|
"username": self._username,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -371,7 +381,7 @@ class SslAesTransport(BaseTransport):
|
|||||||
if status_code != 200:
|
if status_code != 200:
|
||||||
raise KasaException(
|
raise KasaException(
|
||||||
f"{self._host} responded with an unexpected "
|
f"{self._host} responded with an unexpected "
|
||||||
+ f"status code {status_code} to handshake2"
|
+ f"status code {status_code} to login"
|
||||||
)
|
)
|
||||||
resp_dict = cast(dict, resp_dict)
|
resp_dict = cast(dict, resp_dict)
|
||||||
if resp_dict.get("error_code") == 0 and (
|
if resp_dict.get("error_code") == 0 and (
|
||||||
@ -459,10 +469,9 @@ class SslAesTransport(BaseTransport):
|
|||||||
resp_dict
|
resp_dict
|
||||||
and (error_code := self._get_response_error(resp_dict))
|
and (error_code := self._get_response_error(resp_dict))
|
||||||
is SmartErrorCode.SESSION_EXPIRED
|
is SmartErrorCode.SESSION_EXPIRED
|
||||||
|
and (data := resp_dict.get("result", {}).get("data", {}))
|
||||||
and (
|
and (
|
||||||
encrypt_type := resp_dict.get("result", {})
|
encrypt_type := data.get("encrypt_type")
|
||||||
.get("data", {})
|
|
||||||
.get("encrypt_type")
|
|
||||||
)
|
)
|
||||||
and (encrypt_type != ["3"])
|
and (encrypt_type != ["3"])
|
||||||
):
|
):
|
||||||
@ -471,7 +480,7 @@ class SslAesTransport(BaseTransport):
|
|||||||
encrypt_type,
|
encrypt_type,
|
||||||
self._host,
|
self._host,
|
||||||
)
|
)
|
||||||
if await self.try_perform_login():
|
if await self.try_perform_login(data.get("nonce"), local_nonce):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Try the default username. If it fails raise the original error_code
|
# Try the default username. If it fails raise the original error_code
|
||||||
@ -482,7 +491,7 @@ class SslAesTransport(BaseTransport):
|
|||||||
or "nonce" not in resp_dict["result"].get("data", {})
|
or "nonce" not in resp_dict["result"].get("data", {})
|
||||||
):
|
):
|
||||||
_LOGGER.debug("Trying default credentials to %s", self._host)
|
_LOGGER.debug("Trying default credentials to %s", self._host)
|
||||||
local_nonce = secrets.token_bytes(8).hex().upper()
|
#local_nonce = secrets.token_bytes(8).hex().upper()
|
||||||
default_resp_dict = await self.try_send_handshake1(
|
default_resp_dict = await self.try_send_handshake1(
|
||||||
self._default_credentials.username, local_nonce
|
self._default_credentials.username, local_nonce
|
||||||
)
|
)
|
||||||
@ -533,8 +542,22 @@ class SslAesTransport(BaseTransport):
|
|||||||
_LOGGER.debug("Credentials match")
|
_LOGGER.debug("Credentials match")
|
||||||
return local_nonce, server_nonce, pwd_hash
|
return local_nonce, server_nonce, pwd_hash
|
||||||
|
|
||||||
for val in {"admin", "tpadmin", "slprealtek"}:
|
# For testing purposes only.
|
||||||
for func in {_sha256_hash, _md5_hash, _sha1_hash, lambda x: x.decode()}:
|
from ..credentials import DEFAULT_CREDENTIALS, get_default_credentials
|
||||||
|
device_or_wifi_mac = "12:34:56:AB:CD:EF"
|
||||||
|
default_passes = {get_default_credentials(cred).password for cred in DEFAULT_CREDENTIALS.values()}
|
||||||
|
vals = {
|
||||||
|
"admin",
|
||||||
|
"tpadmin",
|
||||||
|
"slprealtek",
|
||||||
|
self._password,
|
||||||
|
self._credentials.username,
|
||||||
|
f"{self._credentials.username}_{device_or_wifi_mac}",
|
||||||
|
f"{self._credentials.username}_{device_or_wifi_mac.lower()}",
|
||||||
|
}
|
||||||
|
vals.update(default_passes)
|
||||||
|
for val in vals:
|
||||||
|
for func in {_sha256_hash, _md5_hash, _sha1_hash, _sha256_hash32, lambda x: x.decode()}:
|
||||||
pwd_hash = func(val.encode())
|
pwd_hash = func(val.encode())
|
||||||
ec = self.generate_confirm_hash(local_nonce, server_nonce, pwd_hash)
|
ec = self.generate_confirm_hash(local_nonce, server_nonce, pwd_hash)
|
||||||
if device_confirm == ec:
|
if device_confirm == ec:
|
||||||
|
Loading…
Reference in New Issue
Block a user