mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-01-08 22:07:06 +00:00
da441bc697
Also updates CI pypy versions to be 3.9 and 3.10 which are the currently [supported versions](https://www.pypy.org/posts/2024/01/pypy-v7315-release.html). Otherwise latest cryptography doesn't ship with pypy3.8 wheels and is unable to build on windows. Also updates the `codecov-action` to v4 which fixed some intermittent uploading errors.
74 lines
2.0 KiB
Python
74 lines
2.0 KiB
Python
"""Cloud module implementation."""
|
|
|
|
try:
|
|
from pydantic.v1 import BaseModel
|
|
except ImportError:
|
|
from pydantic import BaseModel
|
|
|
|
from ...feature import Feature, FeatureType
|
|
from ..iotmodule import IotModule
|
|
|
|
|
|
class CloudInfo(BaseModel):
|
|
"""Container for cloud settings."""
|
|
|
|
binded: bool
|
|
cld_connection: int
|
|
fwDlPage: str
|
|
fwNotifyType: int
|
|
illegalType: int
|
|
server: str
|
|
stopConnect: int
|
|
tcspInfo: str
|
|
tcspStatus: int
|
|
username: str
|
|
|
|
|
|
class Cloud(IotModule):
|
|
"""Module implementing support for cloud services."""
|
|
|
|
def __init__(self, device, module):
|
|
super().__init__(device, module)
|
|
self._add_feature(
|
|
Feature(
|
|
device=device,
|
|
container=self,
|
|
name="Cloud connection",
|
|
icon="mdi:cloud",
|
|
attribute_getter="is_connected",
|
|
type=FeatureType.BinarySensor,
|
|
)
|
|
)
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
"""Return true if device is connected to the cloud."""
|
|
return self.info.binded
|
|
|
|
def query(self):
|
|
"""Request cloud connectivity info."""
|
|
return self.query_for_command("get_info")
|
|
|
|
@property
|
|
def info(self) -> CloudInfo:
|
|
"""Return information about the cloud connectivity."""
|
|
return CloudInfo.parse_obj(self.data["get_info"])
|
|
|
|
def get_available_firmwares(self):
|
|
"""Return list of available firmwares."""
|
|
return self.query_for_command("get_intl_fw_list")
|
|
|
|
def set_server(self, url: str):
|
|
"""Set the update server URL."""
|
|
return self.query_for_command("set_server_url", {"server": url})
|
|
|
|
def connect(self, username: str, password: str):
|
|
"""Login to the cloud using given information."""
|
|
return self.query_for_command(
|
|
"bind", {"username": username, "password": password}
|
|
)
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from the cloud."""
|
|
return self.query_for_command("unbind")
|