Add listen module to smartcam

This commit is contained in:
Steven B
2024-12-18 18:54:08 +00:00
parent 56261e649d
commit 99e8a2fd87
8 changed files with 438 additions and 15 deletions

61
kasa/cli/listen.py Normal file
View File

@@ -0,0 +1,61 @@
"""Module for cli light control commands."""
import asyncio
import sys
from typing import cast
import asyncclick as click
from kasa import (
Credentials,
Device,
)
from .common import echo, error, pass_dev_or_child
async def aioinput(string: str):
"""Non loop blocking get input."""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda s=string: sys.stdout.write(s + " ")) # type: ignore[misc]
return await loop.run_in_executor(None, sys.stdin.readline)
@click.command()
@click.option(
"--cam-username",
required=True,
envvar="KASA_CAMERA_USERNAME",
help="Camera account username address to authenticate to device.",
)
@click.option(
"--cam-password",
required=True,
envvar="KASA_CAMERA_PASSWORD",
help="Camera account password to use to authenticate to device.",
)
@pass_dev_or_child
async def listen(dev: Device, cam_username: str, cam_password: str) -> None:
"""Commands to control light settings."""
try:
import onvif # type: ignore[import-untyped] # noqa: F401
except ImportError:
error("python-kasa must be installed with [onvif] extra for listen.")
from kasa.smartcam.modules.listen import EventType, Listen
listen: Listen = cast(Listen, dev.modules.get(Listen._module_name()))
if not listen:
error(f"Device {dev.host} does not support listening for events.")
def on_event(event: EventType) -> None:
echo(f"Device {dev.host} received event {event}")
creds = Credentials(cam_username, cam_password)
await listen.listen(on_event, creds)
await aioinput("Listening, press enter to cancel\n")
echo("Stopping listener")
await listen.stop()

View File

