mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-01-08 22:07:06 +00:00
Reduce AuthenticationExceptions raising from transports (#740)
* Reduce AuthenticationExceptions raising from transports * Make auth failed test ids easier to read * Test invalid klap response length
This commit is contained in:
parent
215b8d4e4f
commit
6ab17d823c
@ -208,9 +208,9 @@ class AesTransport(BaseTransport):
|
||||
except AuthenticationException:
|
||||
raise
|
||||
except Exception as ex:
|
||||
raise AuthenticationException(
|
||||
raise SmartDeviceException(
|
||||
"Unable to login and trying default "
|
||||
+ "login raised another exception: %s",
|
||||
+ f"login raised another exception: {ex}",
|
||||
ex,
|
||||
) from ex
|
||||
|
||||
|
@ -159,7 +159,7 @@ class KlapTransport(BaseTransport):
|
||||
)
|
||||
|
||||
if response_status != 200:
|
||||
raise AuthenticationException(
|
||||
raise SmartDeviceException(
|
||||
f"Device {self._host} responded with {response_status} to handshake1"
|
||||
)
|
||||
|
||||
@ -167,6 +167,12 @@ class KlapTransport(BaseTransport):
|
||||
remote_seed: bytes = response_data[0:16]
|
||||
server_hash = response_data[16:]
|
||||
|
||||
if len(server_hash) != 32:
|
||||
raise SmartDeviceException(
|
||||
f"Device {self._host} responded with unexpected klap response "
|
||||
+ f"{response_data!r} to handshake1"
|
||||
)
|
||||
|
||||
if _LOGGER.isEnabledFor(logging.DEBUG):
|
||||
_LOGGER.debug(
|
||||
"Handshake1 success at %s. Host is %s, "
|
||||
@ -260,7 +266,9 @@ class KlapTransport(BaseTransport):
|
||||
)
|
||||
|
||||
if response_status != 200:
|
||||
raise AuthenticationException(
|
||||
# This shouldn't be caused by incorrect
|
||||
# credentials so don't raise AuthenticationException
|
||||
raise SmartDeviceException(
|
||||
f"Device {self._host} responded with {response_status} to handshake2"
|
||||
)
|
||||
|
||||
|
@ -350,7 +350,7 @@ async def test_handshake(
|
||||
assert protocol._transport._handshake_done is True
|
||||
|
||||
response_status = 403
|
||||
with pytest.raises(AuthenticationException):
|
||||
with pytest.raises(SmartDeviceException):
|
||||
await protocol._transport.perform_handshake()
|
||||
assert protocol._transport._handshake_done is False
|
||||
await protocol.close()
|
||||
@ -400,34 +400,80 @@ async def test_query(mocker):
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"response_status, expectation",
|
||||
"response_status, credentials_match, expectation",
|
||||
[
|
||||
((403, 403, 403), pytest.raises(AuthenticationException)),
|
||||
((200, 403, 403), pytest.raises(AuthenticationException)),
|
||||
((200, 200, 403), pytest.raises(AuthenticationException)),
|
||||
((200, 200, 400), pytest.raises(SmartDeviceException)),
|
||||
pytest.param(
|
||||
(403, 403, 403),
|
||||
True,
|
||||
pytest.raises(SmartDeviceException),
|
||||
id="handshake1-403-status",
|
||||
),
|
||||
pytest.param(
|
||||
(200, 403, 403),
|
||||
True,
|
||||
pytest.raises(SmartDeviceException),
|
||||
id="handshake2-403-status",
|
||||
),
|
||||
pytest.param(
|
||||
(200, 200, 403),
|
||||
True,
|
||||
pytest.raises(AuthenticationException),
|
||||
id="request-403-status",
|
||||
),
|
||||
pytest.param(
|
||||
(200, 200, 400),
|
||||
True,
|
||||
pytest.raises(SmartDeviceException),
|
||||
id="request-400-status",
|
||||
),
|
||||
pytest.param(
|
||||
(200, 200, 200),
|
||||
False,
|
||||
pytest.raises(AuthenticationException),
|
||||
id="handshake1-wrong-auth",
|
||||
),
|
||||
pytest.param(
|
||||
(200, 200, 200),
|
||||
secrets.token_bytes(16),
|
||||
pytest.raises(SmartDeviceException),
|
||||
id="handshake1-bad-auth-length",
|
||||
),
|
||||
],
|
||||
ids=("handshake1", "handshake2", "request", "non_auth_error"),
|
||||
)
|
||||
async def test_authentication_failures(mocker, response_status, expectation):
|
||||
async def test_authentication_failures(
|
||||
mocker, response_status, credentials_match, expectation
|
||||
):
|
||||
client_seed = None
|
||||
|
||||
server_seed = secrets.token_bytes(16)
|
||||
client_credentials = Credentials("foo", "bar")
|
||||
device_auth_hash = KlapTransport.generate_auth_hash(client_credentials)
|
||||
device_credentials = (
|
||||
client_credentials if credentials_match else Credentials("bar", "foo")
|
||||
)
|
||||
device_auth_hash = KlapTransport.generate_auth_hash(device_credentials)
|
||||
|
||||
async def _return_response(url: URL, params=None, data=None, *_, **__):
|
||||
nonlocal client_seed, server_seed, device_auth_hash, response_status
|
||||
nonlocal \
|
||||
client_seed, \
|
||||
server_seed, \
|
||||
device_auth_hash, \
|
||||
response_status, \
|
||||
credentials_match
|
||||
|
||||
if str(url) == "http://127.0.0.1:80/app/handshake1":
|
||||
client_seed = data
|
||||
client_seed_auth_hash = _sha256(data + device_auth_hash)
|
||||
|
||||
if credentials_match is not False and credentials_match is not True:
|
||||
client_seed_auth_hash += credentials_match
|
||||
return _mock_response(
|
||||
response_status[0], server_seed + client_seed_auth_hash
|
||||
)
|
||||
elif str(url) == "http://127.0.0.1:80/app/handshake2":
|
||||
return _mock_response(response_status[1], b"")
|
||||
client_seed = data
|
||||
client_seed_auth_hash = _sha256(data + device_auth_hash)
|
||||
return _mock_response(
|
||||
response_status[1], server_seed + client_seed_auth_hash
|
||||
)
|
||||
elif str(url) == "http://127.0.0.1:80/app/request":
|
||||
return _mock_response(response_status[2], b"")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user