Update SslAesTransport for older firmware versions (#1362)
Some checks are pending
CI / Perform linting checks (3.13) (push) Waiting to run
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.13) (push) Blocked by required conditions
CodeQL checks / Analyze (python) (push) Waiting to run

Older firmware versions do not encrypt the payload.

Tested to work with C110 hw 2.0 fw 1.3.7 Build 230823 Rel.57279n(5553)

---------

Co-authored-by: Teemu R. <tpr@iki.fi>
This commit is contained in:
Steven B.
2025-01-03 20:00:57 +00:00
committed by GitHub
parent 883d52209e
commit 0a95a41ab6
3 changed files with 363 additions and 23 deletions

View File

@@ -132,6 +132,7 @@ class SmartErrorCode(IntEnum):
# Camera error codes
SESSION_EXPIRED = -40401
BAD_USERNAME = -40411 # determined from testing
HOMEKIT_LOGIN_FAIL = -40412
DEVICE_BLOCKED = -40404
DEVICE_FACTORY = -40405

View File

@@ -126,6 +126,7 @@ class SslAesTransport(BaseTransport):
self._password = ch["pwd"]
self._username = ch["un"]
self._local_nonce: str | None = None
self._send_secure = True
_LOGGER.debug("Created AES transport for %s", self._host)
@@ -162,7 +163,13 @@ class SslAesTransport(BaseTransport):
return error_code
def _get_response_inner_error(self, resp_dict: Any) -> SmartErrorCode | None:
# Device blocked errors have 'data' element at the root level, other inner
# errors are inside 'result'
error_code_raw = resp_dict.get("data", {}).get("code")
if error_code_raw is None:
error_code_raw = resp_dict.get("result", {}).get("data", {}).get("code")
if error_code_raw is None:
return None
try:
@@ -208,6 +215,10 @@ class SslAesTransport(BaseTransport):
else:
url = self._app_url
_LOGGER.debug(
"Sending secure passthrough from %s",
self._host,
)
encrypted_payload = self._encryption_session.encrypt(request.encode()) # type: ignore
passthrough_request = {
"method": "securePassthrough",
@@ -292,6 +303,34 @@ class SslAesTransport(BaseTransport):
) from ex
return ret_val # type: ignore[return-value]
async def send_unencrypted(self, request: str) -> dict[str, Any]:
"""Send encrypted message as passthrough."""
url = cast(URL, self._token_url)
_LOGGER.debug(
"Sending unencrypted to %s",
self._host,
)
status_code, resp_dict = await self._http_client.post(
url,
json=request,
headers=self._headers,
ssl=await self._get_ssl_context(),
)
if status_code != 200:
raise KasaException(
f"{self._host} responded with an unexpected "
+ f"status code {status_code} to unencrypted send"
)
self._handle_response_error_code(resp_dict, "Error sending message")
if TYPE_CHECKING:
resp_dict = cast(dict[str, Any], resp_dict)
return resp_dict
@staticmethod
def generate_confirm_hash(
local_nonce: str, server_nonce: str, pwd_hash: str
@@ -340,8 +379,50 @@ class SslAesTransport(BaseTransport):
async def perform_handshake(self) -> None:
"""Perform the handshake."""
local_nonce, server_nonce, pwd_hash = await self.perform_handshake1()
await self.perform_handshake2(local_nonce, server_nonce, pwd_hash)
result = await self.perform_handshake1()
if result:
local_nonce, server_nonce, pwd_hash = result
await self.perform_handshake2(local_nonce, server_nonce, pwd_hash)
async def try_perform_less_secure_login(self, username: str, password: str) -> bool:
"""Perform the md5 login."""
_LOGGER.debug("Performing less secure login...")
pwd_hash = _md5_hash(password.encode())
body = {
"method": "login",
"params": {
"hashed": True,
"password": pwd_hash,
"username": username,
},
}
status_code, resp_dict = await self._http_client.post(
self._app_url,
json=body,
headers=self._headers,
ssl=await self._get_ssl_context(),
)
if status_code != 200:
raise KasaException(
f"{self._host} responded with an unexpected "
+ f"status code {status_code} to login"
)
resp_dict = cast(dict, resp_dict)
if resp_dict.get("error_code") == 0 and (
stok := resp_dict.get("result", {}).get("stok")
):
_LOGGER.debug(
"Succesfully logged in to %s with less secure passthrough", self._host
)
self._send_secure = False
self._token_url = URL(f"{str(self._app_url)}/stok={stok}/ds")
self._pwd_hash = pwd_hash
return True
_LOGGER.debug("Unable to log in to %s with less secure login", self._host)
return False
async def perform_handshake2(
self, local_nonce: str, server_nonce: str, pwd_hash: str
@@ -393,13 +474,50 @@ class SslAesTransport(BaseTransport):
self._state = TransportState.ESTABLISHED
_LOGGER.debug("Handshake2 complete ...")
async def perform_handshake1(self) -> tuple[str, str, str]:
def _pwd_to_hash(self) -> str:
"""Return the password to hash."""
if self._credentials and self._credentials != Credentials():
return self._credentials.password
if self._username and self._password:
return self._password
return self._default_credentials.password
def _is_less_secure_login(self, resp_dict: dict[str, Any]) -> bool:
result = (
self._get_response_error(resp_dict) is SmartErrorCode.SESSION_EXPIRED
and (data := resp_dict.get("result", {}).get("data", {}))
and (encrypt_type := data.get("encrypt_type"))
and (encrypt_type != ["3"])
)
if result:
_LOGGER.debug(
"Received encrypt_type %s for %s, trying less secure login",
encrypt_type,
self._host,
)
return result
async def perform_handshake1(self) -> tuple[str, str, str] | None:
"""Perform the handshake1."""
resp_dict = None
if self._username:
local_nonce = secrets.token_bytes(8).hex().upper()
resp_dict = await self.try_send_handshake1(self._username, local_nonce)
if (
resp_dict
and self._is_less_secure_login(resp_dict)
and self._get_response_inner_error(resp_dict)
is not SmartErrorCode.BAD_USERNAME
and await self.try_perform_less_secure_login(
cast(str, self._username), self._pwd_to_hash()
)
):
self._state = TransportState.ESTABLISHED
return None
# Try the default username. If it fails raise the original error_code
if (
not resp_dict
@@ -407,19 +525,30 @@ class SslAesTransport(BaseTransport):
is not SmartErrorCode.INVALID_NONCE
or "nonce" not in resp_dict["result"].get("data", {})
):
_LOGGER.debug("Trying default credentials to %s", self._host)
local_nonce = secrets.token_bytes(8).hex().upper()
default_resp_dict = await self.try_send_handshake1(
self._default_credentials.username, local_nonce
)
# INVALID_NONCE means device should perform secure login
if (
default_error_code := self._get_response_error(default_resp_dict)
) is SmartErrorCode.INVALID_NONCE and "nonce" in default_resp_dict[
"result"
].get("data", {}):
_LOGGER.debug("Connected to {self._host} with default username")
_LOGGER.debug("Connected to %s with default username", self._host)
self._username = self._default_credentials.username
error_code = default_error_code
resp_dict = default_resp_dict
# Otherwise could be less secure login
elif self._is_less_secure_login(
default_resp_dict
) and await self.try_perform_less_secure_login(
self._default_credentials.username, self._pwd_to_hash()
):
self._username = self._default_credentials.username
self._state = TransportState.ESTABLISHED
return None
# If the default login worked it's ok not to provide credentials but if
# it didn't raise auth error here.
@@ -451,12 +580,8 @@ class SslAesTransport(BaseTransport):
server_nonce = resp_dict["result"]["data"]["nonce"]
device_confirm = resp_dict["result"]["data"]["device_confirm"]
if self._credentials and self._credentials != Credentials():
pwd_hash = _sha256_hash(self._credentials.password.encode())
elif self._username and self._password:
pwd_hash = _sha256_hash(self._password.encode())
else:
pwd_hash = _sha256_hash(self._default_credentials.password.encode())
pwd_hash = _sha256_hash(self._pwd_to_hash().encode())
expected_confirm_sha256 = self.generate_confirm_hash(
local_nonce, server_nonce, pwd_hash
@@ -468,7 +593,9 @@ class SslAesTransport(BaseTransport):
if TYPE_CHECKING:
assert self._credentials
assert self._credentials.password
pwd_hash = _md5_hash(self._credentials.password.encode())
pwd_hash = _md5_hash(self._pwd_to_hash().encode())
expected_confirm_md5 = self.generate_confirm_hash(
local_nonce, server_nonce, pwd_hash
)
@@ -478,11 +605,12 @@ class SslAesTransport(BaseTransport):
msg = f"Server response doesn't match our challenge on ip {self._host}"
_LOGGER.debug(msg)
raise AuthenticationError(msg)
async def try_send_handshake1(self, username: str, local_nonce: str) -> dict:
"""Perform the handshake."""
_LOGGER.debug("Will to send handshake1...")
_LOGGER.debug("Sending handshake1...")
body = {
"method": "login",
@@ -501,7 +629,7 @@ class SslAesTransport(BaseTransport):
ssl=await self._get_ssl_context(),
)
_LOGGER.debug("Device responded with: %s", resp_dict)
_LOGGER.debug("Device responded with status %s: %s", status_code, resp_dict)
if status_code != 200:
raise KasaException(
@@ -516,7 +644,10 @@ class SslAesTransport(BaseTransport):
if self._state is TransportState.HANDSHAKE_REQUIRED:
await self.perform_handshake()
return await self.send_secure_passthrough(request)
if self._send_secure:
return await self.send_secure_passthrough(request)
return await self.send_unencrypted(request)
async def close(self) -> None:
"""Close the http client and reset internal state."""