mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-01-08 22:07:06 +00:00
a01247d48f
Python 3.11 ships with latest Debian Bookworm. pypy is not that widely used with this library based on statistics. It could be added back when pypy supports python 3.11.
1255 lines
38 KiB
Python
1255 lines
38 KiB
Python
import json
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
from unittest.mock import ANY
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import asyncclick as click
|
|
import pytest
|
|
from asyncclick.testing import CliRunner
|
|
from pytest_mock import MockerFixture
|
|
|
|
from kasa import (
|
|
AuthenticationError,
|
|
Credentials,
|
|
Device,
|
|
DeviceError,
|
|
DeviceType,
|
|
EmeterStatus,
|
|
KasaException,
|
|
Module,
|
|
)
|
|
from kasa.cli.device import (
|
|
alias,
|
|
factory_reset,
|
|
led,
|
|
reboot,
|
|
state,
|
|
sysinfo,
|
|
toggle,
|
|
update_credentials,
|
|
)
|
|
from kasa.cli.light import (
|
|
brightness,
|
|
effect,
|
|
hsv,
|
|
temperature,
|
|
)
|
|
from kasa.cli.main import TYPES, _legacy_type_to_class, cli, cmd_command, raw_command
|
|
from kasa.cli.time import time
|
|
from kasa.cli.usage import emeter, energy
|
|
from kasa.cli.wifi import wifi
|
|
from kasa.discover import Discover, DiscoveryResult
|
|
from kasa.iot import IotDevice
|
|
from kasa.smart import SmartDevice
|
|
from kasa.smartcamera import SmartCamera
|
|
|
|
from .conftest import (
|
|
device_smart,
|
|
get_device_for_fixture_protocol,
|
|
handle_turn_on,
|
|
new_discovery,
|
|
turn_on,
|
|
)
|
|
|
|
# The cli tests should be testing the cli logic rather than a physical device
|
|
# so mark the whole file for skipping with real devices.
|
|
pytestmark = [pytest.mark.requires_dummy]
|
|
|
|
|
|
@pytest.fixture
|
|
def runner():
|
|
"""Runner fixture that unsets the KASA_ environment variables for tests."""
|
|
KASA_VARS = {k: None for k, v in os.environ.items() if k.startswith("KASA_")}
|
|
runner = CliRunner(env=KASA_VARS)
|
|
|
|
return runner
|
|
|
|
|
|
async def test_help(runner):
|
|
"""Test that all the lazy modules are correctly names."""
|
|
res = await runner.invoke(cli, ["--help"])
|
|
assert res.exit_code == 0, "--help failed, check lazy module names"
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("device_family", "encrypt_type"),
|
|
[
|
|
pytest.param(None, None, id="No connect params"),
|
|
pytest.param("SMART.TAPOPLUG", None, id="Only device_family"),
|
|
],
|
|
)
|
|
async def test_update_called_by_cli(dev, mocker, runner, device_family, encrypt_type):
|
|
"""Test that device update is called on main."""
|
|
update = mocker.patch.object(dev, "update")
|
|
|
|
# These will mock the features to avoid accessing non-existing
|
|
mocker.patch("kasa.device.Device.features", return_value={})
|
|
mocker.patch("kasa.iot.iotdevice.IotDevice.features", return_value={})
|
|
|
|
mocker.patch("kasa.discover.Discover.discover_single", return_value=dev)
|
|
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--host",
|
|
"127.0.0.1",
|
|
"--username",
|
|
"foo",
|
|
"--password",
|
|
"bar",
|
|
"--device-family",
|
|
device_family,
|
|
"--encrypt-type",
|
|
encrypt_type,
|
|
],
|
|
catch_exceptions=False,
|
|
)
|
|
assert res.exit_code == 0
|
|
update.assert_called()
|
|
|
|
|
|
async def test_list_devices(discovery_mock, runner):
|
|
"""Test that device update is called on main."""
|
|
res = await runner.invoke(
|
|
cli,
|
|
["--username", "foo", "--password", "bar", "discover", "list"],
|
|
catch_exceptions=False,
|
|
)
|
|
assert res.exit_code == 0
|
|
header = f"{'HOST':<15} {'DEVICE FAMILY':<20} {'ENCRYPT':<7} {'ALIAS'}"
|
|
row = f"{discovery_mock.ip:<15} {discovery_mock.device_type:<20} {discovery_mock.encrypt_type:<7}"
|
|
assert header in res.output
|
|
assert row in res.output
|
|
|
|
|
|
@new_discovery
|
|
async def test_list_auth_failed(discovery_mock, mocker, runner):
|
|
"""Test that device update is called on main."""
|
|
device_class = Discover._get_device_class(discovery_mock.discovery_data)
|
|
mocker.patch.object(
|
|
device_class,
|
|
"update",
|
|
side_effect=AuthenticationError("Failed to authenticate"),
|
|
)
|
|
res = await runner.invoke(
|
|
cli,
|
|
["--username", "foo", "--password", "bar", "discover", "list"],
|
|
catch_exceptions=False,
|
|
)
|
|
assert res.exit_code == 0
|
|
header = f"{'HOST':<15} {'DEVICE FAMILY':<20} {'ENCRYPT':<7} {'ALIAS'}"
|
|
row = f"{discovery_mock.ip:<15} {discovery_mock.device_type:<20} {discovery_mock.encrypt_type:<7} - Authentication failed"
|
|
assert header in res.output
|
|
assert row in res.output
|
|
|
|
|
|
async def test_list_unsupported(unsupported_device_info, runner):
|
|
"""Test that device update is called on main."""
|
|
res = await runner.invoke(
|
|
cli,
|
|
["--username", "foo", "--password", "bar", "discover", "list"],
|
|
catch_exceptions=False,
|
|
)
|
|
assert res.exit_code == 0
|
|
header = f"{'HOST':<15} {'DEVICE FAMILY':<20} {'ENCRYPT':<7} {'ALIAS'}"
|
|
row = f"{'127.0.0.1':<15} UNSUPPORTED DEVICE"
|
|
assert header in res.output
|
|
assert row in res.output
|
|
|
|
|
|
async def test_sysinfo(dev: Device, runner):
|
|
res = await runner.invoke(sysinfo, obj=dev)
|
|
assert "System info" in res.output
|
|
assert dev.model in res.output
|
|
|
|
|
|
@turn_on
|
|
async def test_state(dev, turn_on, runner):
|
|
await handle_turn_on(dev, turn_on)
|
|
await dev.update()
|
|
res = await runner.invoke(state, obj=dev)
|
|
|
|
if dev.is_on:
|
|
assert "Device state: True" in res.output
|
|
else:
|
|
assert "Device state: False" in res.output
|
|
|
|
|
|
@turn_on
|
|
async def test_toggle(dev, turn_on, runner):
|
|
if isinstance(dev, SmartCamera) and dev.device_type == DeviceType.Hub:
|
|
pytest.skip(reason="Hub cannot toggle state")
|
|
|
|
await handle_turn_on(dev, turn_on)
|
|
await dev.update()
|
|
assert dev.is_on == turn_on
|
|
|
|
await runner.invoke(toggle, obj=dev)
|
|
await dev.update()
|
|
assert dev.is_on != turn_on
|
|
|
|
|
|
async def test_alias(dev, runner):
|
|
res = await runner.invoke(alias, obj=dev)
|
|
assert f"Alias: {dev.alias}" in res.output
|
|
|
|
old_alias = dev.alias
|
|
|
|
new_alias = "new alias"
|
|
res = await runner.invoke(alias, [new_alias], obj=dev)
|
|
assert f"Setting alias to {new_alias}" in res.output
|
|
await dev.update()
|
|
|
|
res = await runner.invoke(alias, obj=dev)
|
|
assert f"Alias: {new_alias}" in res.output
|
|
|
|
await dev.set_alias(old_alias)
|
|
|
|
|
|
async def test_raw_command(dev, mocker, runner):
|
|
update = mocker.patch.object(dev, "update")
|
|
from kasa.smart import SmartDevice
|
|
|
|
if isinstance(dev, SmartCamera):
|
|
params = ["na", "getDeviceInfo"]
|
|
elif isinstance(dev, SmartDevice):
|
|
params = ["na", "get_device_info"]
|
|
else:
|
|
params = ["system", "get_sysinfo"]
|
|
res = await runner.invoke(raw_command, params, obj=dev)
|
|
|
|
# Make sure that update was not called for wifi
|
|
with pytest.raises(AssertionError):
|
|
update.assert_called()
|
|
|
|
assert res.exit_code == 0
|
|
assert dev.model in res.output
|
|
|
|
res = await runner.invoke(raw_command, obj=dev)
|
|
assert res.exit_code != 0
|
|
assert "Usage" in res.output
|
|
|
|
|
|
async def test_command_with_child(dev, mocker, runner):
|
|
"""Test 'command' command with --child."""
|
|
update_mock = mocker.patch.object(dev, "update")
|
|
|
|
# create_autospec for device slows tests way too much, so we use a dummy here
|
|
class DummyDevice(dev.__class__):
|
|
def __init__(self):
|
|
super().__init__("127.0.0.1")
|
|
# device_type and _info initialised for repr
|
|
self._device_type = Device.Type.StripSocket
|
|
self._info = {}
|
|
|
|
async def _query_helper(*_, **__):
|
|
return {"dummy": "response"}
|
|
|
|
dummy_child = DummyDevice()
|
|
|
|
mocker.patch.object(dev, "_children", {"XYZ": [dummy_child]})
|
|
mocker.patch.object(dev, "get_child_device", return_value=dummy_child)
|
|
|
|
res = await runner.invoke(
|
|
cmd_command,
|
|
["--child", "XYZ", "command", "'params'"],
|
|
obj=dev,
|
|
catch_exceptions=False,
|
|
)
|
|
|
|
update_mock.assert_called()
|
|
assert '{"dummy": "response"}' in res.output
|
|
assert res.exit_code == 0
|
|
|
|
|
|
@device_smart
|
|
async def test_reboot(dev, mocker, runner):
|
|
"""Test that reboot works on SMART devices."""
|
|
query_mock = mocker.patch.object(dev.protocol, "query")
|
|
|
|
res = await runner.invoke(
|
|
reboot,
|
|
obj=dev,
|
|
)
|
|
|
|
query_mock.assert_called()
|
|
assert res.exit_code == 0
|
|
|
|
|
|
@device_smart
|
|
async def test_factory_reset(dev, mocker, runner):
|
|
"""Test that factory reset works on SMART devices."""
|
|
query_mock = mocker.patch.object(dev.protocol, "query")
|
|
|
|
res = await runner.invoke(
|
|
factory_reset,
|
|
obj=dev,
|
|
input="y\n",
|
|
)
|
|
|
|
query_mock.assert_called()
|
|
assert res.exit_code == 0
|
|
|
|
|
|
@device_smart
|
|
async def test_wifi_scan(dev, runner):
|
|
res = await runner.invoke(wifi, ["scan"], obj=dev)
|
|
|
|
assert res.exit_code == 0
|
|
assert re.search(r"Found [\d]+ wifi networks!", res.output)
|
|
|
|
|
|
@device_smart
|
|
async def test_wifi_join(dev, mocker, runner):
|
|
update = mocker.patch.object(dev, "update")
|
|
res = await runner.invoke(
|
|
wifi,
|
|
["join", "FOOBAR", "--keytype", "wpa_psk", "--password", "foobar"],
|
|
obj=dev,
|
|
)
|
|
|
|
# Make sure that update was not called for wifi
|
|
with pytest.raises(AssertionError):
|
|
update.assert_called()
|
|
|
|
assert res.exit_code == 0
|
|
assert "Asking the device to connect to FOOBAR" in res.output
|
|
|
|
|
|
@device_smart
|
|
async def test_wifi_join_no_creds(dev, runner):
|
|
dev.protocol._transport._credentials = None
|
|
res = await runner.invoke(
|
|
wifi,
|
|
["join", "FOOBAR", "--keytype", "wpa_psk", "--password", "foobar"],
|
|
obj=dev,
|
|
)
|
|
|
|
assert res.exit_code != 0
|
|
assert isinstance(res.exception, AuthenticationError)
|
|
|
|
|
|
@device_smart
|
|
async def test_wifi_join_exception(dev, mocker, runner):
|
|
mocker.patch.object(dev.protocol, "query", side_effect=DeviceError(error_code=9999))
|
|
res = await runner.invoke(
|
|
wifi,
|
|
["join", "FOOBAR", "--keytype", "wpa_psk", "--password", "foobar"],
|
|
obj=dev,
|
|
)
|
|
|
|
assert res.exit_code != 0
|
|
assert isinstance(res.exception, KasaException)
|
|
|
|
|
|
@device_smart
|
|
async def test_update_credentials(dev, runner):
|
|
res = await runner.invoke(
|
|
update_credentials,
|
|
["--username", "foo", "--password", "bar"],
|
|
input="y\n",
|
|
obj=dev,
|
|
)
|
|
|
|
assert res.exit_code == 0
|
|
assert (
|
|
"Do you really want to replace the existing credentials? [y/N]: y\n"
|
|
in res.output
|
|
)
|
|
|
|
|
|
async def test_time_get(dev, runner):
|
|
"""Test time get command."""
|
|
res = await runner.invoke(
|
|
time,
|
|
obj=dev,
|
|
)
|
|
assert res.exit_code == 0
|
|
assert "Current time: " in res.output
|
|
|
|
|
|
async def test_time_sync(dev, mocker, runner):
|
|
"""Test time sync command."""
|
|
update = mocker.patch.object(dev, "update")
|
|
set_time_mock = mocker.spy(dev.modules[Module.Time], "set_time")
|
|
res = await runner.invoke(
|
|
time,
|
|
["sync"],
|
|
obj=dev,
|
|
)
|
|
set_time_mock.assert_called()
|
|
update.assert_called()
|
|
|
|
assert res.exit_code == 0
|
|
assert "Old time: " in res.output
|
|
assert "New time: " in res.output
|
|
|
|
|
|
async def test_time_set(dev: Device, mocker, runner):
|
|
"""Test time set command."""
|
|
time_mod = dev.modules[Module.Time]
|
|
set_time_mock = mocker.spy(time_mod, "set_time")
|
|
dt = datetime(2024, 10, 15, 8, 15)
|
|
res = await runner.invoke(
|
|
time,
|
|
["set", str(dt.year), str(dt.month), str(dt.day), str(dt.hour), str(dt.minute)],
|
|
obj=dev,
|
|
)
|
|
set_time_mock.assert_called()
|
|
assert time_mod.time == dt.replace(tzinfo=time_mod.timezone)
|
|
|
|
assert res.exit_code == 0
|
|
assert "Old time: " in res.output
|
|
assert "New time: " in res.output
|
|
|
|
zone = ZoneInfo("Europe/Berlin")
|
|
dt = dt.replace(tzinfo=zone)
|
|
res = await runner.invoke(
|
|
time,
|
|
[
|
|
"set",
|
|
str(dt.year),
|
|
str(dt.month),
|
|
str(dt.day),
|
|
str(dt.hour),
|
|
str(dt.minute),
|
|
"--timezone",
|
|
zone.key,
|
|
],
|
|
input="y\n",
|
|
obj=dev,
|
|
)
|
|
|
|
assert time_mod.time == dt
|
|
|
|
assert res.exit_code == 0
|
|
assert "Old time: " in res.output
|
|
assert "New time: " in res.output
|
|
|
|
|
|
async def test_emeter(dev: Device, mocker, runner):
|
|
res = await runner.invoke(emeter, obj=dev)
|
|
if not (energy := dev.modules.get(Module.Energy)):
|
|
assert "Device has no energy module." in res.output
|
|
return
|
|
|
|
assert "== Emeter ==" in res.output
|
|
|
|
if dev.device_type is not DeviceType.Strip:
|
|
res = await runner.invoke(emeter, ["--index", "0"], obj=dev)
|
|
assert f"Device: {dev.host} does not have children" in res.output
|
|
res = await runner.invoke(emeter, ["--name", "mock"], obj=dev)
|
|
assert f"Device: {dev.host} does not have children" in res.output
|
|
|
|
if dev.device_type is DeviceType.Strip and len(dev.children) > 0:
|
|
child_energy = dev.children[0].modules.get(Module.Energy)
|
|
assert child_energy
|
|
realtime_emeter = mocker.patch.object(child_energy, "get_status")
|
|
realtime_emeter.return_value = EmeterStatus({"voltage_mv": 122066})
|
|
|
|
res = await runner.invoke(emeter, ["--index", "0"], obj=dev)
|
|
assert "Voltage: 122.066 V" in res.output
|
|
realtime_emeter.assert_called()
|
|
assert realtime_emeter.call_count == 1
|
|
|
|
res = await runner.invoke(emeter, ["--name", dev.children[0].alias], obj=dev)
|
|
assert "Voltage: 122.066 V" in res.output
|
|
assert realtime_emeter.call_count == 2
|
|
|
|
if isinstance(dev, IotDevice):
|
|
monthly = mocker.patch.object(energy, "get_monthly_stats")
|
|
monthly.return_value = {1: 1234}
|
|
res = await runner.invoke(emeter, ["--year", "1900"], obj=dev)
|
|
if not isinstance(dev, IotDevice):
|
|
assert "Device does not support historical statistics" in res.output
|
|
return
|
|
assert "For year" in res.output
|
|
assert "1, 1234" in res.output
|
|
monthly.assert_called_with(year=1900)
|
|
|
|
if isinstance(dev, IotDevice):
|
|
daily = mocker.patch.object(energy, "get_daily_stats")
|
|
daily.return_value = {1: 1234}
|
|
res = await runner.invoke(emeter, ["--month", "1900-12"], obj=dev)
|
|
if not isinstance(dev, IotDevice):
|
|
assert "Device has no historical statistics" in res.output
|
|
return
|
|
assert "For month" in res.output
|
|
assert "1, 1234" in res.output
|
|
daily.assert_called_with(year=1900, month=12)
|
|
|
|
|
|
async def test_brightness(dev: Device, runner):
|
|
res = await runner.invoke(brightness, obj=dev)
|
|
if not (light := dev.modules.get(Module.Light)) or not light.is_dimmable:
|
|
assert "This device does not support brightness." in res.output
|
|
return
|
|
|
|
res = await runner.invoke(brightness, obj=dev)
|
|
assert f"Brightness: {light.brightness}" in res.output
|
|
|
|
res = await runner.invoke(brightness, ["12"], obj=dev)
|
|
assert "Setting brightness" in res.output
|
|
await dev.update()
|
|
|
|
res = await runner.invoke(brightness, obj=dev)
|
|
assert "Brightness: 12" in res.output
|
|
|
|
|
|
async def test_color_temperature(dev: Device, runner):
|
|
res = await runner.invoke(temperature, obj=dev)
|
|
if not (light := dev.modules.get(Module.Light)) or not light.is_variable_color_temp:
|
|
assert "Device does not support color temperature" in res.output
|
|
return
|
|
|
|
res = await runner.invoke(temperature, obj=dev)
|
|
assert f"Color temperature: {light.color_temp}" in res.output
|
|
valid_range = light.valid_temperature_range
|
|
assert f"(min: {valid_range.min}, max: {valid_range.max})" in res.output
|
|
|
|
val = int((valid_range.min + valid_range.max) / 2)
|
|
res = await runner.invoke(temperature, [str(val)], obj=dev)
|
|
assert "Setting color temperature to " in res.output
|
|
await dev.update()
|
|
|
|
res = await runner.invoke(temperature, obj=dev)
|
|
assert f"Color temperature: {val}" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
invalid_max = valid_range.max + 100
|
|
# Lights that support the maximum range will not get past the click cli range check
|
|
# So can't be tested for the internal range check.
|
|
if invalid_max < 9000:
|
|
res = await runner.invoke(temperature, [str(invalid_max)], obj=dev)
|
|
assert res.exit_code == 1
|
|
assert isinstance(res.exception, ValueError)
|
|
|
|
res = await runner.invoke(temperature, [str(9100)], obj=dev)
|
|
assert res.exit_code == 2
|
|
|
|
|
|
async def test_color_hsv(dev: Device, runner: CliRunner):
|
|
res = await runner.invoke(hsv, obj=dev)
|
|
if not (light := dev.modules.get(Module.Light)) or not light.is_color:
|
|
assert "Device does not support colors" in res.output
|
|
return
|
|
|
|
res = await runner.invoke(hsv, obj=dev)
|
|
assert f"Current HSV: {light.hsv}" in res.output
|
|
|
|
res = await runner.invoke(hsv, ["180", "50", "50"], obj=dev)
|
|
assert "Setting HSV: 180 50 50" in res.output
|
|
assert res.exit_code == 0
|
|
await dev.update()
|
|
|
|
res = await runner.invoke(hsv, ["180", "50"], obj=dev)
|
|
assert "Setting a color requires 3 values." in res.output
|
|
assert res.exit_code == 2
|
|
|
|
|
|
async def test_light_effect(dev: Device, runner: CliRunner):
|
|
res = await runner.invoke(effect, obj=dev)
|
|
if not (light_effect := dev.modules.get(Module.LightEffect)):
|
|
assert "Device does not support effects" in res.output
|
|
return
|
|
|
|
# Start off with a known state of off
|
|
await light_effect.set_effect(light_effect.LIGHT_EFFECTS_OFF)
|
|
await dev.update()
|
|
assert light_effect.effect == light_effect.LIGHT_EFFECTS_OFF
|
|
|
|
res = await runner.invoke(effect, obj=dev)
|
|
assert f"Light effect: {light_effect.effect}" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
res = await runner.invoke(effect, [light_effect.effect_list[1]], obj=dev)
|
|
assert f"Setting Effect: {light_effect.effect_list[1]}" in res.output
|
|
assert res.exit_code == 0
|
|
await dev.update()
|
|
assert light_effect.effect == light_effect.effect_list[1]
|
|
|
|
res = await runner.invoke(effect, ["foobar"], obj=dev)
|
|
assert f"Effect must be one of: {light_effect.effect_list}" in res.output
|
|
assert res.exit_code == 2
|
|
|
|
|
|
async def test_led(dev: Device, runner: CliRunner):
|
|
res = await runner.invoke(led, obj=dev)
|
|
if not (led_module := dev.modules.get(Module.Led)):
|
|
assert "Device does not support led" in res.output
|
|
return
|
|
|
|
res = await runner.invoke(led, obj=dev)
|
|
assert f"LED state: {led_module.led}" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
res = await runner.invoke(led, ["on"], obj=dev)
|
|
assert "Turning led to True" in res.output
|
|
assert res.exit_code == 0
|
|
await dev.update()
|
|
assert led_module.led is True
|
|
|
|
res = await runner.invoke(led, ["off"], obj=dev)
|
|
assert "Turning led to False" in res.output
|
|
assert res.exit_code == 0
|
|
await dev.update()
|
|
assert led_module.led is False
|
|
|
|
|
|
async def test_json_output(dev: Device, mocker, runner):
|
|
"""Test that the json output produces correct output."""
|
|
mocker.patch("kasa.Discover.discover_single", return_value=dev)
|
|
# These will mock the features to avoid accessing non-existing ones
|
|
mocker.patch("kasa.device.Device.features", return_value={})
|
|
mocker.patch("kasa.iot.iotdevice.IotDevice.features", return_value={})
|
|
|
|
res = await runner.invoke(cli, ["--host", "127.0.0.1", "--json", "state"], obj=dev)
|
|
assert res.exit_code == 0
|
|
assert json.loads(res.output) == dev.internal_state
|
|
|
|
|
|
@new_discovery
|
|
async def test_credentials(discovery_mock, mocker, runner):
|
|
"""Test credentials are passed correctly from cli to device."""
|
|
# Patch state to echo username and password
|
|
pass_dev = click.make_pass_decorator(Device)
|
|
|
|
@pass_dev
|
|
async def _state(dev: Device):
|
|
if dev.credentials:
|
|
click.echo(
|
|
f"Username:{dev.credentials.username} Password:{dev.credentials.password}"
|
|
)
|
|
|
|
mocker.patch("kasa.cli.device.state", new=_state)
|
|
|
|
dr = DiscoveryResult.from_dict(discovery_mock.discovery_data["result"])
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--host",
|
|
"127.0.0.123",
|
|
"--username",
|
|
"foo",
|
|
"--password",
|
|
"bar",
|
|
"--device-family",
|
|
dr.device_type,
|
|
"--encrypt-type",
|
|
dr.mgt_encrypt_schm.encrypt_type,
|
|
],
|
|
)
|
|
assert res.exit_code == 0
|
|
|
|
assert "Username:foo Password:bar\n" in res.output
|
|
|
|
|
|
async def test_without_device_type(dev, mocker, runner):
|
|
"""Test connecting without the device type."""
|
|
discovery_mock = mocker.patch(
|
|
"kasa.discover.Discover.discover_single", return_value=dev
|
|
)
|
|
# These will mock the features to avoid accessing non-existing
|
|
mocker.patch("kasa.device.Device.features", return_value={})
|
|
mocker.patch("kasa.iot.iotdevice.IotDevice.features", return_value={})
|
|
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--host",
|
|
"127.0.0.1",
|
|
"--username",
|
|
"foo",
|
|
"--password",
|
|
"bar",
|
|
"--discovery-timeout",
|
|
"7",
|
|
],
|
|
)
|
|
assert res.exit_code == 0
|
|
discovery_mock.assert_called_once_with(
|
|
"127.0.0.1",
|
|
port=None,
|
|
credentials=Credentials("foo", "bar"),
|
|
timeout=5,
|
|
discovery_timeout=7,
|
|
on_unsupported=ANY,
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("auth_param", ["--username", "--password"])
|
|
async def test_invalid_credential_params(auth_param, runner):
|
|
"""Test for handling only one of username or password supplied."""
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--host",
|
|
"127.0.0.1",
|
|
"--type",
|
|
"plug",
|
|
auth_param,
|
|
"foo",
|
|
],
|
|
)
|
|
assert res.exit_code == 2
|
|
assert (
|
|
"Error: Using authentication requires both --username and --password"
|
|
in res.output
|
|
)
|
|
|
|
|
|
async def test_duplicate_target_device(runner):
|
|
"""Test that defining both --host or --alias gives an error."""
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--host",
|
|
"127.0.0.1",
|
|
"--alias",
|
|
"foo",
|
|
],
|
|
)
|
|
assert res.exit_code == 2
|
|
assert "Error: Use either --alias or --host, not both." in res.output
|
|
|
|
|
|
async def test_discover(discovery_mock, mocker, runner):
|
|
"""Test discovery output."""
|
|
# These will mock the features to avoid accessing non-existing
|
|
mocker.patch("kasa.device.Device.features", return_value={})
|
|
mocker.patch("kasa.iot.iotdevice.IotDevice.features", return_value={})
|
|
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--discovery-timeout",
|
|
0,
|
|
"--username",
|
|
"foo",
|
|
"--password",
|
|
"bar",
|
|
"--verbose",
|
|
"discover",
|
|
],
|
|
)
|
|
assert res.exit_code == 0
|
|
|
|
|
|
async def test_discover_host(discovery_mock, mocker, runner):
|
|
"""Test discovery output."""
|
|
# These will mock the features to avoid accessing non-existing
|
|
mocker.patch("kasa.device.Device.features", return_value={})
|
|
mocker.patch("kasa.iot.iotdevice.IotDevice.features", return_value={})
|
|
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--discovery-timeout",
|
|
0,
|
|
"--host",
|
|
"127.0.0.123",
|
|
"--username",
|
|
"foo",
|
|
"--password",
|
|
"bar",
|
|
"--verbose",
|
|
],
|
|
)
|
|
assert res.exit_code == 0
|
|
|
|
|
|
async def test_discover_unsupported(unsupported_device_info, runner):
|
|
"""Test discovery output."""
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--discovery-timeout",
|
|
0,
|
|
"--username",
|
|
"foo",
|
|
"--password",
|
|
"bar",
|
|
"--verbose",
|
|
"discover",
|
|
],
|
|
)
|
|
assert res.exit_code == 0
|
|
assert "== Unsupported device ==" in res.output
|
|
|
|
|
|
async def test_host_unsupported(unsupported_device_info, runner):
|
|
"""Test discovery output."""
|
|
host = "127.0.0.1"
|
|
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--host",
|
|
host,
|
|
"--username",
|
|
"foo",
|
|
"--password",
|
|
"bar",
|
|
"--debug",
|
|
],
|
|
)
|
|
|
|
assert res.exit_code != 0
|
|
assert "== Unsupported device ==" in res.output
|
|
|
|
|
|
@new_discovery
|
|
async def test_discover_auth_failed(discovery_mock, mocker, runner):
|
|
"""Test discovery output."""
|
|
host = "127.0.0.1"
|
|
discovery_mock.ip = host
|
|
device_class = Discover._get_device_class(discovery_mock.discovery_data)
|
|
mocker.patch.object(
|
|
device_class,
|
|
"update",
|
|
side_effect=AuthenticationError("Failed to authenticate"),
|
|
)
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--discovery-timeout",
|
|
0,
|
|
"--username",
|
|
"foo",
|
|
"--password",
|
|
"bar",
|
|
"--verbose",
|
|
"discover",
|
|
],
|
|
)
|
|
|
|
assert res.exit_code == 0
|
|
assert "== Authentication failed for device ==" in res.output
|
|
assert "== Discovery Result ==" in res.output
|
|
|
|
|
|
@new_discovery
|
|
async def test_host_auth_failed(discovery_mock, mocker, runner):
|
|
"""Test discovery output."""
|
|
host = "127.0.0.1"
|
|
discovery_mock.ip = host
|
|
device_class = Discover._get_device_class(discovery_mock.discovery_data)
|
|
mocker.patch.object(
|
|
device_class,
|
|
"update",
|
|
side_effect=AuthenticationError("Failed to authenticate"),
|
|
)
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--host",
|
|
host,
|
|
"--username",
|
|
"foo",
|
|
"--password",
|
|
"bar",
|
|
"--debug",
|
|
],
|
|
)
|
|
|
|
assert res.exit_code != 0
|
|
assert isinstance(res.exception, AuthenticationError)
|
|
|
|
|
|
@pytest.mark.parametrize("device_type", TYPES)
|
|
async def test_type_param(device_type, mocker, runner):
|
|
"""Test for handling only one of username or password supplied."""
|
|
result_device = FileNotFoundError
|
|
pass_dev = click.make_pass_decorator(Device)
|
|
|
|
@pass_dev
|
|
async def _state(dev: Device):
|
|
nonlocal result_device
|
|
result_device = dev
|
|
|
|
mocker.patch("kasa.cli.device.state", new=_state)
|
|
if device_type == "camera":
|
|
expected_type = SmartCamera
|
|
elif device_type == "smart":
|
|
expected_type = SmartDevice
|
|
else:
|
|
expected_type = _legacy_type_to_class(device_type)
|
|
mocker.patch.object(expected_type, "update")
|
|
res = await runner.invoke(
|
|
cli,
|
|
["--type", device_type, "--host", "127.0.0.1"],
|
|
)
|
|
assert res.exit_code == 0
|
|
assert isinstance(result_device, expected_type)
|
|
|
|
|
|
@pytest.mark.skip(
|
|
"Skip until pytest-asyncio supports pytest 8.0, https://github.com/pytest-dev/pytest-asyncio/issues/737"
|
|
)
|
|
async def test_shell(dev: Device, mocker, runner):
|
|
"""Test that the shell commands tries to embed a shell."""
|
|
mocker.patch("kasa.Discover.discover", return_value=[dev])
|
|
# repl = mocker.patch("ptpython.repl")
|
|
mocker.patch.dict(
|
|
"sys.modules",
|
|
{"ptpython": mocker.MagicMock(), "ptpython.repl": mocker.MagicMock()},
|
|
)
|
|
embed = mocker.patch("ptpython.repl.embed")
|
|
res = await runner.invoke(cli, ["shell"], obj=dev)
|
|
assert res.exit_code == 0
|
|
embed.assert_called()
|
|
|
|
|
|
async def test_errors(mocker, runner):
|
|
err = KasaException("Foobar")
|
|
|
|
# Test masking
|
|
mocker.patch("kasa.Discover.discover", side_effect=err)
|
|
res = await runner.invoke(
|
|
cli,
|
|
["--username", "foo", "--password", "bar"],
|
|
)
|
|
assert res.exit_code == 1
|
|
assert "Raised error: Foobar" in res.output
|
|
assert "Run with --debug enabled to see stacktrace" in res.output
|
|
assert isinstance(res.exception, SystemExit)
|
|
|
|
# Test --debug
|
|
res = await runner.invoke(
|
|
cli,
|
|
["--debug"],
|
|
)
|
|
assert res.exit_code == 1
|
|
assert "Raised error: Foobar" in res.output
|
|
assert res.exception == err
|
|
|
|
# Test no device passed to subcommand
|
|
mocker.patch("kasa.Discover.discover", return_value={})
|
|
res = await runner.invoke(
|
|
cli,
|
|
["sysinfo"],
|
|
)
|
|
assert res.exit_code == 1
|
|
assert (
|
|
"Only discover is available without --host or --alias"
|
|
in res.output.replace("\n", "") # Remove newlines from rich formatting
|
|
)
|
|
assert isinstance(res.exception, SystemExit)
|
|
|
|
# Test click error
|
|
res = await runner.invoke(
|
|
cli,
|
|
["--foobar"],
|
|
)
|
|
assert res.exit_code == 2
|
|
assert "Raised error:" not in res.output
|
|
|
|
|
|
async def test_feature(mocker, runner):
|
|
"""Test feature command."""
|
|
dummy_device = await get_device_for_fixture_protocol(
|
|
"P300(EU)_1.0_1.0.13.json", "SMART"
|
|
)
|
|
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
|
res = await runner.invoke(
|
|
cli,
|
|
["--host", "127.0.0.123", "--debug", "feature"],
|
|
catch_exceptions=False,
|
|
)
|
|
assert "LED" in res.output
|
|
assert "== Child " in res.output # child listing
|
|
|
|
assert res.exit_code == 0
|
|
|
|
|
|
async def test_features_all(discovery_mock, mocker, runner):
|
|
"""Test feature command on all fixtures."""
|
|
res = await runner.invoke(
|
|
cli,
|
|
["--host", "127.0.0.123", "--debug", "feature"],
|
|
catch_exceptions=False,
|
|
)
|
|
assert "== Primary features ==" in res.output
|
|
assert "== Information ==" in res.output
|
|
assert "== Configuration ==" in res.output
|
|
assert "== Debug ==" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
|
|
async def test_feature_single(mocker, runner):
|
|
"""Test feature command returning single value."""
|
|
dummy_device = await get_device_for_fixture_protocol(
|
|
"P300(EU)_1.0_1.0.13.json", "SMART"
|
|
)
|
|
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
|
res = await runner.invoke(
|
|
cli,
|
|
["--host", "127.0.0.123", "--debug", "feature", "led"],
|
|
catch_exceptions=False,
|
|
)
|
|
assert "LED" in res.output
|
|
assert "== Features ==" not in res.output
|
|
assert res.exit_code == 0
|
|
|
|
|
|
async def test_feature_missing(mocker, runner):
|
|
"""Test feature command returning single value."""
|
|
dummy_device = await get_device_for_fixture_protocol(
|
|
"P300(EU)_1.0_1.0.13.json", "SMART"
|
|
)
|
|
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
|
res = await runner.invoke(
|
|
cli,
|
|
["--host", "127.0.0.123", "--debug", "feature", "missing"],
|
|
catch_exceptions=False,
|
|
)
|
|
assert "No feature by name 'missing'" in res.output
|
|
assert "== Features ==" not in res.output
|
|
assert res.exit_code == 1
|
|
|
|
|
|
async def test_feature_set(mocker, runner):
|
|
"""Test feature command's set value."""
|
|
dummy_device = await get_device_for_fixture_protocol(
|
|
"P300(EU)_1.0_1.0.13.json", "SMART"
|
|
)
|
|
led_setter = mocker.patch("kasa.smart.modules.led.Led.set_led")
|
|
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
|
|
|
res = await runner.invoke(
|
|
cli,
|
|
["--host", "127.0.0.123", "--debug", "feature", "led", "True"],
|
|
catch_exceptions=False,
|
|
)
|
|
|
|
led_setter.assert_called_with(True)
|
|
assert "Changing led from True to True" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
|
|
async def test_feature_set_child(mocker, runner):
|
|
"""Test feature command's set value."""
|
|
dummy_device = await get_device_for_fixture_protocol(
|
|
"P300(EU)_1.0_1.0.13.json", "SMART"
|
|
)
|
|
setter = mocker.patch("kasa.smart.smartdevice.SmartDevice.set_state")
|
|
|
|
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
|
|
get_child_device = mocker.spy(dummy_device, "get_child_device")
|
|
|
|
child_id = "000000000000000000000000000000000000000001"
|
|
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--host",
|
|
"127.0.0.123",
|
|
"--debug",
|
|
"feature",
|
|
"--child",
|
|
child_id,
|
|
"state",
|
|
"True",
|
|
],
|
|
catch_exceptions=False,
|
|
)
|
|
|
|
get_child_device.assert_called()
|
|
setter.assert_called_with(True)
|
|
|
|
assert f"Targeting child device {child_id}"
|
|
assert "Changing state from False to True" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
|
|
async def test_cli_child_commands(
|
|
dev: Device, runner: CliRunner, mocker: MockerFixture
|
|
):
|
|
if not dev.children:
|
|
res = await runner.invoke(alias, ["--child-index", "0"], obj=dev)
|
|
assert f"Device: {dev.host} does not have children" in res.output
|
|
assert res.exit_code == 1
|
|
|
|
res = await runner.invoke(alias, ["--index", "0"], obj=dev)
|
|
assert f"Device: {dev.host} does not have children" in res.output
|
|
assert res.exit_code == 1
|
|
|
|
res = await runner.invoke(alias, ["--child", "Plug 2"], obj=dev)
|
|
assert f"Device: {dev.host} does not have children" in res.output
|
|
assert res.exit_code == 1
|
|
|
|
res = await runner.invoke(alias, ["--name", "Plug 2"], obj=dev)
|
|
assert f"Device: {dev.host} does not have children" in res.output
|
|
assert res.exit_code == 1
|
|
|
|
if dev.children:
|
|
child_alias = dev.children[0].alias
|
|
assert child_alias
|
|
child_device_id = dev.children[0].device_id
|
|
child_count = len(dev.children)
|
|
child_update_method = dev.children[0].update
|
|
|
|
# Test child retrieval
|
|
res = await runner.invoke(alias, ["--child-index", "0"], obj=dev)
|
|
assert f"Targeting child device {child_alias}" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
res = await runner.invoke(alias, ["--index", "0"], obj=dev)
|
|
assert f"Targeting child device {child_alias}" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
res = await runner.invoke(alias, ["--child", child_alias], obj=dev)
|
|
assert f"Targeting child device {child_alias}" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
res = await runner.invoke(alias, ["--name", child_alias], obj=dev)
|
|
assert f"Targeting child device {child_alias}" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
res = await runner.invoke(alias, ["--child", child_device_id], obj=dev)
|
|
assert f"Targeting child device {child_alias}" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
res = await runner.invoke(alias, ["--name", child_device_id], obj=dev)
|
|
assert f"Targeting child device {child_alias}" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
# Test invalid name and index
|
|
res = await runner.invoke(alias, ["--child-index", "-1"], obj=dev)
|
|
assert f"Invalid index -1, device has {child_count} children" in res.output
|
|
assert res.exit_code == 1
|
|
|
|
res = await runner.invoke(alias, ["--child-index", str(child_count)], obj=dev)
|
|
assert (
|
|
f"Invalid index {child_count}, device has {child_count} children"
|
|
in res.output
|
|
)
|
|
assert res.exit_code == 1
|
|
|
|
res = await runner.invoke(alias, ["--child", "foobar"], obj=dev)
|
|
assert "No child device found with device_id or name: foobar" in res.output
|
|
assert res.exit_code == 1
|
|
|
|
# Test using both options:
|
|
|
|
res = await runner.invoke(
|
|
alias, ["--child", child_alias, "--child-index", "0"], obj=dev
|
|
)
|
|
assert "Use either --child or --child-index, not both." in res.output
|
|
assert res.exit_code == 2
|
|
|
|
# Test child with no parameter interactive prompt
|
|
|
|
res = await runner.invoke(alias, ["--child"], obj=dev, input="0\n")
|
|
assert "Enter the index number of the child device:" in res.output
|
|
assert f"Alias: {child_alias}" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
# Test values and updates
|
|
|
|
res = await runner.invoke(alias, ["foo", "--child", child_device_id], obj=dev)
|
|
assert "Alias set to: foo" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
# Test help has command options plus child options
|
|
|
|
res = await runner.invoke(energy, ["--help"], obj=dev)
|
|
assert "--year" in res.output
|
|
assert "--child" in res.output
|
|
assert "--child-index" in res.output
|
|
assert res.exit_code == 0
|
|
|
|
# Test child update patching calls parent and is undone on exit
|
|
|
|
parent_update_spy = mocker.spy(dev, "update")
|
|
res = await runner.invoke(alias, ["bar", "--child", child_device_id], obj=dev)
|
|
assert "Alias set to: bar" in res.output
|
|
assert res.exit_code == 0
|
|
parent_update_spy.assert_called_once()
|
|
assert dev.children[0].update == child_update_method
|
|
|
|
|
|
async def test_discover_config(dev: Device, mocker, runner):
|
|
"""Test that device config is returned."""
|
|
host = "127.0.0.1"
|
|
mocker.patch("kasa.device_factory._connect", side_effect=[Exception, dev])
|
|
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--username",
|
|
"foo",
|
|
"--password",
|
|
"bar",
|
|
"--host",
|
|
host,
|
|
"discover",
|
|
"config",
|
|
],
|
|
catch_exceptions=False,
|
|
)
|
|
assert res.exit_code == 0
|
|
cparam = dev.config.connection_type
|
|
expected = f"--device-family {cparam.device_family.value} --encrypt-type {cparam.encryption_type.value} {'--https' if cparam.https else '--no-https'}"
|
|
assert expected in res.output
|
|
assert re.search(
|
|
r"Attempt to connect to 127\.0\.0\.1 with \w+ \+ \w+ \+ \w+ failed",
|
|
res.output.replace("\n", ""),
|
|
)
|
|
assert re.search(
|
|
r"Attempt to connect to 127\.0\.0\.1 with \w+ \+ \w+ \+ \w+ succeeded",
|
|
res.output.replace("\n", ""),
|
|
)
|
|
|
|
|
|
async def test_discover_config_invalid(mocker, runner):
|
|
"""Test the device config command with invalids."""
|
|
host = "127.0.0.1"
|
|
mocker.patch("kasa.discover.Discover.try_connect_all", return_value=None)
|
|
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--username",
|
|
"foo",
|
|
"--password",
|
|
"bar",
|
|
"--host",
|
|
host,
|
|
"discover",
|
|
"config",
|
|
],
|
|
catch_exceptions=False,
|
|
)
|
|
assert res.exit_code == 1
|
|
assert f"Unable to connect to {host}" in res.output
|
|
|
|
res = await runner.invoke(
|
|
cli,
|
|
["--username", "foo", "--password", "bar", "discover", "config"],
|
|
catch_exceptions=False,
|
|
)
|
|
assert res.exit_code == 1
|
|
assert "--host option must be supplied to discover config" in res.output
|
|
|
|
res = await runner.invoke(
|
|
cli,
|
|
[
|
|
"--username",
|
|
"foo",
|
|
"--password",
|
|
"bar",
|
|
"--host",
|
|
host,
|
|
"--target",
|
|
"127.0.0.2",
|
|
"discover",
|
|
"config",
|
|
],
|
|
catch_exceptions=False,
|
|
)
|
|
assert res.exit_code == 1
|
|
assert "--target is not a valid option for single host discovery" in res.output
|