From 00276e34b720a036b484de7e147aa040b5910920 Mon Sep 17 00:00:00 2001 From: Teemu R Date: Mon, 16 Mar 2020 14:52:40 +0100 Subject: [PATCH] Move child socket handling to its own SmartStripPlug class (#26) * All child device handling is moved out from the main smartdevice class, which simplifies the code. * This will also cleanup the constructors as only the subdevices require the ID and the parent reference. * SmartStripPlug offers SmartPlug like interface, but does not allow separate updates * Trying to update() on the children will cause a warning. --- kasa/cli.py | 4 +- kasa/smartbulb.py | 4 +- kasa/smartdevice.py | 44 +++-------- kasa/smartplug.py | 29 +------ kasa/smartstrip.py | 167 +++++++++++++++++++++++++++++++++++++++-- kasa/tests/conftest.py | 2 +- 6 files changed, 177 insertions(+), 73 deletions(-) diff --git a/kasa/cli.py b/kasa/cli.py index 0aeedb26..dc3d9666 100755 --- a/kasa/cli.py +++ b/kasa/cli.py @@ -2,8 +2,8 @@ import asyncio import json import logging -import re from pprint import pformat as pf +import re import click @@ -112,7 +112,7 @@ def dump_discover(ctx, scrub): if key in ["latitude_i", "longitude_i"]: val = 0 else: - val = re.sub("\w", "0", val) + val = re.sub(r"\w", "0", val) dev["system"]["get_sysinfo"][key] = val model = dev["system"]["get_sysinfo"]["model"] diff --git a/kasa/smartbulb.py b/kasa/smartbulb.py index c3a1a528..401214cd 100644 --- a/kasa/smartbulb.py +++ b/kasa/smartbulb.py @@ -71,8 +71,8 @@ class SmartBulb(SmartDevice): LIGHT_SERVICE = "smartlife.iot.smartbulb.lightingservice" - def __init__(self, host: str, *, child_id: str = None, cache_ttl: int = 3) -> None: - SmartDevice.__init__(self, host=host, child_id=child_id, cache_ttl=cache_ttl) + def __init__(self, host: str, *, cache_ttl: int = 3) -> None: + SmartDevice.__init__(self, host=host, cache_ttl=cache_ttl) self.emeter_type = "smartlife.iot.common.emeter" self._device_type = DeviceType.Bulb self._light_state = None diff --git a/kasa/smartdevice.py b/kasa/smartdevice.py index 450ce1d7..7cb39bf8 100755 --- a/kasa/smartdevice.py +++ b/kasa/smartdevice.py @@ -11,12 +11,12 @@ Stroetmann which is licensed under the Apache License, Version 2.0. You may obtain a copy of the license at http://www.apache.org/licenses/LICENSE-2.0 """ -import functools -import inspect -import logging from collections import defaultdict from datetime import datetime, timedelta from enum import Enum +import functools +import inspect +import logging from typing import Any, Dict, Optional from kasa.protocol import TPLinkSmartHomeProtocol @@ -103,7 +103,7 @@ def requires_update(f): class SmartDevice: """Base class for all supported device types.""" - def __init__(self, host: str, *, child_id: str = None, cache_ttl: int = 3) -> None: + def __init__(self, host: str, *, cache_ttl: int = 3) -> None: """Create a new SmartDevice instance. :param str host: host name or ip address on which the device listens @@ -113,17 +113,11 @@ class SmartDevice: self.protocol = TPLinkSmartHomeProtocol() self.emeter_type = "emeter" - self.child_id = child_id self.cache_ttl = timedelta(seconds=cache_ttl) - _LOGGER.debug( - "Initializing %s using child_id %s and cache ttl %s", - self.host, - self.child_id, - self.cache_ttl, - ) + _LOGGER.debug("Initializing %s with cache ttl %s", self.host, self.cache_ttl) self.cache = defaultdict(lambda: defaultdict(lambda: None)) # type: ignore self._device_type = DeviceType.Unknown - self._sys_info = None + self._sys_info: Optional[Dict] = None def _result_from_cache(self, target, cmd) -> Optional[Dict]: """Return query result from cache if still fresh. @@ -162,7 +156,7 @@ class SmartDevice: self.cache[target][cmd]["last_updated"] = datetime.utcnow() async def _query_helper( - self, target: str, cmd: str, arg: Optional[Dict] = None + self, target: str, cmd: str, arg: Optional[Dict] = None, child_ids=None ) -> Any: """Handle result unwrapping and error handling. @@ -174,8 +168,8 @@ class SmartDevice: :raises SmartDeviceException: if command was not executed correctly """ request: Dict[str, Any] = {target: {cmd: arg}} - if self.child_id is not None: - request = {"context": {"child_ids": [self.child_id]}, target: {cmd: arg}} + if child_ids is not None: + request = {"context": {"child_ids": child_ids}, target: {cmd: arg}} try: response = self._result_from_cache(target, cmd) @@ -204,16 +198,6 @@ class SmartDevice: return result - def _get_child_info(self) -> Dict: - """Return the child information dict, if available. - - :raises SmartDeviceException: if there is no child or it cannot be found. - """ - for plug in self.sys_info["children"]: - if plug["id"] == self.child_id: - return plug - raise SmartDeviceException("Unable to find children %s") - def has_emeter(self) -> bool: """Return if device has an energy meter. @@ -614,11 +598,8 @@ class SmartDevice: def device_id(self) -> str: """Return unique ID for the device. - For regular devices this is the MAC address of the device, - for child devices a combination of MAC and child's ID. + This is the MAC address of the device. """ - if self.is_child_device: - return f"{self.mac}_{self.child_id}" return self.mac @property @@ -651,11 +632,6 @@ class SmartDevice: """Return True if the device supports color temperature.""" return False - @property - def is_child_device(self) -> bool: - """Return True if the device is a child device of another device.""" - return self.child_id is not None - def __repr__(self): return "<{} model {} at {} ({}), is_on: {} - dev specific: {}>".format( self.__class__.__name__, diff --git a/kasa/smartplug.py b/kasa/smartplug.py index a6537513..949bc319 100644 --- a/kasa/smartplug.py +++ b/kasa/smartplug.py @@ -35,8 +35,8 @@ class SmartPlug(SmartDevice): and should be handled by the user of the library. """ - def __init__(self, host: str, *, child_id: str = None, cache_ttl: int = 3) -> None: - SmartDevice.__init__(self, host, child_id=child_id, cache_ttl=cache_ttl) + def __init__(self, host: str, *, cache_ttl: int = 3) -> None: + SmartDevice.__init__(self, host, cache_ttl=cache_ttl) self.emeter_type = "emeter" self._device_type = DeviceType.Plug @@ -81,20 +81,6 @@ class SmartPlug(SmartDevice): else: raise ValueError("Brightness value %s is not valid." % value) - @property # type: ignore - @requires_update - def alias(self) -> str: - """Return device name (alias). - - :return: Device name aka alias. - :rtype: str - """ - if self.is_child_device: - info = self._get_child_info() - return info["alias"] - else: - return super().alias - @property # type: ignore @requires_update def is_dimmable(self): @@ -125,10 +111,6 @@ class SmartPlug(SmartDevice): :return: True if device is on, False otherwise """ - if self.is_child_device: - info = self._get_child_info() - return info["state"] - sys_info = self.sys_info return bool(sys_info["relay_state"]) @@ -176,12 +158,7 @@ class SmartPlug(SmartDevice): :return: datetime for on since :rtype: datetime """ - sys_info = self.sys_info - if self.is_child_device: - info = self._get_child_info() - on_time = info["on_time"] - else: - on_time = sys_info["on_time"] + on_time = self.sys_info["on_time"] return datetime.datetime.now() - datetime.timedelta(seconds=on_time) diff --git a/kasa/smartstrip.py b/kasa/smartstrip.py index baeeb7f6..36811b3a 100755 --- a/kasa/smartstrip.py +++ b/kasa/smartstrip.py @@ -2,18 +2,23 @@ .. todo:: describe how this interfaces with single plugs. """ +from collections import defaultdict import datetime import logging -from collections import defaultdict -from typing import Any, DefaultDict, Dict, List +from typing import Any, DefaultDict, Dict, List, Optional -from kasa.smartdevice import DeviceType, requires_update +from kasa.smartdevice import ( + DeviceType, + SmartDevice, + SmartDeviceException, + requires_update, +) from kasa.smartplug import SmartPlug _LOGGER = logging.getLogger(__name__) -class SmartStrip(SmartPlug): +class SmartStrip(SmartDevice): """Representation of a TP-Link Smart Power Strip. Usage example when used as library: @@ -40,11 +45,15 @@ class SmartStrip(SmartPlug): and should be handled by the user of the library. """ + def has_emeter(self) -> bool: + """Return True as strips has always an emeter.""" + return True + def __init__(self, host: str, *, cache_ttl: int = 3) -> None: - SmartPlug.__init__(self, host=host, cache_ttl=cache_ttl) + SmartDevice.__init__(self, host=host, cache_ttl=cache_ttl) self.emeter_type = "emeter" self._device_type = DeviceType.Strip - self.plugs: List[SmartPlug] = [] + self.plugs: List[SmartStripPlug] = [] @property # type: ignore @requires_update @@ -66,11 +75,12 @@ class SmartStrip(SmartPlug): # Initialize the child devices during the first update. if not self.plugs: children = self.sys_info["children"] - self.num_children = len(children) + _LOGGER.debug("Initializing %s child sockets", len(children)) for child in children: self.plugs.append( - SmartPlug( + SmartStripPlug( self.host, + parent=self, child_id=child["id"], cache_ttl=self.cache_ttl.total_seconds(), ) @@ -98,6 +108,30 @@ class SmartStrip(SmartPlug): """Return the maximum on-time of all outlets.""" return max(plug.on_since for plug in self.plugs) + @property # type: ignore + @requires_update + def led(self) -> bool: + """Return the state of the led. + + :return: True if led is on, False otherwise + :rtype: bool + """ + # TODO this is a copypaste from smartplug, + # check if led value is per socket or per device.. + sys_info = self.sys_info + return bool(1 - sys_info["led_off"]) + + async def set_led(self, state: bool): + """Set the state of the led (night mode). + + :param bool state: True to set led on, False to set led off + :raises SmartDeviceException: on error + """ + # TODO this is a copypaste from smartplug, + # check if led value is per socket or per device.. + await self._query_helper("system", "set_led_off", {"off": int(not state)}) + await self.update() + @property # type: ignore @requires_update def state_information(self) -> Dict[str, Any]: @@ -188,3 +222,120 @@ class SmartStrip(SmartPlug): """ for plug in self.plugs: await plug.erase_emeter_stats() + + +class SmartStripPlug(SmartPlug): + """Representation of a single socket in a power strip. + + This allows you to use the sockets as they were SmartPlug objects. + Instead of calling an update on any of these, you should call an update + on the parent device before accessing the properties. + """ + + def __init__( + self, host: str, parent: "SmartStrip", child_id: str, *, cache_ttl: int = 3 + ) -> None: + super().__init__(host, cache_ttl=cache_ttl) + + self.parent = parent + self.child_id = child_id + self._sys_info = self._get_child_info() + + async def update(self): + """Override the update to no-op and inform the user.""" + _LOGGER.warning( + "You called update() on a child device, which has no effect." + "Call update() on the parent device instead." + ) + return + + async def _query_helper( + self, target: str, cmd: str, arg: Optional[Dict] = None, child_ids=None + ) -> Any: + """Override query helper to include the child_ids.""" + return await self.parent._query_helper( + target, cmd, arg, child_ids=[self.child_id] + ) + + @property # type: ignore + @requires_update + def is_on(self) -> bool: + """Return whether device is on. + + :return: True if device is on, False otherwise + """ + info = self._get_child_info() + return info["state"] + + @property # type: ignore + @requires_update + def led(self) -> bool: + """Return the state of the led. + + This is always false for subdevices. + + :return: True if led is on, False otherwise + :rtype: bool + """ + return False + + @property # type: ignore + @requires_update + def device_id(self) -> str: + """Return unique ID for the socket. + + This is a combination of MAC and child's ID. + """ + return f"{self.mac}_{self.child_id}" + + @property # type: ignore + @requires_update + def has_emeter(self): + """Single sockets have always an emeter.""" + return True + + @property # type: ignore + @requires_update + def alias(self) -> str: + """Return device name (alias). + + :return: Device name aka alias. + :rtype: str + """ + info = self._get_child_info() + return info["alias"] + + @property # type: ignore + @requires_update + def on_since(self) -> datetime.datetime: + """Return pretty-printed on-time. + + :return: datetime for on since + :rtype: datetime + """ + info = self._get_child_info() + on_time = info["on_time"] + + return datetime.datetime.now() - datetime.timedelta(seconds=on_time) + + @property # type: ignore + @requires_update + def model(self) -> str: + """Return device model for a child socket. + + :return: device model + :rtype: str + :raises SmartDeviceException: on error + """ + sys_info = self.parent.sys_info + return f"Socket for {sys_info['model']}" + + def _get_child_info(self) -> Dict: + """Return the subdevice information for this device. + + :raises SmartDeviceException: if the information is not found. + """ + for plug in self.parent.sys_info["children"]: + if plug["id"] == self.child_id: + return plug + raise SmartDeviceException(f"Unable to find children {self.child_id}") diff --git a/kasa/tests/conftest.py b/kasa/tests/conftest.py index 266b5b93..e5161eb3 100644 --- a/kasa/tests/conftest.py +++ b/kasa/tests/conftest.py @@ -52,7 +52,7 @@ dimmable = pytest.mark.parametrize( "dev", filter_model("dimmable", DIMMABLE), indirect=True ) non_dimmable = pytest.mark.parametrize( - "dev", filter_model("non-dimmable", ALL_DEVICES - DIMMABLE), indirect=True + "dev", filter_model("non-dimmable", ALL_DEVICES - DIMMABLE - STRIPS), indirect=True ) variable_temp = pytest.mark.parametrize(