mirror of
https://github.com/python-kasa/python-kasa.git
synced 2025-08-06 10:44:04 +00:00
Provide alternative camera urls (#1316)
This commit is contained in:
@@ -4,6 +4,7 @@ from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import logging
|
||||
from enum import StrEnum
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
from ...credentials import Credentials
|
||||
@@ -15,6 +16,14 @@ from ..smartcammodule import SmartCamModule
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
LOCAL_STREAMING_PORT = 554
|
||||
ONVIF_PORT = 2020
|
||||
|
||||
|
||||
class StreamResolution(StrEnum):
|
||||
"""Class for stream resolution."""
|
||||
|
||||
HD = "HD"
|
||||
SD = "SD"
|
||||
|
||||
|
||||
class Camera(SmartCamModule):
|
||||
@@ -64,7 +73,12 @@ class Camera(SmartCamModule):
|
||||
|
||||
return None
|
||||
|
||||
def stream_rtsp_url(self, credentials: Credentials | None = None) -> str | None:
|
||||
def stream_rtsp_url(
|
||||
self,
|
||||
credentials: Credentials | None = None,
|
||||
*,
|
||||
stream_resolution: StreamResolution = StreamResolution.HD,
|
||||
) -> str | None:
|
||||
"""Return the local rtsp streaming url.
|
||||
|
||||
:param credentials: Credentials for camera account.
|
||||
@@ -73,17 +87,27 @@ class Camera(SmartCamModule):
|
||||
:return: rtsp url with escaped credentials or None if no credentials or
|
||||
camera is off.
|
||||
"""
|
||||
if not self.is_on:
|
||||
streams = {
|
||||
StreamResolution.HD: "stream1",
|
||||
StreamResolution.SD: "stream2",
|
||||
}
|
||||
if (stream := streams.get(stream_resolution)) is None:
|
||||
return None
|
||||
dev = self._device
|
||||
|
||||
if not credentials:
|
||||
credentials = self._get_credentials()
|
||||
|
||||
if not credentials or not credentials.username or not credentials.password:
|
||||
return None
|
||||
|
||||
username = quote_plus(credentials.username)
|
||||
password = quote_plus(credentials.password)
|
||||
return f"rtsp://{username}:{password}@{dev.host}:{LOCAL_STREAMING_PORT}/stream1"
|
||||
|
||||
return f"rtsp://{username}:{password}@{self._device.host}:{LOCAL_STREAMING_PORT}/{stream}"
|
||||
|
||||
def onvif_url(self) -> str | None:
|
||||
"""Return the onvif url."""
|
||||
return f"http://{self._device.host}:{ONVIF_PORT}/onvif/device_service"
|
||||
|
||||
async def set_state(self, on: bool) -> dict:
|
||||
"""Set the device state."""
|
||||
|
Reference in New Issue
Block a user