mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-12-20 03:30:12 +00:00
Add module support & query their information during update cycle (#243)
* Add module support & modularize existing query This creates a base to expose more features on the supported devices. At the moment, the most visible change is that each update cycle gets information from all available modules: * Basic system info * Cloud (new) * Countdown (new) * Antitheft (new) * Schedule (new) * Time (existing, implements the time/timezone handling) * Emeter (existing, partially separated from smartdevice) * Fix imports * Fix linting * Use device host instead of alias in module repr * Add property to list available modules, print them in cli state report * usage: fix the get_realtime query * separate usage from schedule to avoid multi-inheritance * Fix module querying * Add is_supported property to modules
This commit is contained in:
50
kasa/modules/cloud.py
Normal file
50
kasa/modules/cloud.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""Cloud module implementation."""
|
||||
from pydantic import BaseModel
|
||||
|
||||
from .module import Module
|
||||
|
||||
|
||||
class CloudInfo(BaseModel):
|
||||
"""Container for cloud settings."""
|
||||
|
||||
binded: bool
|
||||
cld_connection: int
|
||||
fwDlPage: str
|
||||
fwNotifyType: int
|
||||
illegalType: int
|
||||
server: str
|
||||
stopConnect: int
|
||||
tcspInfo: str
|
||||
tcspStatus: int
|
||||
username: str
|
||||
|
||||
|
||||
class Cloud(Module):
|
||||
"""Module implementing support for cloud services."""
|
||||
|
||||
def query(self):
|
||||
"""Request cloud connectivity info."""
|
||||
return self.query_for_command("get_info")
|
||||
|
||||
@property
|
||||
def info(self) -> CloudInfo:
|
||||
"""Return information about the cloud connectivity."""
|
||||
return CloudInfo.parse_obj(self.data["get_info"])
|
||||
|
||||
def get_available_firmwares(self):
|
||||
"""Return list of available firmwares."""
|
||||
return self.query_for_command("get_intl_fw_list")
|
||||
|
||||
def set_server(self, url: str):
|
||||
"""Set the update server URL."""
|
||||
return self.query_for_command("set_server_url", {"server": url})
|
||||
|
||||
def connect(self, username: str, password: str):
|
||||
"""Login to the cloud using given information."""
|
||||
return self.query_for_command(
|
||||
"bind", {"username": username, "password": password}
|
||||
)
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnect from the cloud."""
|
||||
return self.query_for_command("unbind")
|
||||
Reference in New Issue
Block a user