python-kasa/kasa/tests/newfakes.py

620 lines
20 KiB
Python

import base64
import copy
import logging
import re
import warnings
from json import loads as json_loads
from voluptuous import (
REMOVE_EXTRA,
All,
Any,
Coerce, # type: ignore
Invalid,
Optional,
Range,
Schema,
)
from ..credentials import Credentials
from ..deviceconfig import DeviceConfig
from ..exceptions import SmartDeviceException
from ..iotprotocol import IotProtocol
from ..protocol import BaseTransport
from ..smartprotocol import SmartProtocol
from ..xortransport import XorTransport
_LOGGER = logging.getLogger(__name__)
def check_int_bool(x):
if x != 0 and x != 1:
raise Invalid(x)
return x
def check_mac(x):
if re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", x.lower()):
return x
raise Invalid(x)
def check_mode(x):
if x in ["schedule", "none", "count_down"]:
return x
raise Invalid(f"invalid mode {x}")
def lb_dev_state(x):
if x in ["normal"]:
return x
raise Invalid(f"Invalid dev_state {x}")
TZ_SCHEMA = Schema(
{"zone_str": str, "dst_offset": int, "index": All(int, Range(min=0)), "tz_str": str}
)
CURRENT_CONSUMPTION_SCHEMA = Schema(
Any(
{
"voltage": Any(All(float, Range(min=0, max=300)), None),
"power": Any(Coerce(float, Range(min=0)), None),
"total": Any(Coerce(float, Range(min=0)), None),
"current": Any(All(float, Range(min=0)), None),
"voltage_mv": Any(
All(float, Range(min=0, max=300000)), int, None
), # TODO can this be int?
"power_mw": Any(Coerce(float, Range(min=0)), None),
"total_wh": Any(Coerce(float, Range(min=0)), None),
"current_ma": Any(
All(float, Range(min=0)), int, None
), # TODO can this be int?
"slot_id": Any(Coerce(int, Range(min=0)), None),
},
None,
)
)
# these schemas should go to the mainlib as
# they can be useful when adding support for new features/devices
# as well as to check that faked devices are operating properly.
PLUG_SCHEMA = Schema(
{
"active_mode": check_mode,
"alias": str,
"dev_name": str,
"deviceId": str,
"feature": str,
"fwId": str,
"hwId": str,
"hw_ver": str,
"icon_hash": str,
"led_off": check_int_bool,
"latitude": Any(All(float, Range(min=-90, max=90)), 0, None),
"latitude_i": Any(
All(int, Range(min=-900000, max=900000)),
All(float, Range(min=-900000, max=900000)),
0,
None,
),
"longitude": Any(All(float, Range(min=-180, max=180)), 0, None),
"longitude_i": Any(
All(int, Range(min=-18000000, max=18000000)),
All(float, Range(min=-18000000, max=18000000)),
0,
None,
),
"mac": check_mac,
"model": str,
"oemId": str,
"on_time": int,
"relay_state": int,
"rssi": Any(int, None), # rssi can also be positive, see #54
"sw_ver": str,
"type": str,
"mic_type": str,
"updating": check_int_bool,
# these are available on hs220
"brightness": int,
"preferred_state": [
{"brightness": All(int, Range(min=0, max=100)), "index": int}
],
"next_action": {"type": int},
"child_num": Optional(Any(None, int)), # TODO fix hs300 checks
"children": Optional(list), # TODO fix hs300
# TODO some tplink simulator entries contain invalid (mic_mac, _i variants for lat/lon)
# Therefore we add REMOVE_EXTRA..
# "INVALIDmac": Optional,
# "INVALIDlatitude": Optional,
# "INVALIDlongitude": Optional,
},
extra=REMOVE_EXTRA,
)
LIGHT_STATE_SCHEMA = Schema(
{
"brightness": All(int, Range(min=0, max=100)),
"color_temp": int,
"hue": All(int, Range(min=0, max=360)),
"mode": str,
"on_off": check_int_bool,
"saturation": All(int, Range(min=0, max=100)),
"dft_on_state": Optional(
{
"brightness": All(int, Range(min=0, max=100)),
"color_temp": All(int, Range(min=0, max=9000)),
"hue": All(int, Range(min=0, max=360)),
"mode": str,
"saturation": All(int, Range(min=0, max=100)),
}
),
"err_code": int,
}
)
BULB_SCHEMA = PLUG_SCHEMA.extend(
{
"ctrl_protocols": Optional(dict),
"description": Optional(str), # TODO: LBxxx similar to dev_name
"dev_state": lb_dev_state,
"disco_ver": str,
"heapsize": int,
"is_color": check_int_bool,
"is_dimmable": check_int_bool,
"is_factory": bool,
"is_variable_color_temp": check_int_bool,
"light_state": LIGHT_STATE_SCHEMA,
"preferred_state": [
{
"brightness": All(int, Range(min=0, max=100)),
"color_temp": int,
"hue": All(int, Range(min=0, max=360)),
"index": int,
"saturation": All(int, Range(min=0, max=100)),
}
],
}
)
def get_realtime(obj, x, *args):
return {
"current": 0.268587,
"voltage": 125.836131,
"power": 33.495623,
"total": 0.199000,
}
def get_monthstat(obj, x, *args):
if x["year"] < 2016:
return {"month_list": []}
return {
"month_list": [
{"year": 2016, "month": 11, "energy": 1.089000},
{"year": 2016, "month": 12, "energy": 1.582000},
]
}
def get_daystat(obj, x, *args):
if x["year"] < 2016:
return {"day_list": []}
return {
"day_list": [
{"year": 2016, "month": 11, "day": 24, "energy": 0.026000},
{"year": 2016, "month": 11, "day": 25, "energy": 0.109000},
]
}
emeter_support = {
"get_realtime": get_realtime,
"get_monthstat": get_monthstat,
"get_daystat": get_daystat,
}
def get_realtime_units(obj, x, *args):
return {"power_mw": 10800}
def get_monthstat_units(obj, x, *args):
if x["year"] < 2016:
return {"month_list": []}
return {
"month_list": [
{"year": 2016, "month": 11, "energy_wh": 32},
{"year": 2016, "month": 12, "energy_wh": 16},
]
}
def get_daystat_units(obj, x, *args):
if x["year"] < 2016:
return {"day_list": []}
return {
"day_list": [
{"year": 2016, "month": 11, "day": 24, "energy_wh": 20},
{"year": 2016, "month": 11, "day": 25, "energy_wh": 32},
]
}
emeter_units_support = {
"get_realtime": get_realtime_units,
"get_monthstat": get_monthstat_units,
"get_daystat": get_daystat_units,
}
emeter_commands = {
"emeter": emeter_support,
"smartlife.iot.common.emeter": emeter_units_support,
}
def error(msg="default msg"):
return {"err_code": -1323, "msg": msg}
def success(res):
if res:
res.update({"err_code": 0})
else:
res = {"err_code": 0}
return res
# plugs and bulbs use a different module for time information,
# so we define the contents here to avoid repeating ourselves
TIME_MODULE = {
"get_time": {
"year": 2017,
"month": 1,
"mday": 2,
"hour": 3,
"min": 4,
"sec": 5,
},
"get_timezone": {
"zone_str": "test",
"dst_offset": -1,
"index": 12,
"tz_str": "test2",
},
"set_timezone": None,
}
class FakeSmartProtocol(SmartProtocol):
def __init__(self, info):
super().__init__(
transport=FakeSmartTransport(info),
)
async def query(self, request, retry_count: int = 3):
"""Implement query here so can still patch SmartProtocol.query."""
resp_dict = await self._query(request, retry_count)
return resp_dict
class FakeSmartTransport(BaseTransport):
def __init__(self, info):
super().__init__(
config=DeviceConfig(
"127.0.0.123",
credentials=Credentials(
username="dummy_user",
password="dummy_password", # noqa: S106
),
),
)
self.info = info
self.components = {
comp["id"]: comp["ver_code"]
for comp in self.info["component_nego"]["component_list"]
}
@property
def default_port(self):
"""Default port for the transport."""
return 80
@property
def credentials_hash(self):
"""The hashed credentials used by the transport."""
return self._credentials.username + self._credentials.password + "hash"
FIXTURE_MISSING_MAP = {
"get_wireless_scan_info": ("wireless", {"ap_list": [], "wep_supported": False}),
}
async def send(self, request: str):
request_dict = json_loads(request)
method = request_dict["method"]
params = request_dict["params"]
if method == "multipleRequest":
responses = []
for request in params["requests"]:
response = self._send_request(request) # type: ignore[arg-type]
response["method"] = request["method"] # type: ignore[index]
responses.append(response)
return {"result": {"responses": responses}, "error_code": 0}
else:
return self._send_request(request_dict)
def _send_request(self, request_dict: dict):
method = request_dict["method"]
params = request_dict["params"]
if method == "component_nego" or method[:4] == "get_":
if method in self.info:
return {"result": self.info[method], "error_code": 0}
elif (
missing_result := self.FIXTURE_MISSING_MAP.get(method)
) and missing_result[0] in self.components:
warnings.warn(
UserWarning(
f"Fixture missing expected method {method}, try to regenerate"
),
stacklevel=1,
)
return {"result": missing_result[1], "error_code": 0}
else:
raise SmartDeviceException(f"Fixture doesn't support {method}")
elif method == "set_qs_info":
return {"error_code": 0}
elif method[:4] == "set_":
target_method = f"get_{method[4:]}"
self.info[target_method].update(params)
return {"error_code": 0}
async def close(self) -> None:
pass
async def reset(self) -> None:
pass
class FakeTransportProtocol(IotProtocol):
def __init__(self, info):
super().__init__(
transport=XorTransport(
config=DeviceConfig("127.0.0.123"),
)
)
self.discovery_data = info
self.writer = None
self.reader = None
proto = copy.deepcopy(FakeTransportProtocol.baseproto)
for target in info:
# print("target %s" % target)
if target != "discovery_result":
for cmd in info[target]:
# print("initializing tgt %s cmd %s" % (target, cmd))
proto[target][cmd] = info[target][cmd]
# if we have emeter support, we need to add the missing pieces
for module in ["emeter", "smartlife.iot.common.emeter"]:
if (
module in info
and "err_code" in info[module]
and info[module]["err_code"] != 0
):
proto[module] = info[module]
else:
for etype in ["get_realtime", "get_daystat", "get_monthstat"]:
if (
module in info and etype in info[module]
): # if the fixture has the data, use it
# print("got %s %s from fixture: %s" % (module, etype, info[module][etype]))
proto[module][etype] = info[module][etype]
else: # otherwise fall back to the static one
dummy_data = emeter_commands[module][etype]
# print("got %s %s from dummy: %s" % (module, etype, dummy_data))
proto[module][etype] = dummy_data
# print("initialized: %s" % proto[module])
self.proto = proto
def set_alias(self, x, child_ids=None):
if child_ids is None:
child_ids = []
_LOGGER.debug("Setting alias to %s, child_ids: %s", x["alias"], child_ids)
if child_ids:
for child in self.proto["system"]["get_sysinfo"]["children"]:
if child["id"] in child_ids:
child["alias"] = x["alias"]
else:
self.proto["system"]["get_sysinfo"]["alias"] = x["alias"]
def set_relay_state(self, x, child_ids=None):
if child_ids is None:
child_ids = []
_LOGGER.debug("Setting relay state to %s", x["state"])
if not child_ids and "children" in self.proto["system"]["get_sysinfo"]:
for child in self.proto["system"]["get_sysinfo"]["children"]:
child_ids.append(child["id"])
_LOGGER.info("child_ids: %s", child_ids)
if child_ids:
for child in self.proto["system"]["get_sysinfo"]["children"]:
if child["id"] in child_ids:
_LOGGER.info("Found %s, turning to %s", child, x["state"])
child["state"] = x["state"]
else:
self.proto["system"]["get_sysinfo"]["relay_state"] = x["state"]
def set_led_off(self, x, *args):
_LOGGER.debug("Setting led off to %s", x)
self.proto["system"]["get_sysinfo"]["led_off"] = x["off"]
def set_mac(self, x, *args):
_LOGGER.debug("Setting mac to %s", x)
self.proto["system"]["get_sysinfo"]["mac"] = x["mac"]
def set_hs220_brightness(self, x, *args):
_LOGGER.debug("Setting brightness to %s", x)
self.proto["system"]["get_sysinfo"]["brightness"] = x["brightness"]
def set_hs220_dimmer_transition(self, x, *args):
_LOGGER.debug("Setting dimmer transition to %s", x)
brightness = x["brightness"]
if brightness == 0:
self.proto["system"]["get_sysinfo"]["relay_state"] = 0
else:
self.proto["system"]["get_sysinfo"]["relay_state"] = 1
self.proto["system"]["get_sysinfo"]["brightness"] = x["brightness"]
def set_lighting_effect(self, effect, *args):
_LOGGER.debug("Setting light effect to %s", effect)
self.proto["system"]["get_sysinfo"]["lighting_effect_state"] = dict(effect)
def transition_light_state(self, state_changes, *args):
_LOGGER.debug("Setting light state to %s", state_changes)
light_state = self.proto["system"]["get_sysinfo"]["light_state"]
_LOGGER.debug("Current light state: %s", light_state)
new_state = light_state
# turn on requested, if we were off, use the dft_on_state as a base
if state_changes["on_off"] == 1 and not light_state["on_off"]:
_LOGGER.debug("Bulb was off, using dft_on_state")
new_state = light_state["dft_on_state"]
# override the existing settings
new_state.update(state_changes)
if (
not state_changes["on_off"] and "dft_on_state" not in light_state
): # if not already off, pack the data inside dft_on_state
_LOGGER.debug(
"Bulb was on and turn_off was requested, saving to dft_on_state"
)
new_state = {"dft_on_state": light_state, "on_off": 0}
_LOGGER.debug("New light state: %s", new_state)
self.proto["system"]["get_sysinfo"]["light_state"] = new_state
def set_preferred_state(self, new_state, *args):
"""Implement set_preferred_state."""
self.proto["system"]["get_sysinfo"]["preferred_state"][
new_state["index"]
] = new_state
def light_state(self, x, *args):
light_state = self.proto["system"]["get_sysinfo"]["light_state"]
# Our tests have light state off, so we simply return the dft_on_state when device is on.
_LOGGER.debug("reporting light state: %s", light_state)
# TODO: hack to go around KL430 fixture differences
if light_state["on_off"] and "dft_on_state" in light_state:
return light_state["dft_on_state"]
else:
return light_state
baseproto = {
"system": {
"set_relay_state": set_relay_state,
"set_dev_alias": set_alias,
"set_led_off": set_led_off,
"get_dev_icon": {"icon": None, "hash": None},
"set_mac_addr": set_mac,
"get_sysinfo": None,
},
"emeter": {
"get_realtime": None,
"get_daystat": None,
"get_monthstat": None,
"erase_emeter_state": None,
},
"smartlife.iot.common.emeter": {
"get_realtime": None,
"get_daystat": None,
"get_monthstat": None,
"erase_emeter_state": None,
},
"smartlife.iot.smartbulb.lightingservice": {
"get_light_state": light_state,
"transition_light_state": transition_light_state,
"set_preferred_state": set_preferred_state,
},
"smartlife.iot.lighting_effect": {
"set_lighting_effect": set_lighting_effect,
},
# lightstrip follows the same payloads but uses different module & method
"smartlife.iot.lightStrip": {
"set_light_state": transition_light_state,
"get_light_state": light_state,
"set_preferred_state": set_preferred_state,
},
"smartlife.iot.common.system": {
"set_dev_alias": set_alias,
},
"time": TIME_MODULE,
"smartlife.iot.common.timesetting": TIME_MODULE,
# HS220 brightness, different setter and getter
"smartlife.iot.dimmer": {
"set_brightness": set_hs220_brightness,
"set_dimmer_transition": set_hs220_dimmer_transition,
},
"smartlife.iot.LAS": {},
"smartlife.iot.PIR": {},
}
async def query(self, request, port=9999):
proto = self.proto
# collect child ids from context
try:
child_ids = request["context"]["child_ids"]
request.pop("context", None)
except KeyError:
child_ids = []
def get_response_for_module(target):
if target not in proto:
return error(msg="target not found")
if "err_code" in proto[target] and proto[target]["err_code"] != 0:
return {target: proto[target]}
def get_response_for_command(cmd):
if cmd not in proto[target]:
return error(msg=f"command {cmd} not found")
params = request[target][cmd]
_LOGGER.debug(f"Going to execute {target}.{cmd} (params: {params}).. ")
if callable(proto[target][cmd]):
res = proto[target][cmd](self, params, child_ids)
_LOGGER.debug("[callable] %s.%s: %s", target, cmd, res)
return success(res)
elif isinstance(proto[target][cmd], dict):
res = proto[target][cmd]
_LOGGER.debug("[static] %s.%s: %s", target, cmd, res)
return success(res)
else:
raise NotImplementedError(f"target {target} cmd {cmd}")
from collections import defaultdict
cmd_responses = defaultdict(dict)
for cmd in request[target]:
cmd_responses[target][cmd] = get_response_for_command(cmd)
return cmd_responses
response = {}
for target in request:
response.update(get_response_for_module(target))
return response