From 7e9b1687d0fcc724779d2432d59f09a81537b923 Mon Sep 17 00:00:00 2001 From: Carter Strickland <50763236+clstrickland@users.noreply.github.com> Date: Mon, 15 Jul 2024 07:18:43 -0500 Subject: [PATCH] Decrypt KLAP data from PCAP files (#1041) Allows for decryption of pcap files capturing klap communication with devices. --- devtools/README.md | 27 ++++ devtools/parse_pcap_klap.py | 307 ++++++++++++++++++++++++++++++++++++ pyproject.toml | 1 + 3 files changed, 335 insertions(+) create mode 100755 devtools/parse_pcap_klap.py diff --git a/devtools/README.md b/devtools/README.md index 99d5ec5a..f59ea374 100644 --- a/devtools/README.md +++ b/devtools/README.md @@ -99,3 +99,30 @@ id New parser, parsing 100000 messages took 0.6339647499989951 seconds Old parser, parsing 100000 messages took 9.473990250000497 seconds ``` + + +## parse_pcap_klap + +* A tool to allow KLAP data to be exported, in JSON, from a PCAP file of encrypted requests. + +* NOTE: must install pyshark (`pip install pyshark`). +* pyshark requires Wireshark or tshark to be installed on windows and tshark to be installed +on linux (`apt get tshark`) + +```shell +Usage: parse_pcap_klap.py [OPTIONS] + + Export KLAP data in JSON format from a PCAP file. + +Options: + --host TEXT the IP of the smart device as it appears in the pcap + file. [required] + --username TEXT Username/email address to authenticate to device. + [required] + --password TEXT Password to use to authenticate to device. + [required] + --pcap-file-path TEXT The path to the pcap file to parse. [required] + -o, --output TEXT The name of the output file, relative to the current + directory. + --help Show this message and exit. +``` diff --git a/devtools/parse_pcap_klap.py b/devtools/parse_pcap_klap.py new file mode 100755 index 00000000..d8be6573 --- /dev/null +++ b/devtools/parse_pcap_klap.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python +""" +This code allow for the decryption of KlapV2 data from a pcap file. + +It will output the decrypted data to a file. +This was designed and tested with a Tapo light strip setup using a cloud account. +""" + +import codecs +import json +import re +from threading import Thread + +import asyncclick as click +import pyshark +from cryptography.hazmat.primitives import padding + +from kasa.credentials import Credentials +from kasa.deviceconfig import ( + DeviceConfig, + DeviceConnectionParameters, + DeviceEncryptionType, + DeviceFamily, +) +from kasa.klaptransport import KlapEncryptionSession, KlapTransportV2 + + +class MyEncryptionSession(KlapEncryptionSession): + """A custom KlapEncryptionSession class that allows for decryption.""" + + def decrypt(self, msg): + """Decrypt the data.""" + decryptor = self._cipher.decryptor() + dp = decryptor.update(msg[32:]) + decryptor.finalize() + unpadder = padding.PKCS7(128).unpadder() + plaintextbytes = unpadder.update(dp) + unpadder.finalize() + + return plaintextbytes.decode("utf-8", "bad_chars_replacement") + + +class Operator: + """A class that handles the data decryption, and the encryption session updating.""" + + def __init__(self, klap, creds): + self._local_seed: bytes = None + self._remote_seed: bytes = None + self._session: MyEncryptionSession = None + self._creds = creds + self._klap: KlapTransportV2 = klap + self._auth_hash = self._klap.generate_auth_hash(self._creds) + self._local_auth_hash = None + self._remote_auth_hash = None + self._seq = 0 + + def update_encryption_session(self): + """Update the encryption session used for decrypting data. + + It is called whenever the local_seed, remote_seed, + or remote_auth_hash is updated. + + It checks if the seeds are set and, if they are, creates a new session. + + Raises: + ValueError: If the auth hashes do not match. + """ + if self._local_seed is None or self._remote_seed is None: + self._session = None + else: + self._local_auth_hash = self._klap.handshake1_seed_auth_hash( + self._local_seed, self._remote_seed, self._auth_hash + ) + if (self._remote_auth_hash is not None) and ( + self._local_auth_hash != self._remote_auth_hash + ): + raise ValueError( + "Local and remote auth hashes do not match.\ +This could mean an incorrect username and/or password." + ) + self._session = MyEncryptionSession( + self._local_seed, self._remote_seed, self._auth_hash + ) + self._session._seq = self._seq + self._session._generate_cipher() + + @property + def seq(self) -> int: + """Get the sequence number.""" + return self._seq + + @seq.setter + def seq(self, value: int): + if not isinstance(value, int): + raise ValueError("seq must be an integer") + self._seq = value + self.update_encryption_session() + + @property + def local_seed(self) -> bytes: + """Get the local seed.""" + return self._local_seed + + @local_seed.setter + def local_seed(self, value: bytes): + if not isinstance(value, bytes): + raise ValueError("local_seed must be bytes") + elif len(value) != 16: + raise ValueError("local_seed must be 16 bytes") + else: + self._local_seed = value + self.update_encryption_session() + + @property + def remote_auth_hash(self) -> bytes: + """Get the remote auth hash.""" + return self._remote_auth_hash + + @remote_auth_hash.setter + def remote_auth_hash(self, value: bytes): + print("setting remote_auth_hash") + if not isinstance(value, bytes): + raise ValueError("remote_auth_hash must be bytes") + elif len(value) != 32: + raise ValueError("remote_auth_hash must be 32 bytes") + else: + self._remote_auth_hash = value + self.update_encryption_session() + + @property + def remote_seed(self) -> bytes: + """Get the remote seed.""" + return self._remote_seed + + @remote_seed.setter + def remote_seed(self, value: bytes): + print("setting remote_seed") + if not isinstance(value, bytes): + raise ValueError("remote_seed must be bytes") + elif len(value) != 16: + raise ValueError("remote_seed must be 16 bytes") + else: + self._remote_seed = value + self.update_encryption_session() + + # This function decrypts the data using the encryption session. + def decrypt(self, *args, **kwargs): + """Decrypt the data using the encryption session.""" + if self._session is None: + raise ValueError("No session available") + return self._session.decrypt(*args, **kwargs) + + +# This is a custom error handler that replaces bad characters with '*', +# in case something goes wrong in decryption. +# Without this, the decryption could yield an error. +def bad_chars_replacement(exception): + """Replace bad characters with '*'.""" + return ("*", exception.start + 1) + + +codecs.register_error("bad_chars_replacement", bad_chars_replacement) + + +def main(username, password, device_ip, pcap_file_path, output_json_name=None): + """Run the main function.""" + capture = pyshark.FileCapture(pcap_file_path, display_filter="http") + + # In an effort to keep this code tied into the original code + # (so that this can hopefully leverage any future codebase updates inheriently), + # some weird initialization is done here + creds = Credentials(username, password) + + fake_connection = DeviceConnectionParameters( + DeviceFamily.SmartTapoBulb, DeviceEncryptionType.Klap + ) + fake_device = DeviceConfig( + device_ip, connection_type=fake_connection, credentials=creds + ) + + operator = Operator(KlapTransportV2(config=fake_device), creds) + + packets = [] + + # pyshark is a little weird in how it handles iteration, + # so this is a workaround to allow for (advanced) iteration over the packets. + while True: + try: + packet = capture.next() + # packet_number = capture._current_packet + # we only care about http packets + if hasattr( + packet, "http" + ): # this is redundant, as pyshark is set to only load http packets + if hasattr(packet.http, "request_uri_path"): + uri = packet.http.get("request_uri_path") + elif hasattr(packet.http, "request_uri"): + uri = packet.http.get("request_uri") + else: + uri = None + if hasattr(packet.http, "request_uri_query"): + query = packet.http.get("request_uri_query") + # use regex to get: seq=(\d+) + seq = re.search(r"seq=(\d+)", query) + if seq is not None: + operator.seq = int( + seq.group(1) + ) # grab the sequence number from the query + data = ( + # Windows and linux file_data attribute returns different + # pretty format so get the raw field value. + packet.http.get_field_value("file_data", raw=True) + if hasattr(packet.http, "file_data") + else None + ) + match uri: + case "/app/request": + if packet.ip.dst != device_ip: + continue + message = bytes.fromhex(data) + try: + plaintext = operator.decrypt(message) + payload = json.loads(plaintext) + print(json.dumps(payload, indent=2)) + packets.append(payload) + except ValueError: + print("Insufficient data to decrypt thus far") + + case "/app/handshake1": + if packet.ip.dst != device_ip: + continue + message = bytes.fromhex(data) + operator.local_seed = message + response = None + while ( + True + ): # we are going to now look for the response to this request + response = capture.next() + if ( + hasattr(response, "http") + and hasattr(response.http, "response_for_uri") + and ( + response.http.response_for_uri + == packet.http.request_full_uri + ) + ): + break + data = response.http.get_field_value("file_data", raw=True) + message = bytes.fromhex(data) + operator.remote_seed = message[0:16] + operator.remote_auth_hash = message[16:] + + case "/app/handshake2": + continue # we don't care about this + case _: + continue + except StopIteration: + break + + # save the final array to a file + if output_json_name is not None: + with open(output_json_name, "w") as f: + f.write(json.dumps(packets, indent=2)) + f.write("\n" * 1) + f.close() + + +@click.command() +@click.option( + "--host", + required=True, + help="the IP of the smart device as it appears in the pcap file.", +) +@click.option( + "--username", + required=True, + envvar="KASA_USERNAME", + help="Username/email address to authenticate to device.", +) +@click.option( + "--password", + required=True, + envvar="KASA_PASSWORD", + help="Password to use to authenticate to device.", +) +@click.option( + "--pcap-file-path", + required=True, + help="The path to the pcap file to parse.", +) +@click.option( + "-o", + "--output", + required=False, + help="The name of the output file, relative to the current directory.", +) +def cli(username, password, host, pcap_file_path, output): + """Export KLAP data in JSON format from a PCAP file.""" + # pyshark does not work within a running event loop and we don't want to + # install click as well as asyncclick so run in a new thread. + thread = Thread( + target=main, args=[username, password, host, pcap_file_path, output] + ) + thread.start() + thread.join() + + +if __name__ == "__main__": + cli() diff --git a/pyproject.toml b/pyproject.toml index b9e8c578..91317f48 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -151,6 +151,7 @@ disable_error_code = "annotation-unchecked" module = [ "devtools.bench.benchmark", "devtools.parse_pcap", + "devtools.parse_pcap_klap", "devtools.perftest", "devtools.create_module_fixtures" ]