mirror of
				https://github.com/python-kasa/python-kasa.git
				synced 2025-11-03 22:22:06 +00:00 
			
		
		
		
	make SmartStrip use asyncio
This commit is contained in:
		@@ -1,9 +1,11 @@
 | 
			
		||||
import asyncio
 | 
			
		||||
import datetime
 | 
			
		||||
import logging
 | 
			
		||||
from typing import Any, Dict, Optional, Union
 | 
			
		||||
from deprecation import deprecated
 | 
			
		||||
 | 
			
		||||
from pyHS100 import SmartPlug, SmartDeviceException, EmeterStatus, DeviceType
 | 
			
		||||
from deprecation import deprecated
 | 
			
		||||
from pyHS100 import DeviceType, EmeterStatus, SmartDeviceException, SmartPlug
 | 
			
		||||
 | 
			
		||||
from .protocol import TPLinkSmartHomeProtocol
 | 
			
		||||
 | 
			
		||||
_LOGGER = logging.getLogger(__name__)
 | 
			
		||||
@@ -42,7 +44,9 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
        self.emeter_type = "emeter"
 | 
			
		||||
        self._device_type = DeviceType.Strip
 | 
			
		||||
        self.plugs = {}
 | 
			
		||||
        children = self.sys_info["children"]
 | 
			
		||||
 | 
			
		||||
        sys_info = asyncio.run(self.get_sys_info())
 | 
			
		||||
        children = sys_info["children"]
 | 
			
		||||
        self.num_children = len(children)
 | 
			
		||||
        for plug in range(self.num_children):
 | 
			
		||||
            self.plugs[plug] = SmartPlug(
 | 
			
		||||
@@ -59,14 +63,7 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
        if index not in range(self.num_children):
 | 
			
		||||
            raise SmartStripException("plug index of %d " "is out of bounds" % index)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    @deprecated(details="use is_on, get_is_on()")
 | 
			
		||||
    def state(self) -> bool:
 | 
			
		||||
        if self.is_on:
 | 
			
		||||
            return self.STATE_ON
 | 
			
		||||
        return self.STATE_OFF
 | 
			
		||||
 | 
			
		||||
    def get_state(self, *, index=-1) -> Dict[int, str]:
 | 
			
		||||
    async def get_state(self, *, index=-1) -> Dict[int, str]:
 | 
			
		||||
        """Retrieve the switch state
 | 
			
		||||
 | 
			
		||||
        :returns: list with the state of each child plug
 | 
			
		||||
@@ -75,36 +72,14 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
        :rtype: dict
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        def _state_for_bool(b):
 | 
			
		||||
            return SmartPlug.STATE_ON if b else SmartPlug.STATE_OFF
 | 
			
		||||
        def _state_for_bool(_bool):
 | 
			
		||||
            return SmartPlug.STATE_ON if _bool else SmartPlug.STATE_OFF
 | 
			
		||||
 | 
			
		||||
        is_on = self.get_is_on(index=index)
 | 
			
		||||
        is_on = await self.get_is_on(index=index)
 | 
			
		||||
        if isinstance(is_on, bool):
 | 
			
		||||
            return _state_for_bool(is_on)
 | 
			
		||||
 | 
			
		||||
        print(is_on)
 | 
			
		||||
 | 
			
		||||
        return {k: _state_for_bool(v) for k, v in self.get_is_on().items()}
 | 
			
		||||
 | 
			
		||||
    @state.setter
 | 
			
		||||
    @deprecated(details="use turn_on(), turn_off()")
 | 
			
		||||
    def state(self, value: str):
 | 
			
		||||
        """Sets the state of all plugs in the strip
 | 
			
		||||
 | 
			
		||||
        :param value: one of
 | 
			
		||||
                    STATE_ON
 | 
			
		||||
                    STATE_OFF
 | 
			
		||||
        :raises ValueError: on invalid state
 | 
			
		||||
        :raises SmartDeviceException: on error
 | 
			
		||||
        """
 | 
			
		||||
        if not isinstance(value, str):
 | 
			
		||||
            raise ValueError("State must be str, not of %s.", type(value))
 | 
			
		||||
        elif value.upper() == SmartPlug.STATE_ON:
 | 
			
		||||
            self.turn_on()
 | 
			
		||||
        elif value.upper() == SmartPlug.STATE_OFF:
 | 
			
		||||
            self.turn_off()
 | 
			
		||||
        else:
 | 
			
		||||
            raise ValueError("State %s is not valid.", value)
 | 
			
		||||
        return {k: _state_for_bool(v) for k, v in is_on.items()}
 | 
			
		||||
 | 
			
		||||
    def set_state(self, value: str, *, index: int = -1):
 | 
			
		||||
        """Sets the state of a plug on the strip
 | 
			
		||||
@@ -123,12 +98,12 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
            self.raise_for_index(index)
 | 
			
		||||
            self.plugs[index].state = value
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def is_on(self) -> bool:
 | 
			
		||||
    async def is_on(self) -> bool:
 | 
			
		||||
        """Return if any of the outlets are on"""
 | 
			
		||||
        return any(state == "ON" for state in self.get_state().values())
 | 
			
		||||
        states = await self.get_state()
 | 
			
		||||
        return any(state == "ON" for state in states.values())
 | 
			
		||||
 | 
			
		||||
    def get_is_on(self, *, index: int = -1) -> Any:
 | 
			
		||||
    async def get_is_on(self, *, index: int = -1) -> Any:
 | 
			
		||||
        """
 | 
			
		||||
        Returns whether device is on.
 | 
			
		||||
 | 
			
		||||
@@ -138,7 +113,8 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
                Dict[int, bool] if no index provided
 | 
			
		||||
        :raises SmartStripException: index out of bounds
 | 
			
		||||
        """
 | 
			
		||||
        children = self.sys_info["children"]
 | 
			
		||||
        sys_info = await self.get_sys_info()
 | 
			
		||||
        children = sys_info["children"]
 | 
			
		||||
        if index < 0:
 | 
			
		||||
            is_on = {}
 | 
			
		||||
            for plug in range(self.num_children):
 | 
			
		||||
@@ -148,14 +124,14 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
            self.raise_for_index(index)
 | 
			
		||||
            return bool(children[index]["state"])
 | 
			
		||||
 | 
			
		||||
    def get_is_off(self, *, index: int = -1) -> Any:
 | 
			
		||||
        is_on = self.get_is_on(index=index)
 | 
			
		||||
    async def get_is_off(self, *, index: int = -1) -> Any:
 | 
			
		||||
        is_on = await self.get_is_on(index=index)
 | 
			
		||||
        if isinstance(is_on, bool):
 | 
			
		||||
            return not is_on
 | 
			
		||||
        else:
 | 
			
		||||
            return {k: not v for k, v in is_on}
 | 
			
		||||
 | 
			
		||||
    def turn_on(self, *, index: int = -1):
 | 
			
		||||
    async def turn_on(self, *, index: int = -1):
 | 
			
		||||
        """
 | 
			
		||||
        Turns outlets on
 | 
			
		||||
 | 
			
		||||
@@ -164,12 +140,12 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
        :raises SmartStripException: index out of bounds
 | 
			
		||||
        """
 | 
			
		||||
        if index < 0:
 | 
			
		||||
            self._query_helper("system", "set_relay_state", {"state": 1})
 | 
			
		||||
            await self._query_helper("system", "set_relay_state", {"state": 1})
 | 
			
		||||
        else:
 | 
			
		||||
            self.raise_for_index(index)
 | 
			
		||||
            self.plugs[index].turn_on()
 | 
			
		||||
            await self.plugs[index].turn_on()
 | 
			
		||||
 | 
			
		||||
    def turn_off(self, *, index: int = -1):
 | 
			
		||||
    async def turn_off(self, *, index: int = -1):
 | 
			
		||||
        """
 | 
			
		||||
        Turns outlets off
 | 
			
		||||
 | 
			
		||||
@@ -178,17 +154,17 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
        :raises SmartStripException: index out of bounds
 | 
			
		||||
        """
 | 
			
		||||
        if index < 0:
 | 
			
		||||
            self._query_helper("system", "set_relay_state", {"state": 0})
 | 
			
		||||
            await self._query_helper("system", "set_relay_state", {"state": 0})
 | 
			
		||||
        else:
 | 
			
		||||
            self.raise_for_index(index)
 | 
			
		||||
            self.plugs[index].turn_off()
 | 
			
		||||
            await self.plugs[index].turn_off()
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def on_since(self) -> datetime:
 | 
			
		||||
    async def get_on_since(self) -> datetime:
 | 
			
		||||
        """Returns the maximum on-time of all outlets."""
 | 
			
		||||
        return max(v for v in self.get_on_since().values())
 | 
			
		||||
        on_since = await self._get_on_since()
 | 
			
		||||
        return max(v for v in await on_since.values())
 | 
			
		||||
 | 
			
		||||
    def get_on_since(self, *, index: int = -1) -> Any:
 | 
			
		||||
    async def _get_on_since(self, *, index: int = -1) -> Any:
 | 
			
		||||
        """
 | 
			
		||||
        Returns pretty-printed on-time
 | 
			
		||||
 | 
			
		||||
@@ -200,7 +176,8 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
        """
 | 
			
		||||
        if index < 0:
 | 
			
		||||
            on_since = {}
 | 
			
		||||
            children = self.sys_info["children"]
 | 
			
		||||
            sys_info = await self.get_sys_info()
 | 
			
		||||
            children = sys_info["children"]
 | 
			
		||||
 | 
			
		||||
            for plug in range(self.num_children):
 | 
			
		||||
                child_ontime = children[plug]["on_time"]
 | 
			
		||||
@@ -210,19 +187,18 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
            return on_since
 | 
			
		||||
        else:
 | 
			
		||||
            self.raise_for_index(index)
 | 
			
		||||
            return self.plugs[index].on_since
 | 
			
		||||
            return await self.plugs[index].get_on_since()
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def state_information(self) -> Dict[str, Any]:
 | 
			
		||||
    async def get_state_information(self) -> Dict[str, Any]:
 | 
			
		||||
        """
 | 
			
		||||
        Returns strip-specific state information.
 | 
			
		||||
 | 
			
		||||
        :return: Strip information dict, keys in user-presentable form.
 | 
			
		||||
        :rtype: dict
 | 
			
		||||
        """
 | 
			
		||||
        state = {"LED state": self.led}
 | 
			
		||||
        is_on = self.get_is_on()
 | 
			
		||||
        on_since = self.get_on_since()
 | 
			
		||||
        state = {"LED state": await self.get_led()}  # XXX: from where?
 | 
			
		||||
        is_on = await self.get_is_on()
 | 
			
		||||
        on_since = await self._get_on_since()
 | 
			
		||||
        for plug_index in range(self.num_children):
 | 
			
		||||
            plug_number = plug_index + 1
 | 
			
		||||
            if is_on[plug_index]:
 | 
			
		||||
@@ -230,7 +206,7 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
 | 
			
		||||
        return state
 | 
			
		||||
 | 
			
		||||
    def get_emeter_realtime(self, *, index: int = -1) -> Optional[Any]:
 | 
			
		||||
    async def get_emeter_realtime(self, *, index: int = -1) -> Optional[Any]:
 | 
			
		||||
        """
 | 
			
		||||
        Retrieve current energy readings from device
 | 
			
		||||
 | 
			
		||||
@@ -243,19 +219,19 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
        :raises SmartDeviceException: on error
 | 
			
		||||
        :raises SmartStripException: index out of bounds
 | 
			
		||||
        """
 | 
			
		||||
        if not self.has_emeter:  # pragma: no cover
 | 
			
		||||
        if not await self.get_has_emeter():  # pragma: no cover
 | 
			
		||||
            raise SmartStripException("Device has no emeter")
 | 
			
		||||
 | 
			
		||||
        if index < 0:
 | 
			
		||||
            emeter_status = {}
 | 
			
		||||
            for plug in range(self.num_children):
 | 
			
		||||
                emeter_status[plug] = self.plugs[plug].get_emeter_realtime()
 | 
			
		||||
                emeter_status[plug] = await self.plugs[plug].get_emeter_realtime()
 | 
			
		||||
            return emeter_status
 | 
			
		||||
        else:
 | 
			
		||||
            self.raise_for_index(index)
 | 
			
		||||
            return self.plugs[index].get_emeter_realtime()
 | 
			
		||||
            return await self.plugs[index].get_emeter_realtime()
 | 
			
		||||
 | 
			
		||||
    def current_consumption(self, *, index: int = -1) -> Optional[Any]:
 | 
			
		||||
    async def current_consumption(self, *, index: int = -1) -> Optional[Any]:
 | 
			
		||||
        """
 | 
			
		||||
        Get the current power consumption in Watts.
 | 
			
		||||
 | 
			
		||||
@@ -269,19 +245,19 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
        :raises SmartDeviceException: on error
 | 
			
		||||
        :raises SmartStripException: index out of bounds
 | 
			
		||||
        """
 | 
			
		||||
        if not self.has_emeter:  # pragma: no cover
 | 
			
		||||
        if not await self.get_has_emeter():  # pragma: no cover
 | 
			
		||||
            raise SmartStripException("Device has no emeter")
 | 
			
		||||
 | 
			
		||||
        if index < 0:
 | 
			
		||||
            consumption = {}
 | 
			
		||||
            emeter_reading = self.get_emeter_realtime()
 | 
			
		||||
            emeter_reading = await self.get_emeter_realtime()
 | 
			
		||||
            for plug in range(self.num_children):
 | 
			
		||||
                response = EmeterStatus(emeter_reading[plug])
 | 
			
		||||
                consumption[plug] = response["power"]
 | 
			
		||||
            return consumption
 | 
			
		||||
        else:
 | 
			
		||||
            self.raise_for_index(index)
 | 
			
		||||
            response = EmeterStatus(self.get_emeter_realtime(index=index))
 | 
			
		||||
            response = EmeterStatus(await self.get_emeter_realtime(index=index))
 | 
			
		||||
            return response["power"]
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
@@ -291,7 +267,7 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
        """
 | 
			
		||||
        return {"icon": "SMARTSTRIP-DUMMY", "hash": "SMARTSTRIP-DUMMY"}
 | 
			
		||||
 | 
			
		||||
    def get_alias(self, *, index: int = -1) -> Union[str, Dict[int, str]]:
 | 
			
		||||
    async def get_alias(self, *, index: int = -1) -> Union[str, Dict[int, str]]:
 | 
			
		||||
        """Gets the alias for a plug.
 | 
			
		||||
 | 
			
		||||
        :param index: plug index (-1 for all)
 | 
			
		||||
@@ -301,7 +277,8 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
                Dict[int, str] if no index provided
 | 
			
		||||
        :raises SmartStripException: index out of bounds
 | 
			
		||||
        """
 | 
			
		||||
        children = self.sys_info["children"]
 | 
			
		||||
        sys_info = await self.get_sys_info()
 | 
			
		||||
        children = sys_info["children"]
 | 
			
		||||
 | 
			
		||||
        if index < 0:
 | 
			
		||||
            alias = {}
 | 
			
		||||
@@ -312,7 +289,7 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
            self.raise_for_index(index)
 | 
			
		||||
            return children[index]["alias"]
 | 
			
		||||
 | 
			
		||||
    def set_alias(self, alias: str, *, index: int = -1):
 | 
			
		||||
    async def set_alias(self, alias: str, *, index: int = -1):
 | 
			
		||||
        """Sets the alias for a plug
 | 
			
		||||
 | 
			
		||||
        :param index: plug index
 | 
			
		||||
@@ -325,9 +302,9 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
            return super().set_alias(alias)
 | 
			
		||||
 | 
			
		||||
        self.raise_for_index(index)
 | 
			
		||||
        self.plugs[index].set_alias(alias)
 | 
			
		||||
        await self.plugs[index].set_alias(alias)
 | 
			
		||||
 | 
			
		||||
    def get_emeter_daily(
 | 
			
		||||
    async def get_emeter_daily(
 | 
			
		||||
        self, year: int = None, month: int = None, kwh: bool = True, *, index: int = -1
 | 
			
		||||
    ) -> Dict:
 | 
			
		||||
        """Retrieve daily statistics for a given month
 | 
			
		||||
@@ -342,21 +319,23 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
        :raises SmartDeviceException: on error
 | 
			
		||||
        :raises SmartStripException: index out of bounds
 | 
			
		||||
        """
 | 
			
		||||
        if not self.has_emeter:  # pragma: no cover
 | 
			
		||||
        if not await self.get_has_emeter():  # pragma: no cover
 | 
			
		||||
            raise SmartStripException("Device has no emeter")
 | 
			
		||||
 | 
			
		||||
        emeter_daily = {}
 | 
			
		||||
        if index < 0:
 | 
			
		||||
            for plug in range(self.num_children):
 | 
			
		||||
                emeter_daily = self.plugs[plug].get_emeter_daily(
 | 
			
		||||
                emeter_daily = await self.plugs[plug].get_emeter_daily(
 | 
			
		||||
                    year=year, month=month, kwh=kwh
 | 
			
		||||
                )
 | 
			
		||||
            return emeter_daily
 | 
			
		||||
        else:
 | 
			
		||||
            self.raise_for_index(index)
 | 
			
		||||
            return self.plugs[index].get_emeter_daily(year=year, month=month, kwh=kwh)
 | 
			
		||||
            return await self.plugs[index].get_emeter_daily(
 | 
			
		||||
                year=year, month=month, kwh=kwh
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def get_emeter_monthly(
 | 
			
		||||
    async def get_emeter_monthly(
 | 
			
		||||
        self, year: int = None, kwh: bool = True, *, index: int = -1
 | 
			
		||||
    ) -> Dict:
 | 
			
		||||
        """Retrieve monthly statistics for a given year.
 | 
			
		||||
@@ -369,19 +348,21 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
        :raises SmartDeviceException: on error
 | 
			
		||||
        :raises SmartStripException: index out of bounds
 | 
			
		||||
        """
 | 
			
		||||
        if not self.has_emeter:  # pragma: no cover
 | 
			
		||||
        if not await self.get_has_emeter():  # pragma: no cover
 | 
			
		||||
            raise SmartStripException("Device has no emeter")
 | 
			
		||||
 | 
			
		||||
        emeter_monthly = {}
 | 
			
		||||
        if index < 0:
 | 
			
		||||
            for plug in range(self.num_children):
 | 
			
		||||
                emeter_monthly = self.plugs[plug].get_emeter_monthly(year=year, kwh=kwh)
 | 
			
		||||
                emeter_monthly[plug] = await self.plugs[plug].get_emeter_monthly(
 | 
			
		||||
                    year=year, kwh=kwh
 | 
			
		||||
                )
 | 
			
		||||
            return emeter_monthly
 | 
			
		||||
        else:
 | 
			
		||||
            self.raise_for_index(index)
 | 
			
		||||
            return self.plugs[index].get_emeter_monthly(year=year, kwh=kwh)
 | 
			
		||||
            return await self.plugs[index].get_emeter_monthly(year=year, kwh=kwh)
 | 
			
		||||
 | 
			
		||||
    def erase_emeter_stats(self, *, index: int = -1) -> bool:
 | 
			
		||||
    async def erase_emeter_stats(self, *, index: int = -1) -> bool:
 | 
			
		||||
        """Erase energy meter statistics
 | 
			
		||||
 | 
			
		||||
        :param index: plug index (-1 for all)
 | 
			
		||||
@@ -391,15 +372,15 @@ class SmartStrip(SmartPlug):
 | 
			
		||||
        :raises SmartDeviceException: on error
 | 
			
		||||
        :raises SmartStripException: index out of bounds
 | 
			
		||||
        """
 | 
			
		||||
        if not self.has_emeter:  # pragma: no cover
 | 
			
		||||
        if not await self.get_has_emeter():  # pragma: no cover
 | 
			
		||||
            raise SmartStripException("Device has no emeter")
 | 
			
		||||
 | 
			
		||||
        if index < 0:
 | 
			
		||||
            for plug in range(self.num_children):
 | 
			
		||||
                self.plugs[plug].erase_emeter_stats()
 | 
			
		||||
                await self.plugs[plug].erase_emeter_stats()
 | 
			
		||||
        else:
 | 
			
		||||
            self.raise_for_index(index)
 | 
			
		||||
            self.plugs[index].erase_emeter_stats()
 | 
			
		||||
            await self.plugs[index].erase_emeter_stats()
 | 
			
		||||
 | 
			
		||||
        # As query_helper raises exception in case of failure, we have
 | 
			
		||||
        # succeeded when we are this far.
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user