Merge branch 'master' into janitor/smartcam_referer

This commit is contained in:
Steven B.
2024-12-23 09:27:56 +00:00
committed by GitHub
17 changed files with 5584 additions and 296 deletions

View File

@@ -15,8 +15,10 @@ from kasa.credentials import DEFAULT_CREDENTIALS, Credentials, get_default_crede
from kasa.deviceconfig import DeviceConfig
from kasa.exceptions import (
AuthenticationError,
DeviceError,
KasaException,
SmartErrorCode,
_RetryableError,
)
from kasa.httpclient import HttpClient
from kasa.transports.aestransport import AesEncyptionSession
@@ -201,6 +203,64 @@ async def test_unencrypted_response(mocker, caplog):
)
async def test_device_blocked_response(mocker):
host = "127.0.0.1"
mock_ssl_aes_device = MockSslAesDevice(host, device_blocked=True)
mocker.patch.object(
aiohttp.ClientSession, "post", side_effect=mock_ssl_aes_device.post
)
transport = SslAesTransport(
config=DeviceConfig(host, credentials=Credentials(MOCK_USER, MOCK_PWD))
)
msg = "Device blocked for 1685 seconds"
with pytest.raises(DeviceError, match=msg):
await transport.perform_handshake()
@pytest.mark.parametrize(
("response", "expected_msg"),
[
pytest.param(
{"error_code": -1, "msg": "Check tapo tag failed"},
'{"error_code": -1, "msg": "Check tapo tag failed"}',
id="can-decrypt",
),
pytest.param(
b"12345678",
str({"result": {"response": "12345678"}, "error_code": 0}),
id="cannot-decrypt",
),
],
)
async def test_device_500_error(mocker, response, expected_msg):
"""Test 500 error raises retryable exception."""
host = "127.0.0.1"
mock_ssl_aes_device = MockSslAesDevice(host)
mocker.patch.object(
aiohttp.ClientSession, "post", side_effect=mock_ssl_aes_device.post
)
transport = SslAesTransport(
config=DeviceConfig(host, credentials=Credentials(MOCK_USER, MOCK_PWD))
)
request = {
"method": "getDeviceInfo",
"params": None,
}
await transport.perform_handshake()
mock_ssl_aes_device.put_next_response(response)
mock_ssl_aes_device.status_code = 500
msg = f"Device 127.0.0.1 replied with status 500 after handshake, response: {expected_msg}"
with pytest.raises(_RetryableError, match=msg):
await transport.send(json_dumps(request))
async def test_port_override():
"""Test that port override sets the app_url."""
host = "127.0.0.1"
@@ -236,6 +296,11 @@ class MockSslAesDevice:
},
}
DEVICE_BLOCKED_RESP = {
"data": {"code": SmartErrorCode.DEVICE_BLOCKED.value, "sec_left": 1685},
"error_code": SmartErrorCode.SESSION_EXPIRED.value,
}
class _mock_response:
def __init__(self, status, request: dict):
self.status = status
@@ -264,6 +329,7 @@ class MockSslAesDevice:
send_error_code=0,
secure_passthrough_error_code=0,
digest_password_fail=False,
device_blocked=False,
):
self.host = host
self.http_client = HttpClient(DeviceConfig(self.host))
@@ -278,6 +344,9 @@ class MockSslAesDevice:
self.do_not_encrypt_response = do_not_encrypt_response
self.want_default_username = want_default_username
self.digest_password_fail = digest_password_fail
self.device_blocked = device_blocked
self._next_responses: list[dict | bytes] = []
async def post(self, url: URL, params=None, json=None, data=None, *_, **__):
if data:
@@ -304,6 +373,9 @@ class MockSslAesDevice:
request_nonce = request["params"].get("cnonce")
request_username = request["params"].get("username")
if self.device_blocked:
return self._mock_response(self.status_code, self.DEVICE_BLOCKED_RESP)
if (self.want_default_username and request_username != MOCK_ADMIN_USER) or (
not self.want_default_username and request_username != MOCK_USER
):
@@ -360,11 +432,24 @@ class MockSslAesDevice:
assert self.encryption_session
decrypted_request = self.encryption_session.decrypt(encrypted_request.encode())
decrypted_request_dict = json_loads(decrypted_request)
decrypted_response = await self._post(url, decrypted_request_dict)
async with decrypted_response:
decrypted_response_data = await decrypted_response.read()
encrypted_response = self.encryption_session.encrypt(decrypted_response_data)
if self._next_responses:
next_response = self._next_responses.pop(0)
if isinstance(next_response, dict):
decrypted_response_data = json_dumps(next_response).encode()
encrypted_response = self.encryption_session.encrypt(
decrypted_response_data
)
else:
encrypted_response = next_response
else:
decrypted_response = await self._post(url, decrypted_request_dict)
async with decrypted_response:
decrypted_response_data = await decrypted_response.read()
encrypted_response = self.encryption_session.encrypt(
decrypted_response_data
)
response = (
decrypted_response_data
if self.do_not_encrypt_response
@@ -379,3 +464,6 @@ class MockSslAesDevice:
async def _return_send_response(self, url: URL, json: dict[str, Any]):
result = {"result": {"method": None}, "error_code": self.send_error_code}
return self._mock_response(self.status_code, result)
def put_next_response(self, request: dict | bytes) -> None:
self._next_responses.append(request)