python-kasa/kasa/tests/test_cli.py
Teemu R 51a972542f
Disallow non-targeted device commands (#982)
Prevent the cli from allowing sub commands unless host or alias is specified.
It is unwise to allow commands to be run on an arbitrary set of discovered
devices so this PR shows an error if attempted.
Also consolidates other invalid cli operations to use a single error function
to display the error to the user.
2024-06-17 10:04:46 +01:00

918 lines
26 KiB
Python

import json
import os
import re
import asyncclick as click
import pytest
from asyncclick.testing import CliRunner
from kasa import (
AuthenticationError,
Credentials,
Device,
DeviceError,
EmeterStatus,
KasaException,
Module,
UnsupportedDeviceError,
)
from kasa.cli import (
TYPE_TO_CLASS,
alias,
brightness,
cli,
cmd_command,
effect,
emeter,
hsv,
led,
raw_command,
reboot,
state,
sysinfo,
temperature,
time,
toggle,
update_credentials,
wifi,
)
from kasa.discover import Discover, DiscoveryResult
from kasa.iot import IotDevice
from .conftest import (
device_smart,
get_device_for_fixture_protocol,
handle_turn_on,
new_discovery,
turn_on,
)
@pytest.fixture()
def runner():
"""Runner fixture that unsets the KASA_ environment variables for tests."""
KASA_VARS = {k: None for k, v in os.environ.items() if k.startswith("KASA_")}
runner = CliRunner(env=KASA_VARS)
return runner
async def test_update_called_by_cli(dev, mocker, runner):
"""Test that device update is called on main."""
update = mocker.patch.object(dev, "update")
# These will mock the features to avoid accessing non-existing
mocker.patch("kasa.device.Device.features", return_value={})
mocker.patch("kasa.iot.iotdevice.IotDevice.features", return_value={})
mocker.patch("kasa.discover.Discover.discover_single", return_value=dev)
res = await runner.invoke(
cli,
[
"--host",
"127.0.0.1",
"--username",
"foo",
"--password",
"bar",
],
catch_exceptions=False,
)
assert res.exit_code == 0
update.assert_called()
async def test_sysinfo(dev: Device, runner):
res = await runner.invoke(sysinfo, obj=dev)
assert "System info" in res.output
assert dev.model in res.output
@turn_on
async def test_state(dev, turn_on, runner):
await handle_turn_on(dev, turn_on)
await dev.update()
res = await runner.invoke(state, obj=dev)
if dev.is_on:
assert "Device state: True" in res.output
else:
assert "Device state: False" in res.output
@turn_on
async def test_toggle(dev, turn_on, runner):
await handle_turn_on(dev, turn_on)
await dev.update()
assert dev.is_on == turn_on
await runner.invoke(toggle, obj=dev)
await dev.update()
assert dev.is_on != turn_on
async def test_alias(dev, runner):
res = await runner.invoke(alias, obj=dev)
assert f"Alias: {dev.alias}" in res.output
old_alias = dev.alias
new_alias = "new alias"
res = await runner.invoke(alias, [new_alias], obj=dev)
assert f"Setting alias to {new_alias}" in res.output
await dev.update()
res = await runner.invoke(alias, obj=dev)
assert f"Alias: {new_alias}" in res.output
await dev.set_alias(old_alias)
async def test_raw_command(dev, mocker, runner):
update = mocker.patch.object(dev, "update")
from kasa.smart import SmartDevice
if isinstance(dev, SmartDevice):
params = ["na", "get_device_info"]
else:
params = ["system", "get_sysinfo"]
res = await runner.invoke(raw_command, params, obj=dev)
# Make sure that update was not called for wifi
with pytest.raises(AssertionError):
update.assert_called()
assert res.exit_code == 0
assert dev.model in res.output
res = await runner.invoke(raw_command, obj=dev)
assert res.exit_code != 0
assert "Usage" in res.output
async def test_command_with_child(dev, mocker, runner):
"""Test 'command' command with --child."""
update_mock = mocker.patch.object(dev, "update")
# create_autospec for device slows tests way too much, so we use a dummy here
class DummyDevice(dev.__class__):
def __init__(self):
super().__init__("127.0.0.1")
async def _query_helper(*_, **__):
return {"dummy": "response"}
dummy_child = DummyDevice()
mocker.patch.object(dev, "_children", {"XYZ": dummy_child})
mocker.patch.object(dev, "get_child_device", return_value=dummy_child)
res = await runner.invoke(
cmd_command,
["--child", "XYZ", "command", "'params'"],
obj=dev,
catch_exceptions=False,
)
update_mock.assert_called()
assert '{"dummy": "response"}' in res.output
assert res.exit_code == 0
@device_smart
async def test_reboot(dev, mocker, runner):
"""Test that reboot works on SMART devices."""
query_mock = mocker.patch.object(dev.protocol, "query")
res = await runner.invoke(
reboot,
obj=dev,
)
query_mock.assert_called()
assert res.exit_code == 0
@device_smart
async def test_wifi_scan(dev, runner):
res = await runner.invoke(wifi, ["scan"], obj=dev)
assert res.exit_code == 0
assert re.search(r"Found [\d]+ wifi networks!", res.output)
@device_smart
async def test_wifi_join(dev, mocker, runner):
update = mocker.patch.object(dev, "update")
res = await runner.invoke(
wifi,
["join", "FOOBAR", "--keytype", "wpa_psk", "--password", "foobar"],
obj=dev,
)
# Make sure that update was not called for wifi
with pytest.raises(AssertionError):
update.assert_called()
assert res.exit_code == 0
assert "Asking the device to connect to FOOBAR" in res.output
@device_smart
async def test_wifi_join_no_creds(dev, runner):
dev.protocol._transport._credentials = None
res = await runner.invoke(
wifi,
["join", "FOOBAR", "--keytype", "wpa_psk", "--password", "foobar"],
obj=dev,
)
assert res.exit_code != 0
assert isinstance(res.exception, AuthenticationError)
@device_smart
async def test_wifi_join_exception(dev, mocker, runner):
mocker.patch.object(dev.protocol, "query", side_effect=DeviceError(error_code=9999))
res = await runner.invoke(
wifi,
["join", "FOOBAR", "--keytype", "wpa_psk", "--password", "foobar"],
obj=dev,
)
assert res.exit_code != 0
assert isinstance(res.exception, KasaException)
@device_smart
async def test_update_credentials(dev, runner):
res = await runner.invoke(
update_credentials,
["--username", "foo", "--password", "bar"],
input="y\n",
obj=dev,
)
assert res.exit_code == 0
assert (
"Do you really want to replace the existing credentials? [y/N]: y\n"
in res.output
)
async def test_time_get(dev, runner):
"""Test time get command."""
res = await runner.invoke(
time,
obj=dev,
)
assert res.exit_code == 0
assert "Current time: " in res.output
@device_smart
async def test_time_sync(dev, mocker, runner):
"""Test time sync command.
Currently implemented only for SMART.
"""
update = mocker.patch.object(dev, "update")
set_time_mock = mocker.spy(dev.modules[Module.Time], "set_time")
res = await runner.invoke(
time,
["sync"],
obj=dev,
)
set_time_mock.assert_called()
update.assert_called()
assert res.exit_code == 0
assert "Old time: " in res.output
assert "New time: " in res.output
async def test_emeter(dev: Device, mocker, runner):
res = await runner.invoke(emeter, obj=dev)
if not dev.has_emeter:
assert "Device has no emeter" in res.output
return
assert "== Emeter ==" in res.output
if not dev.is_strip:
res = await runner.invoke(emeter, ["--index", "0"], obj=dev)
assert "Index and name are only for power strips!" in res.output
res = await runner.invoke(emeter, ["--name", "mock"], obj=dev)
assert "Index and name are only for power strips!" in res.output
if dev.is_strip and len(dev.children) > 0:
realtime_emeter = mocker.patch.object(dev.children[0], "get_emeter_realtime")
realtime_emeter.return_value = EmeterStatus({"voltage_mv": 122066})
res = await runner.invoke(emeter, ["--index", "0"], obj=dev)
assert "Voltage: 122.066 V" in res.output
realtime_emeter.assert_called()
assert realtime_emeter.call_count == 1
res = await runner.invoke(emeter, ["--name", dev.children[0].alias], obj=dev)
assert "Voltage: 122.066 V" in res.output
assert realtime_emeter.call_count == 2
if isinstance(dev, IotDevice):
monthly = mocker.patch.object(dev, "get_emeter_monthly")
monthly.return_value = {1: 1234}
res = await runner.invoke(emeter, ["--year", "1900"], obj=dev)
if not isinstance(dev, IotDevice):
assert "Device has no historical statistics" in res.output
return
assert "For year" in res.output
assert "1, 1234" in res.output
monthly.assert_called_with(year=1900)
if isinstance(dev, IotDevice):
daily = mocker.patch.object(dev, "get_emeter_daily")
daily.return_value = {1: 1234}
res = await runner.invoke(emeter, ["--month", "1900-12"], obj=dev)
if not isinstance(dev, IotDevice):
assert "Device has no historical statistics" in res.output
return
assert "For month" in res.output
assert "1, 1234" in res.output
daily.assert_called_with(year=1900, month=12)
async def test_brightness(dev: Device, runner):
res = await runner.invoke(brightness, obj=dev)
if not (light := dev.modules.get(Module.Light)) or not light.is_dimmable:
assert "This device does not support brightness." in res.output
return
res = await runner.invoke(brightness, obj=dev)
assert f"Brightness: {light.brightness}" in res.output
res = await runner.invoke(brightness, ["12"], obj=dev)
assert "Setting brightness" in res.output
await dev.update()
res = await runner.invoke(brightness, obj=dev)
assert "Brightness: 12" in res.output
async def test_color_temperature(dev: Device, runner):
res = await runner.invoke(temperature, obj=dev)
if not (light := dev.modules.get(Module.Light)) or not light.is_variable_color_temp:
assert "Device does not support color temperature" in res.output
return
res = await runner.invoke(temperature, obj=dev)
assert f"Color temperature: {light.color_temp}" in res.output
valid_range = light.valid_temperature_range
assert f"(min: {valid_range.min}, max: {valid_range.max})" in res.output
val = int((valid_range.min + valid_range.max) / 2)
res = await runner.invoke(temperature, [str(val)], obj=dev)
assert "Setting color temperature to " in res.output
await dev.update()
res = await runner.invoke(temperature, obj=dev)
assert f"Color temperature: {val}" in res.output
assert res.exit_code == 0
invalid_max = valid_range.max + 100
# Lights that support the maximum range will not get past the click cli range check
# So can't be tested for the internal range check.
if invalid_max < 9000:
res = await runner.invoke(temperature, [str(invalid_max)], obj=dev)
assert res.exit_code == 1
assert isinstance(res.exception, ValueError)
res = await runner.invoke(temperature, [str(9100)], obj=dev)
assert res.exit_code == 2
async def test_color_hsv(dev: Device, runner: CliRunner):
res = await runner.invoke(hsv, obj=dev)
if not (light := dev.modules.get(Module.Light)) or not light.is_color:
assert "Device does not support colors" in res.output
return
res = await runner.invoke(hsv, obj=dev)
assert f"Current HSV: {light.hsv}" in res.output
res = await runner.invoke(hsv, ["180", "50", "50"], obj=dev)
assert "Setting HSV: 180 50 50" in res.output
assert res.exit_code == 0
await dev.update()
res = await runner.invoke(hsv, ["180", "50"], obj=dev)
assert "Setting a color requires 3 values." in res.output
assert res.exit_code == 2
async def test_light_effect(dev: Device, runner: CliRunner):
res = await runner.invoke(effect, obj=dev)
if not (light_effect := dev.modules.get(Module.LightEffect)):
assert "Device does not support effects" in res.output
return
# Start off with a known state of off
await light_effect.set_effect(light_effect.LIGHT_EFFECTS_OFF)
await dev.update()
assert light_effect.effect == light_effect.LIGHT_EFFECTS_OFF
res = await runner.invoke(effect, obj=dev)
assert f"Light effect: {light_effect.effect}" in res.output
assert res.exit_code == 0
res = await runner.invoke(effect, [light_effect.effect_list[1]], obj=dev)
assert f"Setting Effect: {light_effect.effect_list[1]}" in res.output
assert res.exit_code == 0
await dev.update()
assert light_effect.effect == light_effect.effect_list[1]
res = await runner.invoke(effect, ["foobar"], obj=dev)
assert f"Effect must be one of: {light_effect.effect_list}" in res.output
assert res.exit_code == 2
async def test_led(dev: Device, runner: CliRunner):
res = await runner.invoke(led, obj=dev)
if not (led_module := dev.modules.get(Module.Led)):
assert "Device does not support led" in res.output
return
res = await runner.invoke(led, obj=dev)
assert f"LED state: {led_module.led}" in res.output
assert res.exit_code == 0
res = await runner.invoke(led, ["on"], obj=dev)
assert "Turning led to True" in res.output
assert res.exit_code == 0
await dev.update()
assert led_module.led is True
res = await runner.invoke(led, ["off"], obj=dev)
assert "Turning led to False" in res.output
assert res.exit_code == 0
await dev.update()
assert led_module.led is False
async def test_json_output(dev: Device, mocker, runner):
"""Test that the json output produces correct output."""
mocker.patch("kasa.Discover.discover_single", return_value=dev)
# These will mock the features to avoid accessing non-existing ones
mocker.patch("kasa.device.Device.features", return_value={})
mocker.patch("kasa.iot.iotdevice.IotDevice.features", return_value={})
res = await runner.invoke(cli, ["--host", "127.0.0.1", "--json", "state"], obj=dev)
assert res.exit_code == 0
assert json.loads(res.output) == dev.internal_state
@new_discovery
async def test_credentials(discovery_mock, mocker, runner):
"""Test credentials are passed correctly from cli to device."""
# Patch state to echo username and password
pass_dev = click.make_pass_decorator(Device)
@pass_dev
async def _state(dev: Device):
if dev.credentials:
click.echo(
f"Username:{dev.credentials.username} Password:{dev.credentials.password}"
)
mocker.patch("kasa.cli.state", new=_state)
dr = DiscoveryResult(**discovery_mock.discovery_data["result"])
res = await runner.invoke(
cli,
[
"--host",
"127.0.0.123",
"--username",
"foo",
"--password",
"bar",
"--device-family",
dr.device_type,
"--encrypt-type",
dr.mgt_encrypt_schm.encrypt_type,
],
)
assert res.exit_code == 0
assert "Username:foo Password:bar\n" in res.output
async def test_without_device_type(dev, mocker, runner):
"""Test connecting without the device type."""
discovery_mock = mocker.patch(
"kasa.discover.Discover.discover_single", return_value=dev
)
# These will mock the features to avoid accessing non-existing
mocker.patch("kasa.device.Device.features", return_value={})
mocker.patch("kasa.iot.iotdevice.IotDevice.features", return_value={})
res = await runner.invoke(
cli,
[
"--host",
"127.0.0.1",
"--username",
"foo",
"--password",
"bar",
"--discovery-timeout",
"7",
],
)
assert res.exit_code == 0
discovery_mock.assert_called_once_with(
"127.0.0.1",
port=None,
credentials=Credentials("foo", "bar"),
timeout=5,
discovery_timeout=7,
)
@pytest.mark.parametrize("auth_param", ["--username", "--password"])
async def test_invalid_credential_params(auth_param, runner):
"""Test for handling only one of username or password supplied."""
res = await runner.invoke(
cli,
[
"--host",
"127.0.0.1",
"--type",
"plug",
auth_param,
"foo",
],
)
assert res.exit_code == 2
assert (
"Error: Using authentication requires both --username and --password"
in res.output
)
async def test_duplicate_target_device(runner):
"""Test that defining both --host or --alias gives an error."""
res = await runner.invoke(
cli,
[
"--host",
"127.0.0.1",
"--alias",
"foo",
],
)
assert res.exit_code == 2
assert "Error: Use either --alias or --host, not both." in res.output
async def test_discover(discovery_mock, mocker, runner):
"""Test discovery output."""
# These will mock the features to avoid accessing non-existing
mocker.patch("kasa.device.Device.features", return_value={})
mocker.patch("kasa.iot.iotdevice.IotDevice.features", return_value={})
res = await runner.invoke(
cli,
[
"--discovery-timeout",
0,
"--username",
"foo",
"--password",
"bar",
"--verbose",
"discover",
],
)
assert res.exit_code == 0
async def test_discover_host(discovery_mock, mocker, runner):
"""Test discovery output."""
# These will mock the features to avoid accessing non-existing
mocker.patch("kasa.device.Device.features", return_value={})
mocker.patch("kasa.iot.iotdevice.IotDevice.features", return_value={})
res = await runner.invoke(
cli,
[
"--discovery-timeout",
0,
"--host",
"127.0.0.123",
"--username",
"foo",
"--password",
"bar",
"--verbose",
],
)
assert res.exit_code == 0
async def test_discover_unsupported(unsupported_device_info, runner):
"""Test discovery output."""
res = await runner.invoke(
cli,
[
"--discovery-timeout",
0,
"--username",
"foo",
"--password",
"bar",
"--verbose",
"discover",
],
)
assert res.exit_code == 0
assert "== Unsupported device ==" in res.output
assert "== Discovery Result ==" in res.output
async def test_host_unsupported(unsupported_device_info, runner):
"""Test discovery output."""
host = "127.0.0.1"
res = await runner.invoke(
cli,
[
"--host",
host,
"--username",
"foo",
"--password",
"bar",
"--debug",
],
)
assert res.exit_code != 0
assert isinstance(res.exception, UnsupportedDeviceError)
@new_discovery
async def test_discover_auth_failed(discovery_mock, mocker, runner):
"""Test discovery output."""
host = "127.0.0.1"
discovery_mock.ip = host
device_class = Discover._get_device_class(discovery_mock.discovery_data)
mocker.patch.object(
device_class,
"update",
side_effect=AuthenticationError("Failed to authenticate"),
)
res = await runner.invoke(
cli,
[
"--discovery-timeout",
0,
"--username",
"foo",
"--password",
"bar",
"--verbose",
"discover",
],
)
assert res.exit_code == 0
assert "== Authentication failed for device ==" in res.output
assert "== Discovery Result ==" in res.output
@new_discovery
async def test_host_auth_failed(discovery_mock, mocker, runner):
"""Test discovery output."""
host = "127.0.0.1"
discovery_mock.ip = host
device_class = Discover._get_device_class(discovery_mock.discovery_data)
mocker.patch.object(
device_class,
"update",
side_effect=AuthenticationError("Failed to authenticate"),
)
res = await runner.invoke(
cli,
[
"--host",
host,
"--username",
"foo",
"--password",
"bar",
"--debug",
],
)
assert res.exit_code != 0
assert isinstance(res.exception, AuthenticationError)
@pytest.mark.parametrize("device_type", list(TYPE_TO_CLASS))
async def test_type_param(device_type, mocker, runner):
"""Test for handling only one of username or password supplied."""
result_device = FileNotFoundError
pass_dev = click.make_pass_decorator(Device)
@pass_dev
async def _state(dev: Device):
nonlocal result_device
result_device = dev
mocker.patch("kasa.cli.state", new=_state)
expected_type = TYPE_TO_CLASS[device_type]
mocker.patch.object(expected_type, "update")
res = await runner.invoke(
cli,
["--type", device_type, "--host", "127.0.0.1"],
)
assert res.exit_code == 0
assert isinstance(result_device, expected_type)
@pytest.mark.skip(
"Skip until pytest-asyncio supports pytest 8.0, https://github.com/pytest-dev/pytest-asyncio/issues/737"
)
async def test_shell(dev: Device, mocker, runner):
"""Test that the shell commands tries to embed a shell."""
mocker.patch("kasa.Discover.discover", return_value=[dev])
# repl = mocker.patch("ptpython.repl")
mocker.patch.dict(
"sys.modules",
{"ptpython": mocker.MagicMock(), "ptpython.repl": mocker.MagicMock()},
)
embed = mocker.patch("ptpython.repl.embed")
res = await runner.invoke(cli, ["shell"], obj=dev)
assert res.exit_code == 0
embed.assert_called()
async def test_errors(mocker, runner):
err = KasaException("Foobar")
# Test masking
mocker.patch("kasa.Discover.discover", side_effect=err)
res = await runner.invoke(
cli,
["--username", "foo", "--password", "bar"],
)
assert res.exit_code == 1
assert "Raised error: Foobar" in res.output
assert "Run with --debug enabled to see stacktrace" in res.output
assert isinstance(res.exception, SystemExit)
# Test --debug
res = await runner.invoke(
cli,
["--debug"],
)
assert res.exit_code == 1
assert "Raised error: Foobar" in res.output
assert res.exception == err
# Test no device passed to subcommand
mocker.patch("kasa.Discover.discover", return_value={})
res = await runner.invoke(
cli,
["sysinfo"],
)
assert res.exit_code == 1
assert (
"Only discover is available without --host or --alias"
in res.output.replace("\n", "") # Remove newlines from rich formatting
)
assert isinstance(res.exception, SystemExit)
# Test click error
res = await runner.invoke(
cli,
["--foobar"],
)
assert res.exit_code == 2
assert "Raised error:" not in res.output
async def test_feature(mocker, runner):
"""Test feature command."""
dummy_device = await get_device_for_fixture_protocol(
"P300(EU)_1.0_1.0.13.json", "SMART"
)
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
res = await runner.invoke(
cli,
["--host", "127.0.0.123", "--debug", "feature"],
catch_exceptions=False,
)
assert "LED" in res.output
assert "== Child " in res.output # child listing
assert res.exit_code == 0
async def test_features_all(discovery_mock, mocker, runner):
"""Test feature command on all fixtures."""
res = await runner.invoke(
cli,
["--host", "127.0.0.123", "--debug", "feature"],
catch_exceptions=False,
)
assert "== Features ==" in res.output
assert res.exit_code == 0
async def test_feature_single(mocker, runner):
"""Test feature command returning single value."""
dummy_device = await get_device_for_fixture_protocol(
"P300(EU)_1.0_1.0.13.json", "SMART"
)
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
res = await runner.invoke(
cli,
["--host", "127.0.0.123", "--debug", "feature", "led"],
catch_exceptions=False,
)
assert "LED" in res.output
assert "== Features ==" not in res.output
assert res.exit_code == 0
async def test_feature_missing(mocker, runner):
"""Test feature command returning single value."""
dummy_device = await get_device_for_fixture_protocol(
"P300(EU)_1.0_1.0.13.json", "SMART"
)
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
res = await runner.invoke(
cli,
["--host", "127.0.0.123", "--debug", "feature", "missing"],
catch_exceptions=False,
)
assert "No feature by name 'missing'" in res.output
assert "== Features ==" not in res.output
assert res.exit_code == 1
async def test_feature_set(mocker, runner):
"""Test feature command's set value."""
dummy_device = await get_device_for_fixture_protocol(
"P300(EU)_1.0_1.0.13.json", "SMART"
)
led_setter = mocker.patch("kasa.smart.modules.led.Led.set_led")
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
res = await runner.invoke(
cli,
["--host", "127.0.0.123", "--debug", "feature", "led", "True"],
catch_exceptions=False,
)
led_setter.assert_called_with(True)
assert "Changing led from False to True" in res.output
assert res.exit_code == 0
async def test_feature_set_child(mocker, runner):
"""Test feature command's set value."""
dummy_device = await get_device_for_fixture_protocol(
"P300(EU)_1.0_1.0.13.json", "SMART"
)
setter = mocker.patch("kasa.smart.smartdevice.SmartDevice.set_state")
mocker.patch("kasa.discover.Discover.discover_single", return_value=dummy_device)
get_child_device = mocker.spy(dummy_device, "get_child_device")
child_id = "000000000000000000000000000000000000000001"
res = await runner.invoke(
cli,
[
"--host",
"127.0.0.123",
"--debug",
"feature",
"--child",
child_id,
"state",
"True",
],
catch_exceptions=False,
)
get_child_device.assert_called()
setter.assert_called_with(True)
assert f"Targeting child device {child_id}"
assert "Changing state from False to True" in res.output
assert res.exit_code == 0