python-kasa/kasa/device_factory.py
sdb9696 9728866afb
Re-add protocol_class parameter to connect (#551)
Co-authored-by: J. Nick Koston <nick@koston.org>
2023-11-28 20:13:15 +01:00

130 lines
4.4 KiB
Python
Executable File

"""Device creation by type."""
import logging
import time
from typing import Any, Dict, Optional, Type
from .credentials import Credentials
from .device_type import DeviceType
from .exceptions import UnsupportedDeviceException
from .protocol import TPLinkProtocol
from .smartbulb import SmartBulb
from .smartdevice import SmartDevice, SmartDeviceException
from .smartdimmer import SmartDimmer
from .smartlightstrip import SmartLightStrip
from .smartplug import SmartPlug
from .smartstrip import SmartStrip
DEVICE_TYPE_TO_CLASS = {
DeviceType.Plug: SmartPlug,
DeviceType.Bulb: SmartBulb,
DeviceType.Strip: SmartStrip,
DeviceType.Dimmer: SmartDimmer,
DeviceType.LightStrip: SmartLightStrip,
}
_LOGGER = logging.getLogger(__name__)
async def connect(
host: str,
*,
port: Optional[int] = None,
timeout=5,
credentials: Optional[Credentials] = None,
device_type: Optional[DeviceType] = None,
protocol_class: Optional[Type[TPLinkProtocol]] = None,
) -> "SmartDevice":
"""Connect to a single device by the given IP address.
This method avoids the UDP based discovery process and
will connect directly to the device to query its type.
It is generally preferred to avoid :func:`discover_single()` and
use this function instead as it should perform better when
the WiFi network is congested or the device is not responding
to discovery requests.
The device type is discovered by querying the device.
:param host: Hostname of device to query
:param device_type: Device type to use for the device.
If not given, the device type is discovered by querying the device.
If the device type is already known, it is preferred to pass it
to avoid the extra query to the device to discover its type.
:param protocol_class: Optionally provide the protocol class
to use.
:rtype: SmartDevice
:return: Object for querying/controlling found device.
"""
debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
if debug_enabled:
start_time = time.perf_counter()
if device_type and (klass := DEVICE_TYPE_TO_CLASS.get(device_type)):
dev: SmartDevice = klass(
host=host, port=port, credentials=credentials, timeout=timeout
)
if protocol_class is not None:
dev.protocol = protocol_class(host, credentials=credentials)
await dev.update()
if debug_enabled:
end_time = time.perf_counter()
_LOGGER.debug(
"Device %s with known type (%s) took %.2f seconds to connect",
host,
device_type.value,
end_time - start_time,
)
return dev
unknown_dev = SmartDevice(
host=host, port=port, credentials=credentials, timeout=timeout
)
if protocol_class is not None:
unknown_dev.protocol = protocol_class(host, credentials=credentials)
await unknown_dev.update()
device_class = get_device_class_from_info(unknown_dev.internal_state)
dev = device_class(host=host, port=port, credentials=credentials, timeout=timeout)
# Reuse the connection from the unknown device
# so we don't have to reconnect
dev.protocol = unknown_dev.protocol
await dev.update()
if debug_enabled:
end_time = time.perf_counter()
_LOGGER.debug(
"Device %s with unknown type (%s) took %.2f seconds to connect",
host,
dev.device_type.value,
end_time - start_time,
)
return dev
def get_device_class_from_info(info: Dict[str, Any]) -> Type[SmartDevice]:
"""Find SmartDevice subclass for device described by passed data."""
if "system" not in info or "get_sysinfo" not in info["system"]:
raise SmartDeviceException("No 'system' or 'get_sysinfo' in response")
sysinfo: Dict[str, Any] = info["system"]["get_sysinfo"]
type_: Optional[str] = sysinfo.get("type", sysinfo.get("mic_type"))
if type_ is None:
raise SmartDeviceException("Unable to find the device type field!")
if "dev_name" in sysinfo and "Dimmer" in sysinfo["dev_name"]:
return SmartDimmer
if "smartplug" in type_.lower():
if "children" in sysinfo:
return SmartStrip
return SmartPlug
if "smartbulb" in type_.lower():
if "length" in sysinfo: # strips have length
return SmartLightStrip
return SmartBulb
raise UnsupportedDeviceException("Unknown device type: %s" % type_)