@@ -71,6 +71,7 @@ def _legacy_type_to_class(_type: str) -> Any:
"device": None,
"feature": None,
"light": None,
"listen": None,
"wifi": None,
"time": None,
"schedule": None,
@@ -252,9 +253,7 @@ async def cli(
if target != DEFAULT_TARGET and host:
error("--target is not a valid option for single host discovery")
logging_config: dict[str, Any] = {
"level": logging.DEBUG if debug > 0 else logging.INFO
}
logging_config: dict[str, Any] = {"level": logging.WARNING}
try:
from rich.logging import RichHandler
@@ -269,6 +268,7 @@ async def cli(
# The configuration should be converted to use dictConfig,
# but this keeps mypy happy for now
logging.basicConfig(**logging_config) # type: ignore
logging.getLogger("kasa").setLevel(logging.DEBUG if debug > 0 else logging.INFO)
if ctx.invoked_subcommand == "discover":
return

View File

@@ -160,6 +160,7 @@ class Module(ABC):
# SMARTCAM only modules
Camera: Final[ModuleName[smartcam.Camera]] = ModuleName("Camera")
LensMask: Final[ModuleName[smartcam.LensMask]] = ModuleName("LensMask")
Motion: Final[ModuleName[smartcam.Motion]] = ModuleName("Motion")
def __init__(self, device: Device, module: str) -> None:
self._device = device

View File

@@ -0,0 +1,139 @@
"""Implementation of motion detection module."""
from __future__ import annotations
import asyncio
import logging
import os
import socket
from collections.abc import Callable
from datetime import timedelta
from enum import StrEnum, auto
from subprocess import check_output
import onvif # type: ignore[import-untyped]
from aiohttp import web
from onvif.managers import NotificationManager # type: ignore[import-untyped]
from ...credentials import Credentials
from ..smartcammodule import SmartCamModule
_LOGGER = logging.getLogger(__name__)
class EventType(StrEnum):
"""Listen event types."""
MOTION_DETECTED = auto()
PERSON_DETECTED = auto()
TAMPER_DETECTED = auto()
BABY_CRY_DETECTED = auto()
TOPIC_EVENT_TYPE = {
"tns1:RuleEngine/CellMotionDetector/Motion": EventType.MOTION_DETECTED,
"tns1:RuleEngine/CellMotionDetector/People": EventType.PERSON_DETECTED,
"tns1:RuleEngine/TamperDetector/Tamper": EventType.TAMPER_DETECTED,
}
class Listen(SmartCamModule):
"""Implementation of lens mask module."""
manager: NotificationManager
callback: Callable[[EventType], None]
topics: set[EventType] | None
listening = False
site: web.TCPSite
runner: web.AppRunner
async def _invoke_callback(self, event: EventType) -> None:
self.callback(event)
async def _handle_event(self, request: web.Request) -> web.Response:
content = await request.read()
result = self.manager.process(content)
for msg in result.NotificationMessage:
if (event := TOPIC_EVENT_TYPE.get(msg.Topic._value_1)) and (
not self.topics or event in self.topics
):
asyncio.create_task(self._invoke_callback(event))
return web.Response()
async def listen(
self,
callback: Callable[[EventType], None],
camera_credentials: Credentials,
*,
topics: set[EventType] | None = None,
listen_ip: str | None = None,
) -> None:
"""Start listening for events."""
self.callback = callback
self.topics = topics
def subscription_lost() -> None:
pass
wsdl = f"{os.path.dirname(onvif.__file__)}/wsdl/"
mycam = onvif.ONVIFCamera(
self._device.host,
2020,
camera_credentials.username,
camera_credentials.password,
wsdl,
)
await mycam.update_xaddrs()
address = await self._start_server(listen_ip)
self.manager = await mycam.create_notification_manager(
address=address,
interval=timedelta(minutes=10),
subscription_lost_callback=subscription_lost,
)
self.listening = True
_LOGGER.debug("Listener started for %s", self._device.host)
async def stop(self) -> None:
"""Stop the listener."""
if not self.listening:
return
self.listening = False
await self.site.stop()
await self.runner.shutdown()
async def _get_host_port(self, listen_ip: str | None) -> tuple[str, int]:
def _create_socket(listen_ip: str | None) -> tuple[str, int]:
if not listen_ip:
res = check_output(["hostname", "-I"]) # noqa: S603, S607
listen_ip, _, _ = res.decode().partition(" ")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((listen_ip, 0))
port = sock.getsockname()[1]
sock.close()
return listen_ip, port
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _create_socket, listen_ip)
async def _start_server(self, listen_ip: str | None) -> str:
app = web.Application()
app.add_routes([web.post("/", self._handle_event)])
self.runner = web.AppRunner(app)
await self.runner.setup()
listen_ip, port = await self._get_host_port(listen_ip)
self.site = web.TCPSite(self.runner, listen_ip, port)
await self.site.start()
_LOGGER.debug(
"Listen handler for %s running on %s:%s", self._device.host, listen_ip, port
)
return f"http://{listen_ip}:{port}"

View File

@@ -11,7 +11,7 @@ _LOGGER = logging.getLogger(__name__)
class Motion(SmartCamModule):
"""Implementation of lens mask module."""
"""Implementation of motion detection module."""
REQUIRED_COMPONENT = "detection"

View File

@@ -11,7 +11,7 @@ from ..module import Module
from ..protocols.smartcamprotocol import _ChildCameraProtocolWrapper
from ..smart import SmartChildDevice, SmartDevice
from ..smart.smartdevice import ComponentsRaw
from .modules import ChildDevice, DeviceModule
from .modules import Camera, ChildDevice, DeviceModule, Time
from .smartcammodule import SmartCamModule
_LOGGER = logging.getLogger(__name__)
@@ -128,22 +128,37 @@ class SmartCamDevice(SmartDevice):
self._children = children
def _try_add_listen_module(self) -> None:
try:
import onvif # type: ignore[import-untyped] # noqa: F401
except ImportError:
return
from .modules.listen import Listen
self._modules[Listen._module_name()] = Listen(self, Listen._module_name())
async def _initialize_modules(self) -> None:
"""Initialize modules based on component negotiation response."""
for mod in SmartCamModule.REGISTERED_MODULES.values():
required_component = cast(str, mod.REQUIRED_COMPONENT)
if (
mod.REQUIRED_COMPONENT
and mod.REQUIRED_COMPONENT not in self._components
# Always add Camera module to cameras
and (
mod._module_name() != Module.Camera
or self._device_type is not DeviceType.Camera
required_component in self._components
or any(
self.sys_info.get(key) is not None
for key in mod.SYSINFO_LOOKUP_KEYS
)
or mod in self.FIRST_UPDATE_MODULES
or mod is Time
):
continue
module = mod(self, mod._module_name())
if await module._check_supported():
self._modules[module.name] = module
module = mod(self, mod._module_name())
if await module._check_supported():
self._modules[module.name] = module
if self._device_type is DeviceType.Camera:
self._modules[Camera._module_name()] = Camera(self, Camera._module_name())
if Module.Motion in self._modules:
self._try_add_listen_module()
async def _initialize_features(self) -> None:
"""Initialize device features."""