Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion homeassistant/components/camera/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,12 @@ async def create_stream(self) -> Stream | None:
source = await self.stream_source()
if not source:
return None
self.stream = create_stream(self.hass, source, options=self.stream_options)
self.stream = create_stream(
self.hass,
source,
options=self.stream_options,
stream_label=self.entity_id,
)
self.stream.set_update_callback(self.async_write_ha_state)
return self.stream

Expand Down
42 changes: 31 additions & 11 deletions homeassistant/components/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,17 @@ def redact_credentials(data: str) -> str:


def create_stream(
hass: HomeAssistant, stream_source: str, options: dict[str, str]
hass: HomeAssistant,
stream_source: str,
options: dict[str, str],
stream_label: str | None = None,
) -> Stream:
"""Create a stream with the specified identfier based on the source url.

The stream_source is typically an rtsp url and options are passed into
pyav / ffmpeg as options.
The stream_source is typically an rtsp url (though any url accepted by ffmpeg is fine) and
options are passed into pyav / ffmpeg as options.

The stream_label is a string used as an additional message in logging.
"""
if DOMAIN not in hass.config.components:
raise HomeAssistantError("Stream integration is not set up.")
Expand All @@ -87,7 +92,7 @@ def create_stream(
**options,
}

stream = Stream(hass, stream_source, options=options)
stream = Stream(hass, stream_source, options=options, stream_label=stream_label)
hass.data[DOMAIN][ATTR_STREAMS].append(stream)
return stream

Expand Down Expand Up @@ -192,12 +197,17 @@ class Stream:
"""Represents a single stream."""

def __init__(
self, hass: HomeAssistant, source: str, options: dict[str, str]
self,
hass: HomeAssistant,
source: str,
options: dict[str, str],
stream_label: str | None = None,
) -> None:
"""Initialize a stream."""
self.hass = hass
self.source = source
self.options = options
self._stream_label = stream_label
self.keepalive = False
self.access_token: str | None = None
self._thread: threading.Thread | None = None
Expand All @@ -206,6 +216,11 @@ def __init__(
self._fast_restart_once = False
self._available: bool = True
self._update_callback: Callable[[], None] | None = None
self._logger = (
logging.getLogger(f"{__package__}.stream.{stream_label}")
if stream_label
else _LOGGER
)

def endpoint_url(self, fmt: str) -> str:
"""Start the stream and returns a url for the output format."""
Expand Down Expand Up @@ -285,11 +300,13 @@ def start(self) -> None:
target=self._run_worker,
)
self._thread.start()
_LOGGER.info("Started stream: %s", redact_credentials(str(self.source)))
self._logger.info(
"Started stream: %s", redact_credentials(str(self.source))
)

def update_source(self, new_source: str) -> None:
"""Restart the stream with a new stream source."""
_LOGGER.debug("Updating stream source %s", new_source)
self._logger.debug("Updating stream source %s", new_source)
self.source = new_source
self._fast_restart_once = True
self._thread_quit.set()
Expand All @@ -313,7 +330,8 @@ def _run_worker(self) -> None:
self._thread_quit,
)
except StreamWorkerError as err:
_LOGGER.error("Error from stream worker: %s", str(err))
self._logger.error("Error from stream worker: %s", str(err))
self._available = False

stream_state.discontinuity()
if not self.keepalive or self._thread_quit.is_set():
Expand All @@ -332,7 +350,7 @@ def _run_worker(self) -> None:
if time.time() - start_time > STREAM_RESTART_RESET_TIME:
wait_timeout = 0
wait_timeout += STREAM_RESTART_INCREMENT
_LOGGER.debug(
self._logger.debug(
"Restarting stream worker in %d seconds: %s",
wait_timeout,
self.source,
Expand Down Expand Up @@ -363,7 +381,9 @@ def _stop(self) -> None:
self._thread_quit.set()
self._thread.join()
self._thread = None
_LOGGER.info("Stopped stream: %s", redact_credentials(str(self.source)))
self._logger.info(
"Stopped stream: %s", redact_credentials(str(self.source))
)

async def async_record(
self, video_path: str, duration: int = 30, lookback: int = 5
Expand All @@ -390,7 +410,7 @@ async def async_record(
recorder.video_path = video_path

self.start()
_LOGGER.debug("Started a stream recording of %s seconds", duration)
self._logger.debug("Started a stream recording of %s seconds", duration)

# Take advantage of lookback
hls: HlsStreamOutput = cast(HlsStreamOutput, self.outputs().get(HLS_PROVIDER))
Expand Down
6 changes: 3 additions & 3 deletions tests/components/stream/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ async def test_durations(hass, record_worker_sync):
)

source = generate_h264_video(duration=SEGMENT_DURATION + 1)
stream = create_stream(hass, source, {})
stream = create_stream(hass, source, {}, stream_label="camera")

# use record_worker_sync to grab output segments
with patch.object(hass.config, "is_allowed_path", return_value=True):
Expand Down Expand Up @@ -797,7 +797,7 @@ async def test_has_keyframe(hass, record_worker_sync, h264_video):
},
)

stream = create_stream(hass, h264_video, {})
stream = create_stream(hass, h264_video, {}, stream_label="camera")

# use record_worker_sync to grab output segments
with patch.object(hass.config, "is_allowed_path", return_value=True):
Expand Down Expand Up @@ -836,7 +836,7 @@ async def test_h265_video_is_hvc1(hass, record_worker_sync):
)

source = generate_h265_video()
stream = create_stream(hass, source, {})
stream = create_stream(hass, source, {}, stream_label="camera")

# use record_worker_sync to grab output segments
with patch.object(hass.config, "is_allowed_path", return_value=True):
Expand Down