diff --git a/pyHS100/cli.py b/pyHS100/cli.py index 4921531e..594ec165 100755 --- a/pyHS100/cli.py +++ b/pyHS100/cli.py @@ -151,7 +151,7 @@ def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attempts=3) click.echo(f"Attempt {attempt} of {attempts}") found_devs = Discover.discover(target=target, timeout=timeout).items() for ip, dev in found_devs: - if dev.sync.get_alias().lower() == alias.lower(): + if dev.alias.lower() == alias.lower(): host = dev.host return host return None @@ -161,8 +161,9 @@ def find_host_from_alias(alias, target="255.255.255.255", timeout=1, attempts=3) @pass_dev def sysinfo(dev): """Print out full system information.""" + dev.sync.update() click.echo(click.style("== System info ==", bold=True)) - click.echo(pf(dev.sync.get_sys_info())) + click.echo(pf(dev.sys_info)) @cli.command() @@ -170,23 +171,20 @@ def sysinfo(dev): @click.pass_context def state(ctx, dev: SmartDevice): """Print out device state and versions.""" - click.echo( - click.style( - "== {} - {} ==".format(dev.sync.get_alias(), dev.sync.get_model()), - bold=True, - ) - ) + dev.sync.update() + click.echo(click.style("== {} - {} ==".format(dev.alias, dev.model), bold=True)) click.echo( click.style( - "Device state: {}".format("ON" if dev.sync.is_on() else "OFF"), - fg="green" if dev.sync.is_on() else "red", + "Device state: {}".format("ON" if dev.is_on else "OFF"), + fg="green" if dev.is_on else "red", ) ) if dev.is_strip: for plug in dev.plugs: # type: ignore - is_on = plug.sync.get_is_on() - alias = plug.sync.get_alias() + plug.sync.update() + is_on = plug.is_on + alias = plug.alias click.echo( click.style( " * {} state: {}".format(alias, ("ON" if is_on else "OFF")), @@ -195,15 +193,14 @@ def state(ctx, dev: SmartDevice): ) click.echo(f"Host/IP: {dev.host}") - for k, v in dev.sync.get_state_information().items(): + for k, v in dev.state_information.items(): click.echo(f"{k}: {v}") - hw_info = dev.sync.get_hw_info() click.echo(click.style("== Generic information ==", bold=True)) click.echo("Time: {}".format(dev.sync.get_time())) - click.echo("Hardware: {}".format(hw_info["hw_ver"])) - click.echo("Software: {}".format(hw_info["sw_ver"])) - click.echo("MAC (rssi): {} ({})".format(dev.sync.get_mac(), dev.sync.get_rssi())) - click.echo("Location: {}".format(dev.sync.get_location())) + click.echo("Hardware: {}".format(dev.hw_info["hw_ver"])) + click.echo("Software: {}".format(dev.hw_info["sw_ver"])) + click.echo("MAC (rssi): {} ({})".format(dev.mac, dev.rssi)) + click.echo("Location: {}".format(dev.location)) ctx.invoke(emeter) @@ -217,7 +214,7 @@ def alias(dev, new_alias): click.echo(f"Setting alias to {new_alias}") dev.sync.set_alias(new_alias) - click.echo(f"Alias: {dev.sync.get_alias()}") + click.echo(f"Alias: {dev.alias}") @cli.command() @@ -232,6 +229,7 @@ def raw_command(dev: SmartDevice, module, command, parameters): if parameters is not None: parameters = ast.literal_eval(parameters) res = dev.sync._query_helper(module, command, parameters) + dev.sync.update() click.echo(res) @@ -243,7 +241,8 @@ def raw_command(dev: SmartDevice, module, command, parameters): def emeter(dev, year, month, erase): """Query emeter for historical consumption.""" click.echo(click.style("== Emeter ==", bold=True)) - if not dev.sync.get_has_emeter(): + dev.sync.update() + if not dev.has_emeter: click.echo("Device has no emeter") return @@ -274,11 +273,12 @@ def emeter(dev, year, month, erase): @pass_dev def brightness(dev, brightness): """Get or set brightness.""" - if not dev.sync.is_dimmable(): + dev.sync.update() + if not dev.is_dimmable: click.echo("This device does not support brightness.") return if brightness is None: - click.echo("Brightness: %s" % dev.sync.get_brightness()) + click.echo("Brightness: %s" % dev.brightness) else: click.echo("Setting brightness to %s" % brightness) dev.sync.set_brightness(brightness) @@ -292,14 +292,14 @@ def brightness(dev, brightness): def temperature(dev: SmartBulb, temperature): """Get or set color temperature.""" if temperature is None: - click.echo(f"Color temperature: {dev.sync.get_color_temp()}") - valid_temperature_range = dev.sync.get_valid_temperature_range() + click.echo(f"Color temperature: {dev.color_temp}") + valid_temperature_range = dev.valid_temperature_range if valid_temperature_range != (0, 0): click.echo("(min: {}, max: {})".format(*valid_temperature_range)) else: click.echo( "Temperature range unknown, please open a github issue" - f" or a pull request for model '{dev.sync.get_model()}'" + f" or a pull request for model '{dev.model}'" ) else: click.echo(f"Setting color temperature to {temperature}") @@ -315,7 +315,7 @@ def temperature(dev: SmartBulb, temperature): def hsv(dev, ctx, h, s, v): """Get or set color in HSV. (Bulb only).""" if h is None or s is None or v is None: - click.echo("Current HSV: %s %s %s" % dev.sync.get_hsv()) + click.echo("Current HSV: %s %s %s" % dev.hsv) elif s is None or v is None: raise click.BadArgumentUsage("Setting a color requires 3 values.", ctx) else: @@ -332,7 +332,7 @@ def led(dev, state): click.echo("Turning led to %s" % state) dev.sync.set_led(state) else: - click.echo("LED state: %s" % dev.sync.get_led()) + click.echo("LED state: %s" % dev.led) @cli.command() diff --git a/pyHS100/smartbulb.py b/pyHS100/smartbulb.py index 5dd02071..70b40bf2 100644 --- a/pyHS100/smartbulb.py +++ b/pyHS100/smartbulb.py @@ -1,9 +1,14 @@ """Module for bulbs.""" import re -from typing import Any, Dict, Tuple +from typing import Any, Dict, Optional, Tuple from pyHS100.protocol import TPLinkSmartHomeProtocol -from pyHS100.smartdevice import DeviceType, SmartDevice, SmartDeviceException +from pyHS100.smartdevice import ( + DeviceType, + SmartDevice, + SmartDeviceException, + requires_update, +) TPLINK_KELVIN = { "LB130": (2500, 9000), @@ -24,14 +29,14 @@ class SmartBulb(SmartDevice): p = SmartBulb("192.168.1.105") # print the devices alias - print(p.sync.get_alias()) + print(p.sync.alias) # change state of bulb p.sync.turn_on() p.sync.turn_off() # query and print current state of plug - print(p.sync.get_state_information()) + print(p.sync.state_information()) # check whether the bulb supports color changes if p.sync.is_color(): @@ -40,7 +45,7 @@ class SmartBulb(SmartDevice): p.sync.set_hsv(180, 100, 100) # get the current HSV value - print(p.sync.get_hsv()) + print(p.sync.hsv()) # check whether the bulb supports setting color temperature if p.sync.is_variable_color_temp(): @@ -48,16 +53,16 @@ class SmartBulb(SmartDevice): p.sync.set_color_temp(3000) # get the current color temperature - print(p.sync.get_color_temp()) + print(p.sync.color_temp) # check whether the bulb is dimmable - if p.sync.is_dimmable(): + if p.is_dimmable: # set the bulb to 50% brightness p.sync.set_brightness(50) # check the current brightness - print(p.sync.get_brightness()) + print(p.brightness) ``` Omit the `sync` attribute to get coroutines. @@ -87,70 +92,94 @@ class SmartBulb(SmartDevice): ) self.emeter_type = "smartlife.iot.common.emeter" self._device_type = DeviceType.Bulb + self._light_state = None - async def is_color(self) -> bool: + @property + @requires_update + def is_color(self) -> bool: """Whether the bulb supports color changes. :return: True if the bulb supports color changes, False otherwise :rtype: bool """ - sys_info = await self.get_sys_info() + sys_info = self.sys_info return bool(sys_info["is_color"]) - async def is_dimmable(self) -> bool: + @property + @requires_update + def is_dimmable(self) -> bool: """Whether the bulb supports brightness changes. :return: True if the bulb supports brightness changes, False otherwise :rtype: bool """ - sys_info = await self.get_sys_info() + sys_info = self.sys_info return bool(sys_info["is_dimmable"]) - async def is_variable_color_temp(self) -> bool: + @property + @requires_update + def is_variable_color_temp(self) -> bool: """Whether the bulb supports color temperature changes. :return: True if the bulb supports color temperature changes, False otherwise :rtype: bool """ - sys_info = await self.get_sys_info() + sys_info = self.sys_info return bool(sys_info["is_variable_color_temp"]) - async def get_valid_temperature_range(self) -> Tuple[int, int]: + @property + @requires_update + def valid_temperature_range(self) -> Tuple[int, int]: """Return the device-specific white temperature range (in Kelvin). :return: White temperature range in Kelvin (minimun, maximum) :rtype: tuple """ - if not await self.is_variable_color_temp(): + if not self.is_variable_color_temp: return (0, 0) for model, temp_range in TPLINK_KELVIN.items(): - sys_info = await self.get_sys_info() + sys_info = self.sys_info if re.match(model, sys_info["model"]): return temp_range return (0, 0) - async def get_light_state(self) -> Dict: + async def update(self): + """Update `sys_info and `light_state`.""" + self._sys_info = await self.get_sys_info() + self._light_state = await self.get_light_state() + + @property + @requires_update + def light_state(self) -> Optional[Dict[str, Dict]]: + """Query the light state.""" + return self._light_state + + async def get_light_state(self) -> Dict[str, Dict]: """Query the light state.""" return await self._query_helper(self.LIGHT_SERVICE, "get_light_state") async def set_light_state(self, state: Dict) -> Dict: """Set the light state.""" - return await self._query_helper( + light_state = await self._query_helper( self.LIGHT_SERVICE, "transition_light_state", state ) + await self.update() + return light_state - async def get_hsv(self) -> Tuple[int, int, int]: + @property + @requires_update + def hsv(self) -> Tuple[int, int, int]: """Return the current HSV state of the bulb. :return: hue, saturation and value (degrees, %, %) :rtype: tuple """ - if not await self.is_color(): + if not self.is_color: raise SmartDeviceException("Bulb does not support color.") - light_state = await self.get_light_state() - if not await self.is_on(): + light_state = self.light_state + if not self.is_on: hue = light_state["dft_on_state"]["hue"] saturation = light_state["dft_on_state"]["saturation"] value = light_state["dft_on_state"]["brightness"] @@ -167,6 +196,7 @@ class SmartBulb(SmartDevice): "Invalid brightness value: {} " "(valid range: 0-100%)".format(value) ) + @requires_update async def set_hsv(self, hue: int, saturation: int, value: int): """Set new HSV. @@ -174,7 +204,7 @@ class SmartBulb(SmartDevice): :param int saturation: saturation in percentage [0,100] :param int value: value in percentage [0, 100] """ - if not await self.is_color(): + if not self.is_color: raise SmartDeviceException("Bulb does not support color.") if not isinstance(hue, int) or not (0 <= hue <= 360): @@ -198,30 +228,33 @@ class SmartBulb(SmartDevice): } await self.set_light_state(light_state) - async def get_color_temp(self) -> int: + @property + @requires_update + def color_temp(self) -> int: """Return color temperature of the device. :return: Color temperature in Kelvin :rtype: int """ - if not await self.is_variable_color_temp(): + if not self.is_variable_color_temp: raise SmartDeviceException("Bulb does not support colortemp.") - light_state = await self.get_light_state() - if not await self.is_on(): + light_state = self.light_state + if not self.is_on: return int(light_state["dft_on_state"]["color_temp"]) else: return int(light_state["color_temp"]) + @requires_update async def set_color_temp(self, temp: int) -> None: """Set the color temperature of the device. :param int temp: The new color temperature, in Kelvin """ - if not await self.is_variable_color_temp(): + if not self.is_variable_color_temp: raise SmartDeviceException("Bulb does not support colortemp.") - valid_temperature_range = await self.get_valid_temperature_range() + valid_temperature_range = self.valid_temperature_range if temp < valid_temperature_range[0] or temp > valid_temperature_range[1]: raise ValueError( "Temperature should be between {} " @@ -231,27 +264,30 @@ class SmartBulb(SmartDevice): light_state = {"color_temp": temp} await self.set_light_state(light_state) - async def get_brightness(self) -> int: + @property + @requires_update + def brightness(self) -> int: """Return the current brightness. :return: brightness in percent :rtype: int """ - if not await self.is_dimmable(): # pragma: no cover + if not self.is_dimmable: # pragma: no cover raise SmartDeviceException("Bulb is not dimmable.") - light_state = await self.get_light_state() - if not await self.is_on(): + light_state = self.light_state + if not self.is_on: return int(light_state["dft_on_state"]["brightness"]) else: return int(light_state["brightness"]) + @requires_update async def set_brightness(self, brightness: int) -> None: """Set the brightness. :param int brightness: brightness in percent """ - if not await self.is_dimmable(): # pragma: no cover + if not self.is_dimmable: # pragma: no cover raise SmartDeviceException("Bulb is not dimmable.") self._raise_for_invalid_brightness(brightness) @@ -259,27 +295,31 @@ class SmartBulb(SmartDevice): light_state = {"brightness": brightness} await self.set_light_state(light_state) - async def get_state_information(self) -> Dict[str, Any]: + @property + @requires_update + def state_information(self) -> Dict[str, Any]: """Return bulb-specific state information. :return: Bulb information dict, keys in user-presentable form. :rtype: dict """ info: Dict[str, Any] = { - "Brightness": await self.get_brightness(), - "Is dimmable": await self.is_dimmable(), + "Brightness": self.brightness, + "Is dimmable": self.is_dimmable, } - if await self.is_variable_color_temp(): - info["Color temperature"] = await self.get_color_temp() - info["Valid temperature range"] = await self.get_valid_temperature_range() - if await self.is_color(): - info["HSV"] = await self.get_hsv() + if self.is_variable_color_temp: + info["Color temperature"] = self.color_temp + info["Valid temperature range"] = self.valid_temperature_range + if self.is_color: + info["HSV"] = self.hsv return info - async def is_on(self) -> bool: + @property + @requires_update + def is_on(self) -> bool: """Return whether the device is on.""" - light_state = await self.get_light_state() + light_state = self.light_state return bool(light_state["on_off"]) async def turn_off(self) -> None: @@ -290,6 +330,8 @@ class SmartBulb(SmartDevice): """Turn the bulb on.""" await self.set_light_state({"on_off": 1}) - async def get_has_emeter(self) -> bool: + @property + @requires_update + def has_emeter(self) -> bool: """Return that the bulb has an emeter.""" return True diff --git a/pyHS100/smartdevice.py b/pyHS100/smartdevice.py index 8c3c1d57..f80f54d3 100755 --- a/pyHS100/smartdevice.py +++ b/pyHS100/smartdevice.py @@ -79,6 +79,26 @@ class EmeterStatus(dict): raise SmartDeviceException("Unable to find a value for '%s'" % item) +def requires_update(f): + """Indicate that `update` should be called before accessing this method.""" # noqa: D202 + if inspect.iscoroutinefunction(f): + @functools.wraps(f) + async def wrapped(*args, **kwargs): + self = args[0] + assert self._sys_info is not None + return await f(*args, **kwargs) + + else: + @functools.wraps(f) + def wrapped(*args, **kwargs): + self = args[0] + assert self._sys_info is not None + return f(*args, **kwargs) + + f.requires_update = True + return wrapped + + class SmartDevice: """Base class for all supported device types.""" @@ -117,6 +137,7 @@ class SmartDevice: self._device_type = DeviceType.Unknown self.ioloop = ioloop or asyncio.get_event_loop() self.sync = SyncSmartDevice(self, ioloop=self.ioloop) + self._sys_info = None def _result_from_cache(self, target, cmd) -> Optional[Dict]: """Return query result from cache if still fresh. @@ -197,7 +218,7 @@ class SmartDevice: return result - async def get_has_emeter(self) -> bool: + def has_emeter(self) -> bool: """Return if device has an energy meter. :return: True if energey meter is available @@ -214,23 +235,45 @@ class SmartDevice: """ return await self._query_helper("system", "get_sysinfo") - async def get_model(self) -> str: + async def update(self): + """Update some of the attributes. + + Needed for methods that are decorated with `requires_update`. + """ + self._sys_info = await self.get_sys_info() + + @property + @requires_update + def sys_info(self) -> Dict[str, Any]: + """Retrieve system information. + + :return: sysinfo + :rtype dict + :raises SmartDeviceException: on error + """ + return self._sys_info + + @property + @requires_update + def model(self) -> str: """Return device model. :return: device model :rtype: str :raises SmartDeviceException: on error """ - sys_info = await self.get_sys_info() + sys_info = self.sys_info return str(sys_info["model"]) - async def get_alias(self) -> str: + @property + @requires_update + def alias(self) -> str: """Return device name (alias). :return: Device name aka alias. :rtype: str """ - sys_info = await self.get_sys_info() + sys_info = self.sys_info return str(sys_info["alias"]) async def set_alias(self, alias: str) -> None: @@ -240,6 +283,7 @@ class SmartDevice: :raises SmartDeviceException: on error """ await self._query_helper("system", "set_dev_alias", {"alias": alias}) + await self.update() async def get_icon(self) -> Dict: """Return device icon. @@ -329,7 +373,9 @@ class SmartDevice: """ return await self._query_helper("time", "get_timezone") - async def get_hw_info(self) -> Dict: + @property + @requires_update + def hw_info(self) -> Dict: """Return hardware information. :return: Information about hardware @@ -347,47 +393,53 @@ class SmartDevice: "oemId", "dev_name", ] - sys_info = await self.get_sys_info() + sys_info = self.sys_info return {key: sys_info[key] for key in keys if key in sys_info} - async def get_location(self) -> Dict: + @property + @requires_update + def location(self) -> Dict: """Return geographical location. :return: latitude and longitude :rtype: dict """ - info = await self.get_sys_info() + sys_info = self.sys_info loc = {"latitude": None, "longitude": None} - if "latitude" in info and "longitude" in info: - loc["latitude"] = info["latitude"] - loc["longitude"] = info["longitude"] - elif "latitude_i" in info and "longitude_i" in info: - loc["latitude"] = info["latitude_i"] - loc["longitude"] = info["longitude_i"] + if "latitude" in sys_info and "longitude" in sys_info: + loc["latitude"] = sys_info["latitude"] + loc["longitude"] = sys_info["longitude"] + elif "latitude_i" in sys_info and "longitude_i" in sys_info: + loc["latitude"] = sys_info["latitude_i"] + loc["longitude"] = sys_info["longitude_i"] else: _LOGGER.warning("Unsupported device location.") return loc - async def get_rssi(self) -> Optional[int]: + @property + @requires_update + def rssi(self) -> Optional[int]: """Return WiFi signal strenth (rssi). :return: rssi :rtype: int """ - sys_info = await self.get_sys_info() + sys_info = self.sys_info if "rssi" in sys_info: return int(sys_info["rssi"]) return None - async def get_mac(self) -> str: + @property + @requires_update + def mac(self) -> str: """Return mac address. :return: mac address in hexadecimal with colons, e.g. 01:23:45:67:89:ab :rtype: str """ - sys_info = await self.get_sys_info() + sys_info = self.sys_info if "mac" in sys_info: return str(sys_info["mac"]) @@ -407,7 +459,9 @@ class SmartDevice: :raises SmartDeviceException: on error """ await self._query_helper("system", "set_mac_addr", {"mac": mac}) + await self.update() + @requires_update async def get_emeter_realtime(self) -> EmeterStatus: """Retrieve current energy readings. @@ -415,11 +469,12 @@ class SmartDevice: :rtype: dict, None :raises SmartDeviceException: on error """ - if not await self.get_has_emeter(): + if not self.has_emeter: raise SmartDeviceException("Device has no emeter") return EmeterStatus(await self._query_helper(self.emeter_type, "get_realtime")) + @requires_update async def get_emeter_daily( self, year: int = None, month: int = None, kwh: bool = True ) -> Dict: @@ -433,7 +488,7 @@ class SmartDevice: :rtype: dict :raises SmartDeviceException: on error """ - if not await self.get_has_emeter(): + if not self.has_emeter: raise SmartDeviceException("Device has no emeter") if year is None: @@ -454,6 +509,7 @@ class SmartDevice: return data + @requires_update async def get_emeter_monthly(self, year: int = None, kwh: bool = True) -> Dict: """Retrieve monthly statistics for a given year. @@ -463,7 +519,7 @@ class SmartDevice: :rtype: dict :raises SmartDeviceException: on error """ - if not await self.get_has_emeter(): + if not self.has_emeter: raise SmartDeviceException("Device has no emeter") if year is None: @@ -480,24 +536,27 @@ class SmartDevice: return {entry["month"]: entry[key] for entry in response} + @requires_update async def erase_emeter_stats(self): """Erase energy meter statistics. :return: True if statistics were deleted :raises SmartDeviceException: on error """ - if not await self.get_has_emeter(): + if not self.has_emeter: raise SmartDeviceException("Device has no emeter") await self._query_helper(self.emeter_type, "erase_emeter_stat", None) + await self.update() + @requires_update async def current_consumption(self) -> float: """Get the current power consumption in Watt. :return: the current power consumption in Watts. :raises SmartDeviceException: on error """ - if not await self.get_has_emeter(): + if not self.has_emeter: raise SmartDeviceException("Device has no emeter") response = EmeterStatus(await self.get_emeter_realtime()) @@ -518,19 +577,23 @@ class SmartDevice: """Turn off the device.""" raise NotImplementedError("Device subclass needs to implement this.") - async def is_off(self) -> bool: + @property + @requires_update + def is_off(self) -> bool: """Return True if device is off. :return: True if device is off, False otherwise. :rtype: bool """ - return not await self.is_on() + return not self.is_on async def turn_on(self) -> None: """Turn device on.""" raise NotImplementedError("Device subclass needs to implement this.") - async def is_on(self) -> bool: + @property + @requires_update + def is_on(self) -> bool: """Return if the device is on. :return: True if the device is on, False otherwise. @@ -539,7 +602,9 @@ class SmartDevice: """ raise NotImplementedError("Device subclass needs to implement this.") - async def get_state_information(self) -> Dict[str, Any]: + @property + @requires_update + def state_information(self) -> Dict[str, Any]: """Return device-type specific, end-user friendly state information. :return: dict with state information. @@ -567,22 +632,25 @@ class SmartDevice: """Return True if the device is a strip.""" return self._device_type == DeviceType.Strip - async def is_dimmable(self): + @property + def is_dimmable(self): """Return True if the device is dimmable.""" return False - async def is_variable_color_temp(self) -> bool: + @property + def is_variable_color_temp(self) -> bool: """Return True if the device supports color temperature.""" return False def __repr__(self): + self.sync.update() return "<{} model {} at {} ({}), is_on: {} - dev specific: {}>".format( self.__class__.__name__, - self.sync.get_model(), + self.model, self.host, - self.sync.get_alias(), - self.sync.is_on(), - self.sync.get_state_information(), + self.alias, + self.is_on, + self.sync.state_information, ) diff --git a/pyHS100/smartplug.py b/pyHS100/smartplug.py index a5a389e1..1013cc60 100644 --- a/pyHS100/smartplug.py +++ b/pyHS100/smartplug.py @@ -4,7 +4,12 @@ import logging from typing import Any, Dict from pyHS100.protocol import TPLinkSmartHomeProtocol -from pyHS100.smartdevice import DeviceType, SmartDevice, SmartDeviceException +from pyHS100.smartdevice import ( + DeviceType, + SmartDevice, + SmartDeviceException, + requires_update, +) _LOGGER = logging.getLogger(__name__) @@ -17,14 +22,14 @@ class SmartPlug(SmartDevice): p = SmartPlug("192.168.1.105") # print the devices alias - print(p.sync.get_alias()) + print(p.sync.alias) # change state of plug p.sync.turn_on() p.sync.turn_off() # query and print current state of plug - print(p.sync.get_state_information()) + print(p.sync.state_information) ``` Omit the `sync` attribute to get coroutines. @@ -46,7 +51,9 @@ class SmartPlug(SmartDevice): self.emeter_type = "emeter" self._device_type = DeviceType.Plug - async def get_brightness(self) -> int: + @property + @requires_update + def brightness(self) -> int: """Return current brightness on dimmers. Will return a range between 0 - 100. @@ -54,12 +61,13 @@ class SmartPlug(SmartDevice): :returns: integer :rtype: int """ - if not await self.is_dimmable(): + if not self.is_dimmable: raise SmartDeviceException("Device is not dimmable.") - sys_info = await self.get_sys_info() + sys_info = self.sys_info return int(sys_info["brightness"]) + @requires_update async def set_brightness(self, value: int): """Set the new dimmer brightness level. @@ -70,7 +78,7 @@ class SmartPlug(SmartDevice): :param value: integer between 1 and 100 """ - if not await self.is_dimmable(): + if not self.is_dimmable: raise SmartDeviceException("Device is not dimmable.") if not isinstance(value, int): @@ -80,34 +88,41 @@ class SmartPlug(SmartDevice): await self._query_helper( "smartlife.iot.dimmer", "set_brightness", {"brightness": value} ) + await self.update() else: raise ValueError("Brightness value %s is not valid." % value) - async def is_dimmable(self): + @property + @requires_update + def is_dimmable(self): """Whether the switch supports brightness changes. :return: True if switch supports brightness changes, False otherwise :rtype: bool """ - sys_info = await self.get_sys_info() + sys_info = self.sys_info return "brightness" in sys_info - async def get_has_emeter(self): + @property + @requires_update + def has_emeter(self): """Return whether device has an energy meter. :return: True if energy meter is available False otherwise """ - sys_info = await self.get_sys_info() + sys_info = self.sys_info features = sys_info["feature"].split(":") return "ENE" in features - async def is_on(self) -> bool: + @property + @requires_update + def is_on(self) -> bool: """Return whether device is on. :return: True if device is on, False otherwise """ - sys_info = await self.get_sys_info() + sys_info = self.sys_info return bool(sys_info["relay_state"]) async def turn_on(self): @@ -116,6 +131,7 @@ class SmartPlug(SmartDevice): :raises SmartDeviceException: on error """ await self._query_helper("system", "set_relay_state", {"state": 1}) + await self.update() async def turn_off(self): """Turn the switch off. @@ -123,14 +139,17 @@ class SmartPlug(SmartDevice): :raises SmartDeviceException: on error """ await self._query_helper("system", "set_relay_state", {"state": 0}) + await self.update() - async def get_led(self) -> bool: + @property + @requires_update + def led(self) -> bool: """Return the state of the led. :return: True if led is on, False otherwise :rtype: bool """ - sys_info = await self.get_sys_info() + sys_info = self.sys_info return bool(1 - sys_info["led_off"]) async def set_led(self, state: bool): @@ -140,14 +159,17 @@ class SmartPlug(SmartDevice): :raises SmartDeviceException: on error """ await self._query_helper("system", "set_led_off", {"off": int(not state)}) + await self.update() - async def get_on_since(self) -> datetime.datetime: + @property + @requires_update + def on_since(self) -> datetime.datetime: """Return pretty-printed on-time. :return: datetime for on since :rtype: datetime """ - sys_info = await self.get_sys_info() + sys_info = self.sys_info if self.context: for plug in sys_info["children"]: if plug["id"] == self.context: @@ -158,16 +180,15 @@ class SmartPlug(SmartDevice): return datetime.datetime.now() - datetime.timedelta(seconds=on_time) - async def get_state_information(self) -> Dict[str, Any]: + @property + @requires_update + def state_information(self) -> Dict[str, Any]: """Return switch-specific state information. :return: Switch information dict, keys in user-presentable form. :rtype: dict """ - info = { - "LED state": await self.get_led(), - "On since": await self.get_on_since(), - } - if await self.is_dimmable(): - info["Brightness"] = await self.get_brightness() + info = {"LED state": self.led, "On since": self.on_since} + if self.is_dimmable: + info["Brightness"] = self.brightness return info diff --git a/pyHS100/smartstrip.py b/pyHS100/smartstrip.py index ff3e44b9..f27c2578 100755 --- a/pyHS100/smartstrip.py +++ b/pyHS100/smartstrip.py @@ -8,7 +8,7 @@ from collections import defaultdict from typing import Any, DefaultDict, Dict, List from pyHS100.protocol import TPLinkSmartHomeProtocol -from pyHS100.smartdevice import DeviceType +from pyHS100.smartdevice import DeviceType, requires_update from pyHS100.smartplug import SmartPlug _LOGGER = logging.getLogger(__name__) @@ -22,7 +22,7 @@ class SmartStrip(SmartPlug): p = SmartStrip("192.168.1.105") # query the state of the strip - print(p.sync.is_on()) + print(p.is_on) # change state of all outlets p.sync.turn_on() @@ -30,7 +30,7 @@ class SmartStrip(SmartPlug): # individual outlets are accessible through plugs variable for plug in p.plugs: - print(f"{p}: {p.sync.is_on()}") + print(f"{p}: {p.is_on}") # change state of a single outlet p.plugs[0].sync.turn_on() @@ -66,20 +66,32 @@ class SmartStrip(SmartPlug): ) ) - async def is_on(self) -> bool: + @property + @requires_update + def is_on(self) -> bool: """Return if any of the outlets are on.""" for plug in self.plugs: - is_on = await plug.is_on() + is_on = plug.is_on if is_on: return True return False + async def update(self): + """Update some of the attributes. + + Needed for methods that are decorated with `requires_update`. + """ + await super().update() + for plug in self.plugs: + await plug.update() + async def turn_on(self): """Turn the strip on. :raises SmartDeviceException: on error """ await self._query_helper("system", "set_relay_state", {"state": 1}) + await self.update() async def turn_off(self): """Turn the strip off. @@ -87,21 +99,26 @@ class SmartStrip(SmartPlug): :raises SmartDeviceException: on error """ await self._query_helper("system", "set_relay_state", {"state": 0}) + await self.update() - async def get_on_since(self) -> datetime.datetime: + @property + @requires_update + def on_since(self) -> datetime.datetime: """Return the maximum on-time of all outlets.""" - return max([await plug.get_on_since() for plug in self.plugs]) + return max(plug.on_since for plug in self.plugs) - async def state_information(self) -> Dict[str, Any]: + @property + @requires_update + def state_information(self) -> Dict[str, Any]: """Return strip-specific state information. :return: Strip information dict, keys in user-presentable form. :rtype: dict """ - state: Dict[str, Any] = {"LED state": await self.get_led()} + state: Dict[str, Any] = {"LED state": self.led} for plug in self.plugs: - if await plug.is_on(): - state["Plug %s on since" % str(plug)] = await plug.get_on_since() + if plug.is_on: + state["Plug %s on since" % str(plug)] = self.on_since return state @@ -116,7 +133,7 @@ class SmartStrip(SmartPlug): return consumption - async def icon(self) -> Dict: + async def get_icon(self) -> Dict: """Icon for the device. Overriden to keep the API, as the SmartStrip and children do not @@ -132,6 +149,7 @@ class SmartStrip(SmartPlug): """ return await super().set_alias(alias) + @requires_update async def get_emeter_daily( self, year: int = None, month: int = None, kwh: bool = True ) -> Dict: @@ -154,6 +172,7 @@ class SmartStrip(SmartPlug): emeter_daily[day] += value return emeter_daily + @requires_update async def get_emeter_monthly(self, year: int = None, kwh: bool = True) -> Dict: """Retrieve monthly statistics for a given year. @@ -170,6 +189,7 @@ class SmartStrip(SmartPlug): emeter_monthly[month] += value return emeter_monthly + @requires_update async def erase_emeter_stats(self): """Erase energy meter statistics for all plugs. diff --git a/pyHS100/tests/test_fixtures.py b/pyHS100/tests/test_fixtures.py index d20fcb00..fa1d7661 100644 --- a/pyHS100/tests/test_fixtures.py +++ b/pyHS100/tests/test_fixtures.py @@ -32,10 +32,11 @@ from .newfakes import ( @plug def test_plug_sysinfo(dev): - assert dev.sync.get_sys_info() is not None - PLUG_SCHEMA(dev.sync.get_sys_info()) + dev.sync.update() + assert dev.sys_info is not None + PLUG_SCHEMA(dev.sys_info) - assert dev.sync.get_model() is not None + assert dev.model is not None assert dev.device_type == DeviceType.Plug or dev.device_type == DeviceType.Strip assert dev.is_plug or dev.is_strip @@ -43,23 +44,26 @@ def test_plug_sysinfo(dev): @bulb def test_bulb_sysinfo(dev): - assert dev.sync.get_sys_info() is not None - BULB_SCHEMA(dev.sync.get_sys_info()) + dev.sync.update() + assert dev.sys_info is not None + BULB_SCHEMA(dev.sys_info) - assert dev.sync.get_model() is not None + assert dev.model is not None assert dev.device_type == DeviceType.Bulb assert dev.is_bulb def test_state_info(dev): - assert isinstance(dev.sync.get_state_information(), dict) + dev.sync.update() + assert isinstance(dev.sync.state_information, dict) def test_invalid_connection(dev): with patch.object(FakeTransportProtocol, "query", side_effect=SmartDeviceException): with pytest.raises(SmartDeviceException): - dev.sync.is_on() + dev.sync.update() + dev.is_on def test_query_helper(dev): @@ -71,26 +75,28 @@ def test_query_helper(dev): @turn_on def test_state(dev, turn_on): handle_turn_on(dev, turn_on) - orig_state = dev.sync.is_on() + dev.sync.update() + orig_state = dev.is_on if orig_state: dev.sync.turn_off() - assert not dev.sync.is_on() - assert dev.sync.is_off() + assert not dev.is_on + assert dev.is_off dev.sync.turn_on() - assert dev.sync.is_on() - assert not dev.sync.is_off() + assert dev.is_on + assert not dev.is_off else: dev.sync.turn_on() - assert dev.sync.is_on() - assert not dev.sync.is_off() + assert dev.is_on + assert not dev.is_off dev.sync.turn_off() - assert not dev.sync.is_on() - assert dev.sync.is_off() + assert not dev.is_on + assert dev.is_off @no_emeter def test_no_emeter(dev): - assert not dev.sync.get_has_emeter() + dev.sync.update() + assert not dev.has_emeter with pytest.raises(SmartDeviceException): dev.sync.get_emeter_realtime() @@ -104,10 +110,11 @@ def test_no_emeter(dev): @has_emeter def test_get_emeter_realtime(dev): + dev.sync.update() if dev.is_strip: pytest.skip("Disabled for HS300 temporarily") - assert dev.sync.get_has_emeter() + assert dev.has_emeter current_emeter = dev.sync.get_emeter_realtime() CURRENT_CONSUMPTION_SCHEMA(current_emeter) @@ -115,10 +122,11 @@ def test_get_emeter_realtime(dev): @has_emeter def test_get_emeter_daily(dev): + dev.sync.update() if dev.is_strip: pytest.skip("Disabled for HS300 temporarily") - assert dev.sync.get_has_emeter() + assert dev.has_emeter assert dev.sync.get_emeter_daily(year=1900, month=1) == {} @@ -137,10 +145,11 @@ def test_get_emeter_daily(dev): @has_emeter def test_get_emeter_monthly(dev): + dev.sync.update() if dev.is_strip: pytest.skip("Disabled for HS300 temporarily") - assert dev.sync.get_has_emeter() + assert dev.has_emeter assert dev.sync.get_emeter_monthly(year=1900) == {} @@ -159,10 +168,11 @@ def test_get_emeter_monthly(dev): @has_emeter def test_emeter_status(dev): + dev.sync.update() if dev.is_strip: pytest.skip("Disabled for HS300 temporarily") - assert dev.sync.get_has_emeter() + assert dev.has_emeter d = dev.sync.get_emeter_realtime() @@ -181,17 +191,19 @@ def test_emeter_status(dev): @pytest.mark.skip("not clearing your stats..") @has_emeter def test_erase_emeter_stats(dev): - assert dev.sync.get_has_emeter() + dev.sync.update() + assert dev.has_emeter dev.sync.erase_emeter() @has_emeter def test_current_consumption(dev): + dev.sync.update() if dev.is_strip: pytest.skip("Disabled for HS300 temporarily") - if dev.sync.get_has_emeter(): + if dev.has_emeter: x = dev.sync.current_consumption() assert isinstance(x, float) assert x >= 0.0 @@ -200,34 +212,36 @@ def test_current_consumption(dev): def test_alias(dev): + dev.sync.update() test_alias = "TEST1234" - original = dev.sync.get_alias() + original = dev.sync.alias assert isinstance(original, str) - dev.sync.set_alias(test_alias) - assert dev.sync.get_alias() == test_alias + assert dev.sync.alias == test_alias dev.sync.set_alias(original) - assert dev.sync.get_alias() == original + assert dev.sync.alias == original @plug def test_led(dev): - original = dev.sync.get_led() + dev.sync.update() + original = dev.led dev.sync.set_led(False) - assert not dev.sync.get_led() - dev.sync.set_led(True) + assert not dev.led - assert dev.sync.get_led() + dev.sync.set_led(True) + assert dev.led dev.sync.set_led(original) @plug def test_on_since(dev): - assert isinstance(dev.sync.get_on_since(), datetime.datetime) + dev.sync.update() + assert isinstance(dev.on_since, datetime.datetime) def test_icon(dev): @@ -244,35 +258,41 @@ def test_timezone(dev): def test_hw_info(dev): - PLUG_SCHEMA(dev.sync.get_hw_info()) + dev.sync.update() + PLUG_SCHEMA(dev.hw_info) def test_location(dev): - PLUG_SCHEMA(dev.sync.get_location()) + dev.sync.update() + PLUG_SCHEMA(dev.location) def test_rssi(dev): - PLUG_SCHEMA({"rssi": dev.sync.get_rssi()}) # wrapping for vol + dev.sync.update() + PLUG_SCHEMA({"rssi": dev.rssi}) # wrapping for vol def test_mac(dev): - PLUG_SCHEMA({"mac": dev.sync.get_mac()}) # wrapping for val + dev.sync.update() + PLUG_SCHEMA({"mac": dev.mac}) # wrapping for val # TODO check setting? @non_variable_temp def test_temperature_on_nonsupporting(dev): - assert dev.sync.get_valid_temperature_range() == (0, 0) + dev.sync.update() + assert dev.valid_temperature_range == (0, 0) # TODO test when device does not support temperature range with pytest.raises(SmartDeviceException): dev.sync.set_color_temp(2700) with pytest.raises(SmartDeviceException): - print(dev.sync.get_color_temp()) + print(dev.sync.color_temp) @variable_temp def test_out_of_range_temperature(dev): + dev.sync.update() with pytest.raises(ValueError): dev.sync.set_color_temp(1000) with pytest.raises(ValueError): @@ -281,10 +301,11 @@ def test_out_of_range_temperature(dev): @non_dimmable def test_non_dimmable(dev): - assert not dev.sync.is_dimmable() + dev.sync.update() + assert not dev.is_dimmable with pytest.raises(SmartDeviceException): - assert dev.sync.get_brightness() == 0 + assert dev.brightness == 0 with pytest.raises(SmartDeviceException): dev.sync.set_brightness(100) @@ -293,13 +314,14 @@ def test_non_dimmable(dev): @turn_on def test_dimmable_brightness(dev, turn_on): handle_turn_on(dev, turn_on) - assert dev.sync.is_dimmable() + dev.sync.update() + assert dev.is_dimmable dev.sync.set_brightness(50) - assert dev.sync.get_brightness() == 50 + assert dev.brightness == 50 dev.sync.set_brightness(10) - assert dev.sync.get_brightness() == 10 + assert dev.brightness == 10 with pytest.raises(ValueError): dev.sync.set_brightness("foo") @@ -307,7 +329,8 @@ def test_dimmable_brightness(dev, turn_on): @dimmable def test_invalid_brightness(dev): - assert dev.sync.is_dimmable() + dev.sync.update() + assert dev.is_dimmable with pytest.raises(ValueError): dev.sync.set_brightness(110) @@ -320,16 +343,17 @@ def test_invalid_brightness(dev): @turn_on def test_hsv(dev, turn_on): handle_turn_on(dev, turn_on) - assert dev.sync.is_color() + dev.sync.update() + assert dev.is_color - hue, saturation, brightness = dev.sync.get_hsv() + hue, saturation, brightness = dev.hsv assert 0 <= hue <= 255 assert 0 <= saturation <= 100 assert 0 <= brightness <= 100 dev.sync.set_hsv(hue=1, saturation=1, value=1) - hue, saturation, brightness = dev.sync.get_hsv() + hue, saturation, brightness = dev.hsv assert hue == 1 assert saturation == 1 assert brightness == 1 @@ -339,8 +363,8 @@ def test_hsv(dev, turn_on): @turn_on def test_invalid_hsv(dev, turn_on): handle_turn_on(dev, turn_on) - - assert dev.sync.is_color() + dev.sync.update() + assert dev.is_color for invalid_hue in [-1, 361, 0.5]: with pytest.raises(ValueError): @@ -357,67 +381,79 @@ def test_invalid_hsv(dev, turn_on): @non_color_bulb def test_hsv_on_non_color(dev): - assert not dev.sync.is_color() + dev.sync.update() + assert not dev.is_color with pytest.raises(SmartDeviceException): dev.sync.set_hsv(0, 0, 0) with pytest.raises(SmartDeviceException): - print(dev.sync.get_hsv()) + print(dev.hsv) @variable_temp @turn_on def test_try_set_colortemp(dev, turn_on): + dev.sync.update() handle_turn_on(dev, turn_on) - dev.sync.set_color_temp(2700) - assert dev.sync.get_color_temp() == 2700 + assert dev.sync.color_temp == 2700 @non_variable_temp def test_non_variable_temp(dev): with pytest.raises(SmartDeviceException): + dev.sync.update() dev.sync.set_color_temp(2700) @strip @turn_on def test_children_change_state(dev, turn_on): + dev.sync.update() handle_turn_on(dev, turn_on) for plug in dev.plugs: - orig_state = plug.sync.is_on() + plug.sync.update() + orig_state = plug.is_on if orig_state: plug.turn_off() - assert not plug.sync.is_on() - assert plug.sync.is_off() + plug.sync.update() + assert not plug.is_on + assert plug.is_off plug.sync.turn_on() - assert plug.sync.is_on() - assert not plug.sync.is_off() + plug.sync.update() + assert plug.is_on + assert not plug.is_off else: plug.sync.turn_on() - assert plug.sync.is_on() - assert not plug.sync.is_off() + plug.sync.update() + assert plug.is_on + assert not plug.is_off plug.sync.turn_off() - assert not plug.sync.is_on() - assert plug.sync.is_off() + plug.sync.update() + assert not plug.is_on + assert plug.is_off @strip def test_children_alias(dev): test_alias = "TEST1234" for plug in dev.plugs: - original = plug.sync.get_alias() + plug.sync.update() + original = plug.alias plug.sync.set_alias(alias=test_alias) - assert plug.sync.get_alias() == test_alias + plug.sync.update() + assert plug.alias == test_alias plug.sync.set_alias(alias=original) - assert plug.sync.get_alias() == original + plug.sync.update() + assert plug.alias == original @strip def test_children_on_since(dev): for plug in dev.plugs: - assert plug.sync.get_on_since() + plug.sync.update() + assert plug.on_since @pytest.mark.skip("this test will wear out your relays") @@ -435,7 +471,7 @@ def test_all_binary_states(dev): dev.sync.turn_off(index=plug_index) # check state map applied - for index, state in dev.sync.get_is_on().items(): + for index, state in dev.is_on.items(): assert state_map[index] == state # toggle each outlet with state map applied @@ -448,7 +484,7 @@ def test_all_binary_states(dev): dev.sync.turn_on(index=plug_index) # only target outlet should have state changed - for index, state in dev.sync.get_is_on().items(): + for index, state in dev.is_on.items(): if index == plug_index: assert state != state_map[index] else: @@ -461,15 +497,17 @@ def test_all_binary_states(dev): dev.sync.turn_off(index=plug_index) # original state map should be restored - for index, state in dev.sync.get_is_on().items(): + for index, state in dev.is_on.items(): assert state == state_map[index] @strip def test_children_get_emeter_realtime(dev): - assert dev.sync.get_has_emeter() + dev.sync.update() + assert dev.has_emeter # test with index for plug in dev.plugs: + plug.sync.update() emeter = plug.sync.get_emeter_realtime() CURRENT_CONSUMPTION_SCHEMA(emeter) @@ -482,9 +520,11 @@ def test_children_get_emeter_realtime(dev): @strip def test_children_get_emeter_daily(dev): - assert dev.sync.get_has_emeter() + dev.sync.update() + assert dev.has_emeter # test individual emeters for plug in dev.plugs: + plug.sync.update() emeter = plug.sync.get_emeter_daily(year=1900, month=1) assert emeter == {} @@ -505,9 +545,11 @@ def test_children_get_emeter_daily(dev): @strip def test_children_get_emeter_monthly(dev): - assert dev.sync.get_has_emeter() + dev.sync.update() + assert dev.has_emeter # test individual emeters for plug in dev.plugs: + plug.sync.update() emeter = plug.sync.get_emeter_monthly(year=1900) assert emeter == {} @@ -539,9 +581,9 @@ def test_children_get_emeter_monthly(dev): # if dev.is_strip: # CHECK_COUNT = 0 -# dev.sync.get_sys_info() +# dev.sys_info # assert query_mock.call_count == CHECK_COUNT -# dev.sync.get_sys_info() +# dev.sys_info # assert query_mock.call_count == CHECK_COUNT @@ -553,15 +595,14 @@ def test_children_get_emeter_monthly(dev): # with patch.object( # FakeTransportProtocol, "query", wraps=dev.protocol.query # ) as query_mock: -# dev.sync.get_sys_info() +# dev.sys_info # assert query_mock.call_count == 1 -# dev.sync.get_sys_info() +# dev.sys_info # assert query_mock.call_count == 2 # # assert query_mock.called_once() def test_representation(dev): import re - pattern = re.compile("<.* model .* at .* (.*), is_on: .* - dev specific: .*>") assert pattern.match(str(dev))