Implement wifi interface for tapodevice (#606)

* Implement wifi interface for tapodevice

* Implement wifi_join

Tested to work on P110

* Fix linting
This commit is contained in:
Teemu R 2024-01-03 22:45:16 +01:00 committed by GitHub
parent c810298b04
commit 3692e4812f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 7 deletions

View File

@ -345,10 +345,10 @@ async def scan(dev):
@wifi.command() @wifi.command()
@click.argument("ssid") @click.argument("ssid")
@click.option("--keytype", prompt=True)
@click.option("--password", prompt=True, hide_input=True) @click.option("--password", prompt=True, hide_input=True)
@click.option("--keytype", default=3)
@pass_dev @pass_dev
async def join(dev: SmartDevice, ssid, password, keytype): async def join(dev: SmartDevice, ssid: str, password: str, keytype: str):
"""Join the given wifi network.""" """Join the given wifi network."""
echo(f"Asking the device to connect to {ssid}..") echo(f"Asking the device to connect to {ssid}..")
res = await dev.wifi_join(ssid, password, keytype=keytype) res = await dev.wifi_join(ssid, password, keytype=keytype)

View File

@ -42,6 +42,9 @@ class WifiNetwork:
channel: Optional[int] = None channel: Optional[int] = None
rssi: Optional[int] = None rssi: Optional[int] = None
# For SMART devices
signal_level: Optional[int] = None
def merge(d, u): def merge(d, u):
"""Update dict recursively.""" """Update dict recursively."""
@ -687,7 +690,7 @@ class SmartDevice:
return [WifiNetwork(**x) for x in info["ap_list"]] return [WifiNetwork(**x) for x in info["ap_list"]]
async def wifi_join(self, ssid, password, keytype=3): # noqa: D202 async def wifi_join(self, ssid: str, password: str, keytype: str = "3"): # noqa: D202
"""Join the given wifi network. """Join the given wifi network.
If joining the network fails, the device will return to AP mode after a while. If joining the network fails, the device will return to AP mode after a while.
@ -696,7 +699,7 @@ class SmartDevice:
async def _join(target, payload): async def _join(target, payload):
return await self._query_helper(target, "set_stainfo", payload) return await self._query_helper(target, "set_stainfo", payload)
payload = {"ssid": ssid, "password": password, "key_type": keytype} payload = {"ssid": ssid, "password": password, "key_type": int(keytype)}
try: try:
return await _join("netif", payload) return await _join("netif", payload)
except SmartDeviceException as ex: except SmartDeviceException as ex:

View File

@ -2,15 +2,15 @@
import base64 import base64
import logging import logging
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional, Set, cast from typing import Any, Dict, List, Optional, Set, cast
from ..aestransport import AesTransport from ..aestransport import AesTransport
from ..deviceconfig import DeviceConfig from ..deviceconfig import DeviceConfig
from ..emeterstatus import EmeterStatus from ..emeterstatus import EmeterStatus
from ..exceptions import AuthenticationException from ..exceptions import AuthenticationException, SmartDeviceException
from ..modules import Emeter from ..modules import Emeter
from ..protocol import TPLinkProtocol from ..protocol import TPLinkProtocol
from ..smartdevice import SmartDevice from ..smartdevice import SmartDevice, WifiNetwork
from ..smartprotocol import SmartProtocol from ..smartprotocol import SmartProtocol
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -247,3 +247,77 @@ class TapoDevice(SmartDevice):
def emeter_today(self) -> Optional[float]: def emeter_today(self) -> Optional[float]:
"""Get the emeter value for today.""" """Get the emeter value for today."""
return self._convert_energy_data(self._energy.get("today_energy"), 1 / 1000) return self._convert_energy_data(self._energy.get("today_energy"), 1 / 1000)
async def wifi_scan(self) -> List[WifiNetwork]:
"""Scan for available wifi networks."""
def _net_for_scan_info(res):
return WifiNetwork(
ssid=base64.b64decode(res["ssid"]).decode(),
cipher_type=res["cipher_type"],
key_type=res["key_type"],
channel=res["channel"],
signal_level=res["signal_level"],
bssid=res["bssid"],
)
async def _query_networks(networks=None, start_index=0):
_LOGGER.debug("Querying networks using start_index=%s", start_index)
if networks is None:
networks = []
resp = await self.protocol.query(
{"get_wireless_scan_info": {"start_index": start_index}}
)
network_list = [
_net_for_scan_info(net)
for net in resp["get_wireless_scan_info"]["ap_list"]
]
networks.extend(network_list)
if resp["get_wireless_scan_info"]["sum"] > start_index + 10:
return await _query_networks(networks, start_index=start_index + 10)
return networks
return await _query_networks()
async def wifi_join(self, ssid: str, password: str, keytype: str = "wpa2_psk"):
"""Join the given wifi network.
This method returns nothing as the device tries to activate the new
settings immediately instead of responding to the request.
If joining the network fails, the device will return to the previous state
after some delay.
"""
if not self.credentials:
raise AuthenticationException("Device requires authentication.")
payload = {
"account": {
"username": base64.b64encode(
self.credentials.username.encode()
).decode(),
"password": base64.b64encode(
self.credentials.password.encode()
).decode(),
},
"wireless": {
"key_type": keytype,
"password": base64.b64encode(password.encode()).decode(),
"ssid": base64.b64encode(ssid.encode()).decode(),
},
"time": self.internal_state["time"],
}
# The device does not respond to the request but changes the settings
# immediately which causes us to timeout.
# Thus, We limit retries and suppress the raised exception as useless.
try:
return await self.protocol.query({"set_qs_info": payload}, retry_count=0)
except SmartDeviceException as ex:
if ex.error_code: # Re-raise on device-reported errors
raise
_LOGGER.debug("Received an expected for wifi join, but this is expected")