mirror of
https://github.com/python-kasa/python-kasa.git
synced 2024-12-22 11:13:34 +00:00
Use DeviceInfo consistently across devices (#1338)
Some checks are pending
CI / Perform linting checks (3.13) (push) Waiting to run
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.13) (push) Blocked by required conditions
CodeQL checks / Analyze (python) (push) Waiting to run
Some checks are pending
CI / Perform linting checks (3.13) (push) Waiting to run
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, macos-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, ubuntu-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (false, windows-latest, 3.13) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.11) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.12) (push) Blocked by required conditions
CI / Python ${{ matrix.python-version}} on ${{ matrix.os }}${{ fromJSON('[" (extras)", ""]')[matrix.extras == ''] }} (true, ubuntu-latest, 3.13) (push) Blocked by required conditions
CodeQL checks / Analyze (python) (push) Waiting to run
- Make model exclude region for `iot` devices. This is consistent with `smart` and `smartcam` devices. - Make region it's own attribute on `Device`. - Ensure that devices consistently use `_get_device_info` static methods for all information relating to device models. - Fix issue with firmware and hardware being the wrong way round for `smartcam` devices.
This commit is contained in:
parent
5f84c69774
commit
223f3318ea
@ -41,8 +41,14 @@ async def state(ctx, dev: Device):
|
|||||||
echo(f"Device state: {dev.is_on}")
|
echo(f"Device state: {dev.is_on}")
|
||||||
|
|
||||||
echo(f"Time: {dev.time} (tz: {dev.timezone})")
|
echo(f"Time: {dev.time} (tz: {dev.timezone})")
|
||||||
echo(f"Hardware: {dev.hw_info['hw_ver']}")
|
echo(
|
||||||
echo(f"Software: {dev.hw_info['sw_ver']}")
|
f"Hardware: {dev.device_info.hardware_version}"
|
||||||
|
f"{' (' + dev.region + ')' if dev.region else ''}"
|
||||||
|
)
|
||||||
|
echo(
|
||||||
|
f"Firmware: {dev.device_info.firmware_version}"
|
||||||
|
f" {dev.device_info.firmware_build}"
|
||||||
|
)
|
||||||
echo(f"MAC (rssi): {dev.mac} ({dev.rssi})")
|
echo(f"MAC (rssi): {dev.mac} ({dev.rssi})")
|
||||||
if verbose:
|
if verbose:
|
||||||
echo(f"Location: {dev.location}")
|
echo(f"Location: {dev.location}")
|
||||||
|
@ -29,7 +29,7 @@ All devices provide several informational properties:
|
|||||||
>>> dev.alias
|
>>> dev.alias
|
||||||
Bedroom Lamp Plug
|
Bedroom Lamp Plug
|
||||||
>>> dev.model
|
>>> dev.model
|
||||||
HS110(EU)
|
HS110
|
||||||
>>> dev.rssi
|
>>> dev.rssi
|
||||||
-71
|
-71
|
||||||
>>> dev.mac
|
>>> dev.mac
|
||||||
@ -151,7 +151,7 @@ _LOGGER = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class _DeviceInfo:
|
class DeviceInfo:
|
||||||
"""Device Model Information."""
|
"""Device Model Information."""
|
||||||
|
|
||||||
short_name: str
|
short_name: str
|
||||||
@ -208,7 +208,7 @@ class Device(ABC):
|
|||||||
self.protocol: BaseProtocol = protocol or IotProtocol(
|
self.protocol: BaseProtocol = protocol or IotProtocol(
|
||||||
transport=XorTransport(config=config or DeviceConfig(host=host)),
|
transport=XorTransport(config=config or DeviceConfig(host=host)),
|
||||||
)
|
)
|
||||||
self._last_update: Any = None
|
self._last_update: dict[str, Any] = {}
|
||||||
_LOGGER.debug("Initializing %s of type %s", host, type(self))
|
_LOGGER.debug("Initializing %s of type %s", host, type(self))
|
||||||
self._device_type = DeviceType.Unknown
|
self._device_type = DeviceType.Unknown
|
||||||
# TODO: typing Any is just as using dict | None would require separate
|
# TODO: typing Any is just as using dict | None would require separate
|
||||||
@ -334,9 +334,21 @@ class Device(ABC):
|
|||||||
"""Returns the device model."""
|
"""Returns the device model."""
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
def region(self) -> str | None:
|
||||||
|
"""Returns the device region."""
|
||||||
|
return self.device_info.region
|
||||||
|
|
||||||
|
@property
|
||||||
|
def device_info(self) -> DeviceInfo:
|
||||||
|
"""Return device info."""
|
||||||
|
return self._get_device_info(self._last_update, self._discovery_info)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def _model_region(self) -> str:
|
def _get_device_info(
|
||||||
"""Return device full model name and region."""
|
info: dict[str, Any], discovery_info: dict[str, Any] | None
|
||||||
|
) -> DeviceInfo:
|
||||||
|
"""Get device info."""
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
|
@ -22,7 +22,7 @@ Discovery returns a dict of {ip: discovered devices}:
|
|||||||
>>>
|
>>>
|
||||||
>>> found_devices = await Discover.discover()
|
>>> found_devices = await Discover.discover()
|
||||||
>>> [dev.model for dev in found_devices.values()]
|
>>> [dev.model for dev in found_devices.values()]
|
||||||
['KP303(UK)', 'HS110(EU)', 'L530E', 'KL430(US)', 'HS220(US)']
|
['KP303', 'HS110', 'L530E', 'KL430', 'HS220']
|
||||||
|
|
||||||
You can pass username and password for devices requiring authentication
|
You can pass username and password for devices requiring authentication
|
||||||
|
|
||||||
@ -65,17 +65,17 @@ It is also possible to pass a coroutine to be executed for each found device:
|
|||||||
>>> print(f"Discovered {dev.alias} (model: {dev.model})")
|
>>> print(f"Discovered {dev.alias} (model: {dev.model})")
|
||||||
>>>
|
>>>
|
||||||
>>> devices = await Discover.discover(on_discovered=print_dev_info, credentials=creds)
|
>>> devices = await Discover.discover(on_discovered=print_dev_info, credentials=creds)
|
||||||
Discovered Bedroom Power Strip (model: KP303(UK))
|
Discovered Bedroom Power Strip (model: KP303)
|
||||||
Discovered Bedroom Lamp Plug (model: HS110(EU))
|
Discovered Bedroom Lamp Plug (model: HS110)
|
||||||
Discovered Living Room Bulb (model: L530)
|
Discovered Living Room Bulb (model: L530)
|
||||||
Discovered Bedroom Lightstrip (model: KL430(US))
|
Discovered Bedroom Lightstrip (model: KL430)
|
||||||
Discovered Living Room Dimmer Switch (model: HS220(US))
|
Discovered Living Room Dimmer Switch (model: HS220)
|
||||||
|
|
||||||
Discovering a single device returns a kasa.Device object.
|
Discovering a single device returns a kasa.Device object.
|
||||||
|
|
||||||
>>> device = await Discover.discover_single("127.0.0.1", credentials=creds)
|
>>> device = await Discover.discover_single("127.0.0.1", credentials=creds)
|
||||||
>>> device.model
|
>>> device.model
|
||||||
'KP303(UK)'
|
'KP303'
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ from datetime import datetime, timedelta, tzinfo
|
|||||||
from typing import TYPE_CHECKING, Any, cast
|
from typing import TYPE_CHECKING, Any, cast
|
||||||
from warnings import warn
|
from warnings import warn
|
||||||
|
|
||||||
from ..device import Device, WifiNetwork, _DeviceInfo
|
from ..device import Device, DeviceInfo, WifiNetwork
|
||||||
from ..device_type import DeviceType
|
from ..device_type import DeviceType
|
||||||
from ..deviceconfig import DeviceConfig
|
from ..deviceconfig import DeviceConfig
|
||||||
from ..exceptions import KasaException
|
from ..exceptions import KasaException
|
||||||
@ -43,7 +43,7 @@ def requires_update(f: Callable) -> Any:
|
|||||||
@functools.wraps(f)
|
@functools.wraps(f)
|
||||||
async def wrapped(*args: Any, **kwargs: Any) -> Any:
|
async def wrapped(*args: Any, **kwargs: Any) -> Any:
|
||||||
self = args[0]
|
self = args[0]
|
||||||
if self._last_update is None and (
|
if not self._last_update and (
|
||||||
self._sys_info is None or f.__name__ not in self._sys_info
|
self._sys_info is None or f.__name__ not in self._sys_info
|
||||||
):
|
):
|
||||||
raise KasaException("You need to await update() to access the data")
|
raise KasaException("You need to await update() to access the data")
|
||||||
@ -54,7 +54,7 @@ def requires_update(f: Callable) -> Any:
|
|||||||
@functools.wraps(f)
|
@functools.wraps(f)
|
||||||
def wrapped(*args: Any, **kwargs: Any) -> Any:
|
def wrapped(*args: Any, **kwargs: Any) -> Any:
|
||||||
self = args[0]
|
self = args[0]
|
||||||
if self._last_update is None and (
|
if not self._last_update and (
|
||||||
self._sys_info is None or f.__name__ not in self._sys_info
|
self._sys_info is None or f.__name__ not in self._sys_info
|
||||||
):
|
):
|
||||||
raise KasaException("You need to await update() to access the data")
|
raise KasaException("You need to await update() to access the data")
|
||||||
@ -112,7 +112,7 @@ class IotDevice(Device):
|
|||||||
>>> dev.alias
|
>>> dev.alias
|
||||||
Bedroom Lamp Plug
|
Bedroom Lamp Plug
|
||||||
>>> dev.model
|
>>> dev.model
|
||||||
HS110(EU)
|
HS110
|
||||||
>>> dev.rssi
|
>>> dev.rssi
|
||||||
-71
|
-71
|
||||||
>>> dev.mac
|
>>> dev.mac
|
||||||
@ -310,7 +310,7 @@ class IotDevice(Device):
|
|||||||
# If this is the initial update, check only for the sysinfo
|
# If this is the initial update, check only for the sysinfo
|
||||||
# This is necessary as some devices crash on unexpected modules
|
# This is necessary as some devices crash on unexpected modules
|
||||||
# See #105, #120, #161
|
# See #105, #120, #161
|
||||||
if self._last_update is None:
|
if not self._last_update:
|
||||||
_LOGGER.debug("Performing the initial update to obtain sysinfo")
|
_LOGGER.debug("Performing the initial update to obtain sysinfo")
|
||||||
response = await self.protocol.query(req)
|
response = await self.protocol.query(req)
|
||||||
self._last_update = response
|
self._last_update = response
|
||||||
@ -452,7 +452,9 @@ class IotDevice(Device):
|
|||||||
# This allows setting of some info properties directly
|
# This allows setting of some info properties directly
|
||||||
# from partial discovery info that will then be found
|
# from partial discovery info that will then be found
|
||||||
# by the requires_update decorator
|
# by the requires_update decorator
|
||||||
self._set_sys_info(info)
|
discovery_model = info["device_model"]
|
||||||
|
no_region_model, _, _ = discovery_model.partition("(")
|
||||||
|
self._set_sys_info({**info, "model": no_region_model})
|
||||||
|
|
||||||
def _set_sys_info(self, sys_info: dict[str, Any]) -> None:
|
def _set_sys_info(self, sys_info: dict[str, Any]) -> None:
|
||||||
"""Set sys_info."""
|
"""Set sys_info."""
|
||||||
@ -471,18 +473,13 @@ class IotDevice(Device):
|
|||||||
"""
|
"""
|
||||||
return self._sys_info # type: ignore
|
return self._sys_info # type: ignore
|
||||||
|
|
||||||
@property # type: ignore
|
|
||||||
@requires_update
|
|
||||||
def model(self) -> str:
|
|
||||||
"""Return device model."""
|
|
||||||
sys_info = self._sys_info
|
|
||||||
return str(sys_info["model"])
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@requires_update
|
@requires_update
|
||||||
def _model_region(self) -> str:
|
def model(self) -> str:
|
||||||
"""Return device full model name and region."""
|
"""Returns the device model."""
|
||||||
return self.model
|
if self._last_update:
|
||||||
|
return self.device_info.short_name
|
||||||
|
return self._sys_info["model"]
|
||||||
|
|
||||||
@property # type: ignore
|
@property # type: ignore
|
||||||
def alias(self) -> str | None:
|
def alias(self) -> str | None:
|
||||||
@ -748,7 +745,7 @@ class IotDevice(Device):
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_device_info(
|
def _get_device_info(
|
||||||
info: dict[str, Any], discovery_info: dict[str, Any] | None
|
info: dict[str, Any], discovery_info: dict[str, Any] | None
|
||||||
) -> _DeviceInfo:
|
) -> DeviceInfo:
|
||||||
"""Get model information for a device."""
|
"""Get model information for a device."""
|
||||||
sys_info = _extract_sys_info(info)
|
sys_info = _extract_sys_info(info)
|
||||||
|
|
||||||
@ -766,7 +763,7 @@ class IotDevice(Device):
|
|||||||
firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1)
|
firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1)
|
||||||
auth = bool(discovery_info and ("mgt_encrypt_schm" in discovery_info))
|
auth = bool(discovery_info and ("mgt_encrypt_schm" in discovery_info))
|
||||||
|
|
||||||
return _DeviceInfo(
|
return DeviceInfo(
|
||||||
short_name=long_name,
|
short_name=long_name,
|
||||||
long_name=long_name,
|
long_name=long_name,
|
||||||
brand="kasa",
|
brand="kasa",
|
||||||
|
@ -6,6 +6,7 @@ import logging
|
|||||||
import time
|
import time
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
from ..device import DeviceInfo
|
||||||
from ..device_type import DeviceType
|
from ..device_type import DeviceType
|
||||||
from ..deviceconfig import DeviceConfig
|
from ..deviceconfig import DeviceConfig
|
||||||
from ..protocols.smartprotocol import SmartProtocol, _ChildProtocolWrapper
|
from ..protocols.smartprotocol import SmartProtocol, _ChildProtocolWrapper
|
||||||
@ -50,6 +51,22 @@ class SmartChildDevice(SmartDevice):
|
|||||||
self._components_raw = component_info_raw
|
self._components_raw = component_info_raw
|
||||||
self._components = self._parse_components(self._components_raw)
|
self._components = self._parse_components(self._components_raw)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def device_info(self) -> DeviceInfo:
|
||||||
|
"""Return device info.
|
||||||
|
|
||||||
|
Child device does not have it info and components in _last_update so
|
||||||
|
this overrides the base implementation to call _get_device_info with
|
||||||
|
info and components combined as they would be in _last_update.
|
||||||
|
"""
|
||||||
|
return self._get_device_info(
|
||||||
|
{
|
||||||
|
"get_device_info": self._info,
|
||||||
|
"component_nego": self._components_raw,
|
||||||
|
},
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
|
||||||
async def update(self, update_children: bool = True) -> None:
|
async def update(self, update_children: bool = True) -> None:
|
||||||
"""Update child module info.
|
"""Update child module info.
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ from collections.abc import Mapping, Sequence
|
|||||||
from datetime import UTC, datetime, timedelta, tzinfo
|
from datetime import UTC, datetime, timedelta, tzinfo
|
||||||
from typing import TYPE_CHECKING, Any, TypeAlias, cast
|
from typing import TYPE_CHECKING, Any, TypeAlias, cast
|
||||||
|
|
||||||
from ..device import Device, WifiNetwork, _DeviceInfo
|
from ..device import Device, DeviceInfo, WifiNetwork
|
||||||
from ..device_type import DeviceType
|
from ..device_type import DeviceType
|
||||||
from ..deviceconfig import DeviceConfig
|
from ..deviceconfig import DeviceConfig
|
||||||
from ..exceptions import AuthenticationError, DeviceError, KasaException, SmartErrorCode
|
from ..exceptions import AuthenticationError, DeviceError, KasaException, SmartErrorCode
|
||||||
@ -69,7 +69,6 @@ class SmartDevice(Device):
|
|||||||
self._modules: dict[str | ModuleName[Module], SmartModule] = {}
|
self._modules: dict[str | ModuleName[Module], SmartModule] = {}
|
||||||
self._parent: SmartDevice | None = None
|
self._parent: SmartDevice | None = None
|
||||||
self._children: Mapping[str, SmartDevice] = {}
|
self._children: Mapping[str, SmartDevice] = {}
|
||||||
self._last_update = {}
|
|
||||||
self._last_update_time: float | None = None
|
self._last_update_time: float | None = None
|
||||||
self._on_since: datetime | None = None
|
self._on_since: datetime | None = None
|
||||||
self._info: dict[str, Any] = {}
|
self._info: dict[str, Any] = {}
|
||||||
@ -497,18 +496,13 @@ class SmartDevice(Device):
|
|||||||
@property
|
@property
|
||||||
def model(self) -> str:
|
def model(self) -> str:
|
||||||
"""Returns the device model."""
|
"""Returns the device model."""
|
||||||
return str(self._info.get("model"))
|
# If update hasn't been called self._device_info can't be used
|
||||||
|
if self._last_update:
|
||||||
|
return self.device_info.short_name
|
||||||
|
|
||||||
@property
|
disco_model = str(self._info.get("device_model"))
|
||||||
def _model_region(self) -> str:
|
long_name, _, _ = disco_model.partition("(")
|
||||||
"""Return device full model name and region."""
|
return long_name
|
||||||
if (disco := self._discovery_info) and (
|
|
||||||
disco_model := disco.get("device_model")
|
|
||||||
):
|
|
||||||
return disco_model
|
|
||||||
# Some devices have the region in the specs element.
|
|
||||||
region = f"({specs})" if (specs := self._info.get("specs")) else ""
|
|
||||||
return f"{self.model}{region}"
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def alias(self) -> str | None:
|
def alias(self) -> str | None:
|
||||||
@ -808,7 +802,7 @@ class SmartDevice(Device):
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_device_info(
|
def _get_device_info(
|
||||||
info: dict[str, Any], discovery_info: dict[str, Any] | None
|
info: dict[str, Any], discovery_info: dict[str, Any] | None
|
||||||
) -> _DeviceInfo:
|
) -> DeviceInfo:
|
||||||
"""Get model information for a device."""
|
"""Get model information for a device."""
|
||||||
di = info["get_device_info"]
|
di = info["get_device_info"]
|
||||||
components = [comp["id"] for comp in info["component_nego"]["component_list"]]
|
components = [comp["id"] for comp in info["component_nego"]["component_list"]]
|
||||||
@ -837,7 +831,7 @@ class SmartDevice(Device):
|
|||||||
# Brand inferred from SMART.KASAPLUG/SMART.TAPOPLUG etc.
|
# Brand inferred from SMART.KASAPLUG/SMART.TAPOPLUG etc.
|
||||||
brand = devicetype[:4].lower()
|
brand = devicetype[:4].lower()
|
||||||
|
|
||||||
return _DeviceInfo(
|
return DeviceInfo(
|
||||||
short_name=short_name,
|
short_name=short_name,
|
||||||
long_name=long_name,
|
long_name=long_name,
|
||||||
brand=brand,
|
brand=brand,
|
||||||
|
@ -5,7 +5,7 @@ from __future__ import annotations
|
|||||||
import logging
|
import logging
|
||||||
from typing import Any, cast
|
from typing import Any, cast
|
||||||
|
|
||||||
from ..device import _DeviceInfo
|
from ..device import DeviceInfo
|
||||||
from ..device_type import DeviceType
|
from ..device_type import DeviceType
|
||||||
from ..module import Module
|
from ..module import Module
|
||||||
from ..protocols.smartcamprotocol import _ChildCameraProtocolWrapper
|
from ..protocols.smartcamprotocol import _ChildCameraProtocolWrapper
|
||||||
@ -37,7 +37,7 @@ class SmartCamDevice(SmartDevice):
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_device_info(
|
def _get_device_info(
|
||||||
info: dict[str, Any], discovery_info: dict[str, Any] | None
|
info: dict[str, Any], discovery_info: dict[str, Any] | None
|
||||||
) -> _DeviceInfo:
|
) -> DeviceInfo:
|
||||||
"""Get model information for a device."""
|
"""Get model information for a device."""
|
||||||
basic_info = info["getDeviceInfo"]["device_info"]["basic_info"]
|
basic_info = info["getDeviceInfo"]["device_info"]["basic_info"]
|
||||||
short_name = basic_info["device_model"]
|
short_name = basic_info["device_model"]
|
||||||
@ -45,7 +45,7 @@ class SmartCamDevice(SmartDevice):
|
|||||||
device_type = SmartCamDevice._get_device_type_from_sysinfo(basic_info)
|
device_type = SmartCamDevice._get_device_type_from_sysinfo(basic_info)
|
||||||
fw_version_full = basic_info["sw_version"]
|
fw_version_full = basic_info["sw_version"]
|
||||||
firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1)
|
firmware_version, firmware_build = fw_version_full.split(" ", maxsplit=1)
|
||||||
return _DeviceInfo(
|
return DeviceInfo(
|
||||||
short_name=basic_info["device_model"],
|
short_name=basic_info["device_model"],
|
||||||
long_name=long_name,
|
long_name=long_name,
|
||||||
brand="tapo",
|
brand="tapo",
|
||||||
@ -248,8 +248,8 @@ class SmartCamDevice(SmartDevice):
|
|||||||
def hw_info(self) -> dict:
|
def hw_info(self) -> dict:
|
||||||
"""Return hardware info for the device."""
|
"""Return hardware info for the device."""
|
||||||
return {
|
return {
|
||||||
"sw_ver": self._info.get("hw_ver"),
|
"sw_ver": self._info.get("fw_ver"),
|
||||||
"hw_ver": self._info.get("fw_ver"),
|
"hw_ver": self._info.get("hw_ver"),
|
||||||
"mac": self._info.get("mac"),
|
"mac": self._info.get("mac"),
|
||||||
"type": self._info.get("type"),
|
"type": self._info.get("type"),
|
||||||
"hwId": self._info.get("hwId"),
|
"hwId": self._info.get("hwId"),
|
||||||
|
@ -473,8 +473,12 @@ def get_nearest_fixture_to_ip(dev):
|
|||||||
assert protocol_fixtures, "Unknown device type"
|
assert protocol_fixtures, "Unknown device type"
|
||||||
|
|
||||||
# This will get the best fixture with a match on model region
|
# This will get the best fixture with a match on model region
|
||||||
if model_region_fixtures := filter_fixtures(
|
if (di := dev.device_info) and (
|
||||||
"", model_filter={dev._model_region}, fixture_list=protocol_fixtures
|
model_region_fixtures := filter_fixtures(
|
||||||
|
"",
|
||||||
|
model_filter={di.long_name + (f"({di.region})" if di.region else "")},
|
||||||
|
fixture_list=protocol_fixtures,
|
||||||
|
)
|
||||||
):
|
):
|
||||||
return next(iter(model_region_fixtures))
|
return next(iter(model_region_fixtures))
|
||||||
|
|
||||||
|
@ -99,7 +99,7 @@ async def test_invalid_connection(mocker, dev):
|
|||||||
@has_emeter_iot
|
@has_emeter_iot
|
||||||
async def test_initial_update_emeter(dev, mocker):
|
async def test_initial_update_emeter(dev, mocker):
|
||||||
"""Test that the initial update performs second query if emeter is available."""
|
"""Test that the initial update performs second query if emeter is available."""
|
||||||
dev._last_update = None
|
dev._last_update = {}
|
||||||
dev._legacy_features = set()
|
dev._legacy_features = set()
|
||||||
spy = mocker.spy(dev.protocol, "query")
|
spy = mocker.spy(dev.protocol, "query")
|
||||||
await dev.update()
|
await dev.update()
|
||||||
@ -111,7 +111,7 @@ async def test_initial_update_emeter(dev, mocker):
|
|||||||
@no_emeter_iot
|
@no_emeter_iot
|
||||||
async def test_initial_update_no_emeter(dev, mocker):
|
async def test_initial_update_no_emeter(dev, mocker):
|
||||||
"""Test that the initial update performs second query if emeter is available."""
|
"""Test that the initial update performs second query if emeter is available."""
|
||||||
dev._last_update = None
|
dev._last_update = {}
|
||||||
dev._legacy_features = set()
|
dev._legacy_features = set()
|
||||||
spy = mocker.spy(dev.protocol, "query")
|
spy = mocker.spy(dev.protocol, "query")
|
||||||
await dev.update()
|
await dev.update()
|
||||||
|
@ -62,11 +62,14 @@ async def test_device_type_no_update(discovery_mock, caplog: pytest.LogCaptureFi
|
|||||||
assert repr(dev) == f"<DeviceType.Unknown at {DISCOVERY_MOCK_IP} - update() needed>"
|
assert repr(dev) == f"<DeviceType.Unknown at {DISCOVERY_MOCK_IP} - update() needed>"
|
||||||
|
|
||||||
discovery_result = copy.deepcopy(discovery_mock.discovery_data["result"])
|
discovery_result = copy.deepcopy(discovery_mock.discovery_data["result"])
|
||||||
|
|
||||||
|
disco_model = discovery_result["device_model"]
|
||||||
|
short_model, _, _ = disco_model.partition("(")
|
||||||
dev.update_from_discover_info(discovery_result)
|
dev.update_from_discover_info(discovery_result)
|
||||||
assert dev.device_type is DeviceType.Unknown
|
assert dev.device_type is DeviceType.Unknown
|
||||||
assert (
|
assert (
|
||||||
repr(dev)
|
repr(dev)
|
||||||
== f"<DeviceType.Unknown at {DISCOVERY_MOCK_IP} - None (None) - update() needed>"
|
== f"<DeviceType.Unknown at {DISCOVERY_MOCK_IP} - None ({short_model}) - update() needed>"
|
||||||
)
|
)
|
||||||
discovery_result["device_type"] = "SMART.FOOBAR"
|
discovery_result["device_type"] = "SMART.FOOBAR"
|
||||||
dev.update_from_discover_info(discovery_result)
|
dev.update_from_discover_info(discovery_result)
|
||||||
@ -74,7 +77,7 @@ async def test_device_type_no_update(discovery_mock, caplog: pytest.LogCaptureFi
|
|||||||
assert dev.device_type is DeviceType.Plug
|
assert dev.device_type is DeviceType.Plug
|
||||||
assert (
|
assert (
|
||||||
repr(dev)
|
repr(dev)
|
||||||
== f"<DeviceType.Plug at {DISCOVERY_MOCK_IP} - None (None) - update() needed>"
|
== f"<DeviceType.Plug at {DISCOVERY_MOCK_IP} - None ({short_model}) - update() needed>"
|
||||||
)
|
)
|
||||||
assert "Unknown device type, falling back to plug" in caplog.text
|
assert "Unknown device type, falling back to plug" in caplog.text
|
||||||
|
|
||||||
|
@ -390,13 +390,12 @@ async def test_device_update_from_new_discovery_info(discovery_mock):
|
|||||||
device_class = Discover._get_device_class(discovery_data)
|
device_class = Discover._get_device_class(discovery_data)
|
||||||
device = device_class("127.0.0.1")
|
device = device_class("127.0.0.1")
|
||||||
discover_info = DiscoveryResult.from_dict(discovery_data["result"])
|
discover_info = DiscoveryResult.from_dict(discovery_data["result"])
|
||||||
discover_dump = discover_info.to_dict()
|
|
||||||
model, _, _ = discover_dump["device_model"].partition("(")
|
|
||||||
discover_dump["model"] = model
|
|
||||||
device.update_from_discover_info(discover_dump)
|
|
||||||
|
|
||||||
assert device.mac == discover_dump["mac"].replace("-", ":")
|
device.update_from_discover_info(discovery_data["result"])
|
||||||
assert device.model == model
|
|
||||||
|
assert device.mac == discover_info.mac.replace("-", ":")
|
||||||
|
no_region_model, _, _ = discover_info.device_model.partition("(")
|
||||||
|
assert device.model == no_region_model
|
||||||
|
|
||||||
# TODO implement requires_update for SmartDevice
|
# TODO implement requires_update for SmartDevice
|
||||||
if isinstance(device, IotDevice):
|
if isinstance(device, IotDevice):
|
||||||
|
@ -19,7 +19,7 @@ def test_bulb_examples(mocker):
|
|||||||
assert not res["failed"]
|
assert not res["failed"]
|
||||||
|
|
||||||
|
|
||||||
def test_smartdevice_examples(mocker):
|
def test_iotdevice_examples(mocker):
|
||||||
"""Use HS110 for emeter examples."""
|
"""Use HS110 for emeter examples."""
|
||||||
p = asyncio.run(get_device_for_fixture_protocol("HS110(EU)_1.0_1.2.5.json", "IOT"))
|
p = asyncio.run(get_device_for_fixture_protocol("HS110(EU)_1.0_1.2.5.json", "IOT"))
|
||||||
asyncio.run(p.set_alias("Bedroom Lamp Plug"))
|
asyncio.run(p.set_alias("Bedroom Lamp Plug"))
|
||||||
|
Loading…
Reference in New Issue
Block a user