From 3692e4812ff4207f2c30d122287d6f9132d1a7a4 Mon Sep 17 00:00:00 2001 From: Teemu R Date: Wed, 3 Jan 2024 22:45:16 +0100 Subject: [PATCH] Implement wifi interface for tapodevice (#606) * Implement wifi interface for tapodevice * Implement wifi_join Tested to work on P110 * Fix linting --- kasa/cli.py | 4 +-- kasa/smartdevice.py | 7 ++-- kasa/tapo/tapodevice.py | 80 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 84 insertions(+), 7 deletions(-) diff --git a/kasa/cli.py b/kasa/cli.py index 13458b0e..1fb522cf 100755 --- a/kasa/cli.py +++ b/kasa/cli.py @@ -345,10 +345,10 @@ async def scan(dev): @wifi.command() @click.argument("ssid") +@click.option("--keytype", prompt=True) @click.option("--password", prompt=True, hide_input=True) -@click.option("--keytype", default=3) @pass_dev -async def join(dev: SmartDevice, ssid, password, keytype): +async def join(dev: SmartDevice, ssid: str, password: str, keytype: str): """Join the given wifi network.""" echo(f"Asking the device to connect to {ssid}..") res = await dev.wifi_join(ssid, password, keytype=keytype) diff --git a/kasa/smartdevice.py b/kasa/smartdevice.py index 97b46ddc..c3561812 100755 --- a/kasa/smartdevice.py +++ b/kasa/smartdevice.py @@ -42,6 +42,9 @@ class WifiNetwork: channel: Optional[int] = None rssi: Optional[int] = None + # For SMART devices + signal_level: Optional[int] = None + def merge(d, u): """Update dict recursively.""" @@ -687,7 +690,7 @@ class SmartDevice: return [WifiNetwork(**x) for x in info["ap_list"]] - async def wifi_join(self, ssid, password, keytype=3): # noqa: D202 + async def wifi_join(self, ssid: str, password: str, keytype: str = "3"): # noqa: D202 """Join the given wifi network. If joining the network fails, the device will return to AP mode after a while. @@ -696,7 +699,7 @@ class SmartDevice: async def _join(target, payload): return await self._query_helper(target, "set_stainfo", payload) - payload = {"ssid": ssid, "password": password, "key_type": keytype} + payload = {"ssid": ssid, "password": password, "key_type": int(keytype)} try: return await _join("netif", payload) except SmartDeviceException as ex: diff --git a/kasa/tapo/tapodevice.py b/kasa/tapo/tapodevice.py index 9a7731d3..785269a3 100644 --- a/kasa/tapo/tapodevice.py +++ b/kasa/tapo/tapodevice.py @@ -2,15 +2,15 @@ import base64 import logging from datetime import datetime, timedelta, timezone -from typing import Any, Dict, Optional, Set, cast +from typing import Any, Dict, List, Optional, Set, cast from ..aestransport import AesTransport from ..deviceconfig import DeviceConfig from ..emeterstatus import EmeterStatus -from ..exceptions import AuthenticationException +from ..exceptions import AuthenticationException, SmartDeviceException from ..modules import Emeter from ..protocol import TPLinkProtocol -from ..smartdevice import SmartDevice +from ..smartdevice import SmartDevice, WifiNetwork from ..smartprotocol import SmartProtocol _LOGGER = logging.getLogger(__name__) @@ -247,3 +247,77 @@ class TapoDevice(SmartDevice): def emeter_today(self) -> Optional[float]: """Get the emeter value for today.""" return self._convert_energy_data(self._energy.get("today_energy"), 1 / 1000) + + async def wifi_scan(self) -> List[WifiNetwork]: + """Scan for available wifi networks.""" + + def _net_for_scan_info(res): + return WifiNetwork( + ssid=base64.b64decode(res["ssid"]).decode(), + cipher_type=res["cipher_type"], + key_type=res["key_type"], + channel=res["channel"], + signal_level=res["signal_level"], + bssid=res["bssid"], + ) + + async def _query_networks(networks=None, start_index=0): + _LOGGER.debug("Querying networks using start_index=%s", start_index) + if networks is None: + networks = [] + + resp = await self.protocol.query( + {"get_wireless_scan_info": {"start_index": start_index}} + ) + network_list = [ + _net_for_scan_info(net) + for net in resp["get_wireless_scan_info"]["ap_list"] + ] + networks.extend(network_list) + + if resp["get_wireless_scan_info"]["sum"] > start_index + 10: + return await _query_networks(networks, start_index=start_index + 10) + + return networks + + return await _query_networks() + + async def wifi_join(self, ssid: str, password: str, keytype: str = "wpa2_psk"): + """Join the given wifi network. + + This method returns nothing as the device tries to activate the new + settings immediately instead of responding to the request. + + If joining the network fails, the device will return to the previous state + after some delay. + """ + if not self.credentials: + raise AuthenticationException("Device requires authentication.") + + payload = { + "account": { + "username": base64.b64encode( + self.credentials.username.encode() + ).decode(), + "password": base64.b64encode( + self.credentials.password.encode() + ).decode(), + }, + "wireless": { + "key_type": keytype, + "password": base64.b64encode(password.encode()).decode(), + "ssid": base64.b64encode(ssid.encode()).decode(), + }, + "time": self.internal_state["time"], + } + + # The device does not respond to the request but changes the settings + # immediately which causes us to timeout. + # Thus, We limit retries and suppress the raised exception as useless. + try: + return await self.protocol.query({"set_qs_info": payload}, retry_count=0) + except SmartDeviceException as ex: + if ex.error_code: # Re-raise on device-reported errors + raise + + _LOGGER.debug("Received an expected for wifi join, but this is expected")