Allow https for klaptransport (#1415)
Some checks failed
CI / Perform linting checks (3.13) (push) Waiting to run
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.13) (push) Blocked by required conditions
CodeQL checks / Analyze (python) (push) Has been cancelled

Later firmware versions on robovacs use `KLAP` over https instead of ssltransport (reported as AES)
This commit is contained in:
Teemu R.
2025-01-22 11:54:32 +01:00
committed by GitHub
parent fa0f7157c6
commit 7b1b14d1e6
19 changed files with 1019 additions and 28 deletions

View File

@@ -261,8 +261,11 @@ async def config(ctx: click.Context) -> DeviceDict:
host_port = host + (f":{port}" if port else "")
def on_attempt(connect_attempt: ConnectAttempt, success: bool) -> None:
prot, tran, dev = connect_attempt
key_str = f"{prot.__name__} + {tran.__name__} + {dev.__name__}"
prot, tran, dev, https = connect_attempt
key_str = (
f"{prot.__name__} + {tran.__name__} + {dev.__name__}"
f" + {'https' if https else 'http'}"
)
result = "succeeded" if success else "failed"
msg = f"Attempt to connect to {host_port} with {key_str} {result}"
echo(msg)

View File

@@ -189,6 +189,7 @@ def get_protocol(config: DeviceConfig, *, strict: bool = False) -> BaseProtocol
:param config: Device config to derive protocol
:param strict: Require exact match on encrypt type
"""
_LOGGER.debug("Finding protocol for %s", config.host)
ctype = config.connection_type
protocol_name = ctype.device_family.value.split(".")[0]
_LOGGER.debug("Finding protocol for %s", ctype.device_family)
@@ -203,9 +204,11 @@ def get_protocol(config: DeviceConfig, *, strict: bool = False) -> BaseProtocol
return None
return IotProtocol(transport=LinkieTransportV2(config=config))
if ctype.device_family is DeviceFamily.SmartTapoRobovac:
if strict and ctype.encryption_type is not DeviceEncryptionType.Aes:
return None
# Older FW used a different transport
if (
ctype.device_family is DeviceFamily.SmartTapoRobovac
and ctype.encryption_type is DeviceEncryptionType.Aes
):
return SmartProtocol(transport=SslTransport(config=config))
protocol_transport_key = (
@@ -223,6 +226,7 @@ def get_protocol(config: DeviceConfig, *, strict: bool = False) -> BaseProtocol
"IOT.KLAP": (IotProtocol, KlapTransport),
"SMART.AES": (SmartProtocol, AesTransport),
"SMART.KLAP": (SmartProtocol, KlapTransportV2),
"SMART.KLAP.HTTPS": (SmartProtocol, KlapTransportV2),
# H200 is device family SMART.TAPOHUB and uses SmartCamProtocol so use
# https to distuingish from SmartProtocol devices
"SMART.AES.HTTPS": (SmartCamProtocol, SslAesTransport),

View File

@@ -20,7 +20,7 @@ None
{'host': '127.0.0.3', 'timeout': 5, 'credentials': {'username': 'user@example.com', \
'password': 'great_password'}, 'connection_type'\
: {'device_family': 'SMART.TAPOBULB', 'encryption_type': 'KLAP', 'login_version': 2, \
'https': False}}
'https': False, 'http_port': 80}}
>>> later_device = await Device.connect(config=Device.Config.from_dict(config_dict))
>>> print(later_device.alias) # Alias is available as connect() calls update()
@@ -98,13 +98,16 @@ class DeviceConnectionParameters(_DeviceConfigBaseMixin):
encryption_type: DeviceEncryptionType
login_version: int | None = None
https: bool = False
http_port: int | None = None
@staticmethod
def from_values(
device_family: str,
encryption_type: str,
*,
login_version: int | None = None,
https: bool | None = None,
http_port: int | None = None,
) -> DeviceConnectionParameters:
"""Return connection parameters from string values."""
try:
@@ -115,6 +118,7 @@ class DeviceConnectionParameters(_DeviceConfigBaseMixin):
DeviceEncryptionType(encryption_type),
login_version,
https,
http_port=http_port,
)
except (ValueError, TypeError) as ex:
raise KasaException(

View File

@@ -146,6 +146,7 @@ class ConnectAttempt(NamedTuple):
protocol: type
transport: type
device: type
https: bool
class DiscoveredMeta(TypedDict):
@@ -637,10 +638,10 @@ class Discover:
Device.Family.IotIpCamera,
}
candidates: dict[
tuple[type[BaseProtocol], type[BaseTransport], type[Device]],
tuple[type[BaseProtocol], type[BaseTransport], type[Device], bool],
tuple[BaseProtocol, DeviceConfig],
] = {
(type(protocol), type(protocol._transport), device_class): (
(type(protocol), type(protocol._transport), device_class, https): (
protocol,
config,
)
@@ -870,8 +871,9 @@ class Discover:
config.connection_type = DeviceConnectionParameters.from_values(
type_,
encrypt_type,
login_version,
encrypt_schm.is_support_https,
login_version=login_version,
https=encrypt_schm.is_support_https,
http_port=encrypt_schm.http_port,
)
except KasaException as ex:
raise UnsupportedDeviceError(

View File

@@ -36,6 +36,18 @@ if TYPE_CHECKING:
_LOGGER = logging.getLogger(__name__)
def _mask_area_list(area_list: list[dict[str, Any]]) -> list[dict[str, Any]]:
def mask_area(area: dict[str, Any]) -> dict[str, Any]:
result = {**area}
# Will leave empty names as blank
if area.get("name"):
result["name"] = "I01BU0tFRF9OQU1FIw==" # #MASKED_NAME#
return result
return [mask_area(area) for area in area_list]
REDACTORS: dict[str, Callable[[Any], Any] | None] = {
"latitude": lambda x: 0,
"longitude": lambda x: 0,
@@ -71,6 +83,10 @@ REDACTORS: dict[str, Callable[[Any], Any] | None] = {
"custom_sn": lambda _: "000000000000",
"location": lambda x: "#MASKED_NAME#" if x else "",
"map_data": lambda x: "#SCRUBBED_MAPDATA#" if x else "",
"map_name": lambda x: "I01BU0tFRF9OQU1FIw==", # #MASKED_NAME#
"area_list": _mask_area_list,
# unknown robovac binary blob in get_device_info
"cd": lambda x: "I01BU0tFRF9CSU5BUlkj", # #MASKED_BINARY#
}
# Queries that are known not to work properly when sent as a

View File

@@ -120,6 +120,8 @@ class AesTransport(BaseTransport):
@property
def default_port(self) -> int:
"""Default port for the transport."""
if port := self._config.connection_type.http_port:
return port
return self.DEFAULT_PORT
@property

View File

@@ -48,6 +48,7 @@ import datetime
import hashlib
import logging
import secrets
import ssl
import struct
import time
from asyncio import Future
@@ -92,8 +93,21 @@ class KlapTransport(BaseTransport):
"""
DEFAULT_PORT: int = 80
DEFAULT_HTTPS_PORT: int = 4433
SESSION_COOKIE_NAME = "TP_SESSIONID"
TIMEOUT_COOKIE_NAME = "TIMEOUT"
# Copy & paste from sslaestransport
CIPHERS = ":".join(
[
"AES256-GCM-SHA384",
"AES256-SHA256",
"AES128-GCM-SHA256",
"AES128-SHA256",
"AES256-SHA",
]
)
_ssl_context: ssl.SSLContext | None = None
def __init__(
self,
@@ -125,12 +139,20 @@ class KlapTransport(BaseTransport):
self._session_cookie: dict[str, Any] | None = None
_LOGGER.debug("Created KLAP transport for %s", self._host)
self._app_url = URL(f"http://{self._host}:{self._port}/app")
protocol = "https" if config.connection_type.https else "http"
self._app_url = URL(f"{protocol}://{self._host}:{self._port}/app")
self._request_url = self._app_url / "request"
@property
def default_port(self) -> int:
"""Default port for the transport."""
config = self._config
if port := config.connection_type.http_port:
return port
if config.connection_type.https:
return self.DEFAULT_HTTPS_PORT
return self.DEFAULT_PORT
@property
@@ -152,7 +174,9 @@ class KlapTransport(BaseTransport):
url = self._app_url / "handshake1"
response_status, response_data = await self._http_client.post(url, data=payload)
response_status, response_data = await self._http_client.post(
url, data=payload, ssl=await self._get_ssl_context()
)
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug(
@@ -263,6 +287,7 @@ class KlapTransport(BaseTransport):
url,
data=payload,
cookies_dict=self._session_cookie,
ssl=await self._get_ssl_context(),
)
if _LOGGER.isEnabledFor(logging.DEBUG):
@@ -337,6 +362,7 @@ class KlapTransport(BaseTransport):
params={"seq": seq},
data=payload,
cookies_dict=self._session_cookie,
ssl=await self._get_ssl_context(),
)
msg = (
@@ -413,6 +439,23 @@ class KlapTransport(BaseTransport):
un = creds.username
return md5(un.encode())
# Copy & paste from sslaestransport.
def _create_ssl_context(self) -> ssl.SSLContext:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.set_ciphers(self.CIPHERS)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
# Copy & paste from sslaestransport.
async def _get_ssl_context(self) -> ssl.SSLContext:
if not self._ssl_context:
loop = asyncio.get_running_loop()
self._ssl_context = await loop.run_in_executor(
None, self._create_ssl_context
)
return self._ssl_context
class KlapTransportV2(KlapTransport):
"""Implementation of the KLAP encryption protocol with v2 hanshake hashes."""

View File

@@ -55,6 +55,8 @@ class LinkieTransportV2(BaseTransport):
@property
def default_port(self) -> int:
"""Default port for the transport."""
if port := self._config.connection_type.http_port:
return port
return self.DEFAULT_PORT
@property

View File

@@ -133,6 +133,8 @@ class SslAesTransport(BaseTransport):
@property
def default_port(self) -> int:
"""Default port for the transport."""
if port := self._config.connection_type.http_port:
return port
return self.DEFAULT_PORT
@staticmethod

View File

@@ -94,6 +94,8 @@ class SslTransport(BaseTransport):
@property
def default_port(self) -> int:
"""Default port for the transport."""
if port := self._config.connection_type.http_port:
return port
return self.DEFAULT_PORT
@property