2020-07-12 21:07:24 +00:00
|
|
|
"""Parse pcaps for TP-Link communications."""
|
|
|
|
|
|
|
|
import json
|
|
|
|
from collections import Counter, defaultdict
|
|
|
|
from pprint import pformat as pf
|
|
|
|
|
|
|
|
import click
|
|
|
|
import dpkt
|
|
|
|
from dpkt.ethernet import ETH_TYPE_IP, Ethernet
|
2021-03-18 18:22:10 +00:00
|
|
|
|
2024-07-04 12:52:01 +00:00
|
|
|
from kasa.cli.main import echo
|
2024-11-12 13:40:44 +00:00
|
|
|
from kasa.transports.xortransport import XorEncryption
|
2020-07-12 21:07:24 +00:00
|
|
|
|
|
|
|
|
|
|
|
def read_payloads_from_file(file):
|
|
|
|
"""Read the given pcap file and yield json payloads."""
|
|
|
|
pcap = dpkt.pcap.Reader(file)
|
2023-10-29 22:15:42 +00:00
|
|
|
for _ts, pkt in pcap:
|
2020-07-12 21:07:24 +00:00
|
|
|
eth = Ethernet(pkt)
|
|
|
|
if eth.type != ETH_TYPE_IP:
|
|
|
|
continue
|
|
|
|
|
|
|
|
ip = eth.ip
|
|
|
|
if ip.p == 6:
|
|
|
|
transport = ip.tcp
|
|
|
|
elif ip == 17:
|
|
|
|
transport = ip.udp
|
|
|
|
else:
|
|
|
|
continue
|
|
|
|
|
|
|
|
if transport.sport != 9999 and transport.dport != 9999:
|
|
|
|
continue
|
|
|
|
|
|
|
|
data = transport.data
|
|
|
|
|
|
|
|
try:
|
2024-01-26 09:11:31 +00:00
|
|
|
decrypted = XorEncryption.decrypt(data[4:])
|
2020-07-12 21:07:24 +00:00
|
|
|
except Exception as ex:
|
2023-02-18 16:31:06 +00:00
|
|
|
echo(f"[red]Unable to decrypt the data, ignoring: {ex}[/red]")
|
|
|
|
continue
|
|
|
|
|
|
|
|
if not decrypted: # skip empty payloads
|
2020-07-12 21:07:24 +00:00
|
|
|
continue
|
|
|
|
|
|
|
|
try:
|
|
|
|
json_payload = json.loads(decrypted)
|
2023-10-29 22:15:42 +00:00
|
|
|
except Exception as ex:
|
|
|
|
# this can happen when the response is split into multiple tcp segments
|
2023-02-18 16:31:06 +00:00
|
|
|
echo(f"[red]Unable to parse payload '{decrypted}', ignoring: {ex}[/red]")
|
2020-07-12 21:07:24 +00:00
|
|
|
continue
|
|
|
|
|
|
|
|
if not json_payload: # ignore empty payloads
|
2023-02-18 16:31:06 +00:00
|
|
|
echo("[red]Got empty payload, ignoring[/red]")
|
2020-07-12 21:07:24 +00:00
|
|
|
continue
|
|
|
|
|
|
|
|
yield json_payload
|
|
|
|
|
|
|
|
|
|
|
|
@click.command()
|
|
|
|
@click.argument("file", type=click.File("rb"))
|
|
|
|
def parse_pcap(file):
|
|
|
|
"""Parse pcap file and pretty print the communications and some statistics."""
|
|
|
|
seen_items = defaultdict(Counter)
|
|
|
|
|
|
|
|
for json_payload in read_payloads_from_file(file):
|
|
|
|
context = json_payload.pop("context", "")
|
|
|
|
for module, cmds in json_payload.items():
|
|
|
|
seen_items["modules"][module] += 1
|
|
|
|
if "err_code" in cmds:
|
2024-11-18 18:46:36 +00:00
|
|
|
echo(f"[red]Got error for module: {cmds}[/red]")
|
2020-07-12 21:07:24 +00:00
|
|
|
continue
|
|
|
|
|
|
|
|
for cmd, response in cmds.items():
|
|
|
|
seen_items["commands"][cmd] += 1
|
|
|
|
seen_items["full_command"][f"{module}.{cmd}"] += 1
|
|
|
|
if response is None:
|
|
|
|
continue
|
|
|
|
direction = ">>"
|
|
|
|
if response is None:
|
2023-02-18 16:31:06 +00:00
|
|
|
echo(f"got none as response for {cmd} %s, weird?")
|
2020-07-12 21:07:24 +00:00
|
|
|
continue
|
2023-02-18 16:31:06 +00:00
|
|
|
is_success = "[green]+[/green]"
|
2020-07-12 21:07:24 +00:00
|
|
|
if "err_code" in response:
|
|
|
|
direction = "<<"
|
|
|
|
if response["err_code"] != 0:
|
|
|
|
seen_items["errorcodes"][response["err_code"]] += 1
|
|
|
|
seen_items["errors"][response["err_msg"]] += 1
|
2023-02-18 16:31:06 +00:00
|
|
|
is_success = "[red]![/red]"
|
2020-07-12 21:07:24 +00:00
|
|
|
|
|
|
|
context_str = f" [ctx: {context}]" if context else ""
|
|
|
|
|
2023-02-18 16:31:06 +00:00
|
|
|
echo(
|
2023-10-29 22:15:42 +00:00
|
|
|
f"[{is_success}] {direction}{context_str} {module}.{cmd}:"
|
|
|
|
f" {pf(response)}"
|
2020-07-12 21:07:24 +00:00
|
|
|
)
|
|
|
|
|
2023-02-18 16:31:06 +00:00
|
|
|
echo(pf(seen_items))
|
2020-07-12 21:07:24 +00:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
parse_pcap()
|