implement methods that depend on sys_info as properties and add update

In the case of smartbulb.update, light_state is also updated.
This commit is contained in:
Bas Nijholt 2019-11-15 17:48:36 +01:00
parent 8c25590186
commit 72d5443f1a
6 changed files with 413 additions and 221 deletions

View File

@ -151,7 +151,7 @@ def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attempts=3)
click.echo(f"Attempt {attempt} of {attempts}")
found_devs = Discover.discover(target=target, timeout=timeout).items()
for ip, dev in found_devs:
if dev.sync.get_alias().lower() == alias.lower():
if dev.alias.lower() == alias.lower():
host = dev.host
return host
return None
@ -161,8 +161,9 @@ def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attempts=3)
@pass_dev
def sysinfo(dev):
"""Print out full system information."""
dev.sync.update()
click.echo(click.style("== System info ==", bold=True))
click.echo(pf(dev.sync.get_sys_info()))
click.echo(pf(dev.sys_info))
@cli.command()
@ -170,23 +171,20 @@ def sysinfo(dev):
@click.pass_context
def state(ctx, dev: SmartDevice):
"""Print out device state and versions."""
click.echo(
click.style(
"== {} - {} ==".format(dev.sync.get_alias(), dev.sync.get_model()),
bold=True,
)
)
dev.sync.update()
click.echo(click.style("== {} - {} ==".format(dev.alias, dev.model), bold=True))
click.echo(
click.style(
"Device state: {}".format("ON" if dev.sync.is_on() else "OFF"),
fg="green" if dev.sync.is_on() else "red",
"Device state: {}".format("ON" if dev.is_on else "OFF"),
fg="green" if dev.is_on else "red",
)
)
if dev.is_strip:
for plug in dev.plugs: # type: ignore
is_on = plug.sync.get_is_on()
alias = plug.sync.get_alias()
plug.sync.update()
is_on = plug.is_on
alias = plug.alias
click.echo(
click.style(
" * {} state: {}".format(alias, ("ON" if is_on else "OFF")),
@ -195,15 +193,14 @@ def state(ctx, dev: SmartDevice):
)
click.echo(f"Host/IP: {dev.host}")
for k, v in dev.sync.get_state_information().items():
for k, v in dev.state_information.items():
click.echo(f"{k}: {v}")
hw_info = dev.sync.get_hw_info()
click.echo(click.style("== Generic information ==", bold=True))
click.echo("Time: {}".format(dev.sync.get_time()))
click.echo("Hardware: {}".format(hw_info["hw_ver"]))
click.echo("Software: {}".format(hw_info["sw_ver"]))
click.echo("MAC (rssi): {} ({})".format(dev.sync.get_mac(), dev.sync.get_rssi()))
click.echo("Location: {}".format(dev.sync.get_location()))
click.echo("Hardware: {}".format(dev.hw_info["hw_ver"]))
click.echo("Software: {}".format(dev.hw_info["sw_ver"]))
click.echo("MAC (rssi): {} ({})".format(dev.mac, dev.rssi))
click.echo("Location: {}".format(dev.location))
ctx.invoke(emeter)
@ -217,7 +214,7 @@ def alias(dev, new_alias):
click.echo(f"Setting alias to {new_alias}")
dev.sync.set_alias(new_alias)
click.echo(f"Alias: {dev.sync.get_alias()}")
click.echo(f"Alias: {dev.alias}")
@cli.command()
@ -232,6 +229,7 @@ def raw_command(dev: SmartDevice, module, command, parameters):
if parameters is not None:
parameters = ast.literal_eval(parameters)
res = dev.sync._query_helper(module, command, parameters)
dev.sync.update()
click.echo(res)
@ -243,7 +241,8 @@ def raw_command(dev: SmartDevice, module, command, parameters):
def emeter(dev, year, month, erase):
"""Query emeter for historical consumption."""
click.echo(click.style("== Emeter ==", bold=True))
if not dev.sync.get_has_emeter():
dev.sync.update()
if not dev.has_emeter:
click.echo("Device has no emeter")
return
@ -274,11 +273,12 @@ def emeter(dev, year, month, erase):
@pass_dev
def brightness(dev, brightness):
"""Get or set brightness."""
if not dev.sync.is_dimmable():
dev.sync.update()
if not dev.is_dimmable:
click.echo("This device does not support brightness.")
return
if brightness is None:
click.echo("Brightness: %s" % dev.sync.get_brightness())
click.echo("Brightness: %s" % dev.brightness)
else:
click.echo("Setting brightness to %s" % brightness)
dev.sync.set_brightness(brightness)
@ -292,14 +292,14 @@ def brightness(dev, brightness):
def temperature(dev: SmartBulb, temperature):
"""Get or set color temperature."""
if temperature is None:
click.echo(f"Color temperature: {dev.sync.get_color_temp()}")
valid_temperature_range = dev.sync.get_valid_temperature_range()
click.echo(f"Color temperature: {dev.color_temp}")
valid_temperature_range = dev.valid_temperature_range
if valid_temperature_range != (0, 0):
click.echo("(min: {}, max: {})".format(*valid_temperature_range))
else:
click.echo(
"Temperature range unknown, please open a github issue"
f" or a pull request for model '{dev.sync.get_model()}'"
f" or a pull request for model '{dev.model}'"
)
else:
click.echo(f"Setting color temperature to {temperature}")
@ -315,7 +315,7 @@ def temperature(dev: SmartBulb, temperature):
def hsv(dev, ctx, h, s, v):
"""Get or set color in HSV. (Bulb only)."""
if h is None or s is None or v is None:
click.echo("Current HSV: %s %s %s" % dev.sync.get_hsv())
click.echo("Current HSV: %s %s %s" % dev.hsv)
elif s is None or v is None:
raise click.BadArgumentUsage("Setting a color requires 3 values.", ctx)
else:
@ -332,7 +332,7 @@ def led(dev, state):
click.echo("Turning led to %s" % state)
dev.sync.set_led(state)
else:
click.echo("LED state: %s" % dev.sync.get_led())
click.echo("LED state: %s" % dev.led)
@cli.command()

View File

@ -1,9 +1,14 @@
"""Module for bulbs."""
import re
from typing import Any, Dict, Tuple
from typing import Any, Dict, Optional, Tuple
from pyHS100.protocol import TPLinkSmartHomeProtocol
from pyHS100.smartdevice import DeviceType, SmartDevice, SmartDeviceException
from pyHS100.smartdevice import (
DeviceType,
SmartDevice,
SmartDeviceException,
requires_update,
)
TPLINK_KELVIN = {
"LB130": (2500, 9000),
@ -24,14 +29,14 @@ class SmartBulb(SmartDevice):
p = SmartBulb("192.168.1.105")
# print the devices alias
print(p.sync.get_alias())
print(p.sync.alias)
# change state of bulb
p.sync.turn_on()
p.sync.turn_off()
# query and print current state of plug
print(p.sync.get_state_information())
print(p.sync.state_information())
# check whether the bulb supports color changes
if p.sync.is_color():
@ -40,7 +45,7 @@ class SmartBulb(SmartDevice):
p.sync.set_hsv(180, 100, 100)
# get the current HSV value
print(p.sync.get_hsv())
print(p.sync.hsv())
# check whether the bulb supports setting color temperature
if p.sync.is_variable_color_temp():
@ -48,16 +53,16 @@ class SmartBulb(SmartDevice):
p.sync.set_color_temp(3000)
# get the current color temperature
print(p.sync.get_color_temp())
print(p.sync.color_temp)
# check whether the bulb is dimmable
if p.sync.is_dimmable():
if p.is_dimmable:
# set the bulb to 50% brightness
p.sync.set_brightness(50)
# check the current brightness
print(p.sync.get_brightness())
print(p.brightness)
```
Omit the `sync` attribute to get coroutines.
@ -87,70 +92,94 @@ class SmartBulb(SmartDevice):
)
self.emeter_type = "smartlife.iot.common.emeter"
self._device_type = DeviceType.Bulb
self._light_state = None
async def is_color(self) -> bool:
@property
@requires_update
def is_color(self) -> bool:
"""Whether the bulb supports color changes.
:return: True if the bulb supports color changes, False otherwise
:rtype: bool
"""
sys_info = await self.get_sys_info()
sys_info = self.sys_info
return bool(sys_info["is_color"])
async def is_dimmable(self) -> bool:
@property
@requires_update
def is_dimmable(self) -> bool:
"""Whether the bulb supports brightness changes.
:return: True if the bulb supports brightness changes, False otherwise
:rtype: bool
"""
sys_info = await self.get_sys_info()
sys_info = self.sys_info
return bool(sys_info["is_dimmable"])
async def is_variable_color_temp(self) -> bool:
@property
@requires_update
def is_variable_color_temp(self) -> bool:
"""Whether the bulb supports color temperature changes.
:return: True if the bulb supports color temperature changes, False
otherwise
:rtype: bool
"""
sys_info = await self.get_sys_info()
sys_info = self.sys_info
return bool(sys_info["is_variable_color_temp"])
async def get_valid_temperature_range(self) -> Tuple[int, int]:
@property
@requires_update
def valid_temperature_range(self) -> Tuple[int, int]:
"""Return the device-specific white temperature range (in Kelvin).
:return: White temperature range in Kelvin (minimun, maximum)
:rtype: tuple
"""
if not await self.is_variable_color_temp():
if not self.is_variable_color_temp:
return (0, 0)
for model, temp_range in TPLINK_KELVIN.items():
sys_info = await self.get_sys_info()
sys_info = self.sys_info
if re.match(model, sys_info["model"]):
return temp_range
return (0, 0)
async def get_light_state(self) -> Dict:
async def update(self):
"""Update `sys_info and `light_state`."""
self._sys_info = await self.get_sys_info()
self._light_state = await self.get_light_state()
@property
@requires_update
def light_state(self) -> Optional[Dict[str, Dict]]:
"""Query the light state."""
return self._light_state
async def get_light_state(self) -> Dict[str, Dict]:
"""Query the light state."""
return await self._query_helper(self.LIGHT_SERVICE, "get_light_state")
async def set_light_state(self, state: Dict) -> Dict:
"""Set the light state."""
return await self._query_helper(
light_state = await self._query_helper(
self.LIGHT_SERVICE, "transition_light_state", state
)
await self.update()
return light_state
async def get_hsv(self) -> Tuple[int, int, int]:
@property
@requires_update
def hsv(self) -> Tuple[int, int, int]:
"""Return the current HSV state of the bulb.
:return: hue, saturation and value (degrees, %, %)
:rtype: tuple
"""
if not await self.is_color():
if not self.is_color:
raise SmartDeviceException("Bulb does not support color.")
light_state = await self.get_light_state()
if not await self.is_on():
light_state = self.light_state
if not self.is_on:
hue = light_state["dft_on_state"]["hue"]
saturation = light_state["dft_on_state"]["saturation"]
value = light_state["dft_on_state"]["brightness"]
@ -167,6 +196,7 @@ class SmartBulb(SmartDevice):
"Invalid brightness value: {} " "(valid range: 0-100%)".format(value)
)
@requires_update
async def set_hsv(self, hue: int, saturation: int, value: int):
"""Set new HSV.
@ -174,7 +204,7 @@ class SmartBulb(SmartDevice):
:param int saturation: saturation in percentage [0,100]
:param int value: value in percentage [0, 100]
"""
if not await self.is_color():
if not self.is_color:
raise SmartDeviceException("Bulb does not support color.")
if not isinstance(hue, int) or not (0 <= hue <= 360):
@ -198,30 +228,33 @@ class SmartBulb(SmartDevice):
}
await self.set_light_state(light_state)
async def get_color_temp(self) -> int:
@property
@requires_update
def color_temp(self) -> int:
"""Return color temperature of the device.
:return: Color temperature in Kelvin
:rtype: int
"""
if not await self.is_variable_color_temp():
if not self.is_variable_color_temp:
raise SmartDeviceException("Bulb does not support colortemp.")
light_state = await self.get_light_state()
if not await self.is_on():
light_state = self.light_state
if not self.is_on:
return int(light_state["dft_on_state"]["color_temp"])
else:
return int(light_state["color_temp"])
@requires_update
async def set_color_temp(self, temp: int) -> None:
"""Set the color temperature of the device.
:param int temp: The new color temperature, in Kelvin
"""
if not await self.is_variable_color_temp():
if not self.is_variable_color_temp:
raise SmartDeviceException("Bulb does not support colortemp.")
valid_temperature_range = await self.get_valid_temperature_range()
valid_temperature_range = self.valid_temperature_range
if temp < valid_temperature_range[0] or temp > valid_temperature_range[1]:
raise ValueError(
"Temperature should be between {} "
@ -231,27 +264,30 @@ class SmartBulb(SmartDevice):
light_state = {"color_temp": temp}
await self.set_light_state(light_state)
async def get_brightness(self) -> int:
@property
@requires_update
def brightness(self) -> int:
"""Return the current brightness.
:return: brightness in percent
:rtype: int
"""
if not await self.is_dimmable(): # pragma: no cover
if not self.is_dimmable: # pragma: no cover
raise SmartDeviceException("Bulb is not dimmable.")
light_state = await self.get_light_state()
if not await self.is_on():
light_state = self.light_state
if not self.is_on:
return int(light_state["dft_on_state"]["brightness"])
else:
return int(light_state["brightness"])
@requires_update
async def set_brightness(self, brightness: int) -> None:
"""Set the brightness.
:param int brightness: brightness in percent
"""
if not await self.is_dimmable(): # pragma: no cover
if not self.is_dimmable: # pragma: no cover
raise SmartDeviceException("Bulb is not dimmable.")
self._raise_for_invalid_brightness(brightness)
@ -259,27 +295,31 @@ class SmartBulb(SmartDevice):
light_state = {"brightness": brightness}
await self.set_light_state(light_state)
async def get_state_information(self) -> Dict[str, Any]:
@property
@requires_update
def state_information(self) -> Dict[str, Any]:
"""Return bulb-specific state information.
:return: Bulb information dict, keys in user-presentable form.
:rtype: dict
"""
info: Dict[str, Any] = {
"Brightness": await self.get_brightness(),
"Is dimmable": await self.is_dimmable(),
"Brightness": self.brightness,
"Is dimmable": self.is_dimmable,
}
if await self.is_variable_color_temp():
info["Color temperature"] = await self.get_color_temp()
info["Valid temperature range"] = await self.get_valid_temperature_range()
if await self.is_color():
info["HSV"] = await self.get_hsv()
if self.is_variable_color_temp:
info["Color temperature"] = self.color_temp
info["Valid temperature range"] = self.valid_temperature_range
if self.is_color:
info["HSV"] = self.hsv
return info
async def is_on(self) -> bool:
@property
@requires_update
def is_on(self) -> bool:
"""Return whether the device is on."""
light_state = await self.get_light_state()
light_state = self.light_state
return bool(light_state["on_off"])
async def turn_off(self) -> None:
@ -290,6 +330,8 @@ class SmartBulb(SmartDevice):
"""Turn the bulb on."""
await self.set_light_state({"on_off": 1})
async def get_has_emeter(self) -> bool:
@property
@requires_update
def has_emeter(self) -> bool:
"""Return that the bulb has an emeter."""
return True

View File

@ -79,6 +79,26 @@ class EmeterStatus(dict):
raise SmartDeviceException("Unable to find a value for '%s'" % item)
def requires_update(f):
"""Indicate that `update` should be called before accessing this method.""" # noqa: D202
if inspect.iscoroutinefunction(f):
@functools.wraps(f)
async def wrapped(*args, **kwargs):
self = args[0]
assert self._sys_info is not None
return await f(*args, **kwargs)
else:
@functools.wraps(f)
def wrapped(*args, **kwargs):
self = args[0]
assert self._sys_info is not None
return f(*args, **kwargs)
f.requires_update = True
return wrapped
class SmartDevice:
"""Base class for all supported device types."""
@ -117,6 +137,7 @@ class SmartDevice:
self._device_type = DeviceType.Unknown
self.ioloop = ioloop or asyncio.get_event_loop()
self.sync = SyncSmartDevice(self, ioloop=self.ioloop)
self._sys_info = None
def _result_from_cache(self, target, cmd) -> Optional[Dict]:
"""Return query result from cache if still fresh.
@ -197,7 +218,7 @@ class SmartDevice:
return result
async def get_has_emeter(self) -> bool:
def has_emeter(self) -> bool:
"""Return if device has an energy meter.
:return: True if energey meter is available
@ -214,23 +235,45 @@ class SmartDevice:
"""
return await self._query_helper("system", "get_sysinfo")
async def get_model(self) -> str:
async def update(self):
"""Update some of the attributes.
Needed for methods that are decorated with `requires_update`.
"""
self._sys_info = await self.get_sys_info()
@property
@requires_update
def sys_info(self) -> Dict[str, Any]:
"""Retrieve system information.
:return: sysinfo
:rtype dict
:raises SmartDeviceException: on error
"""
return self._sys_info
@property
@requires_update
def model(self) -> str:
"""Return device model.
:return: device model
:rtype: str
:raises SmartDeviceException: on error
"""
sys_info = await self.get_sys_info()
sys_info = self.sys_info
return str(sys_info["model"])
async def get_alias(self) -> str:
@property
@requires_update
def alias(self) -> str:
"""Return device name (alias).
:return: Device name aka alias.
:rtype: str
"""
sys_info = await self.get_sys_info()
sys_info = self.sys_info
return str(sys_info["alias"])
async def set_alias(self, alias: str) -> None:
@ -240,6 +283,7 @@ class SmartDevice:
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_dev_alias", {"alias": alias})
await self.update()
async def get_icon(self) -> Dict:
"""Return device icon.
@ -329,7 +373,9 @@ class SmartDevice:
"""
return await self._query_helper("time", "get_timezone")
async def get_hw_info(self) -> Dict:
@property
@requires_update
def hw_info(self) -> Dict:
"""Return hardware information.
:return: Information about hardware
@ -347,47 +393,53 @@ class SmartDevice:
"oemId",
"dev_name",
]
sys_info = await self.get_sys_info()
sys_info = self.sys_info
return {key: sys_info[key] for key in keys if key in sys_info}
async def get_location(self) -> Dict:
@property
@requires_update
def location(self) -> Dict:
"""Return geographical location.
:return: latitude and longitude
:rtype: dict
"""
info = await self.get_sys_info()
sys_info = self.sys_info
loc = {"latitude": None, "longitude": None}
if "latitude" in info and "longitude" in info:
loc["latitude"] = info["latitude"]
loc["longitude"] = info["longitude"]
elif "latitude_i" in info and "longitude_i" in info:
loc["latitude"] = info["latitude_i"]
loc["longitude"] = info["longitude_i"]
if "latitude" in sys_info and "longitude" in sys_info:
loc["latitude"] = sys_info["latitude"]
loc["longitude"] = sys_info["longitude"]
elif "latitude_i" in sys_info and "longitude_i" in sys_info:
loc["latitude"] = sys_info["latitude_i"]
loc["longitude"] = sys_info["longitude_i"]
else:
_LOGGER.warning("Unsupported device location.")
return loc
async def get_rssi(self) -> Optional[int]:
@property
@requires_update
def rssi(self) -> Optional[int]:
"""Return WiFi signal strenth (rssi).
:return: rssi
:rtype: int
"""
sys_info = await self.get_sys_info()
sys_info = self.sys_info
if "rssi" in sys_info:
return int(sys_info["rssi"])
return None
async def get_mac(self) -> str:
@property
@requires_update
def mac(self) -> str:
"""Return mac address.
:return: mac address in hexadecimal with colons, e.g. 01:23:45:67:89:ab
:rtype: str
"""
sys_info = await self.get_sys_info()
sys_info = self.sys_info
if "mac" in sys_info:
return str(sys_info["mac"])
@ -407,7 +459,9 @@ class SmartDevice:
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_mac_addr", {"mac": mac})
await self.update()
@requires_update
async def get_emeter_realtime(self) -> EmeterStatus:
"""Retrieve current energy readings.
@ -415,11 +469,12 @@ class SmartDevice:
:rtype: dict, None
:raises SmartDeviceException: on error
"""
if not await self.get_has_emeter():
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
return EmeterStatus(await self._query_helper(self.emeter_type, "get_realtime"))
@requires_update
async def get_emeter_daily(
self, year: int = None, month: int = None, kwh: bool = True
) -> Dict:
@ -433,7 +488,7 @@ class SmartDevice:
:rtype: dict
:raises SmartDeviceException: on error
"""
if not await self.get_has_emeter():
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
if year is None:
@ -454,6 +509,7 @@ class SmartDevice:
return data
@requires_update
async def get_emeter_monthly(self, year: int = None, kwh: bool = True) -> Dict:
"""Retrieve monthly statistics for a given year.
@ -463,7 +519,7 @@ class SmartDevice:
:rtype: dict
:raises SmartDeviceException: on error
"""
if not await self.get_has_emeter():
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
if year is None:
@ -480,24 +536,27 @@ class SmartDevice:
return {entry["month"]: entry[key] for entry in response}
@requires_update
async def erase_emeter_stats(self):
"""Erase energy meter statistics.
:return: True if statistics were deleted
:raises SmartDeviceException: on error
"""
if not await self.get_has_emeter():
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
await self._query_helper(self.emeter_type, "erase_emeter_stat", None)
await self.update()
@requires_update
async def current_consumption(self) -> float:
"""Get the current power consumption in Watt.
:return: the current power consumption in Watts.
:raises SmartDeviceException: on error
"""
if not await self.get_has_emeter():
if not self.has_emeter:
raise SmartDeviceException("Device has no emeter")
response = EmeterStatus(await self.get_emeter_realtime())
@ -518,19 +577,23 @@ class SmartDevice:
"""Turn off the device."""
raise NotImplementedError("Device subclass needs to implement this.")
async def is_off(self) -> bool:
@property
@requires_update
def is_off(self) -> bool:
"""Return True if device is off.
:return: True if device is off, False otherwise.
:rtype: bool
"""
return not await self.is_on()
return not self.is_on
async def turn_on(self) -> None:
"""Turn device on."""
raise NotImplementedError("Device subclass needs to implement this.")
async def is_on(self) -> bool:
@property
@requires_update
def is_on(self) -> bool:
"""Return if the device is on.
:return: True if the device is on, False otherwise.
@ -539,7 +602,9 @@ class SmartDevice:
"""
raise NotImplementedError("Device subclass needs to implement this.")
async def get_state_information(self) -> Dict[str, Any]:
@property
@requires_update
def state_information(self) -> Dict[str, Any]:
"""Return device-type specific, end-user friendly state information.
:return: dict with state information.
@ -567,22 +632,25 @@ class SmartDevice:
"""Return True if the device is a strip."""
return self._device_type == DeviceType.Strip
async def is_dimmable(self):
@property
def is_dimmable(self):
"""Return True if the device is dimmable."""
return False
async def is_variable_color_temp(self) -> bool:
@property
def is_variable_color_temp(self) -> bool:
"""Return True if the device supports color temperature."""
return False
def __repr__(self):
self.sync.update()
return "<{} model {} at {} ({}), is_on: {} - dev specific: {}>".format(
self.__class__.__name__,
self.sync.get_model(),
self.model,
self.host,
self.sync.get_alias(),
self.sync.is_on(),
self.sync.get_state_information(),
self.alias,
self.is_on,
self.sync.state_information,
)

View File

@ -4,7 +4,12 @@ import logging
from typing import Any, Dict
from pyHS100.protocol import TPLinkSmartHomeProtocol
from pyHS100.smartdevice import DeviceType, SmartDevice, SmartDeviceException
from pyHS100.smartdevice import (
DeviceType,
SmartDevice,
SmartDeviceException,
requires_update,
)
_LOGGER = logging.getLogger(__name__)
@ -17,14 +22,14 @@ class SmartPlug(SmartDevice):
p = SmartPlug("192.168.1.105")
# print the devices alias
print(p.sync.get_alias())
print(p.sync.alias)
# change state of plug
p.sync.turn_on()
p.sync.turn_off()
# query and print current state of plug
print(p.sync.get_state_information())
print(p.sync.state_information)
```
Omit the `sync` attribute to get coroutines.
@ -46,7 +51,9 @@ class SmartPlug(SmartDevice):
self.emeter_type = "emeter"
self._device_type = DeviceType.Plug
async def get_brightness(self) -> int:
@property
@requires_update
def brightness(self) -> int:
"""Return current brightness on dimmers.
Will return a range between 0 - 100.
@ -54,12 +61,13 @@ class SmartPlug(SmartDevice):
:returns: integer
:rtype: int
"""
if not await self.is_dimmable():
if not self.is_dimmable:
raise SmartDeviceException("Device is not dimmable.")
sys_info = await self.get_sys_info()
sys_info = self.sys_info
return int(sys_info["brightness"])
@requires_update
async def set_brightness(self, value: int):
"""Set the new dimmer brightness level.
@ -70,7 +78,7 @@ class SmartPlug(SmartDevice):
:param value: integer between 1 and 100
"""
if not await self.is_dimmable():
if not self.is_dimmable:
raise SmartDeviceException("Device is not dimmable.")
if not isinstance(value, int):
@ -80,34 +88,41 @@ class SmartPlug(SmartDevice):
await self._query_helper(
"smartlife.iot.dimmer", "set_brightness", {"brightness": value}
)
await self.update()
else:
raise ValueError("Brightness value %s is not valid." % value)
async def is_dimmable(self):
@property
@requires_update
def is_dimmable(self):
"""Whether the switch supports brightness changes.
:return: True if switch supports brightness changes, False otherwise
:rtype: bool
"""
sys_info = await self.get_sys_info()
sys_info = self.sys_info
return "brightness" in sys_info
async def get_has_emeter(self):
@property
@requires_update
def has_emeter(self):
"""Return whether device has an energy meter.
:return: True if energy meter is available
False otherwise
"""
sys_info = await self.get_sys_info()
sys_info = self.sys_info
features = sys_info["feature"].split(":")
return "ENE" in features
async def is_on(self) -> bool:
@property
@requires_update
def is_on(self) -> bool:
"""Return whether device is on.
:return: True if device is on, False otherwise
"""
sys_info = await self.get_sys_info()
sys_info = self.sys_info
return bool(sys_info["relay_state"])
async def turn_on(self):
@ -116,6 +131,7 @@ class SmartPlug(SmartDevice):
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_relay_state", {"state": 1})
await self.update()
async def turn_off(self):
"""Turn the switch off.
@ -123,14 +139,17 @@ class SmartPlug(SmartDevice):
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_relay_state", {"state": 0})
await self.update()
async def get_led(self) -> bool:
@property
@requires_update
def led(self) -> bool:
"""Return the state of the led.
:return: True if led is on, False otherwise
:rtype: bool
"""
sys_info = await self.get_sys_info()
sys_info = self.sys_info
return bool(1 - sys_info["led_off"])
async def set_led(self, state: bool):
@ -140,14 +159,17 @@ class SmartPlug(SmartDevice):
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_led_off", {"off": int(not state)})
await self.update()
async def get_on_since(self) -> datetime.datetime:
@property
@requires_update
def on_since(self) -> datetime.datetime:
"""Return pretty-printed on-time.
:return: datetime for on since
:rtype: datetime
"""
sys_info = await self.get_sys_info()
sys_info = self.sys_info
if self.context:
for plug in sys_info["children"]:
if plug["id"] == self.context:
@ -158,16 +180,15 @@ class SmartPlug(SmartDevice):
return datetime.datetime.now() - datetime.timedelta(seconds=on_time)
async def get_state_information(self) -> Dict[str, Any]:
@property
@requires_update
def state_information(self) -> Dict[str, Any]:
"""Return switch-specific state information.
:return: Switch information dict, keys in user-presentable form.
:rtype: dict
"""
info = {
"LED state": await self.get_led(),
"On since": await self.get_on_since(),
}
if await self.is_dimmable():
info["Brightness"] = await self.get_brightness()
info = {"LED state": self.led, "On since": self.on_since}
if self.is_dimmable:
info["Brightness"] = self.brightness
return info

View File

@ -8,7 +8,7 @@ from collections import defaultdict
from typing import Any, DefaultDict, Dict, List
from pyHS100.protocol import TPLinkSmartHomeProtocol
from pyHS100.smartdevice import DeviceType
from pyHS100.smartdevice import DeviceType, requires_update
from pyHS100.smartplug import SmartPlug
_LOGGER = logging.getLogger(__name__)
@ -22,7 +22,7 @@ class SmartStrip(SmartPlug):
p = SmartStrip("192.168.1.105")
# query the state of the strip
print(p.sync.is_on())
print(p.is_on)
# change state of all outlets
p.sync.turn_on()
@ -30,7 +30,7 @@ class SmartStrip(SmartPlug):
# individual outlets are accessible through plugs variable
for plug in p.plugs:
print(f"{p}: {p.sync.is_on()}")
print(f"{p}: {p.is_on}")
# change state of a single outlet
p.plugs[0].sync.turn_on()
@ -66,20 +66,32 @@ class SmartStrip(SmartPlug):
)
)
async def is_on(self) -> bool:
@property
@requires_update
def is_on(self) -> bool:
"""Return if any of the outlets are on."""
for plug in self.plugs:
is_on = await plug.is_on()
is_on = plug.is_on
if is_on:
return True
return False
async def update(self):
"""Update some of the attributes.
Needed for methods that are decorated with `requires_update`.
"""
await super().update()
for plug in self.plugs:
await plug.update()
async def turn_on(self):
"""Turn the strip on.
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_relay_state", {"state": 1})
await self.update()
async def turn_off(self):
"""Turn the strip off.
@ -87,21 +99,26 @@ class SmartStrip(SmartPlug):
:raises SmartDeviceException: on error
"""
await self._query_helper("system", "set_relay_state", {"state": 0})
await self.update()
async def get_on_since(self) -> datetime.datetime:
@property
@requires_update
def on_since(self) -> datetime.datetime:
"""Return the maximum on-time of all outlets."""
return max([await plug.get_on_since() for plug in self.plugs])
return max(plug.on_since for plug in self.plugs)
async def state_information(self) -> Dict[str, Any]:
@property
@requires_update
def state_information(self) -> Dict[str, Any]:
"""Return strip-specific state information.
:return: Strip information dict, keys in user-presentable form.
:rtype: dict
"""
state: Dict[str, Any] = {"LED state": await self.get_led()}
state: Dict[str, Any] = {"LED state": self.led}
for plug in self.plugs:
if await plug.is_on():
state["Plug %s on since" % str(plug)] = await plug.get_on_since()
if plug.is_on:
state["Plug %s on since" % str(plug)] = self.on_since
return state
@ -116,7 +133,7 @@ class SmartStrip(SmartPlug):
return consumption
async def icon(self) -> Dict:
async def get_icon(self) -> Dict:
"""Icon for the device.
Overriden to keep the API, as the SmartStrip and children do not
@ -132,6 +149,7 @@ class SmartStrip(SmartPlug):
"""
return await super().set_alias(alias)
@requires_update
async def get_emeter_daily(
self, year: int = None, month: int = None, kwh: bool = True
) -> Dict:
@ -154,6 +172,7 @@ class SmartStrip(SmartPlug):
emeter_daily[day] += value
return emeter_daily
@requires_update
async def get_emeter_monthly(self, year: int = None, kwh: bool = True) -> Dict:
"""Retrieve monthly statistics for a given year.
@ -170,6 +189,7 @@ class SmartStrip(SmartPlug):
emeter_monthly[month] += value
return emeter_monthly
@requires_update
async def erase_emeter_stats(self):
"""Erase energy meter statistics for all plugs.

View File

@ -32,10 +32,11 @@ from .newfakes import (
@plug
def test_plug_sysinfo(dev):
assert dev.sync.get_sys_info() is not None
PLUG_SCHEMA(dev.sync.get_sys_info())
dev.sync.update()
assert dev.sys_info is not None
PLUG_SCHEMA(dev.sys_info)
assert dev.sync.get_model() is not None
assert dev.model is not None
assert dev.device_type == DeviceType.Plug or dev.device_type == DeviceType.Strip
assert dev.is_plug or dev.is_strip
@ -43,23 +44,26 @@ def test_plug_sysinfo(dev):
@bulb
def test_bulb_sysinfo(dev):
assert dev.sync.get_sys_info() is not None
BULB_SCHEMA(dev.sync.get_sys_info())
dev.sync.update()
assert dev.sys_info is not None
BULB_SCHEMA(dev.sys_info)
assert dev.sync.get_model() is not None
assert dev.model is not None
assert dev.device_type == DeviceType.Bulb
assert dev.is_bulb
def test_state_info(dev):
assert isinstance(dev.sync.get_state_information(), dict)
dev.sync.update()
assert isinstance(dev.sync.state_information, dict)
def test_invalid_connection(dev):
with patch.object(FakeTransportProtocol, "query", side_effect=SmartDeviceException):
with pytest.raises(SmartDeviceException):
dev.sync.is_on()
dev.sync.update()
dev.is_on
def test_query_helper(dev):
@ -71,26 +75,28 @@ def test_query_helper(dev):
@turn_on
def test_state(dev, turn_on):
handle_turn_on(dev, turn_on)
orig_state = dev.sync.is_on()
dev.sync.update()
orig_state = dev.is_on
if orig_state:
dev.sync.turn_off()
assert not dev.sync.is_on()
assert dev.sync.is_off()
assert not dev.is_on
assert dev.is_off
dev.sync.turn_on()
assert dev.sync.is_on()
assert not dev.sync.is_off()
assert dev.is_on
assert not dev.is_off
else:
dev.sync.turn_on()
assert dev.sync.is_on()
assert not dev.sync.is_off()
assert dev.is_on
assert not dev.is_off
dev.sync.turn_off()
assert not dev.sync.is_on()
assert dev.sync.is_off()
assert not dev.is_on
assert dev.is_off
@no_emeter
def test_no_emeter(dev):
assert not dev.sync.get_has_emeter()
dev.sync.update()
assert not dev.has_emeter
with pytest.raises(SmartDeviceException):
dev.sync.get_emeter_realtime()
@ -104,10 +110,11 @@ def test_no_emeter(dev):
@has_emeter
def test_get_emeter_realtime(dev):
dev.sync.update()
if dev.is_strip:
pytest.skip("Disabled for HS300 temporarily")
assert dev.sync.get_has_emeter()
assert dev.has_emeter
current_emeter = dev.sync.get_emeter_realtime()
CURRENT_CONSUMPTION_SCHEMA(current_emeter)
@ -115,10 +122,11 @@ def test_get_emeter_realtime(dev):
@has_emeter
def test_get_emeter_daily(dev):
dev.sync.update()
if dev.is_strip:
pytest.skip("Disabled for HS300 temporarily")
assert dev.sync.get_has_emeter()
assert dev.has_emeter
assert dev.sync.get_emeter_daily(year=1900, month=1) == {}
@ -137,10 +145,11 @@ def test_get_emeter_daily(dev):
@has_emeter
def test_get_emeter_monthly(dev):
dev.sync.update()
if dev.is_strip:
pytest.skip("Disabled for HS300 temporarily")
assert dev.sync.get_has_emeter()
assert dev.has_emeter
assert dev.sync.get_emeter_monthly(year=1900) == {}
@ -159,10 +168,11 @@ def test_get_emeter_monthly(dev):
@has_emeter
def test_emeter_status(dev):
dev.sync.update()
if dev.is_strip:
pytest.skip("Disabled for HS300 temporarily")
assert dev.sync.get_has_emeter()
assert dev.has_emeter
d = dev.sync.get_emeter_realtime()
@ -181,17 +191,19 @@ def test_emeter_status(dev):
@pytest.mark.skip("not clearing your stats..")
@has_emeter
def test_erase_emeter_stats(dev):
assert dev.sync.get_has_emeter()
dev.sync.update()
assert dev.has_emeter
dev.sync.erase_emeter()
@has_emeter
def test_current_consumption(dev):
dev.sync.update()
if dev.is_strip:
pytest.skip("Disabled for HS300 temporarily")
if dev.sync.get_has_emeter():
if dev.has_emeter:
x = dev.sync.current_consumption()
assert isinstance(x, float)
assert x >= 0.0
@ -200,34 +212,36 @@ def test_current_consumption(dev):
def test_alias(dev):
dev.sync.update()
test_alias = "TEST1234"
original = dev.sync.get_alias()
original = dev.sync.alias
assert isinstance(original, str)
dev.sync.set_alias(test_alias)
assert dev.sync.get_alias() == test_alias
assert dev.sync.alias == test_alias
dev.sync.set_alias(original)
assert dev.sync.get_alias() == original
assert dev.sync.alias == original
@plug
def test_led(dev):
original = dev.sync.get_led()
dev.sync.update()
original = dev.led
dev.sync.set_led(False)
assert not dev.sync.get_led()
dev.sync.set_led(True)
assert not dev.led
assert dev.sync.get_led()
dev.sync.set_led(True)
assert dev.led
dev.sync.set_led(original)
@plug
def test_on_since(dev):
assert isinstance(dev.sync.get_on_since(), datetime.datetime)
dev.sync.update()
assert isinstance(dev.on_since, datetime.datetime)
def test_icon(dev):
@ -244,35 +258,41 @@ def test_timezone(dev):
def test_hw_info(dev):
PLUG_SCHEMA(dev.sync.get_hw_info())
dev.sync.update()
PLUG_SCHEMA(dev.hw_info)
def test_location(dev):
PLUG_SCHEMA(dev.sync.get_location())
dev.sync.update()
PLUG_SCHEMA(dev.location)
def test_rssi(dev):
PLUG_SCHEMA({"rssi": dev.sync.get_rssi()}) # wrapping for vol
dev.sync.update()
PLUG_SCHEMA({"rssi": dev.rssi}) # wrapping for vol
def test_mac(dev):
PLUG_SCHEMA({"mac": dev.sync.get_mac()}) # wrapping for val
dev.sync.update()
PLUG_SCHEMA({"mac": dev.mac}) # wrapping for val
# TODO check setting?
@non_variable_temp
def test_temperature_on_nonsupporting(dev):
assert dev.sync.get_valid_temperature_range() == (0, 0)
dev.sync.update()
assert dev.valid_temperature_range == (0, 0)
# TODO test when device does not support temperature range
with pytest.raises(SmartDeviceException):
dev.sync.set_color_temp(2700)
with pytest.raises(SmartDeviceException):
print(dev.sync.get_color_temp())
print(dev.sync.color_temp)
@variable_temp
def test_out_of_range_temperature(dev):
dev.sync.update()
with pytest.raises(ValueError):
dev.sync.set_color_temp(1000)
with pytest.raises(ValueError):
@ -281,10 +301,11 @@ def test_out_of_range_temperature(dev):
@non_dimmable
def test_non_dimmable(dev):
assert not dev.sync.is_dimmable()
dev.sync.update()
assert not dev.is_dimmable
with pytest.raises(SmartDeviceException):
assert dev.sync.get_brightness() == 0
assert dev.brightness == 0
with pytest.raises(SmartDeviceException):
dev.sync.set_brightness(100)
@ -293,13 +314,14 @@ def test_non_dimmable(dev):
@turn_on
def test_dimmable_brightness(dev, turn_on):
handle_turn_on(dev, turn_on)
assert dev.sync.is_dimmable()
dev.sync.update()
assert dev.is_dimmable
dev.sync.set_brightness(50)
assert dev.sync.get_brightness() == 50
assert dev.brightness == 50
dev.sync.set_brightness(10)
assert dev.sync.get_brightness() == 10
assert dev.brightness == 10
with pytest.raises(ValueError):
dev.sync.set_brightness("foo")
@ -307,7 +329,8 @@ def test_dimmable_brightness(dev, turn_on):
@dimmable
def test_invalid_brightness(dev):
assert dev.sync.is_dimmable()
dev.sync.update()
assert dev.is_dimmable
with pytest.raises(ValueError):
dev.sync.set_brightness(110)
@ -320,16 +343,17 @@ def test_invalid_brightness(dev):
@turn_on
def test_hsv(dev, turn_on):
handle_turn_on(dev, turn_on)
assert dev.sync.is_color()
dev.sync.update()
assert dev.is_color
hue, saturation, brightness = dev.sync.get_hsv()
hue, saturation, brightness = dev.hsv
assert 0 <= hue <= 255
assert 0 <= saturation <= 100
assert 0 <= brightness <= 100
dev.sync.set_hsv(hue=1, saturation=1, value=1)
hue, saturation, brightness = dev.sync.get_hsv()
hue, saturation, brightness = dev.hsv
assert hue == 1
assert saturation == 1
assert brightness == 1
@ -339,8 +363,8 @@ def test_hsv(dev, turn_on):
@turn_on
def test_invalid_hsv(dev, turn_on):
handle_turn_on(dev, turn_on)
assert dev.sync.is_color()
dev.sync.update()
assert dev.is_color
for invalid_hue in [-1, 361, 0.5]:
with pytest.raises(ValueError):
@ -357,67 +381,79 @@ def test_invalid_hsv(dev, turn_on):
@non_color_bulb
def test_hsv_on_non_color(dev):
assert not dev.sync.is_color()
dev.sync.update()
assert not dev.is_color
with pytest.raises(SmartDeviceException):
dev.sync.set_hsv(0, 0, 0)
with pytest.raises(SmartDeviceException):
print(dev.sync.get_hsv())
print(dev.hsv)
@variable_temp
@turn_on
def test_try_set_colortemp(dev, turn_on):
dev.sync.update()
handle_turn_on(dev, turn_on)
dev.sync.set_color_temp(2700)
assert dev.sync.get_color_temp() == 2700
assert dev.sync.color_temp == 2700
@non_variable_temp
def test_non_variable_temp(dev):
with pytest.raises(SmartDeviceException):
dev.sync.update()
dev.sync.set_color_temp(2700)
@strip
@turn_on
def test_children_change_state(dev, turn_on):
dev.sync.update()
handle_turn_on(dev, turn_on)
for plug in dev.plugs:
orig_state = plug.sync.is_on()
plug.sync.update()
orig_state = plug.is_on
if orig_state:
plug.turn_off()
assert not plug.sync.is_on()
assert plug.sync.is_off()
plug.sync.update()
assert not plug.is_on
assert plug.is_off
plug.sync.turn_on()
assert plug.sync.is_on()
assert not plug.sync.is_off()
plug.sync.update()
assert plug.is_on
assert not plug.is_off
else:
plug.sync.turn_on()
assert plug.sync.is_on()
assert not plug.sync.is_off()
plug.sync.update()
assert plug.is_on
assert not plug.is_off
plug.sync.turn_off()
assert not plug.sync.is_on()
assert plug.sync.is_off()
plug.sync.update()
assert not plug.is_on
assert plug.is_off
@strip
def test_children_alias(dev):
test_alias = "TEST1234"
for plug in dev.plugs:
original = plug.sync.get_alias()
plug.sync.update()
original = plug.alias
plug.sync.set_alias(alias=test_alias)
assert plug.sync.get_alias() == test_alias
plug.sync.update()
assert plug.alias == test_alias
plug.sync.set_alias(alias=original)
assert plug.sync.get_alias() == original
plug.sync.update()
assert plug.alias == original
@strip
def test_children_on_since(dev):
for plug in dev.plugs:
assert plug.sync.get_on_since()
plug.sync.update()
assert plug.on_since
@pytest.mark.skip("this test will wear out your relays")
@ -435,7 +471,7 @@ def test_all_binary_states(dev):
dev.sync.turn_off(index=plug_index)
# check state map applied
for index, state in dev.sync.get_is_on().items():
for index, state in dev.is_on.items():
assert state_map[index] == state
# toggle each outlet with state map applied
@ -448,7 +484,7 @@ def test_all_binary_states(dev):
dev.sync.turn_on(index=plug_index)
# only target outlet should have state changed
for index, state in dev.sync.get_is_on().items():
for index, state in dev.is_on.items():
if index == plug_index:
assert state != state_map[index]
else:
@ -461,15 +497,17 @@ def test_all_binary_states(dev):
dev.sync.turn_off(index=plug_index)
# original state map should be restored
for index, state in dev.sync.get_is_on().items():
for index, state in dev.is_on.items():
assert state == state_map[index]
@strip
def test_children_get_emeter_realtime(dev):
assert dev.sync.get_has_emeter()
dev.sync.update()
assert dev.has_emeter
# test with index
for plug in dev.plugs:
plug.sync.update()
emeter = plug.sync.get_emeter_realtime()
CURRENT_CONSUMPTION_SCHEMA(emeter)
@ -482,9 +520,11 @@ def test_children_get_emeter_realtime(dev):
@strip
def test_children_get_emeter_daily(dev):
assert dev.sync.get_has_emeter()
dev.sync.update()
assert dev.has_emeter
# test individual emeters
for plug in dev.plugs:
plug.sync.update()
emeter = plug.sync.get_emeter_daily(year=1900, month=1)
assert emeter == {}
@ -505,9 +545,11 @@ def test_children_get_emeter_daily(dev):
@strip
def test_children_get_emeter_monthly(dev):
assert dev.sync.get_has_emeter()
dev.sync.update()
assert dev.has_emeter
# test individual emeters
for plug in dev.plugs:
plug.sync.update()
emeter = plug.sync.get_emeter_monthly(year=1900)
assert emeter == {}
@ -539,9 +581,9 @@ def test_children_get_emeter_monthly(dev):
# if dev.is_strip:
# CHECK_COUNT = 0
# dev.sync.get_sys_info()
# dev.sys_info
# assert query_mock.call_count == CHECK_COUNT
# dev.sync.get_sys_info()
# dev.sys_info
# assert query_mock.call_count == CHECK_COUNT
@ -553,15 +595,14 @@ def test_children_get_emeter_monthly(dev):
# with patch.object(
# FakeTransportProtocol, "query", wraps=dev.protocol.query
# ) as query_mock:
# dev.sync.get_sys_info()
# dev.sys_info
# assert query_mock.call_count == 1
# dev.sync.get_sys_info()
# dev.sys_info
# assert query_mock.call_count == 2
# # assert query_mock.called_once()
def test_representation(dev):
import re
pattern = re.compile("<.* model .* at .* (.*), is_on: .* - dev specific: .*>")
assert pattern.match(str(dev))