Switch from TPLinkSmartHomeProtocol to IotProtocol/XorTransport (#710)

* Switch from TPLinkSmartHomeProtocol to IotProtocol/XorTransport

* Add test

* Update docs

* Fix ruff deleting deprecated import
This commit is contained in:
Steven B
2024-01-26 09:11:31 +00:00
committed by GitHub
parent c318303255
commit 0d0f56414c
15 changed files with 172 additions and 355 deletions

View File

@@ -25,8 +25,8 @@ from kasa.deviceconfig import ConnectionType, DeviceConfig, EncryptType
from kasa.exceptions import TimeoutException, UnsupportedDeviceException
from kasa.json import dumps as json_dumps
from kasa.json import loads as json_loads
from kasa.protocol import TPLinkSmartHomeProtocol
from kasa.smartdevice import SmartDevice, SmartDeviceException
from kasa.xortransport import XorEncryption
_LOGGER = logging.getLogger(__name__)
@@ -103,7 +103,7 @@ class _DiscoverProtocol(asyncio.DatagramProtocol):
"""Send number of discovery datagrams."""
req = json_dumps(Discover.DISCOVERY_QUERY)
_LOGGER.debug("[DISCOVERY] %s >> %s", self.target, Discover.DISCOVERY_QUERY)
encrypted_req = TPLinkSmartHomeProtocol.encrypt(req)
encrypted_req = XorEncryption.encrypt(req)
sleep_between_packets = self.discovery_timeout / self.discovery_packets
for i in range(self.discovery_packets):
if self.target in self.seen_hosts: # Stop sending for discover_single
@@ -400,7 +400,7 @@ class Discover:
def _get_device_instance_legacy(data: bytes, config: DeviceConfig) -> SmartDevice:
"""Get SmartDevice from legacy 9999 response."""
try:
info = json_loads(TPLinkSmartHomeProtocol.decrypt(data))
info = json_loads(XorEncryption.decrypt(data))
except Exception as ex:
raise SmartDeviceException(
f"Unable to read response from device: {config.host}: {ex}"