Make device initialisation easier by reducing required imports (#936)

Adds username and password arguments to discovery to remove the need to import Credentials.
Creates TypeAliases in Device for connection configuration classes and DeviceType.
Using the API with these changes will only require importing either Discover or Device
depending on whether using Discover.discover() or Device.connect() to 
initialize and interact with the API.
This commit is contained in:
Steven B 2024-06-03 21:06:54 +03:00 committed by GitHub
parent bfba7a347f
commit be5202ccb7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 263 additions and 90 deletions

View File

@ -231,11 +231,11 @@ async def cli(
if host is not None: if host is not None:
if discovery_info: if discovery_info:
click.echo("Host and discovery info given, trying connect on %s." % host) click.echo("Host and discovery info given, trying connect on %s." % host)
from kasa import ConnectionType, DeviceConfig from kasa import DeviceConfig, DeviceConnectionParameters
di = json.loads(discovery_info) di = json.loads(discovery_info)
dr = DiscoveryResult(**di) dr = DiscoveryResult(**di)
connection_type = ConnectionType.from_values( connection_type = DeviceConnectionParameters.from_values(
dr.device_type, dr.device_type,
dr.mgt_encrypt_schm.encrypt_type, dr.mgt_encrypt_schm.encrypt_type,
dr.mgt_encrypt_schm.lv, dr.mgt_encrypt_schm.lv,

View File

@ -6,12 +6,14 @@ This page contains guides of how to perform common actions using the library.
```{eval-rst} ```{eval-rst}
.. automodule:: kasa.discover .. automodule:: kasa.discover
:noindex:
``` ```
## Connect without discovery ## Connect without discovery
```{eval-rst} ```{eval-rst}
.. automodule:: kasa.deviceconfig .. automodule:: kasa.deviceconfig
:noindex:
``` ```
## Get Energy Consumption and Usage Statistics ## Get Energy Consumption and Usage Statistics

View File

@ -1,10 +1,11 @@
# API Reference # API Reference
```{currentmodule} kasa
```
## Discover ## Discover
```{module} kasa.discover
```
```{eval-rst} ```{eval-rst}
.. autoclass:: kasa.Discover .. autoclass:: kasa.Discover
:members: :members:
@ -12,8 +13,51 @@
## Device ## Device
```{module} kasa.device
```
```{eval-rst} ```{eval-rst}
.. autoclass:: kasa.Device .. autoclass:: Device
:members:
:undoc-members:
```
## Device Config
```{module} kasa.credentials
```
```{eval-rst}
.. autoclass:: Credentials
:members:
:undoc-members:
```
```{module} kasa.deviceconfig
```
```{eval-rst}
.. autoclass:: DeviceConfig
:members:
:undoc-members:
```
```{eval-rst}
.. autoclass:: kasa.DeviceFamily
:members:
:undoc-members:
```
```{eval-rst}
.. autoclass:: kasa.DeviceConnection
:members:
:undoc-members:
```
```{eval-rst}
.. autoclass:: kasa.DeviceEncryption
:members: :members:
:undoc-members: :undoc-members:
``` ```

View File

@ -11,11 +11,11 @@ The main entry point for the API is :meth:`~kasa.Discover.discover` and
Most newer devices require your TP-Link cloud username and password, but this can be omitted for older devices. Most newer devices require your TP-Link cloud username and password, but this can be omitted for older devices.
>>> from kasa import Device, Discover, Credentials >>> from kasa import Discover
:func:`~kasa.Discover.discover` returns a dict[str,Device] of devices on your network: :func:`~kasa.Discover.discover` returns a dict[str,Device] of devices on your network:
>>> devices = await Discover.discover(credentials=Credentials("user@example.com", "great_password")) >>> devices = await Discover.discover(username="user@example.com", password="great_password")
>>> for dev in devices.values(): >>> for dev in devices.values():
>>> await dev.update() >>> await dev.update()
>>> print(dev.host) >>> print(dev.host)
@ -27,7 +27,7 @@ Most newer devices require your TP-Link cloud username and password, but this ca
:meth:`~kasa.Discover.discover_single` returns a single device by hostname: :meth:`~kasa.Discover.discover_single` returns a single device by hostname:
>>> dev = await Discover.discover_single("127.0.0.3", credentials=Credentials("user@example.com", "great_password")) >>> dev = await Discover.discover_single("127.0.0.3", username="user@example.com", password="great_password")
>>> await dev.update() >>> await dev.update()
>>> dev.alias >>> dev.alias
Living Room Bulb Living Room Bulb

View File

@ -20,10 +20,10 @@ from kasa.credentials import Credentials
from kasa.device import Device from kasa.device import Device
from kasa.device_type import DeviceType from kasa.device_type import DeviceType
from kasa.deviceconfig import ( from kasa.deviceconfig import (
ConnectionType,
DeviceConfig, DeviceConfig,
DeviceFamilyType, DeviceConnectionParameters,
EncryptType, DeviceEncryptionType,
DeviceFamily,
) )
from kasa.discover import Discover from kasa.discover import Discover
from kasa.emeterstatus import EmeterStatus from kasa.emeterstatus import EmeterStatus
@ -71,9 +71,9 @@ __all__ = [
"TimeoutError", "TimeoutError",
"Credentials", "Credentials",
"DeviceConfig", "DeviceConfig",
"ConnectionType", "DeviceConnectionParameters",
"EncryptType", "DeviceEncryptionType",
"DeviceFamilyType", "DeviceFamily",
] ]
from . import iot from . import iot
@ -89,11 +89,14 @@ deprecated_smart_devices = {
"SmartDimmer": iot.IotDimmer, "SmartDimmer": iot.IotDimmer,
"SmartBulbPreset": IotLightPreset, "SmartBulbPreset": IotLightPreset,
} }
deprecated_exceptions = { deprecated_classes = {
"SmartDeviceException": KasaException, "SmartDeviceException": KasaException,
"UnsupportedDeviceException": UnsupportedDeviceError, "UnsupportedDeviceException": UnsupportedDeviceError,
"AuthenticationException": AuthenticationError, "AuthenticationException": AuthenticationError,
"TimeoutException": TimeoutError, "TimeoutException": TimeoutError,
"ConnectionType": DeviceConnectionParameters,
"EncryptType": DeviceEncryptionType,
"DeviceFamilyType": DeviceFamily,
} }
@ -112,8 +115,8 @@ def __getattr__(name):
stacklevel=1, stacklevel=1,
) )
return new_class return new_class
if name in deprecated_exceptions: if name in deprecated_classes:
new_class = deprecated_exceptions[name] new_class = deprecated_classes[name]
msg = f"{name} is deprecated, use {new_class.__name__} instead" msg = f"{name} is deprecated, use {new_class.__name__} instead"
warn(msg, DeprecationWarning, stacklevel=1) warn(msg, DeprecationWarning, stacklevel=1)
return new_class return new_class
@ -133,6 +136,10 @@ if TYPE_CHECKING:
UnsupportedDeviceException = UnsupportedDeviceError UnsupportedDeviceException = UnsupportedDeviceError
AuthenticationException = AuthenticationError AuthenticationException = AuthenticationError
TimeoutException = TimeoutError TimeoutException = TimeoutError
ConnectionType = DeviceConnectionParameters
EncryptType = DeviceEncryptionType
DeviceFamilyType = DeviceFamily
# Instanstiate all classes so the type checkers catch abstract issues # Instanstiate all classes so the type checkers catch abstract issues
from . import smart from . import smart

