mirror of
https://github.com/python-kasa/python-kasa.git
synced 2024-12-22 11:13:34 +00:00
Decrypt KLAP data from PCAP files (#1041)
Allows for decryption of pcap files capturing klap communication with devices.
This commit is contained in:
parent
a2b7daa069
commit
7e9b1687d0
@ -99,3 +99,30 @@ id
|
|||||||
New parser, parsing 100000 messages took 0.6339647499989951 seconds
|
New parser, parsing 100000 messages took 0.6339647499989951 seconds
|
||||||
Old parser, parsing 100000 messages took 9.473990250000497 seconds
|
Old parser, parsing 100000 messages took 9.473990250000497 seconds
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
## parse_pcap_klap
|
||||||
|
|
||||||
|
* A tool to allow KLAP data to be exported, in JSON, from a PCAP file of encrypted requests.
|
||||||
|
|
||||||
|
* NOTE: must install pyshark (`pip install pyshark`).
|
||||||
|
* pyshark requires Wireshark or tshark to be installed on windows and tshark to be installed
|
||||||
|
on linux (`apt get tshark`)
|
||||||
|
|
||||||
|
```shell
|
||||||
|
Usage: parse_pcap_klap.py [OPTIONS]
|
||||||
|
|
||||||
|
Export KLAP data in JSON format from a PCAP file.
|
||||||
|
|
||||||
|
Options:
|
||||||
|
--host TEXT the IP of the smart device as it appears in the pcap
|
||||||
|
file. [required]
|
||||||
|
--username TEXT Username/email address to authenticate to device.
|
||||||
|
[required]
|
||||||
|
--password TEXT Password to use to authenticate to device.
|
||||||
|
[required]
|
||||||
|
--pcap-file-path TEXT The path to the pcap file to parse. [required]
|
||||||
|
-o, --output TEXT The name of the output file, relative to the current
|
||||||
|
directory.
|
||||||
|
--help Show this message and exit.
|
||||||
|
```
|
||||||
|
307
devtools/parse_pcap_klap.py
Executable file
307
devtools/parse_pcap_klap.py
Executable file
@ -0,0 +1,307 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
"""
|
||||||
|
This code allow for the decryption of KlapV2 data from a pcap file.
|
||||||
|
|
||||||
|
It will output the decrypted data to a file.
|
||||||
|
This was designed and tested with a Tapo light strip setup using a cloud account.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import codecs
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
|
import asyncclick as click
|
||||||
|
import pyshark
|
||||||
|
from cryptography.hazmat.primitives import padding
|
||||||
|
|
||||||
|
from kasa.credentials import Credentials
|
||||||
|
from kasa.deviceconfig import (
|
||||||
|
DeviceConfig,
|
||||||
|
DeviceConnectionParameters,
|
||||||
|
DeviceEncryptionType,
|
||||||
|
DeviceFamily,
|
||||||
|
)
|
||||||
|
from kasa.klaptransport import KlapEncryptionSession, KlapTransportV2
|
||||||
|
|
||||||
|
|
||||||
|
class MyEncryptionSession(KlapEncryptionSession):
|
||||||
|
"""A custom KlapEncryptionSession class that allows for decryption."""
|
||||||
|
|
||||||
|
def decrypt(self, msg):
|
||||||
|
"""Decrypt the data."""
|
||||||
|
decryptor = self._cipher.decryptor()
|
||||||
|
dp = decryptor.update(msg[32:]) + decryptor.finalize()
|
||||||
|
unpadder = padding.PKCS7(128).unpadder()
|
||||||
|
plaintextbytes = unpadder.update(dp) + unpadder.finalize()
|
||||||
|
|
||||||
|
return plaintextbytes.decode("utf-8", "bad_chars_replacement")
|
||||||
|
|
||||||
|
|
||||||
|
class Operator:
|
||||||
|
"""A class that handles the data decryption, and the encryption session updating."""
|
||||||
|
|
||||||
|
def __init__(self, klap, creds):
|
||||||
|
self._local_seed: bytes = None
|
||||||
|
self._remote_seed: bytes = None
|
||||||
|
self._session: MyEncryptionSession = None
|
||||||
|
self._creds = creds
|
||||||
|
self._klap: KlapTransportV2 = klap
|
||||||
|
self._auth_hash = self._klap.generate_auth_hash(self._creds)
|
||||||
|
self._local_auth_hash = None
|
||||||
|
self._remote_auth_hash = None
|
||||||
|
self._seq = 0
|
||||||
|
|
||||||
|
def update_encryption_session(self):
|
||||||
|
"""Update the encryption session used for decrypting data.
|
||||||
|
|
||||||
|
It is called whenever the local_seed, remote_seed,
|
||||||
|
or remote_auth_hash is updated.
|
||||||
|
|
||||||
|
It checks if the seeds are set and, if they are, creates a new session.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If the auth hashes do not match.
|
||||||
|
"""
|
||||||
|
if self._local_seed is None or self._remote_seed is None:
|
||||||
|
self._session = None
|
||||||
|
else:
|
||||||
|
self._local_auth_hash = self._klap.handshake1_seed_auth_hash(
|
||||||
|
self._local_seed, self._remote_seed, self._auth_hash
|
||||||
|
)
|
||||||
|
if (self._remote_auth_hash is not None) and (
|
||||||
|
self._local_auth_hash != self._remote_auth_hash
|
||||||
|
):
|
||||||
|
raise ValueError(
|
||||||
|
"Local and remote auth hashes do not match.\
|
||||||
|
This could mean an incorrect username and/or password."
|
||||||
|
)
|
||||||
|
self._session = MyEncryptionSession(
|
||||||
|
self._local_seed, self._remote_seed, self._auth_hash
|
||||||
|
)
|
||||||
|
self._session._seq = self._seq
|
||||||
|
self._session._generate_cipher()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def seq(self) -> int:
|
||||||
|
"""Get the sequence number."""
|
||||||
|
return self._seq
|
||||||
|
|
||||||
|
@seq.setter
|
||||||
|
def seq(self, value: int):
|
||||||
|
if not isinstance(value, int):
|
||||||
|
raise ValueError("seq must be an integer")
|
||||||
|
self._seq = value
|
||||||
|
self.update_encryption_session()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def local_seed(self) -> bytes:
|
||||||
|
"""Get the local seed."""
|
||||||
|
return self._local_seed
|
||||||
|
|
||||||
|
@local_seed.setter
|
||||||
|
def local_seed(self, value: bytes):
|
||||||
|
if not isinstance(value, bytes):
|
||||||
|
raise ValueError("local_seed must be bytes")
|
||||||
|
elif len(value) != 16:
|
||||||
|
raise ValueError("local_seed must be 16 bytes")
|
||||||
|
else:
|
||||||
|
self._local_seed = value
|
||||||
|
self.update_encryption_session()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def remote_auth_hash(self) -> bytes:
|
||||||
|
"""Get the remote auth hash."""
|
||||||
|
return self._remote_auth_hash
|
||||||
|
|
||||||
|
@remote_auth_hash.setter
|
||||||
|
def remote_auth_hash(self, value: bytes):
|
||||||
|
print("setting remote_auth_hash")
|
||||||
|
if not isinstance(value, bytes):
|
||||||
|
raise ValueError("remote_auth_hash must be bytes")
|
||||||
|
elif len(value) != 32:
|
||||||
|
raise ValueError("remote_auth_hash must be 32 bytes")
|
||||||
|
else:
|
||||||
|
self._remote_auth_hash = value
|
||||||
|
self.update_encryption_session()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def remote_seed(self) -> bytes:
|
||||||
|
"""Get the remote seed."""
|
||||||
|
return self._remote_seed
|
||||||
|
|
||||||
|
@remote_seed.setter
|
||||||
|
def remote_seed(self, value: bytes):
|
||||||
|
print("setting remote_seed")
|
||||||
|
if not isinstance(value, bytes):
|
||||||
|
raise ValueError("remote_seed must be bytes")
|
||||||
|
elif len(value) != 16:
|
||||||
|
raise ValueError("remote_seed must be 16 bytes")
|
||||||
|
else:
|
||||||
|
self._remote_seed = value
|
||||||
|
self.update_encryption_session()
|
||||||
|
|
||||||
|
# This function decrypts the data using the encryption session.
|
||||||
|
def decrypt(self, *args, **kwargs):
|
||||||
|
"""Decrypt the data using the encryption session."""
|
||||||
|
if self._session is None:
|
||||||
|
raise ValueError("No session available")
|
||||||
|
return self._session.decrypt(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
# This is a custom error handler that replaces bad characters with '*',
|
||||||
|
# in case something goes wrong in decryption.
|
||||||
|
# Without this, the decryption could yield an error.
|
||||||
|
def bad_chars_replacement(exception):
|
||||||
|
"""Replace bad characters with '*'."""
|
||||||
|
return ("*", exception.start + 1)
|
||||||
|
|
||||||
|
|
||||||
|
codecs.register_error("bad_chars_replacement", bad_chars_replacement)
|
||||||
|
|
||||||
|
|
||||||
|
def main(username, password, device_ip, pcap_file_path, output_json_name=None):
|
||||||
|
"""Run the main function."""
|
||||||
|
capture = pyshark.FileCapture(pcap_file_path, display_filter="http")
|
||||||
|
|
||||||
|
# In an effort to keep this code tied into the original code
|
||||||
|
# (so that this can hopefully leverage any future codebase updates inheriently),
|
||||||
|
# some weird initialization is done here
|
||||||
|
creds = Credentials(username, password)
|
||||||
|
|
||||||
|
fake_connection = DeviceConnectionParameters(
|
||||||
|
DeviceFamily.SmartTapoBulb, DeviceEncryptionType.Klap
|
||||||
|
)
|
||||||
|
fake_device = DeviceConfig(
|
||||||
|
device_ip, connection_type=fake_connection, credentials=creds
|
||||||
|
)
|
||||||
|
|
||||||
|
operator = Operator(KlapTransportV2(config=fake_device), creds)
|
||||||
|
|
||||||
|
packets = []
|
||||||
|
|
||||||
|
# pyshark is a little weird in how it handles iteration,
|
||||||
|
# so this is a workaround to allow for (advanced) iteration over the packets.
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
packet = capture.next()
|
||||||
|
# packet_number = capture._current_packet
|
||||||
|
# we only care about http packets
|
||||||
|
if hasattr(
|
||||||
|
packet, "http"
|
||||||
|
): # this is redundant, as pyshark is set to only load http packets
|
||||||
|
if hasattr(packet.http, "request_uri_path"):
|
||||||
|
uri = packet.http.get("request_uri_path")
|
||||||
|
elif hasattr(packet.http, "request_uri"):
|
||||||
|
uri = packet.http.get("request_uri")
|
||||||
|
else:
|
||||||
|
uri = None
|
||||||
|
if hasattr(packet.http, "request_uri_query"):
|
||||||
|
query = packet.http.get("request_uri_query")
|
||||||
|
# use regex to get: seq=(\d+)
|
||||||
|
seq = re.search(r"seq=(\d+)", query)
|
||||||
|
if seq is not None:
|
||||||
|
operator.seq = int(
|
||||||
|
seq.group(1)
|
||||||
|
) # grab the sequence number from the query
|
||||||
|
data = (
|
||||||
|
# Windows and linux file_data attribute returns different
|
||||||
|
# pretty format so get the raw field value.
|
||||||
|
packet.http.get_field_value("file_data", raw=True)
|
||||||
|
if hasattr(packet.http, "file_data")
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
match uri:
|
||||||
|
case "/app/request":
|
||||||
|
if packet.ip.dst != device_ip:
|
||||||
|
continue
|
||||||
|
message = bytes.fromhex(data)
|
||||||
|
try:
|
||||||
|
plaintext = operator.decrypt(message)
|
||||||
|
payload = json.loads(plaintext)
|
||||||
|
print(json.dumps(payload, indent=2))
|
||||||
|
packets.append(payload)
|
||||||
|
except ValueError:
|
||||||
|
print("Insufficient data to decrypt thus far")
|
||||||
|
|
||||||
|
case "/app/handshake1":
|
||||||
|
if packet.ip.dst != device_ip:
|
||||||
|
continue
|
||||||
|
message = bytes.fromhex(data)
|
||||||
|
operator.local_seed = message
|
||||||
|
response = None
|
||||||
|
while (
|
||||||
|
True
|
||||||
|
): # we are going to now look for the response to this request
|
||||||
|
response = capture.next()
|
||||||
|
if (
|
||||||
|
hasattr(response, "http")
|
||||||
|
and hasattr(response.http, "response_for_uri")
|
||||||
|
and (
|
||||||
|
response.http.response_for_uri
|
||||||
|
== packet.http.request_full_uri
|
||||||
|
)
|
||||||
|
):
|
||||||
|
break
|
||||||
|
data = response.http.get_field_value("file_data", raw=True)
|
||||||
|
message = bytes.fromhex(data)
|
||||||
|
operator.remote_seed = message[0:16]
|
||||||
|
operator.remote_auth_hash = message[16:]
|
||||||
|
|
||||||
|
case "/app/handshake2":
|
||||||
|
continue # we don't care about this
|
||||||
|
case _:
|
||||||
|
continue
|
||||||
|
except StopIteration:
|
||||||
|
break
|
||||||
|
|
||||||
|
# save the final array to a file
|
||||||
|
if output_json_name is not None:
|
||||||
|
with open(output_json_name, "w") as f:
|
||||||
|
f.write(json.dumps(packets, indent=2))
|
||||||
|
f.write("\n" * 1)
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
|
||||||
|
@click.command()
|
||||||
|
@click.option(
|
||||||
|
"--host",
|
||||||
|
required=True,
|
||||||
|
help="the IP of the smart device as it appears in the pcap file.",
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--username",
|
||||||
|
required=True,
|
||||||
|
envvar="KASA_USERNAME",
|
||||||
|
help="Username/email address to authenticate to device.",
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--password",
|
||||||
|
required=True,
|
||||||
|
envvar="KASA_PASSWORD",
|
||||||
|
help="Password to use to authenticate to device.",
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--pcap-file-path",
|
||||||
|
required=True,
|
||||||
|
help="The path to the pcap file to parse.",
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"-o",
|
||||||
|
"--output",
|
||||||
|
required=False,
|
||||||
|
help="The name of the output file, relative to the current directory.",
|
||||||
|
)
|
||||||
|
def cli(username, password, host, pcap_file_path, output):
|
||||||
|
"""Export KLAP data in JSON format from a PCAP file."""
|
||||||
|
# pyshark does not work within a running event loop and we don't want to
|
||||||
|
# install click as well as asyncclick so run in a new thread.
|
||||||
|
thread = Thread(
|
||||||
|
target=main, args=[username, password, host, pcap_file_path, output]
|
||||||
|
)
|
||||||
|
thread.start()
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
cli()
|
@ -151,6 +151,7 @@ disable_error_code = "annotation-unchecked"
|
|||||||
module = [
|
module = [
|
||||||
"devtools.bench.benchmark",
|
"devtools.bench.benchmark",
|
||||||
"devtools.parse_pcap",
|
"devtools.parse_pcap",
|
||||||
|
"devtools.parse_pcap_klap",
|
||||||
"devtools.perftest",
|
"devtools.perftest",
|
||||||
"devtools.create_module_fixtures"
|
"devtools.create_module_fixtures"
|
||||||
]
|
]
|
||||||
|
Loading…
Reference in New Issue
Block a user