Refactor devices into subpackages and deprecate old names (#716)

* Refactor devices into subpackages and deprecate old names

* Tweak and add tests

* Fix linting

* Remove duplicate implementations affecting project coverage

* Update post review

* Add device base class attributes and rename subclasses

* Rename Module to BaseModule

* Remove has_emeter_history

* Fix missing _time in init

* Update post review

* Fix test_readmeexamples

* Fix erroneously duped files

* Clean up iot and smart imports

* Update post latest review

* Tweak Device docstring
This commit is contained in:
Steven B
2024-02-04 15:20:08 +00:00
committed by GitHub
parent 6afd05be59
commit 0d119e63d0
49 changed files with 1046 additions and 606 deletions

7
kasa/smart/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
"""Package for supporting tapo-branded and newer kasa devices."""
from .smartbulb import SmartBulb
from .smartchilddevice import SmartChildDevice
from .smartdevice import SmartDevice
from .smartplug import SmartPlug
__all__ = ["SmartDevice", "SmartPlug", "SmartBulb", "SmartChildDevice"]

276
kasa/smart/smartbulb.py Normal file
View File

@@ -0,0 +1,276 @@
"""Module for tapo-branded smart bulbs (L5**)."""
from typing import Any, Dict, List, Optional
from ..bulb import Bulb
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..exceptions import SmartDeviceException
from ..iot.iotbulb import HSV, BulbPreset, ColorTempRange
from ..smartprotocol import SmartProtocol
from .smartdevice import SmartDevice
AVAILABLE_EFFECTS = {
"L1": "Party",
"L2": "Relax",
}
class SmartBulb(SmartDevice, Bulb):
"""Representation of a TP-Link Tapo Bulb.
Documentation TBD. See :class:`~kasa.iot.Bulb` for now.
"""
def __init__(
self,
host: str,
*,
config: Optional[DeviceConfig] = None,
protocol: Optional[SmartProtocol] = None,
) -> None:
super().__init__(host=host, config=config, protocol=protocol)
self._device_type = DeviceType.Bulb
@property
def is_color(self) -> bool:
"""Whether the bulb supports color changes."""
# TODO: this makes an assumption that only color bulbs report this
return "hue" in self._info
@property
def is_dimmable(self) -> bool:
"""Whether the bulb supports brightness changes."""
# TODO: this makes an assumption that only dimmables report this
return "brightness" in self._info
@property
def is_variable_color_temp(self) -> bool:
"""Whether the bulb supports color temperature changes."""
ct = self._info.get("color_temp_range")
# L900 reports [9000, 9000] even when it doesn't support changing the ct
return ct is not None and ct[0] != ct[1]
@property
def valid_temperature_range(self) -> ColorTempRange:
"""Return the device-specific white temperature range (in Kelvin).
:return: White temperature range in Kelvin (minimum, maximum)
"""
if not self.is_variable_color_temp:
raise SmartDeviceException("Color temperature not supported")
ct_range = self._info.get("color_temp_range", [0, 0])
return ColorTempRange(min=ct_range[0], max=ct_range[1])
@property
def has_effects(self) -> bool:
"""Return True if the device supports effects."""
return "dynamic_light_effect_enable" in self._info
@property
def effect(self) -> Dict:
"""Return effect state.
This follows the format used by SmartLightStrip.
Example:
{'brightness': 50,
'custom': 0,
'enable': 0,
'id': '',
'name': ''}
"""
# If no effect is active, dynamic_light_effect_id does not appear in info
current_effect = self._info.get("dynamic_light_effect_id", "")
data = {
"brightness": self.brightness,
"enable": current_effect != "",
"id": current_effect,
"name": AVAILABLE_EFFECTS.get(current_effect, ""),
}
return data
@property
def effect_list(self) -> Optional[List[str]]:
"""Return built-in effects list.
Example:
['Party', 'Relax', ...]
"""
return list(AVAILABLE_EFFECTS.keys()) if self.has_effects else None
@property
def hsv(self) -> HSV:
"""Return the current HSV state of the bulb.
:return: hue, saturation and value (degrees, %, %)
"""
if not self.is_color:
raise SmartDeviceException("Bulb does not support color.")
h, s, v = (
self._info.get("hue", 0),
self._info.get("saturation", 0),
self._info.get("brightness", 0),
)
return HSV(hue=h, saturation=s, value=v)
@property
def color_temp(self) -> int:
"""Whether the bulb supports color temperature changes."""
if not self.is_variable_color_temp:
raise SmartDeviceException("Bulb does not support colortemp.")
return self._info.get("color_temp", -1)
@property
def brightness(self) -> int:
"""Return the current brightness in percentage."""
if not self.is_dimmable: # pragma: no cover
raise SmartDeviceException("Bulb is not dimmable.")
return self._info.get("brightness", -1)
async def set_hsv(
self,
hue: int,
saturation: int,
value: Optional[int] = None,
*,
transition: Optional[int] = None,
) -> Dict:
"""Set new HSV.
Note, transition is not supported and will be ignored.
:param int hue: hue in degrees
:param int saturation: saturation in percentage [0,100]
:param int value: value in percentage [0, 100]
:param int transition: transition in milliseconds.
"""
if not self.is_color:
raise SmartDeviceException("Bulb does not support color.")
if not isinstance(hue, int) or not (0 <= hue <= 360):
raise ValueError(f"Invalid hue value: {hue} (valid range: 0-360)")
if not isinstance(saturation, int) or not (0 <= saturation <= 100):
raise ValueError(
f"Invalid saturation value: {saturation} (valid range: 0-100%)"
)
if value is not None:
self._raise_for_invalid_brightness(value)
request_payload = {
"color_temp": 0, # If set, color_temp takes precedence over hue&sat
"hue": hue,
"saturation": saturation,
}
# The device errors on invalid brightness values.
if value is not None:
request_payload["brightness"] = value
return await self.protocol.query({"set_device_info": {**request_payload}})
async def set_color_temp(
self, temp: int, *, brightness=None, transition: Optional[int] = None
) -> Dict:
"""Set the color temperature of the device in kelvin.
Note, transition is not supported and will be ignored.
:param int temp: The new color temperature, in Kelvin
:param int transition: transition in milliseconds.
"""
# TODO: Note, trying to set brightness at the same time
# with color_temp causes error -1008
if not self.is_variable_color_temp:
raise SmartDeviceException("Bulb does not support colortemp.")
valid_temperature_range = self.valid_temperature_range
if temp < valid_temperature_range[0] or temp > valid_temperature_range[1]:
raise ValueError(
"Temperature should be between {} and {}, was {}".format(
*valid_temperature_range, temp
)
)
return await self.protocol.query({"set_device_info": {"color_temp": temp}})
async def set_brightness(
self, brightness: int, *, transition: Optional[int] = None
) -> Dict:
"""Set the brightness in percentage.
Note, transition is not supported and will be ignored.
:param int brightness: brightness in percent
:param int transition: transition in milliseconds.
"""
if not self.is_dimmable: # pragma: no cover
raise SmartDeviceException("Bulb is not dimmable.")
return await self.protocol.query(
{"set_device_info": {"brightness": brightness}}
)
# Default state information, should be made to settings
"""
"info": {
"default_states": {
"re_power_type": "always_on",
"type": "last_states",
"state": {
"brightness": 36,
"hue": 0,
"saturation": 0,
"color_temp": 2700,
},
},
"""
async def set_effect(
self,
effect: str,
*,
brightness: Optional[int] = None,
transition: Optional[int] = None,
) -> None:
"""Set an effect on the device."""
raise NotImplementedError()
# TODO: the code below does to activate the effect but gives no error
return await self.protocol.query(
{
"set_device_info": {
"dynamic_light_effect_enable": 1,
"dynamic_light_effect_id": effect,
}
}
)
@property # type: ignore
def state_information(self) -> Dict[str, Any]:
"""Return bulb-specific state information."""
info: Dict[str, Any] = {
# TODO: re-enable after we don't inherit from smartbulb
# **super().state_information
"Is dimmable": self.is_dimmable,
}
if self.is_dimmable:
info["Brightness"] = self.brightness
if self.is_variable_color_temp:
info["Color temperature"] = self.color_temp
info["Valid temperature range"] = self.valid_temperature_range
if self.is_color:
info["HSV"] = self.hsv
info["Presets"] = self.presets
return info
@property
def presets(self) -> List[BulbPreset]:
"""Return a list of available bulb setting presets."""
return []

View File

@@ -0,0 +1,41 @@
"""Child device implementation."""
from typing import Optional
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..smartprotocol import SmartProtocol, _ChildProtocolWrapper
from .smartdevice import SmartDevice
class SmartChildDevice(SmartDevice):
"""Presentation of a child device.
This wraps the protocol communications and sets internal data for the child.
"""
def __init__(
self,
parent: SmartDevice,
child_id: str,
config: Optional[DeviceConfig] = None,
protocol: Optional[SmartProtocol] = None,
) -> None:
super().__init__(parent.host, config=parent.config, protocol=parent.protocol)
self._parent = parent
self._id = child_id
self.protocol = _ChildProtocolWrapper(child_id, parent.protocol)
# TODO: remove the assignment after modularization is done,
# currently required to allow accessing time-related properties
self._time = parent._time
self._device_type = DeviceType.StripSocket
async def update(self, update_children: bool = True):
"""Noop update. The parent updates our internals."""
def update_internal_state(self, info):
"""Set internal state for the child."""
# TODO: cleanup the _last_update, _sys_info, _info, _data mess.
self._last_update = self._sys_info = self._info = info
def __repr__(self):
return f"<ChildDevice {self.alias} of {self._parent}>"

417
kasa/smart/smartdevice.py Normal file
View File

@@ -0,0 +1,417 @@
"""Module for a SMART device."""
import base64
import logging
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Set, cast
from ..aestransport import AesTransport
from ..device import Device, WifiNetwork
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..emeterstatus import EmeterStatus
from ..exceptions import AuthenticationException, SmartDeviceException
from ..smartprotocol import SmartProtocol
_LOGGER = logging.getLogger(__name__)
if TYPE_CHECKING:
from .smartchilddevice import SmartChildDevice
class SmartDevice(Device):
"""Base class to represent a SMART protocol based device."""
def __init__(
self,
host: str,
*,
config: Optional[DeviceConfig] = None,
protocol: Optional[SmartProtocol] = None,
) -> None:
_protocol = protocol or SmartProtocol(
transport=AesTransport(config=config or DeviceConfig(host=host)),
)
super().__init__(host=host, config=config, protocol=_protocol)
self.protocol: SmartProtocol
self._components_raw: Optional[Dict[str, Any]] = None
self._components: Dict[str, int] = {}
self._children: Dict[str, "SmartChildDevice"] = {}
self._energy: Dict[str, Any] = {}
self._state_information: Dict[str, Any] = {}
self._time: Dict[str, Any] = {}
async def _initialize_children(self):
"""Initialize children for power strips."""
children = self._last_update["child_info"]["child_device_list"]
# TODO: Use the type information to construct children,
# as hubs can also have them.
from .smartchilddevice import SmartChildDevice
self._children = {
child["device_id"]: SmartChildDevice(
parent=self, child_id=child["device_id"]
)
for child in children
}
self._device_type = DeviceType.Strip
@property
def children(self) -> Sequence["SmartDevice"]:
"""Return list of children."""
return list(self._children.values())
async def update(self, update_children: bool = True):
"""Update the device."""
if self.credentials is None and self.credentials_hash is None:
raise AuthenticationException("Tapo plug requires authentication.")
if self._components_raw is None:
resp = await self.protocol.query("component_nego")
self._components_raw = resp["component_nego"]
self._components = {
comp["id"]: comp["ver_code"]
for comp in self._components_raw["component_list"]
}
await self._initialize_modules()
extra_reqs: Dict[str, Any] = {}
if "child_device" in self._components:
extra_reqs = {**extra_reqs, "get_child_device_list": None}
if "energy_monitoring" in self._components:
extra_reqs = {
**extra_reqs,
"get_energy_usage": None,
"get_current_power": None,
}
req = {
"get_device_info": None,
"get_device_usage": None,
"get_device_time": None,
**extra_reqs,
}
resp = await self.protocol.query(req)
self._info = resp["get_device_info"]
self._usage = resp["get_device_usage"]
self._time = resp["get_device_time"]
# Emeter is not always available, but we set them still for now.
self._energy = resp.get("get_energy_usage", {})
self._emeter = resp.get("get_current_power", {})
self._last_update = {
"components": self._components_raw,
"info": self._info,
"usage": self._usage,
"time": self._time,
"energy": self._energy,
"emeter": self._emeter,
"child_info": resp.get("get_child_device_list", {}),
}
if child_info := self._last_update.get("child_info"):
if not self.children:
await self._initialize_children()
for info in child_info["child_device_list"]:
self._children[info["device_id"]].update_internal_state(info)
_LOGGER.debug("Got an update: %s", self._last_update)
async def _initialize_modules(self):
"""Initialize modules based on component negotiation response."""
if "energy_monitoring" in self._components:
self.emeter_type = "emeter"
@property
def sys_info(self) -> Dict[str, Any]:
"""Returns the device info."""
return self._info # type: ignore
@property
def model(self) -> str:
"""Returns the device model."""
return str(self._info.get("model"))
@property
def alias(self) -> Optional[str]:
"""Returns the device alias or nickname."""
if self._info and (nickname := self._info.get("nickname")):
return base64.b64decode(nickname).decode()
else:
return None
@property
def time(self) -> datetime:
"""Return the time."""
td = timedelta(minutes=cast(float, self._time.get("time_diff")))
if self._time.get("region"):
tz = timezone(td, str(self._time.get("region")))
else:
# in case the device returns a blank region this will result in the
# tzname being a UTC offset
tz = timezone(td)
return datetime.fromtimestamp(
cast(float, self._time.get("timestamp")),
tz=tz,
)
@property
def timezone(self) -> Dict:
"""Return the timezone and time_difference."""
ti = self.time
return {"timezone": ti.tzname()}
@property
def hw_info(self) -> Dict:
"""Return hardware info for the device."""
return {
"sw_ver": self._info.get("fw_ver"),
"hw_ver": self._info.get("hw_ver"),
"mac": self._info.get("mac"),
"type": self._info.get("type"),
"hwId": self._info.get("device_id"),
"dev_name": self.alias,
"oemId": self._info.get("oem_id"),
}
@property
def location(self) -> Dict:
"""Return the device location."""
loc = {
"latitude": cast(float, self._info.get("latitude", 0)) / 10_000,
"longitude": cast(float, self._info.get("longitude", 0)) / 10_000,
}
return loc
@property
def rssi(self) -> Optional[int]:
"""Return the rssi."""
rssi = self._info.get("rssi")
return int(rssi) if rssi else None
@property
def mac(self) -> str:
"""Return the mac formatted with colons."""
return str(self._info.get("mac")).replace("-", ":")
@property
def device_id(self) -> str:
"""Return the device id."""
return str(self._info.get("device_id"))
@property
def internal_state(self) -> Any:
"""Return all the internal state data."""
return self._last_update
async def _query_helper(
self, method: str, params: Optional[Dict] = None, child_ids=None
) -> Any:
res = await self.protocol.query({method: params})
return res
@property
def state_information(self) -> Dict[str, Any]:
"""Return the key state information."""
ssid = self._info.get("ssid")
ssid = base64.b64decode(ssid).decode() if ssid else "No SSID"
return {
"overheated": self._info.get("overheated"),
"signal_level": self._info.get("signal_level"),
"SSID": ssid,
}
@property
def features(self) -> Set[str]:
"""Return the list of supported features."""
# TODO:
return set()
@property
def has_emeter(self) -> bool:
"""Return if the device has emeter."""
return "energy_monitoring" in self._components
@property
def is_on(self) -> bool:
"""Return true if the device is on."""
return bool(self._info.get("device_on"))
async def turn_on(self, **kwargs):
"""Turn on the device."""
await self.protocol.query({"set_device_info": {"device_on": True}})
async def turn_off(self, **kwargs):
"""Turn off the device."""
await self.protocol.query({"set_device_info": {"device_on": False}})
def update_from_discover_info(self, info):
"""Update state from info from the discover call."""
self._discovery_info = info
self._info = info
async def get_emeter_realtime(self) -> EmeterStatus:
"""Retrieve current energy readings."""
self._verify_emeter()
resp = await self.protocol.query("get_energy_usage")
self._energy = resp["get_energy_usage"]
return self.emeter_realtime
def _convert_energy_data(self, data, scale) -> Optional[float]:
"""Return adjusted emeter information."""
return data if not data else data * scale
def _verify_emeter(self) -> None:
"""Raise an exception if there is no emeter."""
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
if self.emeter_type not in self._last_update:
raise SmartDeviceException("update() required prior accessing emeter")
@property
def emeter_realtime(self) -> EmeterStatus:
"""Get the emeter status."""
return EmeterStatus(
{
"power_mw": self._energy.get("current_power"),
"total": self._convert_energy_data(
self._energy.get("today_energy"), 1 / 1000
),
}
)
@property
def emeter_this_month(self) -> Optional[float]:
"""Get the emeter value for this month."""
return self._convert_energy_data(self._energy.get("month_energy"), 1 / 1000)
@property
def emeter_today(self) -> Optional[float]:
"""Get the emeter value for today."""
return self._convert_energy_data(self._energy.get("today_energy"), 1 / 1000)
@property
def on_since(self) -> Optional[datetime]:
"""Return the time that the device was turned on or None if turned off."""
if (
not self._info.get("device_on")
or (on_time := self._info.get("on_time")) is None
):
return None
on_time = cast(float, on_time)
return datetime.now().replace(microsecond=0) - timedelta(seconds=on_time)
async def wifi_scan(self) -> List[WifiNetwork]:
"""Scan for available wifi networks."""
def _net_for_scan_info(res):
return WifiNetwork(
ssid=base64.b64decode(res["ssid"]).decode(),
cipher_type=res["cipher_type"],
key_type=res["key_type"],
channel=res["channel"],
signal_level=res["signal_level"],
bssid=res["bssid"],
)
async def _query_networks(networks=None, start_index=0):
_LOGGER.debug("Querying networks using start_index=%s", start_index)
if networks is None:
networks = []
resp = await self.protocol.query(
{"get_wireless_scan_info": {"start_index": start_index}}
)
network_list = [
_net_for_scan_info(net)
for net in resp["get_wireless_scan_info"]["ap_list"]
]
networks.extend(network_list)
if resp["get_wireless_scan_info"].get("sum", 0) > start_index + 10:
return await _query_networks(networks, start_index=start_index + 10)
return networks
return await _query_networks()
async def wifi_join(self, ssid: str, password: str, keytype: str = "wpa2_psk"):
"""Join the given wifi network.
This method returns nothing as the device tries to activate the new
settings immediately instead of responding to the request.
If joining the network fails, the device will return to the previous state
after some delay.
"""
if not self.credentials:
raise AuthenticationException("Device requires authentication.")
payload = {
"account": {
"username": base64.b64encode(
self.credentials.username.encode()
).decode(),
"password": base64.b64encode(
self.credentials.password.encode()
).decode(),
},
"wireless": {
"key_type": keytype,
"password": base64.b64encode(password.encode()).decode(),
"ssid": base64.b64encode(ssid.encode()).decode(),
},
"time": self.internal_state["time"],
}
# The device does not respond to the request but changes the settings
# immediately which causes us to timeout.
# Thus, We limit retries and suppress the raised exception as useless.
try:
return await self.protocol.query({"set_qs_info": payload}, retry_count=0)
except SmartDeviceException as ex:
if ex.error_code: # Re-raise on device-reported errors
raise
_LOGGER.debug("Received an expected for wifi join, but this is expected")
async def update_credentials(self, username: str, password: str):
"""Update device credentials.
This will replace the existing authentication credentials on the device.
"""
t = self.internal_state["time"]
payload = {
"account": {
"username": base64.b64encode(username.encode()).decode(),
"password": base64.b64encode(password.encode()).decode(),
},
"time": t,
}
return await self.protocol.query({"set_qs_info": payload})
async def set_alias(self, alias: str):
"""Set the device name (alias)."""
return await self.protocol.query(
{"set_device_info": {"nickname": base64.b64encode(alias.encode()).decode()}}
)
async def reboot(self, delay: int = 1) -> None:
"""Reboot the device.
Note that giving a delay of zero causes this to block,
as the device reboots immediately without responding to the call.
"""
await self.protocol.query({"device_reboot": {"delay": delay}})
async def factory_reset(self) -> None:
"""Reset device back to factory settings.
Note, this does not downgrade the firmware.
"""
await self.protocol.query("device_reset")

37
kasa/smart/smartplug.py Normal file
View File

@@ -0,0 +1,37 @@
"""Module for a TAPO Plug."""
import logging
from typing import Any, Dict, Optional
from ..device_type import DeviceType
from ..deviceconfig import DeviceConfig
from ..plug import Plug
from ..smartprotocol import SmartProtocol
from .smartdevice import SmartDevice
_LOGGER = logging.getLogger(__name__)
class SmartPlug(SmartDevice, Plug):
"""Class to represent a TAPO Plug."""
def __init__(
self,
host: str,
*,
config: Optional[DeviceConfig] = None,
protocol: Optional[SmartProtocol] = None,
) -> None:
super().__init__(host=host, config=config, protocol=protocol)
self._device_type = DeviceType.Plug
@property
def state_information(self) -> Dict[str, Any]:
"""Return the key state information."""
return {
**super().state_information,
**{
"On since": self.on_since,
"auto_off_status": self._info.get("auto_off_status"),
"auto_off_remain_time": self._info.get("auto_off_remain_time"),
},
}