mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-04-26 16:46:23 +00:00
Improve test coverage (#625)
This commit is contained in:
parent
3e0cd07b7c
commit
d5a6fd8e73
16
kasa/cli.py
16
kasa/cli.py
@ -427,8 +427,8 @@ async def discover(ctx):
|
||||
echo()
|
||||
else:
|
||||
discovered[dev.host] = dev.internal_state
|
||||
ctx.obj = dev
|
||||
await ctx.invoke(state)
|
||||
ctx.parent.obj = dev
|
||||
await ctx.parent.invoke(state)
|
||||
if verbose:
|
||||
echo()
|
||||
_echo_discovery_info(dev._discovery_info)
|
||||
@ -513,12 +513,14 @@ async def sysinfo(dev):
|
||||
|
||||
@cli.command()
|
||||
@pass_dev
|
||||
async def state(dev: SmartDevice):
|
||||
@click.pass_context
|
||||
async def state(ctx, dev: SmartDevice):
|
||||
"""Print out device state and versions."""
|
||||
verbose = ctx.parent.params.get("verbose", False) if ctx.parent else False
|
||||
|
||||
echo(f"[bold]== {dev.alias} - {dev.model} ==[/bold]")
|
||||
echo(f"\tHost: {dev.host}")
|
||||
echo(f"\tPort: {dev.port}")
|
||||
echo(f"\tCredentials hash: {dev.credentials_hash}")
|
||||
echo(f"\tDevice state: {dev.is_on}")
|
||||
if dev.is_strip:
|
||||
echo("\t[bold]== Plugs ==[/bold]")
|
||||
@ -554,6 +556,12 @@ async def state(dev: SmartDevice):
|
||||
else:
|
||||
echo(f"\t[red]- {module}[/red]")
|
||||
|
||||
if verbose:
|
||||
echo("\n\t[bold]== Verbose information ==[/bold]")
|
||||
echo(f"\tCredentials hash: {dev.credentials_hash}")
|
||||
echo(f"\tDevice ID: {dev.device_id}")
|
||||
for feature in dev.features:
|
||||
echo(f"\tFeature: {feature}")
|
||||
return dev.internal_state
|
||||
|
||||
|
||||
|
@ -275,7 +275,7 @@ class TapoDevice(SmartDevice):
|
||||
]
|
||||
networks.extend(network_list)
|
||||
|
||||
if resp["get_wireless_scan_info"]["sum"] > start_index + 10:
|
||||
if resp["get_wireless_scan_info"].get("sum", 0) > start_index + 10:
|
||||
return await _query_networks(networks, start_index=start_index + 10)
|
||||
|
||||
return networks
|
||||
|
@ -18,6 +18,7 @@ from voluptuous import (
|
||||
|
||||
from ..credentials import Credentials
|
||||
from ..deviceconfig import DeviceConfig
|
||||
from ..exceptions import SmartDeviceException
|
||||
from ..protocol import BaseTransport, TPLinkSmartHomeProtocol, _XorTransport
|
||||
from ..smartprotocol import SmartProtocol
|
||||
|
||||
@ -315,6 +316,10 @@ class FakeSmartTransport(BaseTransport):
|
||||
),
|
||||
)
|
||||
self.info = info
|
||||
self.components = {
|
||||
comp["id"]: comp["ver_code"]
|
||||
for comp in self.info["component_nego"]["component_list"]
|
||||
}
|
||||
|
||||
@property
|
||||
def default_port(self):
|
||||
@ -326,6 +331,10 @@ class FakeSmartTransport(BaseTransport):
|
||||
"""The hashed credentials used by the transport."""
|
||||
return self._credentials.username + self._credentials.password + "hash"
|
||||
|
||||
FIXTURE_MISSING_MAP = {
|
||||
"get_wireless_scan_info": ("wireless", {"ap_list": [], "wep_supported": False}),
|
||||
}
|
||||
|
||||
async def send(self, request: str):
|
||||
request_dict = json_loads(request)
|
||||
method = request_dict["method"]
|
||||
@ -346,14 +355,20 @@ class FakeSmartTransport(BaseTransport):
|
||||
if method == "component_nego" or method[:4] == "get_":
|
||||
if method in self.info:
|
||||
return {"result": self.info[method], "error_code": 0}
|
||||
else:
|
||||
elif (
|
||||
missing_result := self.FIXTURE_MISSING_MAP.get(method)
|
||||
) and missing_result[0] in self.components:
|
||||
warnings.warn(
|
||||
UserWarning(
|
||||
f"Fixture missing expected method {method}, try to regenerate"
|
||||
),
|
||||
stacklevel=1,
|
||||
)
|
||||
return {"result": {}, "error_code": 0}
|
||||
return {"result": missing_result[1], "error_code": 0}
|
||||
else:
|
||||
raise SmartDeviceException(f"Fixture doesn't support {method}")
|
||||
elif method == "set_qs_info":
|
||||
return {"error_code": 0}
|
||||
elif method[:4] == "set_":
|
||||
target_method = f"get_{method[4:]}"
|
||||
self.info[target_method].update(params)
|
||||
|
@ -1,4 +1,5 @@
|
||||
import json
|
||||
import re
|
||||
|
||||
import asyncclick as click
|
||||
import pytest
|
||||
@ -9,6 +10,7 @@ from kasa import (
|
||||
Credentials,
|
||||
EmeterStatus,
|
||||
SmartDevice,
|
||||
SmartDeviceException,
|
||||
TPLinkSmartHomeProtocol,
|
||||
UnsupportedDeviceException,
|
||||
)
|
||||
@ -22,11 +24,13 @@ from kasa.cli import (
|
||||
state,
|
||||
sysinfo,
|
||||
toggle,
|
||||
update_credentials,
|
||||
wifi,
|
||||
)
|
||||
from kasa.discover import Discover, DiscoveryResult
|
||||
from kasa.smartprotocol import SmartProtocol
|
||||
|
||||
from .conftest import device_iot, handle_turn_on, new_discovery, turn_on
|
||||
from .conftest import device_iot, device_smart, handle_turn_on, new_discovery, turn_on
|
||||
|
||||
|
||||
@device_iot
|
||||
@ -81,20 +85,93 @@ async def test_alias(dev):
|
||||
await dev.set_alias(old_alias)
|
||||
|
||||
|
||||
@device_iot
|
||||
async def test_raw_command(dev):
|
||||
runner = CliRunner()
|
||||
res = await runner.invoke(raw_command, ["system", "get_sysinfo"], obj=dev)
|
||||
from kasa.tapo import TapoDevice
|
||||
|
||||
if isinstance(dev, TapoDevice):
|
||||
params = ["na", "get_device_info"]
|
||||
else:
|
||||
params = ["system", "get_sysinfo"]
|
||||
res = await runner.invoke(raw_command, params, obj=dev)
|
||||
|
||||
assert res.exit_code == 0
|
||||
assert dev.alias in res.output
|
||||
assert dev.model in res.output
|
||||
|
||||
res = await runner.invoke(raw_command, obj=dev)
|
||||
assert res.exit_code != 0
|
||||
assert "Usage" in res.output
|
||||
|
||||
|
||||
@device_iot
|
||||
@device_smart
|
||||
async def test_wifi_scan(dev):
|
||||
runner = CliRunner()
|
||||
res = await runner.invoke(wifi, ["scan"], obj=dev)
|
||||
|
||||
assert res.exit_code == 0
|
||||
assert re.search(r"Found \d wifi networks!", res.output)
|
||||
|
||||
|
||||
@device_smart
|
||||
async def test_wifi_join(dev):
|
||||
runner = CliRunner()
|
||||
res = await runner.invoke(
|
||||
wifi,
|
||||
["join", "FOOBAR", "--keytype", "wpa_psk", "--password", "foobar"],
|
||||
obj=dev,
|
||||
)
|
||||
|
||||
assert res.exit_code == 0
|
||||
assert "Asking the device to connect to FOOBAR" in res.output
|
||||
|
||||
|
||||
@device_smart
|
||||
async def test_wifi_join_no_creds(dev):
|
||||
runner = CliRunner()
|
||||
dev.protocol._transport._credentials = None
|
||||
res = await runner.invoke(
|
||||
wifi,
|
||||
["join", "FOOBAR", "--keytype", "wpa_psk", "--password", "foobar"],
|
||||
obj=dev,
|
||||
)
|
||||
|
||||
assert res.exit_code != 0
|
||||
assert isinstance(res.exception, AuthenticationException)
|
||||
|
||||
|
||||
@device_smart
|
||||
async def test_wifi_join_exception(dev, mocker):
|
||||
runner = CliRunner()
|
||||
mocker.patch.object(
|
||||
dev.protocol, "query", side_effect=SmartDeviceException(error_code=9999)
|
||||
)
|
||||
res = await runner.invoke(
|
||||
wifi,
|
||||
["join", "FOOBAR", "--keytype", "wpa_psk", "--password", "foobar"],
|
||||
obj=dev,
|
||||
)
|
||||
|
||||
assert res.exit_code != 0
|
||||
assert isinstance(res.exception, SmartDeviceException)
|
||||
|
||||
|
||||
@device_smart
|
||||
async def test_update_credentials(dev):
|
||||
runner = CliRunner()
|
||||
res = await runner.invoke(
|
||||
update_credentials,
|
||||
["--username", "foo", "--password", "bar"],
|
||||
input="y\n",
|
||||
obj=dev,
|
||||
)
|
||||
|
||||
assert res.exit_code == 0
|
||||
assert (
|
||||
"Do you really want to replace the existing credentials? [y/N]: y\n"
|
||||
in res.output
|
||||
)
|
||||
|
||||
|
||||
async def test_emeter(dev: SmartDevice, mocker):
|
||||
runner = CliRunner()
|
||||
|
||||
|
@ -24,7 +24,7 @@ async def test_no_emeter(dev):
|
||||
await dev.erase_emeter_stats()
|
||||
|
||||
|
||||
@has_emeter_iot
|
||||
@has_emeter
|
||||
async def test_get_emeter_realtime(dev):
|
||||
assert dev.has_emeter
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user