View File

@ -18,13 +18,13 @@ from pydantic.v1 import ValidationError
from kasa import ( from kasa import (
AuthenticationError, AuthenticationError,
ConnectionType,
Credentials, Credentials,
Device, Device,
DeviceConfig, DeviceConfig,
DeviceFamilyType, DeviceConnectionParameters,
DeviceEncryptionType,
DeviceFamily,
Discover, Discover,
EncryptType,
Feature, Feature,
KasaException, KasaException,
Module, Module,
@ -87,11 +87,9 @@ TYPE_TO_CLASS = {
"smart.bulb": SmartDevice, "smart.bulb": SmartDevice,
} }
ENCRYPT_TYPES = [encrypt_type.value for encrypt_type in EncryptType] ENCRYPT_TYPES = [encrypt_type.value for encrypt_type in DeviceEncryptionType]
DEVICE_FAMILY_TYPES = [ DEVICE_FAMILY_TYPES = [device_family_type.value for device_family_type in DeviceFamily]
device_family_type.value for device_family_type in DeviceFamilyType
]
# Block list of commands which require no update # Block list of commands which require no update
SKIP_UPDATE_COMMANDS = ["wifi", "raw-command", "command"] SKIP_UPDATE_COMMANDS = ["wifi", "raw-command", "command"]
@ -374,9 +372,9 @@ async def cli(
if type is not None: if type is not None:
dev = TYPE_TO_CLASS[type](host) dev = TYPE_TO_CLASS[type](host)
elif device_family and encrypt_type: elif device_family and encrypt_type:
ctype = ConnectionType( ctype = DeviceConnectionParameters(
DeviceFamilyType(device_family), DeviceFamily(device_family),
EncryptType(encrypt_type), DeviceEncryptionType(encrypt_type),
login_version, login_version,
) )
config = DeviceConfig( config = DeviceConfig(

View File

@ -9,9 +9,16 @@ from datetime import datetime
from typing import TYPE_CHECKING, Any, Mapping, Sequence from typing import TYPE_CHECKING, Any, Mapping, Sequence
from warnings import warn from warnings import warn
from .credentials import Credentials from typing_extensions import TypeAlias
from .credentials import Credentials as _Credentials
from .device_type import DeviceType from .device_type import DeviceType
from .deviceconfig import DeviceConfig from .deviceconfig import (
DeviceConfig,
DeviceConnectionParameters,
DeviceEncryptionType,
DeviceFamily,
)
from .emeterstatus import EmeterStatus from .emeterstatus import EmeterStatus
from .exceptions import KasaException from .exceptions import KasaException
from .feature import Feature from .feature import Feature
@ -51,6 +58,22 @@ class Device(ABC):
or :func:`Discover.discover_single()`. or :func:`Discover.discover_single()`.
""" """
# All types required to create devices directly via connect are aliased here
# to avoid consumers having to do multiple imports.
#: The type of device
Type: TypeAlias = DeviceType
#: The credentials for authentication
Credentials: TypeAlias = _Credentials
#: Configuration for connecting to the device
Config: TypeAlias = DeviceConfig
#: The family of the device, e.g. SMART.KASASWITCH.
Family: TypeAlias = DeviceFamily
#: The encryption for the device, e.g. Klap or Aes
EncryptionType: TypeAlias = DeviceEncryptionType
#: The connection type for the device.
ConnectionParameters: TypeAlias = DeviceConnectionParameters
def __init__( def __init__(
self, self,
host: str, host: str,
@ -166,7 +189,7 @@ class Device(ABC):
return self.protocol._transport._port return self.protocol._transport._port
@property @property
def credentials(self) -> Credentials | None: def credentials(self) -> _Credentials | None:
"""The device credentials.""" """The device credentials."""
return self.protocol._transport._credentials return self.protocol._transport._credentials

View File

@ -5,11 +5,11 @@ via discovery or connect directly with :class:`DeviceConfig`.
Discovery returns a list of discovered devices: Discovery returns a list of discovered devices:
>>> from kasa import Discover, Credentials, Device, DeviceConfig >>> from kasa import Discover, Device
>>> device = await Discover.discover_single( >>> device = await Discover.discover_single(
>>> "127.0.0.3", >>> "127.0.0.3",
>>> credentials=Credentials("myusername", "mypassword"), >>> username="user@example.com",
>>> discovery_timeout=10 >>> password="great_password",
>>> ) >>> )
>>> print(device.alias) # Alias is None because update() has not been called >>> print(device.alias) # Alias is None because update() has not been called
None None
@ -21,7 +21,7 @@ None
: {'device_family': 'SMART.TAPOBULB', 'encryption_type': 'KLAP', 'login_version': 2},\ : {'device_family': 'SMART.TAPOBULB', 'encryption_type': 'KLAP', 'login_version': 2},\
'uses_http': True} 'uses_http': True}
>>> later_device = await Device.connect(config=DeviceConfig.from_dict(config_dict)) >>> later_device = await Device.connect(config=Device.Config.from_dict(config_dict))
>>> print(later_device.alias) # Alias is available as connect() calls update() >>> print(later_device.alias) # Alias is available as connect() calls update()
Living Room Bulb Living Room Bulb
@ -45,7 +45,7 @@ if TYPE_CHECKING:
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class EncryptType(Enum): class DeviceEncryptionType(Enum):
"""Encrypt type enum.""" """Encrypt type enum."""
Klap = "KLAP" Klap = "KLAP"
@ -53,7 +53,7 @@ class EncryptType(Enum):
Xor = "XOR" Xor = "XOR"
class DeviceFamilyType(Enum): class DeviceFamily(Enum):
"""Encrypt type enum.""" """Encrypt type enum."""
IotSmartPlugSwitch = "IOT.SMARTPLUGSWITCH" IotSmartPlugSwitch = "IOT.SMARTPLUGSWITCH"
@ -105,11 +105,11 @@ def _dataclass_to_dict(in_val):
@dataclass @dataclass
class ConnectionType: class DeviceConnectionParameters:
"""Class to hold the the parameters determining connection type.""" """Class to hold the the parameters determining connection type."""
device_family: DeviceFamilyType device_family: DeviceFamily
encryption_type: EncryptType encryption_type: DeviceEncryptionType
login_version: Optional[int] = None login_version: Optional[int] = None
@staticmethod @staticmethod
@ -117,12 +117,12 @@ class ConnectionType:
device_family: str, device_family: str,
encryption_type: str, encryption_type: str,
login_version: Optional[int] = None, login_version: Optional[int] = None,
) -> "ConnectionType": ) -> "DeviceConnectionParameters":
"""Return connection parameters from string values.""" """Return connection parameters from string values."""
try: try:
return ConnectionType( return DeviceConnectionParameters(
DeviceFamilyType(device_family), DeviceFamily(device_family),
EncryptType(encryption_type), DeviceEncryptionType(encryption_type),
login_version, login_version,
) )
except (ValueError, TypeError) as ex: except (ValueError, TypeError) as ex:
@ -132,7 +132,7 @@ class ConnectionType:
) from ex ) from ex
@staticmethod @staticmethod
def from_dict(connection_type_dict: Dict[str, str]) -> "ConnectionType": def from_dict(connection_type_dict: Dict[str, str]) -> "DeviceConnectionParameters":
"""Return connection parameters from dict.""" """Return connection parameters from dict."""
if ( if (
isinstance(connection_type_dict, dict) isinstance(connection_type_dict, dict)
@ -141,7 +141,7 @@ class ConnectionType:
): ):
if login_version := connection_type_dict.get("login_version"): if login_version := connection_type_dict.get("login_version"):
login_version = int(login_version) # type: ignore[assignment] login_version = int(login_version) # type: ignore[assignment]
return ConnectionType.from_values( return DeviceConnectionParameters.from_values(
device_family, device_family,
encryption_type, encryption_type,
login_version, # type: ignore[arg-type] login_version, # type: ignore[arg-type]
@ -180,9 +180,9 @@ class DeviceConfig:
#: The protocol specific type of connection. Defaults to the legacy type. #: The protocol specific type of connection. Defaults to the legacy type.
batch_size: Optional[int] = None batch_size: Optional[int] = None
#: The batch size for protoools supporting multiple request batches. #: The batch size for protoools supporting multiple request batches.
connection_type: ConnectionType = field( connection_type: DeviceConnectionParameters = field(
default_factory=lambda: ConnectionType( default_factory=lambda: DeviceConnectionParameters(
DeviceFamilyType.IotSmartPlugSwitch, EncryptType.Xor, 1 DeviceFamily.IotSmartPlugSwitch, DeviceEncryptionType.Xor, 1
) )
) )
#: True if the device uses http. Consumers should retrieve rather than set this #: True if the device uses http. Consumers should retrieve rather than set this
@ -195,8 +195,8 @@ class DeviceConfig:
def __post_init__(self): def __post_init__(self):
if self.connection_type is None: if self.connection_type is None:
self.connection_type = ConnectionType( self.connection_type = DeviceConnectionParameters(
DeviceFamilyType.IotSmartPlugSwitch, EncryptType.Xor DeviceFamily.IotSmartPlugSwitch, DeviceEncryptionType.Xor
) )
def to_dict( def to_dict(

View File

@ -18,17 +18,32 @@ devices.
Discovery returns a dict of {ip: discovered devices}: Discovery returns a dict of {ip: discovered devices}:
>>> import asyncio
>>> from kasa import Discover, Credentials >>> from kasa import Discover, Credentials
>>> >>>
>>> found_devices = await Discover.discover() >>> found_devices = await Discover.discover()
>>> [dev.model for dev in found_devices.values()] >>> [dev.model for dev in found_devices.values()]
['KP303(UK)', 'HS110(EU)', 'L530E', 'KL430(US)', 'HS220(US)'] ['KP303(UK)', 'HS110(EU)', 'L530E', 'KL430(US)', 'HS220(US)']
You can pass username and password for devices requiring authentication
>>> devices = await Discover.discover(
>>> username="user@example.com",
>>> password="great_password",
>>> )
>>> print(len(devices))
5
You can also pass a :class:`kasa.Credentials`
>>> creds = Credentials("user@example.com", "great_password")
>>> devices = await Discover.discover(credentials=creds)
>>> print(len(devices))
5
Discovery can also be targeted to a specific broadcast address instead of Discovery can also be targeted to a specific broadcast address instead of
the default 255.255.255.255: the default 255.255.255.255:
>>> found_devices = await Discover.discover(target="127.0.0.255") >>> found_devices = await Discover.discover(target="127.0.0.255", credentials=creds)
>>> print(len(found_devices)) >>> print(len(found_devices))
5 5
@ -49,29 +64,16 @@ It is also possible to pass a coroutine to be executed for each found device:
>>> await dev.update() >>> await dev.update()
>>> print(f"Discovered {dev.alias} (model: {dev.model})") >>> print(f"Discovered {dev.alias} (model: {dev.model})")
>>> >>>
>>> devices = await Discover.discover(on_discovered=print_dev_info) >>> devices = await Discover.discover(on_discovered=print_dev_info, credentials=creds)
Discovered Bedroom Power Strip (model: KP303(UK)) Discovered Bedroom Power Strip (model: KP303(UK))
Discovered Bedroom Lamp Plug (model: HS110(EU)) Discovered Bedroom Lamp Plug (model: HS110(EU))
Discovered Living Room Bulb (model: L530) Discovered Living Room Bulb (model: L530)
Discovered Bedroom Lightstrip (model: KL430(US)) Discovered Bedroom Lightstrip (model: KL430(US))
Discovered Living Room Dimmer Switch (model: HS220(US)) Discovered Living Room Dimmer Switch (model: HS220(US))
You can pass credentials for devices requiring authentication
>>> devices = await Discover.discover(
>>> credentials=Credentials("myusername", "mypassword"),
>>> discovery_timeout=10
>>> )
>>> print(len(devices))
5
Discovering a single device returns a kasa.Device object. Discovering a single device returns a kasa.Device object.
>>> device = await Discover.discover_single( >>> device = await Discover.discover_single("127.0.0.1", credentials=creds)
>>> "127.0.0.1",
>>> credentials=Credentials("myusername", "mypassword"),
>>> discovery_timeout=10
>>> )
>>> device.model >>> device.model
'KP303(UK)' 'KP303(UK)'
@ -98,7 +100,11 @@ from kasa.device_factory import (
get_device_class_from_sys_info, get_device_class_from_sys_info,
get_protocol, get_protocol,
) )
from kasa.deviceconfig import ConnectionType, DeviceConfig, EncryptType from kasa.deviceconfig import (
DeviceConfig,
DeviceConnectionParameters,
DeviceEncryptionType,
)
from kasa.exceptions import ( from kasa.exceptions import (
KasaException, KasaException,
TimeoutError, TimeoutError,
@ -296,6 +302,8 @@ class Discover:
interface=None, interface=None,
on_unsupported=None, on_unsupported=None,
credentials=None, credentials=None,
username: str | None = None,
password: str | None = None,
port=None, port=None,
timeout=None, timeout=None,
) -> DeviceDict: ) -> DeviceDict:
@ -323,11 +331,16 @@ class Discover:
:param discovery_packets: Number of discovery packets to broadcast :param discovery_packets: Number of discovery packets to broadcast
:param interface: Bind to specific interface :param interface: Bind to specific interface
:param on_unsupported: Optional callback when unsupported devices are discovered :param on_unsupported: Optional callback when unsupported devices are discovered
:param credentials: Credentials for devices requiring authentication :param credentials: Credentials for devices that require authentication.
username and password are ignored if provided.
:param username: Username for devices that require authentication
:param password: Password for devices that require authentication
:param port: Override the discovery port for devices listening on 9999 :param port: Override the discovery port for devices listening on 9999
:param timeout: Query timeout in seconds for devices returned by discovery :param timeout: Query timeout in seconds for devices returned by discovery
:return: dictionary with discovered devices :return: dictionary with discovered devices
""" """
if not credentials and username and password:
credentials = Credentials(username, password)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
transport, protocol = await loop.create_datagram_endpoint( transport, protocol = await loop.create_datagram_endpoint(
lambda: _DiscoverProtocol( lambda: _DiscoverProtocol(
@ -367,6 +380,8 @@ class Discover:
port: int | None = None, port: int | None = None,
timeout: int | None = None, timeout: int | None = None,
credentials: Credentials | None = None, credentials: Credentials | None = None,
username: str | None = None,
password: str | None = None,
) -> Device: ) -> Device:
"""Discover a single device by the given IP address. """Discover a single device by the given IP address.
@ -379,10 +394,15 @@ class Discover:
:param discovery_timeout: Timeout in seconds for discovery :param discovery_timeout: Timeout in seconds for discovery
:param port: Optionally set a different port for legacy devices using port 9999 :param port: Optionally set a different port for legacy devices using port 9999
:param timeout: Timeout in seconds device for devices queries :param timeout: Timeout in seconds device for devices queries
:param credentials: Credentials for devices that require authentication :param credentials: Credentials for devices that require authentication.
username and password are ignored if provided.
:param username: Username for devices that require authentication
:param password: Password for devices that require authentication
:rtype: SmartDevice :rtype: SmartDevice
:return: Object for querying/controlling found device. :return: Object for querying/controlling found device.
""" """
if not credentials and username and password:
credentials = Credentials(username, password)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
@ -469,8 +489,9 @@ class Discover:
device = device_class(config.host, config=config) device = device_class(config.host, config=config)
sys_info = info["system"]["get_sysinfo"] sys_info = info["system"]["get_sysinfo"]
if device_type := sys_info.get("mic_type", sys_info.get("type")): if device_type := sys_info.get("mic_type", sys_info.get("type")):
config.connection_type = ConnectionType.from_values( config.connection_type = DeviceConnectionParameters.from_values(
device_family=device_type, encryption_type=EncryptType.Xor.value device_family=device_type,
encryption_type=DeviceEncryptionType.Xor.value,
) )
device.protocol = get_protocol(config) # type: ignore[assignment] device.protocol = get_protocol(config) # type: ignore[assignment]
device.update_from_discover_info(info) device.update_from_discover_info(info)
@ -502,7 +523,7 @@ class Discover:
type_ = discovery_result.device_type type_ = discovery_result.device_type
try: try:
config.connection_type = ConnectionType.from_values( config.connection_type = DeviceConnectionParameters.from_values(
type_, type_,
discovery_result.mgt_encrypt_schm.encrypt_type, discovery_result.mgt_encrypt_schm.encrypt_type,
discovery_result.mgt_encrypt_schm.lv, discovery_result.mgt_encrypt_schm.lv,

View File

@ -116,13 +116,11 @@ def test_deprecated_devices(device_class, use_class):
getattr(module, use_class.__name__) getattr(module, use_class.__name__)
@pytest.mark.parametrize( @pytest.mark.parametrize("deprecated_class, use_class", kasa.deprecated_classes.items())
"exceptions_class, use_class", kasa.deprecated_exceptions.items() def test_deprecated_classes(deprecated_class, use_class):
) msg = f"{deprecated_class} is deprecated, use {use_class.__name__} instead"
def test_deprecated_exceptions(exceptions_class, use_class):
msg = f"{exceptions_class} is deprecated, use {use_class.__name__} instead"
with pytest.deprecated_call(match=msg): with pytest.deprecated_call(match=msg):
getattr(kasa, exceptions_class) getattr(kasa, deprecated_class)
getattr(kasa, use_class.__name__) getattr(kasa, use_class.__name__)
@ -266,3 +264,27 @@ async def test_deprecated_light_preset_attributes(dev: Device):
IotLightPreset(index=0, hue=100, brightness=100, saturation=0, color_temp=0), # type: ignore[call-arg] IotLightPreset(index=0, hue=100, brightness=100, saturation=0, color_temp=0), # type: ignore[call-arg]
will_raise=exc, will_raise=exc,
) )
async def test_device_type_aliases():
"""Test that the device type aliases in Device work."""
def _mock_connect(config, *args, **kwargs):
mock = Mock()
mock.config = config
return mock
with patch("kasa.device_factory.connect", side_effect=_mock_connect):
dev = await Device.connect(
config=Device.Config(
host="127.0.0.1",
credentials=Device.Credentials(username="user", password="foobar"), # noqa: S106
connection_type=Device.ConnectionParameters(
device_family=Device.Family.SmartKasaPlug,
encryption_type=Device.EncryptionType.Klap,
login_version=2,
),
)
)
assert isinstance(dev.config, DeviceConfig)
assert DeviceType.Dimmer == Device.Type.Dimmer

View File

@ -17,10 +17,10 @@ from kasa.device_factory import (
get_protocol, get_protocol,
) )
from kasa.deviceconfig import ( from kasa.deviceconfig import (
ConnectionType,
DeviceConfig, DeviceConfig,
DeviceFamilyType, DeviceConnectionParameters,
EncryptType, DeviceEncryptionType,
DeviceFamily,
) )
from kasa.discover import DiscoveryResult from kasa.discover import DiscoveryResult
from kasa.smart.smartdevice import SmartDevice from kasa.smart.smartdevice import SmartDevice
@ -31,12 +31,12 @@ def _get_connection_type_device_class(discovery_info):
device_class = Discover._get_device_class(discovery_info) device_class = Discover._get_device_class(discovery_info)
dr = DiscoveryResult(**discovery_info["result"]) dr = DiscoveryResult(**discovery_info["result"])
connection_type = ConnectionType.from_values( connection_type = DeviceConnectionParameters.from_values(
dr.device_type, dr.mgt_encrypt_schm.encrypt_type dr.device_type, dr.mgt_encrypt_schm.encrypt_type
) )
else: else:
connection_type = ConnectionType.from_values( connection_type = DeviceConnectionParameters.from_values(
DeviceFamilyType.IotSmartPlugSwitch.value, EncryptType.Xor.value DeviceFamily.IotSmartPlugSwitch.value, DeviceEncryptionType.Xor.value
) )
device_class = Discover._get_device_class(discovery_info) device_class = Discover._get_device_class(discovery_info)
@ -137,7 +137,7 @@ async def test_connect_http_client(discovery_data, mocker):
host=host, credentials=Credentials("foor", "bar"), connection_type=ctype host=host, credentials=Credentials("foor", "bar"), connection_type=ctype
) )
dev = await connect(config=config) dev = await connect(config=config)
if ctype.encryption_type != EncryptType.Xor: if ctype.encryption_type != DeviceEncryptionType.Xor:
assert dev.protocol._transport._http_client.client != http_client assert dev.protocol._transport._http_client.client != http_client
await dev.disconnect() await dev.disconnect()
@ -148,7 +148,7 @@ async def test_connect_http_client(discovery_data, mocker):
http_client=http_client, http_client=http_client,
) )
dev = await connect(config=config) dev = await connect(config=config)
if ctype.encryption_type != EncryptType.Xor: if ctype.encryption_type != DeviceEncryptionType.Xor:
assert dev.protocol._transport._http_client.client == http_client assert dev.protocol._transport._http_client.client == http_client
await dev.disconnect() await dev.disconnect()
await http_client.close() await http_client.close()

