2021-11-07 01:41:12 +00:00
|
|
|
"""Cloud module implementation."""
|
2024-04-16 18:21:20 +00:00
|
|
|
|
2024-11-20 13:34:26 +00:00
|
|
|
from dataclasses import dataclass
|
|
|
|
from typing import Annotated
|
|
|
|
|
|
|
|
from mashumaro import DataClassDictMixin
|
|
|
|
from mashumaro.types import Alias
|
2021-11-07 01:41:12 +00:00
|
|
|
|
2024-04-24 16:38:52 +00:00
|
|
|
from ...feature import Feature
|
2024-02-19 17:01:31 +00:00
|
|
|
from ..iotmodule import IotModule
|
2021-11-07 01:41:12 +00:00
|
|
|
|
|
|
|
|
2024-11-20 13:34:26 +00:00
|
|
|
@dataclass
|
|
|
|
class CloudInfo(DataClassDictMixin):
|
2021-11-07 01:41:12 +00:00
|
|
|
"""Container for cloud settings."""
|
|
|
|
|
2024-11-20 13:34:26 +00:00
|
|
|
provisioned: Annotated[int, Alias("binded")]
|
|
|
|
cloud_connected: Annotated[int, Alias("cld_connection")]
|
|
|
|
firmware_download_page: Annotated[str, Alias("fwDlPage")]
|
|
|
|
firmware_notify_type: Annotated[int, Alias("fwNotifyType")]
|
|
|
|
illegal_type: Annotated[int, Alias("illegalType")]
|
2021-11-07 01:41:12 +00:00
|
|
|
server: str
|
2024-11-20 13:34:26 +00:00
|
|
|
stop_connect: Annotated[int, Alias("stopConnect")]
|
|
|
|
tcsp_info: Annotated[str, Alias("tcspInfo")]
|
|
|
|
tcsp_status: Annotated[int, Alias("tcspStatus")]
|
2021-11-07 01:41:12 +00:00
|
|
|
username: str
|
|
|
|
|
|
|
|
|
2024-02-04 15:20:08 +00:00
|
|
|
class Cloud(IotModule):
|
2021-11-07 01:41:12 +00:00
|
|
|
"""Module implementing support for cloud services."""
|
|
|
|
|
2024-11-10 18:55:13 +00:00
|
|
|
def _initialize_features(self) -> None:
|
2024-09-28 18:14:31 +00:00
|
|
|
"""Initialize features after the initial update."""
|
2024-02-15 15:25:08 +00:00
|
|
|
self._add_feature(
|
|
|
|
Feature(
|
2024-09-28 18:14:31 +00:00
|
|
|
device=self._device,
|
2024-02-15 15:25:08 +00:00
|
|
|
container=self,
|
2024-05-07 09:13:35 +00:00
|
|
|
id="cloud_connection",
|
2024-02-15 15:25:08 +00:00
|
|
|
name="Cloud connection",
|
|
|
|
icon="mdi:cloud",
|
|
|
|
attribute_getter="is_connected",
|
2024-04-24 16:38:52 +00:00
|
|
|
type=Feature.Type.BinarySensor,
|
2024-05-07 09:13:35 +00:00
|
|
|
category=Feature.Category.Info,
|
2024-02-15 15:25:08 +00:00
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def is_connected(self) -> bool:
|
|
|
|
"""Return true if device is connected to the cloud."""
|
2024-11-20 13:34:26 +00:00
|
|
|
return bool(self.info.cloud_connected)
|
2024-02-15 15:25:08 +00:00
|
|
|
|
2024-11-10 18:55:13 +00:00
|
|
|
def query(self) -> dict:
|
2021-11-07 01:41:12 +00:00
|
|
|
"""Request cloud connectivity info."""
|
|
|
|
return self.query_for_command("get_info")
|
|
|
|
|
|
|
|
@property
|
|
|
|
def info(self) -> CloudInfo:
|
|
|
|
"""Return information about the cloud connectivity."""
|
2024-11-20 13:34:26 +00:00
|
|
|
return CloudInfo.from_dict(self.data["get_info"])
|
2021-11-07 01:41:12 +00:00
|
|
|
|
2024-11-10 18:55:13 +00:00
|
|
|
def get_available_firmwares(self) -> dict:
|
2021-11-07 01:41:12 +00:00
|
|
|
"""Return list of available firmwares."""
|
|
|
|
return self.query_for_command("get_intl_fw_list")
|
|
|
|
|
2024-11-10 18:55:13 +00:00
|
|
|
def set_server(self, url: str) -> dict:
|
2021-11-07 01:41:12 +00:00
|
|
|
"""Set the update server URL."""
|
|
|
|
return self.query_for_command("set_server_url", {"server": url})
|
|
|
|
|
2024-11-10 18:55:13 +00:00
|
|
|
def connect(self, username: str, password: str) -> dict:
|
2021-11-07 01:41:12 +00:00
|
|
|
"""Login to the cloud using given information."""
|
|
|
|
return self.query_for_command(
|
|
|
|
"bind", {"username": username, "password": password}
|
|
|
|
)
|
|
|
|
|
2024-11-10 18:55:13 +00:00
|
|
|
def disconnect(self) -> dict:
|
2021-11-07 01:41:12 +00:00
|
|
|
"""Disconnect from the cloud."""
|
|
|
|
return self.query_for_command("unbind")
|