mirror of
				https://github.com/python-kasa/python-kasa.git
				synced 2025-11-04 14:42:09 +00:00 
			
		
		
		
	Add parse_pcap to devtools, improve readme on contributing (#84)
* Add parse_pcap to devtools, improve readme on contributing * simplify context extraction
This commit is contained in:
		
							
								
								
									
										31
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										31
									
								
								README.md
									
									
									
									
									
								
							@@ -79,23 +79,26 @@ You can find several code examples in [the API documentation](broken link).
 | 
			
		||||
 | 
			
		||||
Contributions are very welcome! To simplify the process, we are leveraging automated checks and tests for contributions.
 | 
			
		||||
 | 
			
		||||
### Resources
 | 
			
		||||
 | 
			
		||||
* [softScheck's github contains lot of information and wireshark dissector](https://github.com/softScheck/tplink-smartplug#wireshark-dissector)
 | 
			
		||||
* [https://github.com/plasticrake/tplink-smarthome-simulator](tplink-smarthome-simulator)
 | 
			
		||||
 | 
			
		||||
### Setting up development environment
 | 
			
		||||
 | 
			
		||||
```bash
 | 
			
		||||
poetry install
 | 
			
		||||
pre-commit install
 | 
			
		||||
```
 | 
			
		||||
To get started, simply clone this repository and initialize the development environment.
 | 
			
		||||
We are using [poetry](https://python-poetry.org) for dependency management, so after cloning the repository simply execute
 | 
			
		||||
`poetry install` which will install all necessary packages and create a virtual environment for you.
 | 
			
		||||
 | 
			
		||||
### Code-style checks
 | 
			
		||||
 | 
			
		||||
We use several tools to automatically check all contributions, which are run automatically when you commit your code.
 | 
			
		||||
We use several tools to automatically check all contributions. The simplest way to verify that everything is formatted properly
 | 
			
		||||
before creating a pull request, consider activating the pre-commit hooks by executing `pre-commit install`.
 | 
			
		||||
This will make sure that the checks are passing when you do a commit.
 | 
			
		||||
 | 
			
		||||
You can also execute the checks by running either `tox -e lint` to only do the linting checks, or `tox` to also execute the tests.
 | 
			
		||||
 | 
			
		||||
### Analyzing network captures
 | 
			
		||||
 | 
			
		||||
The simplest way to add support for a new device or to improve existing ones is to capture traffic between the mobile app and the device.
 | 
			
		||||
After capturing the traffic, you can either use the [softScheck's  wireshark dissector](https://github.com/softScheck/tplink-smartplug#wireshark-dissector)
 | 
			
		||||
or the `parse_pcap.py` script contained inside the `devtools` directory.
 | 
			
		||||
 | 
			
		||||
If you want to manually execute the checks, you can run `tox -e lint` to do the linting checks or `tox` to also execute the tests.
 | 
			
		||||
 | 
			
		||||
## Supported devices
 | 
			
		||||
 | 
			
		||||
@@ -128,3 +131,9 @@ If you want to manually execute the checks, you can run `tox -e lint` to do the
 | 
			
		||||
* KL130
 | 
			
		||||
 | 
			
		||||
**Contributions (be it adding missing features, fixing bugs or improving documentation) are more than welcome, feel free to submit pull requests!**
 | 
			
		||||
 | 
			
		||||
### Resources
 | 
			
		||||
 | 
			
		||||
* [softScheck's github contains lot of information and wireshark dissector](https://github.com/softScheck/tplink-smartplug#wireshark-dissector)
 | 
			
		||||
* [https://github.com/plasticrake/tplink-smarthome-simulator](tplink-smarthome-simulator)
 | 
			
		||||
* [Unofficial API documentation](https://github.com/plasticrake/tplink-smarthome-api/blob/master/API.md)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										105
									
								
								devtools/parse_pcap.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										105
									
								
								devtools/parse_pcap.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,105 @@
 | 
			
		||||
"""Parse pcaps for TP-Link communications."""
 | 
			
		||||
 | 
			
		||||
import json
 | 
			
		||||
from collections import Counter, defaultdict
 | 
			
		||||
from pprint import pformat as pf
 | 
			
		||||
from pprint import pprint as pp
 | 
			
		||||
 | 
			
		||||
import click
 | 
			
		||||
import dpkt
 | 
			
		||||
from dpkt.ethernet import ETH_TYPE_IP, Ethernet
 | 
			
		||||
from kasa.protocol import TPLinkSmartHomeProtocol
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def read_payloads_from_file(file):
 | 
			
		||||
    """Read the given pcap file and yield json payloads."""
 | 
			
		||||
    pcap = dpkt.pcap.Reader(file)
 | 
			
		||||
    for ts, pkt in pcap:
 | 
			
		||||
        eth = Ethernet(pkt)
 | 
			
		||||
        if eth.type != ETH_TYPE_IP:
 | 
			
		||||
            continue
 | 
			
		||||
 | 
			
		||||
        ip = eth.ip
 | 
			
		||||
        if ip.p == 6:
 | 
			
		||||
            transport = ip.tcp
 | 
			
		||||
        elif ip == 17:
 | 
			
		||||
            transport = ip.udp
 | 
			
		||||
        else:
 | 
			
		||||
            continue
 | 
			
		||||
 | 
			
		||||
        if transport.sport != 9999 and transport.dport != 9999:
 | 
			
		||||
            continue
 | 
			
		||||
 | 
			
		||||
        data = transport.data
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            decrypted = TPLinkSmartHomeProtocol.decrypt(data[4:])
 | 
			
		||||
        except Exception as ex:
 | 
			
		||||
            click.echo(
 | 
			
		||||
                click.style(f"Unable to decrypt the data, ignoring: {ex}", fg="red")
 | 
			
		||||
            )
 | 
			
		||||
            continue
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            json_payload = json.loads(decrypted)
 | 
			
		||||
        except Exception as ex:
 | 
			
		||||
            click.echo(
 | 
			
		||||
                click.style(f"Unable to parse payload, ignoring: {ex}", fg="red")
 | 
			
		||||
            )
 | 
			
		||||
            continue
 | 
			
		||||
 | 
			
		||||
        if not json_payload:  # ignore empty payloads
 | 
			
		||||
            click.echo(click.style("Got empty payload, ignoring", fg="red"))
 | 
			
		||||
            continue
 | 
			
		||||
 | 
			
		||||
        yield json_payload
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@click.command()
 | 
			
		||||
@click.argument("file", type=click.File("rb"))
 | 
			
		||||
def parse_pcap(file):
 | 
			
		||||
    """Parse pcap file and pretty print the communications and some statistics."""
 | 
			
		||||
    seen_items = defaultdict(Counter)
 | 
			
		||||
 | 
			
		||||
    for json_payload in read_payloads_from_file(file):
 | 
			
		||||
        context = json_payload.pop("context", "")
 | 
			
		||||
        for module, cmds in json_payload.items():
 | 
			
		||||
            seen_items["modules"][module] += 1
 | 
			
		||||
            if "err_code" in cmds:
 | 
			
		||||
                click.echo(click.style("Got error for module: %s" % cmds, fg="red"))
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            for cmd, response in cmds.items():
 | 
			
		||||
                seen_items["commands"][cmd] += 1
 | 
			
		||||
                seen_items["full_command"][f"{module}.{cmd}"] += 1
 | 
			
		||||
                if response is None:
 | 
			
		||||
                    continue
 | 
			
		||||
                direction = ">>"
 | 
			
		||||
                style = {}
 | 
			
		||||
                if response is None:
 | 
			
		||||
                    print("got none as response for %s, weird?" % (cmd))
 | 
			
		||||
                    continue
 | 
			
		||||
                if "err_code" in response:
 | 
			
		||||
                    direction = "<<"
 | 
			
		||||
                    if response["err_code"] != 0:
 | 
			
		||||
                        seen_items["errorcodes"][response["err_code"]] += 1
 | 
			
		||||
                        seen_items["errors"][response["err_msg"]] += 1
 | 
			
		||||
                        print(response)
 | 
			
		||||
                        style = {"bold": True, "fg": "red"}
 | 
			
		||||
                    else:
 | 
			
		||||
                        style = {"fg": "green"}
 | 
			
		||||
 | 
			
		||||
                context_str = f" [ctx: {context}]" if context else ""
 | 
			
		||||
 | 
			
		||||
                click.echo(
 | 
			
		||||
                    click.style(
 | 
			
		||||
                        f"{direction}{context_str} {module}.{cmd}: {pf(response)}",
 | 
			
		||||
                        **style,
 | 
			
		||||
                    )
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
    pp(seen_items)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    parse_pcap()
 | 
			
		||||
		Reference in New Issue
	
	Block a user