View File

@ -1,4 +1,6 @@
# type: ignore # type: ignore
# ruff: noqa: S106
import asyncio import asyncio
import re import re
import socket import socket
@ -16,8 +18,8 @@ from kasa import (
KasaException, KasaException,
) )
from kasa.deviceconfig import ( from kasa.deviceconfig import (
ConnectionType,
DeviceConfig, DeviceConfig,
DeviceConnectionParameters,
) )
from kasa.discover import DiscoveryResult, _DiscoverProtocol, json_dumps from kasa.discover import DiscoveryResult, _DiscoverProtocol, json_dumps
from kasa.exceptions import AuthenticationError, UnsupportedDeviceError from kasa.exceptions import AuthenticationError, UnsupportedDeviceError
@ -128,7 +130,7 @@ async def test_discover_single(discovery_mock, custom_port, mocker):
if discovery_mock.default_port == 80: if discovery_mock.default_port == 80:
assert x.alias is None assert x.alias is None
ct = ConnectionType.from_values( ct = DeviceConnectionParameters.from_values(
discovery_mock.device_type, discovery_mock.device_type,
discovery_mock.encrypt_type, discovery_mock.encrypt_type,
discovery_mock.login_version, discovery_mock.login_version,
@ -164,6 +166,60 @@ async def test_discover_single_hostname(discovery_mock, mocker):
x = await Discover.discover_single(host, credentials=Credentials()) x = await Discover.discover_single(host, credentials=Credentials())
async def test_discover_credentials(mocker):
"""Make sure that discover gives credentials precedence over un and pw."""
host = "127.0.0.1"
mocker.patch("kasa.discover._DiscoverProtocol.wait_for_discovery_to_complete")
def mock_discover(self, *_, **__):
self.discovered_devices = {host: MagicMock()}
mocker.patch.object(_DiscoverProtocol, "do_discover", mock_discover)
dp = mocker.spy(_DiscoverProtocol, "__init__")
# Only credentials passed
await Discover.discover(credentials=Credentials(), timeout=0)
assert dp.mock_calls[0].kwargs["credentials"] == Credentials()
# Credentials and un/pw passed
await Discover.discover(
credentials=Credentials(), username="Foo", password="Bar", timeout=0
)
assert dp.mock_calls[1].kwargs["credentials"] == Credentials()
# Only un/pw passed
await Discover.discover(username="Foo", password="Bar", timeout=0)
assert dp.mock_calls[2].kwargs["credentials"] == Credentials("Foo", "Bar")
# Only un passed, credentials should be None
await Discover.discover(username="Foo", timeout=0)
assert dp.mock_calls[3].kwargs["credentials"] is None
async def test_discover_single_credentials(mocker):
"""Make sure that discover_single gives credentials precedence over un and pw."""
host = "127.0.0.1"
mocker.patch("kasa.discover._DiscoverProtocol.wait_for_discovery_to_complete")
def mock_discover(self, *_, **__):
self.discovered_devices = {host: MagicMock()}
mocker.patch.object(_DiscoverProtocol, "do_discover", mock_discover)
dp = mocker.spy(_DiscoverProtocol, "__init__")
# Only credentials passed
await Discover.discover_single(host, credentials=Credentials(), timeout=0)
assert dp.mock_calls[0].kwargs["credentials"] == Credentials()
# Credentials and un/pw passed
await Discover.discover_single(
host, credentials=Credentials(), username="Foo", password="Bar", timeout=0
)
assert dp.mock_calls[1].kwargs["credentials"] == Credentials()
# Only un/pw passed
await Discover.discover_single(host, username="Foo", password="Bar", timeout=0)
assert dp.mock_calls[2].kwargs["credentials"] == Credentials("Foo", "Bar")
# Only un passed, credentials should be None
await Discover.discover_single(host, username="Foo", timeout=0)
assert dp.mock_calls[3].kwargs["credentials"] is None
async def test_discover_single_unsupported(unsupported_device_info, mocker): async def test_discover_single_unsupported(unsupported_device_info, mocker):
"""Make sure that discover_single handles unsupported devices correctly.""" """Make sure that discover_single handles unsupported devices correctly."""
host = "127.0.0.1" host = "127.0.0.1"