Pass sslcontext for post requests, direct c&p from sslaestransport

This commit is contained in:
Teemu Rytilahti 2025-01-08 22:18:07 +01:00
parent d356ab1d60
commit c1ea380703

View File

@ -48,6 +48,7 @@ import datetime
import hashlib import hashlib
import logging import logging
import secrets import secrets
import ssl
import struct import struct
import time import time
from asyncio import Future from asyncio import Future
@ -94,6 +95,17 @@ class KlapTransport(BaseTransport):
DEFAULT_PORT: int = 80 DEFAULT_PORT: int = 80
SESSION_COOKIE_NAME = "TP_SESSIONID" SESSION_COOKIE_NAME = "TP_SESSIONID"
TIMEOUT_COOKIE_NAME = "TIMEOUT" TIMEOUT_COOKIE_NAME = "TIMEOUT"
# Copy & paste from sslaestransport
CIPHERS = ":".join(
[
"AES256-GCM-SHA384",
"AES256-SHA256",
"AES128-GCM-SHA256",
"AES128-SHA256",
"AES256-SHA",
]
)
_ssl_context: ssl.SSLContext | None = None
def __init__( def __init__(
self, self,
@ -153,7 +165,9 @@ class KlapTransport(BaseTransport):
url = self._app_url / "handshake1" url = self._app_url / "handshake1"
response_status, response_data = await self._http_client.post(url, data=payload) response_status, response_data = await self._http_client.post(
url, data=payload, ssl=await self._get_ssl_context()
)
if _LOGGER.isEnabledFor(logging.DEBUG): if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug( _LOGGER.debug(
@ -264,6 +278,7 @@ class KlapTransport(BaseTransport):
url, url,
data=payload, data=payload,
cookies_dict=self._session_cookie, cookies_dict=self._session_cookie,
ssl=await self._get_ssl_context(),
) )
if _LOGGER.isEnabledFor(logging.DEBUG): if _LOGGER.isEnabledFor(logging.DEBUG):
@ -338,6 +353,7 @@ class KlapTransport(BaseTransport):
params={"seq": seq}, params={"seq": seq},
data=payload, data=payload,
cookies_dict=self._session_cookie, cookies_dict=self._session_cookie,
ssl=await self._get_ssl_context(),
) )
msg = ( msg = (
@ -414,6 +430,23 @@ class KlapTransport(BaseTransport):
un = creds.username un = creds.username
return md5(un.encode()) return md5(un.encode())
# Copy & paste from sslaestransport.
def _create_ssl_context(self) -> ssl.SSLContext:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.set_ciphers(self.CIPHERS)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
# Copy & paste from sslaestransport.
async def _get_ssl_context(self) -> ssl.SSLContext:
if not self._ssl_context:
loop = asyncio.get_running_loop()
self._ssl_context = await loop.run_in_executor(
None, self._create_ssl_context
)
return self._ssl_context
class KlapTransportV2(KlapTransport): class KlapTransportV2(KlapTransport):
"""Implementation of the KLAP encryption protocol with v2 hanshake hashes.""" """Implementation of the KLAP encryption protocol with v2 hanshake hashes."""