From c19389f23640f891f6941ea057b81b24fb819217 Mon Sep 17 00:00:00 2001 From: "Steven B." <51370195+sdb9696@users.noreply.github.com> Date: Wed, 17 Jul 2024 08:34:12 +0100 Subject: [PATCH] Fix parse_pcap_klap on windows and support default credentials (#1068) - Fixes issue running pyshark on new thread in windows - Fixes bug if handshake repeated during capture - Tries the default tplink hardcoded credentials as per the library --- devtools/parse_pcap_klap.py | 95 +++++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 26 deletions(-) diff --git a/devtools/parse_pcap_klap.py b/devtools/parse_pcap_klap.py index d8be6573..36384631 100755 --- a/devtools/parse_pcap_klap.py +++ b/devtools/parse_pcap_klap.py @@ -6,6 +6,9 @@ It will output the decrypted data to a file. This was designed and tested with a Tapo light strip setup using a cloud account. """ +from __future__ import annotations + +import asyncio import codecs import json import re @@ -23,6 +26,7 @@ from kasa.deviceconfig import ( DeviceFamily, ) from kasa.klaptransport import KlapEncryptionSession, KlapTransportV2 +from kasa.protocol import DEFAULT_CREDENTIALS, get_default_credentials class MyEncryptionSession(KlapEncryptionSession): @@ -42,16 +46,34 @@ class Operator: """A class that handles the data decryption, and the encryption session updating.""" def __init__(self, klap, creds): - self._local_seed: bytes = None - self._remote_seed: bytes = None - self._session: MyEncryptionSession = None + self._local_seed: bytes | None = None + self._remote_seed: bytes | None = None + self._session: MyEncryptionSession | None = None self._creds = creds self._klap: KlapTransportV2 = klap self._auth_hash = self._klap.generate_auth_hash(self._creds) - self._local_auth_hash = None - self._remote_auth_hash = None + self._local_seed_auth_hash = None + self._remote_seed_auth_hash = None self._seq = 0 + def check_default_credentials(self): + """Check whether default credentials were used. + + Devices sometimes randomly accept the hardcoded default credentials + and the library handles that. + """ + for value in DEFAULT_CREDENTIALS.values(): + default_credentials = get_default_credentials(value) + default_auth_hash = self._klap.generate_auth_hash(default_credentials) + default_credentials_seed_auth_hash = self._klap.handshake1_seed_auth_hash( + self._local_seed, + self._remote_seed, + default_auth_hash, # type: ignore + ) + if self._remote_seed_auth_hash == default_credentials_seed_auth_hash: + return default_auth_hash + return None + def update_encryption_session(self): """Update the encryption session used for decrypting data. @@ -66,21 +88,25 @@ class Operator: if self._local_seed is None or self._remote_seed is None: self._session = None else: - self._local_auth_hash = self._klap.handshake1_seed_auth_hash( + self._local_seed_auth_hash = self._klap.handshake1_seed_auth_hash( self._local_seed, self._remote_seed, self._auth_hash ) - if (self._remote_auth_hash is not None) and ( - self._local_auth_hash != self._remote_auth_hash - ): - raise ValueError( - "Local and remote auth hashes do not match.\ -This could mean an incorrect username and/or password." + auth_hash = None + if self._remote_seed_auth_hash is not None: + if self._local_seed_auth_hash == self._remote_seed_auth_hash: + auth_hash = self._auth_hash + else: + auth_hash = self.check_default_credentials() + if not auth_hash: + raise ValueError( + "Local and remote auth hashes do not match. " + "This could mean an incorrect username and/or password." + ) + self._session = MyEncryptionSession( + self._local_seed, self._remote_seed, auth_hash ) - self._session = MyEncryptionSession( - self._local_seed, self._remote_seed, self._auth_hash - ) - self._session._seq = self._seq - self._session._generate_cipher() + self._session._seq = self._seq + self._session._generate_cipher() @property def seq(self) -> int: @@ -95,24 +121,27 @@ This could mean an incorrect username and/or password." self.update_encryption_session() @property - def local_seed(self) -> bytes: + def local_seed(self) -> bytes | None: """Get the local seed.""" return self._local_seed @local_seed.setter def local_seed(self, value: bytes): + print("setting local_seed") if not isinstance(value, bytes): raise ValueError("local_seed must be bytes") elif len(value) != 16: raise ValueError("local_seed must be 16 bytes") else: self._local_seed = value + self._remote_seed_auth_hash = None + self._remote_seed = None self.update_encryption_session() @property - def remote_auth_hash(self) -> bytes: + def remote_auth_hash(self) -> bytes | None: """Get the remote auth hash.""" - return self._remote_auth_hash + return self._remote_seed_auth_hash @remote_auth_hash.setter def remote_auth_hash(self, value: bytes): @@ -122,11 +151,11 @@ This could mean an incorrect username and/or password." elif len(value) != 32: raise ValueError("remote_auth_hash must be 32 bytes") else: - self._remote_auth_hash = value + self._remote_seed_auth_hash = value self.update_encryption_session() @property - def remote_seed(self) -> bytes: + def remote_seed(self) -> bytes | None: """Get the remote seed.""" return self._remote_seed @@ -160,9 +189,17 @@ def bad_chars_replacement(exception): codecs.register_error("bad_chars_replacement", bad_chars_replacement) -def main(username, password, device_ip, pcap_file_path, output_json_name=None): +def main( + loop: asyncio.AbstractEventLoop, + username, + password, + device_ip, + pcap_file_path, + output_json_name=None, +): """Run the main function.""" - capture = pyshark.FileCapture(pcap_file_path, display_filter="http") + asyncio.set_event_loop(loop) + capture = pyshark.FileCapture(pcap_file_path, display_filter="http", eventloop=loop) # In an effort to keep this code tied into the original code # (so that this can hopefully leverage any future codebase updates inheriently), @@ -262,6 +299,9 @@ def main(username, password, device_ip, pcap_file_path, output_json_name=None): f.write("\n" * 1) f.close() + # Call close method which cleans up event loop + capture.close() + @click.command() @click.option( @@ -292,12 +332,15 @@ def main(username, password, device_ip, pcap_file_path, output_json_name=None): required=False, help="The name of the output file, relative to the current directory.", ) -def cli(username, password, host, pcap_file_path, output): +async def cli(username, password, host, pcap_file_path, output): """Export KLAP data in JSON format from a PCAP file.""" # pyshark does not work within a running event loop and we don't want to # install click as well as asyncclick so run in a new thread. + loop = asyncio.new_event_loop() thread = Thread( - target=main, args=[username, password, host, pcap_file_path, output] + target=main, + args=[loop, username, password, host, pcap_file_path, output], + daemon=True, ) thread.start() thread.join()