Move child socket handling to its own SmartStripPlug class (#26)

* All child device handling is moved out from the main smartdevice class, which simplifies the code.
* This will also cleanup the constructors as only the subdevices require the ID and the parent reference.
* SmartStripPlug offers SmartPlug like interface, but does not allow separate updates
  * Trying to update() on the children will cause a warning.
This commit is contained in:
Teemu R 2020-03-16 14:52:40 +01:00 committed by GitHub
parent 489a550582
commit 00276e34b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 177 additions and 73 deletions

View File

@ -2,8 +2,8 @@
import asyncio
import json
import logging
import re
from pprint import pformat as pf
import re
import click
@ -112,7 +112,7 @@ def dump_discover(ctx, scrub):
if key in ["latitude_i", "longitude_i"]:
val = 0
else:
val = re.sub("\w", "0", val)
val = re.sub(r"\w", "0", val)
dev["system"]["get_sysinfo"][key] = val
model = dev["system"]["get_sysinfo"]["model"]

View File

@ -71,8 +71,8 @@ class SmartBulb(SmartDevice):
LIGHT_SERVICE = "smartlife.iot.smartbulb.lightingservice"
def __init__(self, host: str, *, child_id: str = None, cache_ttl: int = 3) -> None:
SmartDevice.__init__(self, host=host, child_id=child_id, cache_ttl=cache_ttl)
def __init__(self, host: str, *, cache_ttl: int = 3) -> None:
SmartDevice.__init__(self, host=host, cache_ttl=cache_ttl)
self.emeter_type = "smartlife.iot.common.emeter"
self._device_type = DeviceType.Bulb
self._light_state = None

View File

@ -11,12 +11,12 @@ Stroetmann which is licensed under the Apache License, Version 2.0.
You may obtain a copy of the license at
http://www.apache.org/licenses/LICENSE-2.0
"""
import functools
import inspect
import logging
from collections import defaultdict
from datetime import datetime, timedelta
from enum import Enum
import functools
import inspect
import logging
from typing import Any, Dict, Optional
from kasa.protocol import TPLinkSmartHomeProtocol
@ -103,7 +103,7 @@ def requires_update(f):
class SmartDevice:
"""Base class for all supported device types."""
def __init__(self, host: str, *, child_id: str = None, cache_ttl: int = 3) -> None:
def __init__(self, host: str, *, cache_ttl: int = 3) -> None:
"""Create a new SmartDevice instance.
:param str host: host name or ip address on which the device listens
@ -113,17 +113,11 @@ class SmartDevice:
self.protocol = TPLinkSmartHomeProtocol()
self.emeter_type = "emeter"
self.child_id = child_id
self.cache_ttl = timedelta(seconds=cache_ttl)
_LOGGER.debug(
"Initializing %s using child_id %s and cache ttl %s",
self.host,
self.child_id,
self.cache_ttl,
)
_LOGGER.debug("Initializing %s with cache ttl %s", self.host, self.cache_ttl)
self.cache = defaultdict(lambda: defaultdict(lambda: None)) # type: ignore
self._device_type = DeviceType.Unknown
self._sys_info = None
self._sys_info: Optional[Dict] = None
def _result_from_cache(self, target, cmd) -> Optional[Dict]:
"""Return query result from cache if still fresh.
@ -162,7 +156,7 @@ class SmartDevice:
self.cache[target][cmd]["last_updated"] = datetime.utcnow()
async def _query_helper(
self, target: str, cmd: str, arg: Optional[Dict] = None
self, target: str, cmd: str, arg: Optional[Dict] = None, child_ids=None
) -> Any:
"""Handle result unwrapping and error handling.
@ -174,8 +168,8 @@ class SmartDevice:
:raises SmartDeviceException: if command was not executed correctly
"""
request: Dict[str, Any] = {target: {cmd: arg}}
if self.child_id is not None:
request = {"context": {"child_ids": [self.child_id]}, target: {cmd: arg}}
if child_ids is not None:
request = {"context": {"child_ids": child_ids}, target: {cmd: arg}}
try:
response = self._result_from_cache(target, cmd)
@ -204,16 +198,6 @@ class SmartDevice:
return result
def _get_child_info(self) -> Dict:
"""Return the child information dict, if available.
:raises SmartDeviceException: if there is no child or it cannot be found.
"""
for plug in self.sys_info["children"]:
if plug["id"] == self.child_id:
return plug
raise SmartDeviceException("Unable to find children %s")
def has_emeter(self) -> bool:
"""Return if device has an energy meter.
@ -614,11 +598,8 @@ class SmartDevice:
def device_id(self) -> str:
"""Return unique ID for the device.
For regular devices this is the MAC address of the device,
for child devices a combination of MAC and child's ID.
This is the MAC address of the device.
"""
if self.is_child_device:
return f"{self.mac}_{self.child_id}"
return self.mac
@property
@ -651,11 +632,6 @@ class SmartDevice:
"""Return True if the device supports color temperature."""
return False
@property
def is_child_device(self) -> bool:
"""Return True if the device is a child device of another device."""
return self.child_id is not None
def __repr__(self):
return "<{} model {} at {} ({}), is_on: {} - dev specific: {}>".format(
self.__class__.__name__,

View File

@ -35,8 +35,8 @@ class SmartPlug(SmartDevice):
and should be handled by the user of the library.
"""
def __init__(self, host: str, *, child_id: str = None, cache_ttl: int = 3) -> None:
SmartDevice.__init__(self, host, child_id=child_id, cache_ttl=cache_ttl)
def __init__(self, host: str, *, cache_ttl: int = 3) -> None:
SmartDevice.__init__(self, host, cache_ttl=cache_ttl)
self.emeter_type = "emeter"
self._device_type = DeviceType.Plug
@ -81,20 +81,6 @@ class SmartPlug(SmartDevice):
else:
raise ValueError("Brightness value %s is not valid." % value)
@property # type: ignore
@requires_update
def alias(self) -> str:
"""Return device name (alias).
:return: Device name aka alias.
:rtype: str
"""
if self.is_child_device:
info = self._get_child_info()
return info["alias"]
else:
return super().alias
@property # type: ignore
@requires_update
def is_dimmable(self):
@ -125,10 +111,6 @@ class SmartPlug(SmartDevice):
:return: True if device is on, False otherwise
"""
if self.is_child_device:
info = self._get_child_info()
return info["state"]
sys_info = self.sys_info
return bool(sys_info["relay_state"])
@ -176,12 +158,7 @@ class SmartPlug(SmartDevice):
:return: datetime for on since
:rtype: datetime
"""
sys_info = self.sys_info
if self.is_child_device:
info = self._get_child_info()
on_time = info["on_time"]
else:
on_time = sys_info["on_time"]
on_time = self.sys_info["on_time"]
return datetime.datetime.now() - datetime.timedelta(seconds=on_time)

View File

@ -2,18 +2,23 @@
.. todo:: describe how this interfaces with single plugs.
"""
from collections import defaultdict
import datetime
import logging
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List
from typing import Any, DefaultDict, Dict, List, Optional
from kasa.smartdevice import DeviceType, requires_update
from kasa.smartdevice import (
DeviceType,
SmartDevice,
SmartDeviceException,
requires_update,
)
from kasa.smartplug import SmartPlug
_LOGGER = logging.getLogger(__name__)
class SmartStrip(SmartPlug):
class SmartStrip(SmartDevice):
"""Representation of a TP-Link Smart Power Strip.
Usage example when used as library:
@ -40,11 +45,15 @@ class SmartStrip(SmartPlug):
and should be handled by the user of the library.
"""
def has_emeter(self) -> bool:
"""Return True as strips has always an emeter."""
return True
def __init__(self, host: str, *, cache_ttl: int = 3) -> None:
SmartPlug.__init__(self, host=host, cache_ttl=cache_ttl)
SmartDevice.__init__(self, host=host, cache_ttl=cache_ttl)
self.emeter_type = "emeter"
self._device_type = DeviceType.Strip
self.plugs: List[SmartPlug] = []
self.plugs: List[SmartStripPlug] = []
@property # type: ignore
@requires_update
@ -66,11 +75,12 @@ class SmartStrip(SmartPlug):
# Initialize the child devices during the first update.
if not self.plugs:
children = self.sys_info["children"]
self.num_children = len(children)
_LOGGER.debug("Initializing %s child sockets", len(children))
for child in children:
self.plugs.append(
SmartPlug(
SmartStripPlug(
self.host,
parent=self,
child_id=child["id"],
cache_ttl=self.cache_ttl.total_seconds(),
)
@ -98,6 +108,30 @@ class SmartStrip(SmartPlug):
"""Return the maximum on-time of all outlets."""
return max(plug.on_since for plug in self.plugs)
@property # type: ignore
@requires_update
def led(self) -> bool:
"""Return the state of the led.
:return: True if led is on, False otherwise
:rtype: bool
"""
# TODO this is a copypaste from smartplug,
# check if led value is per socket or per device..
sys_info = self.sys_info
return bool(1 - sys_info["led_off"])
async def set_led(self, state: bool):
"""Set the state of the led (night mode).
:param bool state: True to set led on, False to set led off
:raises SmartDeviceException: on error
"""
# TODO this is a copypaste from smartplug,
# check if led value is per socket or per device..
await self._query_helper("system", "set_led_off", {"off": int(not state)})
await self.update()
@property # type: ignore
@requires_update
def state_information(self) -> Dict[str, Any]:
@ -188,3 +222,120 @@ class SmartStrip(SmartPlug):
"""
for plug in self.plugs:
await plug.erase_emeter_stats()
class SmartStripPlug(SmartPlug):
"""Representation of a single socket in a power strip.
This allows you to use the sockets as they were SmartPlug objects.
Instead of calling an update on any of these, you should call an update
on the parent device before accessing the properties.
"""
def __init__(
self, host: str, parent: "SmartStrip", child_id: str, *, cache_ttl: int = 3
) -> None:
super().__init__(host, cache_ttl=cache_ttl)
self.parent = parent
self.child_id = child_id
self._sys_info = self._get_child_info()
async def update(self):
"""Override the update to no-op and inform the user."""
_LOGGER.warning(
"You called update() on a child device, which has no effect."
"Call update() on the parent device instead."
)
return
async def _query_helper(
self, target: str, cmd: str, arg: Optional[Dict] = None, child_ids=None
) -> Any:
"""Override query helper to include the child_ids."""
return await self.parent._query_helper(
target, cmd, arg, child_ids=[self.child_id]
)
@property # type: ignore
@requires_update
def is_on(self) -> bool:
"""Return whether device is on.
:return: True if device is on, False otherwise
"""
info = self._get_child_info()
return info["state"]
@property # type: ignore
@requires_update
def led(self) -> bool:
"""Return the state of the led.
This is always false for subdevices.
:return: True if led is on, False otherwise
:rtype: bool
"""
return False
@property # type: ignore
@requires_update
def device_id(self) -> str:
"""Return unique ID for the socket.
This is a combination of MAC and child's ID.
"""
return f"{self.mac}_{self.child_id}"
@property # type: ignore
@requires_update
def has_emeter(self):
"""Single sockets have always an emeter."""
return True
@property # type: ignore
@requires_update
def alias(self) -> str:
"""Return device name (alias).
:return: Device name aka alias.
:rtype: str
"""
info = self._get_child_info()
return info["alias"]
@property # type: ignore
@requires_update
def on_since(self) -> datetime.datetime:
"""Return pretty-printed on-time.
:return: datetime for on since
:rtype: datetime
"""
info = self._get_child_info()
on_time = info["on_time"]
return datetime.datetime.now() - datetime.timedelta(seconds=on_time)
@property # type: ignore
@requires_update
def model(self) -> str:
"""Return device model for a child socket.
:return: device model
:rtype: str
:raises SmartDeviceException: on error
"""
sys_info = self.parent.sys_info
return f"Socket for {sys_info['model']}"
def _get_child_info(self) -> Dict:
"""Return the subdevice information for this device.
:raises SmartDeviceException: if the information is not found.
"""
for plug in self.parent.sys_info["children"]:
if plug["id"] == self.child_id:
return plug
raise SmartDeviceException(f"Unable to find children {self.child_id}")

View File

@ -52,7 +52,7 @@ dimmable = pytest.mark.parametrize(
"dev", filter_model("dimmable", DIMMABLE), indirect=True
)
non_dimmable = pytest.mark.parametrize(
"dev", filter_model("non-dimmable", ALL_DEVICES - DIMMABLE), indirect=True
"dev", filter_model("non-dimmable", ALL_DEVICES - DIMMABLE - STRIPS), indirect=True
)
variable_temp = pytest.mark.parametrize(