parse_pcap_klap: various code cleanups (#1138)

Co-authored-by: Steven B <51370195+sdb9696@users.noreply.github.com>
This commit is contained in:
Teemu R. 2024-11-04 11:24:58 +01:00 committed by GitHub
parent b2f3971a4c
commit 4640dfaedc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -29,6 +29,18 @@ from kasa.klaptransport import KlapEncryptionSession, KlapTransportV2
from kasa.protocol import DEFAULT_CREDENTIALS, get_default_credentials
def _get_seq_from_query(packet):
"""Return sequence number for the query."""
query = packet.http.get("request_uri_query")
if query is None:
raise Exception("No request_uri_query found")
# use regex to get: seq=(\d+)
seq = re.search(r"seq=(\d+)", query)
if seq is not None:
return int(seq.group(1))
raise Exception("Unable to find sequence number")
def _is_http_response_for_packet(response, packet):
"""Return True if the *response* contains a response for request in *packet*.
@ -41,10 +53,7 @@ def _is_http_response_for_packet(response, packet):
):
return True
# tshark 4.4.0
if response.http.request_uri == packet.http.request_uri:
return True
return False
return response.http.request_uri == packet.http.request_uri
class MyEncryptionSession(KlapEncryptionSession):
@ -244,71 +253,59 @@ def main(
if packet.ip.src != source_host:
continue
# we only care about http packets
if hasattr(
packet, "http"
): # this is redundant, as pyshark is set to only load http packets
if hasattr(packet.http, "request_uri_path"):
uri = packet.http.get("request_uri_path")
elif hasattr(packet.http, "request_uri"):
uri = packet.http.get("request_uri")
else:
uri = None
if hasattr(packet.http, "request_uri_query"):
query = packet.http.get("request_uri_query")
# use regex to get: seq=(\d+)
seq = re.search(r"seq=(\d+)", query)
if seq is not None:
operator.seq = int(
seq.group(1)
) # grab the sequence number from the query
data = (
# Windows and linux file_data attribute returns different
# pretty format so get the raw field value.
packet.http.get_field_value("file_data", raw=True)
if hasattr(packet.http, "file_data")
else None
)
match uri:
case "/app/request":
if packet.ip.dst != device_ip:
continue
assert isinstance(data, str) # noqa: S101
message = bytes.fromhex(data)
try:
plaintext = operator.decrypt(message)
payload = json.loads(plaintext)
print(json.dumps(payload, indent=2))
packets.append(payload)
except ValueError:
print("Insufficient data to decrypt thus far")
# this is redundant, as pyshark is set to only load http packets
if not hasattr(packet, "http"):
continue
case "/app/handshake1":
if packet.ip.dst != device_ip:
continue
assert isinstance(data, str) # noqa: S101
message = bytes.fromhex(data)
operator.local_seed = message
response = None
print(
f"got handshake1 in {packet_number}, "
f"looking for the response"
)
while (
True
): # we are going to now look for the response to this request
response = capture.next()
if _is_http_response_for_packet(response, packet):
print(f"found response in {packet_number}")
break
data = response.http.get_field_value("file_data", raw=True)
message = bytes.fromhex(data)
operator.remote_seed = message[0:16]
operator.remote_auth_hash = message[16:]
uri = packet.http.get("request_uri_path", packet.http.get("request_uri"))
if uri is None:
continue
case "/app/handshake2":
continue # we don't care about this
case _:
operator.seq = _get_seq_from_query(packet)
# Windows and linux file_data attribute returns different
# pretty format so get the raw field value.
data = packet.http.get_field_value("file_data", raw=True)
match uri:
case "/app/request":
if packet.ip.dst != device_ip:
continue
message = bytes.fromhex(data)
try:
plaintext = operator.decrypt(message)
payload = json.loads(plaintext)
print(json.dumps(payload, indent=2))
packets.append(payload)
except ValueError:
print("Insufficient data to decrypt thus far")
case "/app/handshake1":
if packet.ip.dst != device_ip:
continue
message = bytes.fromhex(data)
operator.local_seed = message
response = None
print(
f"got handshake1 in {packet_number}, "
f"looking for the response"
)
while (
True
): # we are going to now look for the response to this request
response = capture.next()
if _is_http_response_for_packet(response, packet):
print(f"found response in {packet_number}")
break
data = response.http.get_field_value("file_data", raw=True)
message = bytes.fromhex(data)
operator.remote_seed = message[0:16]
operator.remote_auth_hash = message[16:]
case "/app/handshake2":
continue # we don't care about this
case _:
continue
except StopIteration:
break