From 862b14dae0573ae1676e9bfdfd76da3b681e8911 Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Wed, 15 Dec 2021 18:04:06 +0000 Subject: [PATCH 01/20] Add get_image method to Stream --- homeassistant/components/stream/__init__.py | 11 ++++- homeassistant/components/stream/core.py | 46 +++++++++++++++++++ homeassistant/components/stream/manifest.json | 2 +- homeassistant/components/stream/worker.py | 5 +- requirements_all.txt | 1 + requirements_test_all.txt | 1 + tests/components/stream/test_worker.py | 26 ++++++++++- 7 files changed, 87 insertions(+), 5 deletions(-) diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 0556bc2c7a9dde..7e1755f1c9310f 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -50,7 +50,7 @@ STREAM_RESTART_RESET_TIME, TARGET_SEGMENT_DURATION_NON_LL_HLS, ) -from .core import PROVIDERS, IdleTimer, StreamOutput, StreamSettings +from .core import PROVIDERS, IdleTimer, KeyFrame, StreamOutput, StreamSettings from .hls import HlsStreamOutput, async_setup_hls _LOGGER = logging.getLogger(__name__) @@ -137,6 +137,8 @@ def libav_filter(record: logging.LogRecord) -> bool: # Set log level to error for libav.mp4 logging.getLogger("libav.mp4").setLevel(logging.ERROR) + # Suppress "deprecated pixel format" WARNING + logging.getLogger("libav.swscaler").setLevel(logging.ERROR) async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: @@ -214,6 +216,7 @@ def __init__( self._thread_quit = threading.Event() self._outputs: dict[str, StreamOutput] = {} self._fast_restart_once = False + self.last_keyframe = KeyFrame() self._available: bool = True self._update_callback: Callable[[], None] | None = None self._logger = ( @@ -327,6 +330,7 @@ def _run_worker(self) -> None: self.source, self.options, stream_state, + self.last_keyframe, self._thread_quit, ) except StreamWorkerError as err: @@ -419,3 +423,8 @@ async def async_record( # Wait for latest segment, then add the lookback await hls.recv() recorder.prepend(list(hls.get_segments())[-num_segments:]) + + async def get_image(self) -> bytes | None: + """Fetch an image from the Stream and return it as a jpeg in bytes.""" + + return await self.hass.async_add_executor_job(self.last_keyframe.get_bytes) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index b51c953e915956..71a79957662529 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -5,6 +5,7 @@ from collections import deque from collections.abc import Iterable import datetime +from io import BytesIO from typing import TYPE_CHECKING from aiohttp import web @@ -19,6 +20,8 @@ from .const import ATTR_STREAMS, DOMAIN if TYPE_CHECKING: + from av import Packet + from . import Stream PROVIDERS = Registry() @@ -356,3 +359,46 @@ async def handle( ) -> web.StreamResponse: """Handle the stream request.""" raise NotImplementedError() + + +class KeyFrame: + """Hold the last keyframe. + + The keyframe can be either a Packet or a jpeg of bytes. This is so that a + new keyframe assignment can remain atomic to avoid threading issues. + """ + + def __init__(self) -> None: + """Initialize.""" + + # Keep import here so that we can import stream integration without installing reqs + # pylint: disable=import-outside-toplevel + from homeassistant.components.camera.img_util import TurboJPEGSingleton + + self.keyframe: Packet | bytes | None = None + self.turbojpeg = TurboJPEGSingleton.instance() + self.get_bytes = self._get_bytes + if not self.turbojpeg: + self.get_bytes = lambda: None + + def _get_bytes(self) -> bytes | None: + """Get the keyframe as bytes.""" + + # Keep import here so that we can import stream integration without installing reqs + # pylint: disable=import-outside-toplevel + from av import Packet + + last_packet_or_image = self.keyframe + if isinstance(last_packet_or_image, Packet): + # decode packet (try up to 3 times) + for _i in range(3): + # pylint: disable=maybe-no-member + if frames := last_packet_or_image.decode(): + break + if frames: + image_file = BytesIO() + bgr_array = frames[0].to_ndarray(format="bgr24") + image_file.write(self.turbojpeg.encode(bgr_array)) + self.keyframe = last_packet_or_image = image_file.getvalue() + image_file.close() + return last_packet_or_image diff --git a/homeassistant/components/stream/manifest.json b/homeassistant/components/stream/manifest.json index d5115754e2eb9f..60d4a6e66ebe48 100644 --- a/homeassistant/components/stream/manifest.json +++ b/homeassistant/components/stream/manifest.json @@ -2,7 +2,7 @@ "domain": "stream", "name": "Stream", "documentation": "https://www.home-assistant.io/integrations/stream", - "requirements": ["ha-av==8.0.4-rc.1"], + "requirements": ["ha-av==8.0.4-rc.1", "PyTurboJPEG==1.6.3"], "dependencies": ["http"], "codeowners": ["@hunterjm", "@uvjustin", "@allenporter"], "quality_scale": "internal", diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index b1d79e528008fa..0903dbf60ea28c 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -14,7 +14,7 @@ from homeassistant.core import HomeAssistant -from . import redact_credentials +from . import KeyFrame, redact_credentials from .const import ( ATTR_SETTINGS, AUDIO_CODECS, @@ -439,6 +439,7 @@ def stream_worker( source: str, options: dict[str, str], stream_state: StreamState, + last_keyframe: KeyFrame, quit_event: Event, ) -> None: """Handle consuming streams.""" @@ -535,3 +536,5 @@ def is_video(packet: av.Packet) -> Any: raise StreamWorkerError("Error demuxing stream: %s" % str(ex)) from ex muxer.mux_packet(packet) + if packet.is_keyframe and packet.stream.type == "video": + last_keyframe.keyframe = packet diff --git a/requirements_all.txt b/requirements_all.txt index 3939bfa9f68a2d..bbe03d5c678079 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -55,6 +55,7 @@ PySocks==1.7.1 PyTransportNSW==0.1.1 # homeassistant.components.camera +# homeassistant.components.stream PyTurboJPEG==1.6.3 # homeassistant.components.vicare diff --git a/requirements_test_all.txt b/requirements_test_all.txt index a20c95e1603b3f..b1e954241b65ad 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -33,6 +33,7 @@ PyRMVtransport==0.3.3 PyTransportNSW==0.1.1 # homeassistant.components.camera +# homeassistant.components.stream PyTurboJPEG==1.6.3 # homeassistant.components.vicare diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index f05b2ece8293a6..fda18f24ed8740 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -23,7 +23,7 @@ import av import pytest -from homeassistant.components.stream import Stream, create_stream +from homeassistant.components.stream import KeyFrame, Stream, create_stream from homeassistant.components.stream.const import ( ATTR_SETTINGS, CONF_LL_HLS, @@ -195,6 +195,7 @@ def add_stream(self, template=None): class FakeAvOutputStream: def __init__(self, capture_packets): self.capture_packets = capture_packets + self.type = "ignored-type" def close(self): return @@ -258,7 +259,7 @@ def open(self, stream_source, *args, **kwargs): def run_worker(hass, stream, stream_source): """Run the stream worker under test.""" stream_state = StreamState(hass, stream.outputs) - stream_worker(stream_source, {}, stream_state, threading.Event()) + stream_worker(stream_source, {}, stream_state, KeyFrame(), threading.Event()) async def async_decode_stream(hass, packets, py_av=None): @@ -854,3 +855,24 @@ async def test_h265_video_is_hvc1(hass, record_worker_sync): await record_worker_sync.join() stream.stop() + + +async def test_get_image(hass, record_worker_sync): + """Test that the has_keyframe metadata matches the media.""" + await async_setup_component(hass, "stream", {"stream": {}}) + + source = generate_h264_video() + stream = create_stream(hass, source, {}) + + # use record_worker_sync to grab output segments + with patch.object(hass.config, "is_allowed_path", return_value=True): + await stream.async_record("/example/path") + + await record_worker_sync.join() + + assert isinstance(stream.last_keyframe.keyframe, av.Packet) + image = await stream.get_image() + assert image == stream.last_keyframe.keyframe + assert isinstance(image, bytes) + + stream.stop() From d4776669fdc54fab0c8f468e8ed079324228c75a Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Thu, 16 Dec 2021 09:40:16 +0000 Subject: [PATCH 02/20] Fix tests --- tests/components/camera/common.py | 1 + tests/components/stream/test_worker.py | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/components/camera/common.py b/tests/components/camera/common.py index 756a553f3c72c4..bd3841cc4e85c6 100644 --- a/tests/components/camera/common.py +++ b/tests/components/camera/common.py @@ -29,4 +29,5 @@ def mock_turbo_jpeg( (second_width, second_height, 0, 0), ] mocked_turbo_jpeg.scale_with_quality.return_value = EMPTY_8_6_JPEG + mocked_turbo_jpeg.encode.return_value = EMPTY_8_6_JPEG return mocked_turbo_jpeg diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index fda18f24ed8740..255ed10dc32452 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -45,6 +45,7 @@ ) from homeassistant.setup import async_setup_component +from tests.components.camera.common import mock_turbo_jpeg from tests.components.stream.common import generate_h264_video, generate_h265_video from tests.components.stream.test_ll_hls import TEST_PART_DURATION @@ -97,6 +98,11 @@ class FakeCodec: self.codec = FakeCodec() + class FakeCodecContext: + thread_type = "SLICE" + + self.codec_context = FakeCodecContext() + def __str__(self) -> str: """Return a stream name for debugging.""" return f"FakePyAvStream<{self.name}, {self.time_base}>" @@ -862,7 +868,13 @@ async def test_get_image(hass, record_worker_sync): await async_setup_component(hass, "stream", {"stream": {}}) source = generate_h264_video() - stream = create_stream(hass, source, {}) + + # Since libjpeg-turbo is not installed on the CI runner, we use a mock + with patch( + "homeassistant.components.camera.img_util.TurboJPEGSingleton" + ) as mock_turbo_jpeg_singleton: + mock_turbo_jpeg_singleton.instance.return_value = mock_turbo_jpeg() + stream = create_stream(hass, source, {}) # use record_worker_sync to grab output segments with patch.object(hass.config, "is_allowed_path", return_value=True): @@ -871,6 +883,7 @@ async def test_get_image(hass, record_worker_sync): await record_worker_sync.join() assert isinstance(stream.last_keyframe.keyframe, av.Packet) + image = await stream.get_image() assert image == stream.last_keyframe.keyframe assert isinstance(image, bytes) From d90ef18f785887d2d47372d577be84e741308b12 Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Thu, 16 Dec 2021 09:43:48 +0000 Subject: [PATCH 03/20] Disable threading for pyav decoder --- homeassistant/components/stream/worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 0903dbf60ea28c..831c4d56f094f9 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -454,6 +454,8 @@ def stream_worker( video_stream = container.streams.video[0] except (KeyError, IndexError) as ex: raise StreamWorkerError("Stream has no video") from ex + # Since we are only decoding one frame at a time we don't need threading + video_stream.codec_context.thread_type = "NONE" try: audio_stream = container.streams.audio[0] except (KeyError, IndexError): From 1e6935ebc83931454208a8fbf1778f59c1133897 Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Thu, 16 Dec 2021 09:46:34 +0000 Subject: [PATCH 04/20] Minor formatting changes --- homeassistant/components/stream/core.py | 2 +- homeassistant/components/stream/worker.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 71a79957662529..81ed3d2623f00f 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -376,8 +376,8 @@ def __init__(self) -> None: from homeassistant.components.camera.img_util import TurboJPEGSingleton self.keyframe: Packet | bytes | None = None - self.turbojpeg = TurboJPEGSingleton.instance() self.get_bytes = self._get_bytes + self.turbojpeg = TurboJPEGSingleton.instance() if not self.turbojpeg: self.get_bytes = lambda: None diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 831c4d56f094f9..660df124eb5cb2 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -538,5 +538,6 @@ def is_video(packet: av.Packet) -> Any: raise StreamWorkerError("Error demuxing stream: %s" % str(ex)) from ex muxer.mux_packet(packet) + if packet.is_keyframe and packet.stream.type == "video": last_keyframe.keyframe = packet From 4df3d9dbfd7f15033fb5168224196fef66ae8723 Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Thu, 16 Dec 2021 10:46:58 +0000 Subject: [PATCH 05/20] Get next keyframe instead of last keyframe Refactor and do keyframe conversion in worker --- homeassistant/components/stream/__init__.py | 10 ++-- homeassistant/components/stream/const.py | 2 + homeassistant/components/stream/core.py | 64 +++++++++++---------- homeassistant/components/stream/worker.py | 12 ++-- tests/components/stream/test_worker.py | 19 +++--- 5 files changed, 59 insertions(+), 48 deletions(-) diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 7e1755f1c9310f..83134a76289607 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -50,7 +50,7 @@ STREAM_RESTART_RESET_TIME, TARGET_SEGMENT_DURATION_NON_LL_HLS, ) -from .core import PROVIDERS, IdleTimer, KeyFrame, StreamOutput, StreamSettings +from .core import PROVIDERS, IdleTimer, KeyFrameConverter, StreamOutput, StreamSettings from .hls import HlsStreamOutput, async_setup_hls _LOGGER = logging.getLogger(__name__) @@ -216,7 +216,7 @@ def __init__( self._thread_quit = threading.Event() self._outputs: dict[str, StreamOutput] = {} self._fast_restart_once = False - self.last_keyframe = KeyFrame() + self.keyframe_converter = KeyFrameConverter() self._available: bool = True self._update_callback: Callable[[], None] | None = None self._logger = ( @@ -330,7 +330,7 @@ def _run_worker(self) -> None: self.source, self.options, stream_state, - self.last_keyframe, + self.keyframe_converter, self._thread_quit, ) except StreamWorkerError as err: @@ -424,7 +424,7 @@ async def async_record( await hls.recv() recorder.prepend(list(hls.get_segments())[-num_segments:]) - async def get_image(self) -> bytes | None: + async def get_image(self) -> bytes: """Fetch an image from the Stream and return it as a jpeg in bytes.""" - return await self.hass.async_add_executor_job(self.last_keyframe.get_bytes) + return await self.keyframe_converter.get_keyframe_image() diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index 50ae43df0d0fbe..9bccdcd126658d 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -42,3 +42,5 @@ CONF_LL_HLS = "ll_hls" CONF_PART_DURATION = "part_duration" CONF_SEGMENT_DURATION = "segment_duration" + +KEYFRAME_TIMEOUT = 60 # Number of seconds to wait for the next keyframe in get_image diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 81ed3d2623f00f..050402912ff6bf 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -17,7 +17,7 @@ from homeassistant.helpers.event import async_call_later from homeassistant.util.decorator import Registry -from .const import ATTR_STREAMS, DOMAIN +from .const import ATTR_STREAMS, DOMAIN, KEYFRAME_TIMEOUT if TYPE_CHECKING: from av import Packet @@ -361,12 +361,8 @@ async def handle( raise NotImplementedError() -class KeyFrame: - """Hold the last keyframe. - - The keyframe can be either a Packet or a jpeg of bytes. This is so that a - new keyframe assignment can remain atomic to avoid threading issues. - """ +class KeyFrameConverter: + """Generate and hold the keyframe as a jpeg.""" def __init__(self) -> None: """Initialize.""" @@ -375,30 +371,36 @@ def __init__(self) -> None: # pylint: disable=import-outside-toplevel from homeassistant.components.camera.img_util import TurboJPEGSingleton - self.keyframe: Packet | bytes | None = None - self.get_bytes = self._get_bytes + self.image_requested = False + self._image = b"" + self._event = asyncio.Event() self.turbojpeg = TurboJPEGSingleton.instance() if not self.turbojpeg: - self.get_bytes = lambda: None + self.get_bytes = lambda: b"" - def _get_bytes(self) -> bytes | None: - """Get the keyframe as bytes.""" - - # Keep import here so that we can import stream integration without installing reqs - # pylint: disable=import-outside-toplevel - from av import Packet - - last_packet_or_image = self.keyframe - if isinstance(last_packet_or_image, Packet): - # decode packet (try up to 3 times) - for _i in range(3): - # pylint: disable=maybe-no-member - if frames := last_packet_or_image.decode(): - break - if frames: - image_file = BytesIO() - bgr_array = frames[0].to_ndarray(format="bgr24") - image_file.write(self.turbojpeg.encode(bgr_array)) - self.keyframe = last_packet_or_image = image_file.getvalue() - image_file.close() - return last_packet_or_image + async def get_keyframe_image(self, timeout: int = KEYFRAME_TIMEOUT) -> bytes: + """Get the next keyframe image.""" + self.image_requested = True + try: + async with async_timeout.timeout(timeout): + await self._event.wait() + except asyncio.TimeoutError: + return b"" + return self._image + + def generate_keyframe_image(self, keyframe_packet: Packet) -> None: + """Generate the keyframe image in the worker.""" + + self.image_requested = False + # decode packet (try up to 3 times) + for _i in range(3): + if frames := keyframe_packet.decode(): + break + if frames: + image_file = BytesIO() + bgr_array = frames[0].to_ndarray(format="bgr24") + image_file.write(self.turbojpeg.encode(bgr_array)) + self._image = image_file.getvalue() + image_file.close() + self._event.set() + self._event.clear() diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 660df124eb5cb2..1f368c733409a8 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -14,7 +14,7 @@ from homeassistant.core import HomeAssistant -from . import KeyFrame, redact_credentials +from . import KeyFrameConverter, redact_credentials from .const import ( ATTR_SETTINGS, AUDIO_CODECS, @@ -439,7 +439,7 @@ def stream_worker( source: str, options: dict[str, str], stream_state: StreamState, - last_keyframe: KeyFrame, + keyframe_converter: KeyFrameConverter, quit_event: Event, ) -> None: """Handle consuming streams.""" @@ -539,5 +539,9 @@ def is_video(packet: av.Packet) -> Any: muxer.mux_packet(packet) - if packet.is_keyframe and packet.stream.type == "video": - last_keyframe.keyframe = packet + if ( + keyframe_converter.image_requested + and packet.is_keyframe + and packet.stream.type == "video" + ): + keyframe_converter.generate_keyframe_image(packet) diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index 255ed10dc32452..c48b7281971a90 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -13,6 +13,7 @@ failure modes or corner cases like how out of order packets are handled. """ +import asyncio import fractions import io import logging @@ -23,7 +24,7 @@ import av import pytest -from homeassistant.components.stream import KeyFrame, Stream, create_stream +from homeassistant.components.stream import KeyFrameConverter, Stream, create_stream from homeassistant.components.stream.const import ( ATTR_SETTINGS, CONF_LL_HLS, @@ -45,7 +46,7 @@ ) from homeassistant.setup import async_setup_component -from tests.components.camera.common import mock_turbo_jpeg +from tests.components.camera.common import EMPTY_8_6_JPEG, mock_turbo_jpeg from tests.components.stream.common import generate_h264_video, generate_h265_video from tests.components.stream.test_ll_hls import TEST_PART_DURATION @@ -265,7 +266,9 @@ def open(self, stream_source, *args, **kwargs): def run_worker(hass, stream, stream_source): """Run the stream worker under test.""" stream_state = StreamState(hass, stream.outputs) - stream_worker(stream_source, {}, stream_state, KeyFrame(), threading.Event()) + stream_worker( + stream_source, {}, stream_state, KeyFrameConverter(), threading.Event() + ) async def async_decode_stream(hass, packets, py_av=None): @@ -880,12 +883,12 @@ async def test_get_image(hass, record_worker_sync): with patch.object(hass.config, "is_allowed_path", return_value=True): await stream.async_record("/example/path") - await record_worker_sync.join() + assert stream.keyframe_converter._image == b"" + image_future = asyncio.create_task(stream.get_image()) - assert isinstance(stream.last_keyframe.keyframe, av.Packet) + await record_worker_sync.join() - image = await stream.get_image() - assert image == stream.last_keyframe.keyframe - assert isinstance(image, bytes) + image = await image_future + assert image == EMPTY_8_6_JPEG stream.stop() From acb29b5a89fb99bd3c1af8d53e5bc131793c7dbc Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Thu, 16 Dec 2021 17:14:28 +0000 Subject: [PATCH 06/20] Change behavior back to last keyframe --- homeassistant/components/stream/__init__.py | 7 ++-- homeassistant/components/stream/core.py | 38 ++++++++++++++------- homeassistant/components/stream/worker.py | 11 +++--- tests/components/stream/test_worker.py | 2 +- 4 files changed, 34 insertions(+), 24 deletions(-) diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 83134a76289607..d0823da3177858 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -216,7 +216,7 @@ def __init__( self._thread_quit = threading.Event() self._outputs: dict[str, StreamOutput] = {} self._fast_restart_once = False - self.keyframe_converter = KeyFrameConverter() + self._keyframe_converter = KeyFrameConverter() self._available: bool = True self._update_callback: Callable[[], None] | None = None self._logger = ( @@ -330,7 +330,7 @@ def _run_worker(self) -> None: self.source, self.options, stream_state, - self.keyframe_converter, + self._keyframe_converter, self._thread_quit, ) except StreamWorkerError as err: @@ -426,5 +426,4 @@ async def async_record( async def get_image(self) -> bytes: """Fetch an image from the Stream and return it as a jpeg in bytes.""" - - return await self.keyframe_converter.get_keyframe_image() + return await self._keyframe_converter.get_image() diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 050402912ff6bf..843972eb64a2e0 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -372,35 +372,47 @@ def __init__(self) -> None: from homeassistant.components.camera.img_util import TurboJPEGSingleton self.image_requested = False + self.packet: Packet = None self._image = b"" self._event = asyncio.Event() - self.turbojpeg = TurboJPEGSingleton.instance() - if not self.turbojpeg: - self.get_bytes = lambda: b"" + self.generate_image = self._generate_image + self._turbojpeg = TurboJPEGSingleton.instance() + if not self._turbojpeg: + self.generate_image = lambda: None - async def get_keyframe_image(self, timeout: int = KEYFRAME_TIMEOUT) -> bytes: - """Get the next keyframe image.""" + async def get_image(self) -> bytes: + """ + Fetch an image from the Stream and return it as a jpeg in bytes. + + This is called from outside the worker. + """ self.image_requested = True try: - async with async_timeout.timeout(timeout): + async with async_timeout.timeout(KEYFRAME_TIMEOUT): await self._event.wait() except asyncio.TimeoutError: return b"" return self._image - def generate_keyframe_image(self, keyframe_packet: Packet) -> None: - """Generate the keyframe image in the worker.""" + def check_generate_image(self) -> None: + """Generate the image if necessary and signal it is done from the worker.""" + if self.packet: + self.generate_image() + self.packet = None + if self._image: + self.image_requested = False + self._event.set() + self._event.clear() - self.image_requested = False + def _generate_image(self) -> None: + """Generate the keyframe image in the worker.""" # decode packet (try up to 3 times) for _i in range(3): - if frames := keyframe_packet.decode(): + if frames := self.packet.decode(): break if frames: image_file = BytesIO() bgr_array = frames[0].to_ndarray(format="bgr24") - image_file.write(self.turbojpeg.encode(bgr_array)) + image_file.write(self._turbojpeg.encode(bgr_array)) self._image = image_file.getvalue() image_file.close() - self._event.set() - self._event.clear() diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 1f368c733409a8..a890a303d1bc45 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -539,9 +539,8 @@ def is_video(packet: av.Packet) -> Any: muxer.mux_packet(packet) - if ( - keyframe_converter.image_requested - and packet.is_keyframe - and packet.stream.type == "video" - ): - keyframe_converter.generate_keyframe_image(packet) + if packet.is_keyframe and packet.stream.type == "video": + keyframe_converter.packet = packet + + if keyframe_converter.image_requested: + keyframe_converter.check_generate_image() diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index c48b7281971a90..2edd172a920c06 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -883,7 +883,7 @@ async def test_get_image(hass, record_worker_sync): with patch.object(hass.config, "is_allowed_path", return_value=True): await stream.async_record("/example/path") - assert stream.keyframe_converter._image == b"" + assert stream._keyframe_converter._image == b"" image_future = asyncio.create_task(stream.get_image()) await record_worker_sync.join() From 8ec4b60538f41290dd68c4e543d1df075d9c4eed Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Fri, 17 Dec 2021 04:35:18 +0000 Subject: [PATCH 07/20] Allow get_image to return None Refactor KeyFrameConverter Add KeyFrameConverter comments Adjust KEYFRAME_TIMEOUT --- homeassistant/components/stream/__init__.py | 2 +- homeassistant/components/stream/const.py | 2 +- homeassistant/components/stream/core.py | 28 +++++++++++++-------- tests/components/stream/test_worker.py | 2 +- 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index d0823da3177858..6c7803b3a5340d 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -424,6 +424,6 @@ async def async_record( await hls.recv() recorder.prepend(list(hls.get_segments())[-num_segments:]) - async def get_image(self) -> bytes: + async def get_image(self) -> bytes | None: """Fetch an image from the Stream and return it as a jpeg in bytes.""" return await self._keyframe_converter.get_image() diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index 9bccdcd126658d..e41f041518412d 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -43,4 +43,4 @@ CONF_PART_DURATION = "part_duration" CONF_SEGMENT_DURATION = "segment_duration" -KEYFRAME_TIMEOUT = 60 # Number of seconds to wait for the next keyframe in get_image +KEYFRAME_TIMEOUT = 10 # Number of seconds to wait for the worker to get the keyframe diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 843972eb64a2e0..60f781e93548a2 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -362,7 +362,14 @@ async def handle( class KeyFrameConverter: - """Generate and hold the keyframe as a jpeg.""" + """ + Generate and hold the keyframe as a jpeg. + + get_image is called from outside the worker. It sets the image_requested flag + and waits for an event from the worker. + check_generate_image and generate_image are called in the worker when the + image_requested flag is set. + """ def __init__(self) -> None: """Initialize.""" @@ -373,14 +380,11 @@ def __init__(self) -> None: self.image_requested = False self.packet: Packet = None - self._image = b"" + self._image: bytes | None = None self._event = asyncio.Event() - self.generate_image = self._generate_image self._turbojpeg = TurboJPEGSingleton.instance() - if not self._turbojpeg: - self.generate_image = lambda: None - async def get_image(self) -> bytes: + async def get_image(self) -> bytes | None: """ Fetch an image from the Stream and return it as a jpeg in bytes. @@ -391,21 +395,24 @@ async def get_image(self) -> bytes: async with async_timeout.timeout(KEYFRAME_TIMEOUT): await self._event.wait() except asyncio.TimeoutError: - return b"" + return None return self._image def check_generate_image(self) -> None: """Generate the image if necessary and signal it is done from the worker.""" if self.packet: - self.generate_image() + self._image = self.generate_image() self.packet = None if self._image: self.image_requested = False self._event.set() self._event.clear() - def _generate_image(self) -> None: + def generate_image(self) -> bytes | None: """Generate the keyframe image in the worker.""" + if not self._turbojpeg: + return None + image = None # decode packet (try up to 3 times) for _i in range(3): if frames := self.packet.decode(): @@ -414,5 +421,6 @@ def _generate_image(self) -> None: image_file = BytesIO() bgr_array = frames[0].to_ndarray(format="bgr24") image_file.write(self._turbojpeg.encode(bgr_array)) - self._image = image_file.getvalue() + image = image_file.getvalue() image_file.close() + return image diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index 2edd172a920c06..0237b88db02d60 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -883,7 +883,7 @@ async def test_get_image(hass, record_worker_sync): with patch.object(hass.config, "is_allowed_path", return_value=True): await stream.async_record("/example/path") - assert stream._keyframe_converter._image == b"" + assert stream._keyframe_converter._image is None image_future = asyncio.create_task(stream.get_image()) await record_worker_sync.join() From 26b301c9fa746c75446ea473c03158ccda3a190a Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Fri, 17 Dec 2021 16:34:22 +0000 Subject: [PATCH 08/20] Call generate_image from thread executor and not worker --- homeassistant/components/stream/__init__.py | 14 ++++++- homeassistant/components/stream/const.py | 2 - homeassistant/components/stream/core.py | 46 ++++----------------- homeassistant/components/stream/worker.py | 3 -- tests/components/stream/test_worker.py | 7 +--- 5 files changed, 21 insertions(+), 51 deletions(-) diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 6c7803b3a5340d..3a5b67d04235aa 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -425,5 +425,15 @@ async def async_record( recorder.prepend(list(hls.get_segments())[-num_segments:]) async def get_image(self) -> bytes | None: - """Fetch an image from the Stream and return it as a jpeg in bytes.""" - return await self._keyframe_converter.get_image() + """ + Fetch an image from the Stream and return it as a jpeg in bytes. + + This should only be called from the main thread. + """ + # Use a lock to ensure only one thread is working on the keyframe at a time + await self._keyframe_converter.lock.acquire() + self._keyframe_converter.image = await self.hass.async_add_executor_job( + self._keyframe_converter.generate_image + ) + self._keyframe_converter.lock.release() + return self._keyframe_converter.image diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index e41f041518412d..50ae43df0d0fbe 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -42,5 +42,3 @@ CONF_LL_HLS = "ll_hls" CONF_PART_DURATION = "part_duration" CONF_SEGMENT_DURATION = "segment_duration" - -KEYFRAME_TIMEOUT = 10 # Number of seconds to wait for the worker to get the keyframe diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 60f781e93548a2..6480c47e620b83 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -17,7 +17,7 @@ from homeassistant.helpers.event import async_call_later from homeassistant.util.decorator import Registry -from .const import ATTR_STREAMS, DOMAIN, KEYFRAME_TIMEOUT +from .const import ATTR_STREAMS, DOMAIN if TYPE_CHECKING: from av import Packet @@ -362,14 +362,7 @@ async def handle( class KeyFrameConverter: - """ - Generate and hold the keyframe as a jpeg. - - get_image is called from outside the worker. It sets the image_requested flag - and waits for an event from the worker. - check_generate_image and generate_image are called in the worker when the - image_requested flag is set. - """ + """Generate and hold the keyframe as a jpeg.""" def __init__(self) -> None: """Initialize.""" @@ -378,40 +371,15 @@ def __init__(self) -> None: # pylint: disable=import-outside-toplevel from homeassistant.components.camera.img_util import TurboJPEGSingleton - self.image_requested = False self.packet: Packet = None - self._image: bytes | None = None - self._event = asyncio.Event() + self.image: bytes | None = None self._turbojpeg = TurboJPEGSingleton.instance() - - async def get_image(self) -> bytes | None: - """ - Fetch an image from the Stream and return it as a jpeg in bytes. - - This is called from outside the worker. - """ - self.image_requested = True - try: - async with async_timeout.timeout(KEYFRAME_TIMEOUT): - await self._event.wait() - except asyncio.TimeoutError: - return None - return self._image - - def check_generate_image(self) -> None: - """Generate the image if necessary and signal it is done from the worker.""" - if self.packet: - self._image = self.generate_image() - self.packet = None - if self._image: - self.image_requested = False - self._event.set() - self._event.clear() + self.lock = asyncio.Lock() def generate_image(self) -> bytes | None: - """Generate the keyframe image in the worker.""" - if not self._turbojpeg: - return None + """Generate the keyframe image. This is called in an executor thread.""" + if not (self._turbojpeg and self.packet): + return self.image image = None # decode packet (try up to 3 times) for _i in range(3): diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index a890a303d1bc45..6c08a5dd622ea7 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -541,6 +541,3 @@ def is_video(packet: av.Packet) -> Any: if packet.is_keyframe and packet.stream.type == "video": keyframe_converter.packet = packet - - if keyframe_converter.image_requested: - keyframe_converter.check_generate_image() diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index 0237b88db02d60..08ebddcd577677 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -13,7 +13,6 @@ failure modes or corner cases like how out of order packets are handled. """ -import asyncio import fractions import io import logging @@ -883,12 +882,10 @@ async def test_get_image(hass, record_worker_sync): with patch.object(hass.config, "is_allowed_path", return_value=True): await stream.async_record("/example/path") - assert stream._keyframe_converter._image is None - image_future = asyncio.create_task(stream.get_image()) + assert stream._keyframe_converter.image is None await record_worker_sync.join() - image = await image_future - assert image == EMPTY_8_6_JPEG + assert await stream.get_image() == EMPTY_8_6_JPEG stream.stop() From 7dfd7b3e266b8aee0f1cfe8c2f2ab861a58688b1 Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Fri, 17 Dec 2021 19:34:20 +0000 Subject: [PATCH 09/20] Fix thread safety issue --- homeassistant/components/stream/core.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 6480c47e620b83..08e8549d6081b8 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -381,9 +381,11 @@ def generate_image(self) -> bytes | None: if not (self._turbojpeg and self.packet): return self.image image = None + packet = self.packet + self.packet = None # decode packet (try up to 3 times) for _i in range(3): - if frames := self.packet.decode(): + if frames := packet.decode(): break if frames: image_file = BytesIO() From 43b2464caf2db3175166103e87c8fd73758829dc Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Fri, 17 Dec 2021 19:46:30 +0000 Subject: [PATCH 10/20] Remove BytesIO --- homeassistant/components/stream/core.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 08e8549d6081b8..52b6ee9ff7aa5f 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -5,7 +5,6 @@ from collections import deque from collections.abc import Iterable import datetime -from io import BytesIO from typing import TYPE_CHECKING from aiohttp import web @@ -388,9 +387,6 @@ def generate_image(self) -> bytes | None: if frames := packet.decode(): break if frames: - image_file = BytesIO() bgr_array = frames[0].to_ndarray(format="bgr24") - image_file.write(self._turbojpeg.encode(bgr_array)) - image = image_file.getvalue() - image_file.close() + image = bytes(self._turbojpeg.encode(bgr_array)) return image From 357a240a6a1719dd0a79ddf036f7bf475d315eae Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Fri, 17 Dec 2021 19:58:51 +0000 Subject: [PATCH 11/20] Add width and height parameters to get_image --- homeassistant/components/stream/__init__.py | 8 ++++++-- homeassistant/components/stream/core.py | 7 +++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 3a5b67d04235aa..fba65f662bc763 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -424,7 +424,11 @@ async def async_record( await hls.recv() recorder.prepend(list(hls.get_segments())[-num_segments:]) - async def get_image(self) -> bytes | None: + async def get_image( + self, + width: int | None = None, + height: int | None = None, + ) -> bytes | None: """ Fetch an image from the Stream and return it as a jpeg in bytes. @@ -433,7 +437,7 @@ async def get_image(self) -> bytes | None: # Use a lock to ensure only one thread is working on the keyframe at a time await self._keyframe_converter.lock.acquire() self._keyframe_converter.image = await self.hass.async_add_executor_job( - self._keyframe_converter.generate_image + self._keyframe_converter.generate_image, width, height ) self._keyframe_converter.lock.release() return self._keyframe_converter.image diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 52b6ee9ff7aa5f..fa89049f3131a0 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -375,7 +375,7 @@ def __init__(self) -> None: self._turbojpeg = TurboJPEGSingleton.instance() self.lock = asyncio.Lock() - def generate_image(self) -> bytes | None: + def generate_image(self, width: int | None, height: int | None) -> bytes | None: """Generate the keyframe image. This is called in an executor thread.""" if not (self._turbojpeg and self.packet): return self.image @@ -387,6 +387,9 @@ def generate_image(self) -> bytes | None: if frames := packet.decode(): break if frames: - bgr_array = frames[0].to_ndarray(format="bgr24") + frame = frames[0] + if width and height: + frame = frame.reformat(width=width, height=height) + bgr_array = frame.to_ndarray(format="bgr24") image = bytes(self._turbojpeg.encode(bgr_array)) return image From 510c66483fe2525bac1804930ff87f9fb15151bd Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Fri, 17 Dec 2021 20:03:39 +0000 Subject: [PATCH 12/20] Use async_with syntax for lock --- homeassistant/components/stream/__init__.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index fba65f662bc763..27df3acd801e21 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -435,9 +435,8 @@ async def get_image( This should only be called from the main thread. """ # Use a lock to ensure only one thread is working on the keyframe at a time - await self._keyframe_converter.lock.acquire() - self._keyframe_converter.image = await self.hass.async_add_executor_job( - self._keyframe_converter.generate_image, width, height - ) - self._keyframe_converter.lock.release() + async with self._keyframe_converter.lock: + self._keyframe_converter.image = await self.hass.async_add_executor_job( + self._keyframe_converter.generate_image, width, height + ) return self._keyframe_converter.image From 6229c870911a57e9f3df0e03fbc34095ae59e57f Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Mon, 20 Dec 2021 06:24:27 +0000 Subject: [PATCH 13/20] Flush decoder instead of retrying packet --- homeassistant/components/stream/core.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index fa89049f3131a0..df132b096d52af 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -382,10 +382,12 @@ def generate_image(self, width: int | None, height: int | None) -> bytes | None: image = None packet = self.packet self.packet = None - # decode packet (try up to 3 times) - for _i in range(3): - if frames := packet.decode(): + # decode packet (flush afterwards) + frames = packet.decode() + for _i in range(2): + if frames: break + frames = packet.stream.codec_context.decode(None) if frames: frame = frames[0] if width and height: From dddfd480808afb074afd043289cb2fe725afcdb6 Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Mon, 20 Dec 2021 06:36:51 +0000 Subject: [PATCH 14/20] Use separate codec_context in KeyframeConverter --- homeassistant/components/stream/core.py | 21 +++++++++++++++++---- homeassistant/components/stream/worker.py | 3 +-- tests/components/stream/test_worker.py | 5 ----- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index df132b096d52af..6014c93969a505 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -19,7 +19,7 @@ from .const import ATTR_STREAMS, DOMAIN if TYPE_CHECKING: - from av import Packet + from av import CodecContext, Packet from . import Stream @@ -374,20 +374,33 @@ def __init__(self) -> None: self.image: bytes | None = None self._turbojpeg = TurboJPEGSingleton.instance() self.lock = asyncio.Lock() + self._codec_context: CodecContext | None = None + + def create_codec_context(self, codec_context: CodecContext) -> None: + """Create a codec context to be used for decoding the keyframes.""" + + # Keep import here so that we can import stream integration without installing reqs + # pylint: disable=import-outside-toplevel + from av import CodecContext + + self._codec_context = CodecContext.create(codec_context.name, "r") + self._codec_context.extradata = codec_context.extradata + self._codec_context.skip_frame = "NONKEY" + self._codec_context.thread_type = "NONE" def generate_image(self, width: int | None, height: int | None) -> bytes | None: """Generate the keyframe image. This is called in an executor thread.""" - if not (self._turbojpeg and self.packet): + if not (self._turbojpeg and self.packet and self._codec_context): return self.image image = None packet = self.packet self.packet = None # decode packet (flush afterwards) - frames = packet.decode() + frames = self._codec_context.decode(packet) for _i in range(2): if frames: break - frames = packet.stream.codec_context.decode(None) + frames = self._codec_context.decode(None) if frames: frame = frames[0] if width and height: diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 6c08a5dd622ea7..ac1c25739e2b3a 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -454,8 +454,7 @@ def stream_worker( video_stream = container.streams.video[0] except (KeyError, IndexError) as ex: raise StreamWorkerError("Stream has no video") from ex - # Since we are only decoding one frame at a time we don't need threading - video_stream.codec_context.thread_type = "NONE" + keyframe_converter.create_codec_context(codec_context=video_stream.codec_context) try: audio_stream = container.streams.audio[0] except (KeyError, IndexError): diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index 08ebddcd577677..f3129b164290b6 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -98,11 +98,6 @@ class FakeCodec: self.codec = FakeCodec() - class FakeCodecContext: - thread_type = "SLICE" - - self.codec_context = FakeCodecContext() - def __str__(self) -> str: """Return a stream name for debugging.""" return f"FakePyAvStream<{self.name}, {self.time_base}>" From f549f31e0e8ae01315e2211cb934f2b26f56dfc4 Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Mon, 20 Dec 2021 07:40:04 +0000 Subject: [PATCH 15/20] Fix test_worker --- tests/components/stream/test_worker.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index f3129b164290b6..e77ced6b68c3cb 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -98,6 +98,12 @@ class FakeCodec: self.codec = FakeCodec() + class FakeCodecContext: + name = "h264" + extradata = None + + self.codec_context = FakeCodecContext() + def __str__(self) -> str: """Return a stream name for debugging.""" return f"FakePyAvStream<{self.name}, {self.time_base}>" From bbe0982ad9a7e92050fc03a6dabc05d9444384a8 Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Wed, 22 Dec 2021 08:49:06 +0000 Subject: [PATCH 16/20] Use is_video for detecting packet stream type --- homeassistant/components/stream/worker.py | 4 ++-- tests/components/stream/test_worker.py | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index ac1c25739e2b3a..e633d146444252 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -476,7 +476,7 @@ def stream_worker( def is_video(packet: av.Packet) -> Any: """Return true if the packet is for the video stream.""" - return packet.stream == video_stream + return packet.stream.type == "video" # Have to work around two problems with RTSP feeds in ffmpeg # 1 - first frame has bad pts/dts https://trac.ffmpeg.org/ticket/5018 @@ -538,5 +538,5 @@ def is_video(packet: av.Packet) -> Any: muxer.mux_packet(packet) - if packet.is_keyframe and packet.stream.type == "video": + if packet.is_keyframe and is_video(packet): keyframe_converter.packet = packet diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index e77ced6b68c3cb..193be8675d91a7 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -104,6 +104,11 @@ class FakeCodecContext: self.codec_context = FakeCodecContext() + @property + def type(self): + """Return packet type.""" + return "video" if self.name == VIDEO_STREAM_FORMAT else "audio" + def __str__(self) -> str: """Return a stream name for debugging.""" return f"FakePyAvStream<{self.name}, {self.time_base}>" From 55193235b8ba4e942b404ce3833b6663704ed157 Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Wed, 22 Dec 2021 09:12:10 +0000 Subject: [PATCH 17/20] Refactor get_image into KeyFrameConverter Add class comments --- homeassistant/components/stream/__init__.py | 16 ++------ homeassistant/components/stream/core.py | 44 ++++++++++++++++----- tests/components/stream/test_worker.py | 4 +- 3 files changed, 41 insertions(+), 23 deletions(-) diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 27df3acd801e21..fec9731136ff3e 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -216,7 +216,7 @@ def __init__( self._thread_quit = threading.Event() self._outputs: dict[str, StreamOutput] = {} self._fast_restart_once = False - self._keyframe_converter = KeyFrameConverter() + self._keyframe_converter = KeyFrameConverter(hass) self._available: bool = True self._update_callback: Callable[[], None] | None = None self._logger = ( @@ -429,14 +429,6 @@ async def get_image( width: int | None = None, height: int | None = None, ) -> bytes | None: - """ - Fetch an image from the Stream and return it as a jpeg in bytes. - - This should only be called from the main thread. - """ - # Use a lock to ensure only one thread is working on the keyframe at a time - async with self._keyframe_converter.lock: - self._keyframe_converter.image = await self.hass.async_add_executor_job( - self._keyframe_converter.generate_image, width, height - ) - return self._keyframe_converter.image + """Wrap get_image from KeyFrameConverter.""" + + return await self._keyframe_converter.get_image(width=width, height=height) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 6014c93969a505..956f35e2bb0394 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -361,9 +361,25 @@ async def handle( class KeyFrameConverter: - """Generate and hold the keyframe as a jpeg.""" + """ + Generate and hold the keyframe as a jpeg. + + Each of the functions here is only run once at any time per instance. + create_codec_context is run in the worker thread + get_image is called from the main thread and obtains the instance lock + _generate_image is called by get_image and run in an executor thread + + An overview of the thread and state interaction: + the worker thread sets a packet + at any time, main loop can run a get_image call + _generate_image will try to create an image from the packet + Running _generate_image will clear the packet, so there will only + be one attempt per packet + If successful, _image will be updated and returned by get_image + If unsuccessful, get_image will return the previous image + """ - def __init__(self) -> None: + def __init__(self, hass: HomeAssistant) -> None: """Initialize.""" # Keep import here so that we can import stream integration without installing reqs @@ -371,7 +387,8 @@ def __init__(self) -> None: from homeassistant.components.camera.img_util import TurboJPEGSingleton self.packet: Packet = None - self.image: bytes | None = None + self._hass = hass + self._image: bytes | None = None self._turbojpeg = TurboJPEGSingleton.instance() self.lock = asyncio.Lock() self._codec_context: CodecContext | None = None @@ -388,11 +405,10 @@ def create_codec_context(self, codec_context: CodecContext) -> None: self._codec_context.skip_frame = "NONKEY" self._codec_context.thread_type = "NONE" - def generate_image(self, width: int | None, height: int | None) -> bytes | None: - """Generate the keyframe image. This is called in an executor thread.""" + def _generate_image(self, width: int | None, height: int | None) -> None: + """Generate the keyframe image.""" if not (self._turbojpeg and self.packet and self._codec_context): - return self.image - image = None + return packet = self.packet self.packet = None # decode packet (flush afterwards) @@ -406,5 +422,15 @@ def generate_image(self, width: int | None, height: int | None) -> bytes | None: if width and height: frame = frame.reformat(width=width, height=height) bgr_array = frame.to_ndarray(format="bgr24") - image = bytes(self._turbojpeg.encode(bgr_array)) - return image + self._image = bytes(self._turbojpeg.encode(bgr_array)) + + async def get_image( + self, + width: int | None = None, + height: int | None = None, + ) -> bytes | None: + """Fetch an image from the Stream and return it as a jpeg in bytes.""" + # Use a lock to ensure only one thread is working on the keyframe at a time + async with self.lock: + await self._hass.async_add_executor_job(self._generate_image, width, height) + return self._image diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index 193be8675d91a7..eb50e76a80a621 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -272,7 +272,7 @@ def run_worker(hass, stream, stream_source): """Run the stream worker under test.""" stream_state = StreamState(hass, stream.outputs) stream_worker( - stream_source, {}, stream_state, KeyFrameConverter(), threading.Event() + stream_source, {}, stream_state, KeyFrameConverter(hass), threading.Event() ) @@ -888,7 +888,7 @@ async def test_get_image(hass, record_worker_sync): with patch.object(hass.config, "is_allowed_path", return_value=True): await stream.async_record("/example/path") - assert stream._keyframe_converter.image is None + assert stream._keyframe_converter._image is None await record_worker_sync.join() From fb9b8fec1a4c10476ecd29a4db7cd5d4050b149b Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Wed, 22 Dec 2021 09:17:07 +0000 Subject: [PATCH 18/20] Make lock private --- homeassistant/components/stream/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 956f35e2bb0394..b6902d1e665d49 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -390,7 +390,7 @@ def __init__(self, hass: HomeAssistant) -> None: self._hass = hass self._image: bytes | None = None self._turbojpeg = TurboJPEGSingleton.instance() - self.lock = asyncio.Lock() + self._lock = asyncio.Lock() self._codec_context: CodecContext | None = None def create_codec_context(self, codec_context: CodecContext) -> None: @@ -431,6 +431,6 @@ async def get_image( ) -> bytes | None: """Fetch an image from the Stream and return it as a jpeg in bytes.""" # Use a lock to ensure only one thread is working on the keyframe at a time - async with self.lock: + async with self._lock: await self._hass.async_add_executor_job(self._generate_image, width, height) return self._image From aec32bc650d76d0df455c8e6177cf891be9fa974 Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Wed, 22 Dec 2021 09:39:45 +0000 Subject: [PATCH 19/20] Update KeyFrameConverter comments --- homeassistant/components/stream/core.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index b6902d1e665d49..5de1d09ce5d00e 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -364,11 +364,6 @@ class KeyFrameConverter: """ Generate and hold the keyframe as a jpeg. - Each of the functions here is only run once at any time per instance. - create_codec_context is run in the worker thread - get_image is called from the main thread and obtains the instance lock - _generate_image is called by get_image and run in an executor thread - An overview of the thread and state interaction: the worker thread sets a packet at any time, main loop can run a get_image call @@ -394,7 +389,11 @@ def __init__(self, hass: HomeAssistant) -> None: self._codec_context: CodecContext | None = None def create_codec_context(self, codec_context: CodecContext) -> None: - """Create a codec context to be used for decoding the keyframes.""" + """ + Create a codec context to be used for decoding the keyframes. + + This is run by the worker thread and will only be called once. + """ # Keep import here so that we can import stream integration without installing reqs # pylint: disable=import-outside-toplevel @@ -406,7 +405,14 @@ def create_codec_context(self, codec_context: CodecContext) -> None: self._codec_context.thread_type = "NONE" def _generate_image(self, width: int | None, height: int | None) -> None: - """Generate the keyframe image.""" + """ + Generate the keyframe image. + + This is run in an executor thread, but since it is called within an + the asyncio lock from the main thread, there will only be one entry + at a time per instance. + """ + if not (self._turbojpeg and self.packet and self._codec_context): return packet = self.packet @@ -430,6 +436,7 @@ async def get_image( height: int | None = None, ) -> bytes | None: """Fetch an image from the Stream and return it as a jpeg in bytes.""" + # Use a lock to ensure only one thread is working on the keyframe at a time async with self._lock: await self._hass.async_add_executor_job(self._generate_image, width, height) From b2f17a6e66001ec8dd7244a4f82af78136e2dc52 Mon Sep 17 00:00:00 2001 From: Justin Wong <46082645+uvjustin@users.noreply.github.com> Date: Wed, 22 Dec 2021 09:46:27 +0000 Subject: [PATCH 20/20] Update comment --- homeassistant/components/stream/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 5de1d09ce5d00e..08397fb68762b5 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -392,7 +392,7 @@ def create_codec_context(self, codec_context: CodecContext) -> None: """ Create a codec context to be used for decoding the keyframes. - This is run by the worker thread and will only be called once. + This is run by the worker thread and will only be called once per worker. """ # Keep import here so that we can import stream integration without installing reqs