Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion homeassistant/components/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
STREAM_RESTART_RESET_TIME,
TARGET_SEGMENT_DURATION_NON_LL_HLS,
)
from .core import PROVIDERS, IdleTimer, StreamOutput, StreamSettings
from .core import PROVIDERS, IdleTimer, KeyFrameConverter, StreamOutput, StreamSettings
from .hls import HlsStreamOutput, async_setup_hls

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -137,6 +137,8 @@ def libav_filter(record: logging.LogRecord) -> bool:

# Set log level to error for libav.mp4
logging.getLogger("libav.mp4").setLevel(logging.ERROR)
# Suppress "deprecated pixel format" WARNING
logging.getLogger("libav.swscaler").setLevel(logging.ERROR)


async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
Expand Down Expand Up @@ -214,6 +216,7 @@ def __init__(
self._thread_quit = threading.Event()
self._outputs: dict[str, StreamOutput] = {}
self._fast_restart_once = False
self._keyframe_converter = KeyFrameConverter(hass)
self._available: bool = True
self._update_callback: Callable[[], None] | None = None
self._logger = (
Expand Down Expand Up @@ -327,6 +330,7 @@ def _run_worker(self) -> None:
self.source,
self.options,
stream_state,
self._keyframe_converter,
self._thread_quit,
)
except StreamWorkerError as err:
Expand Down Expand Up @@ -419,3 +423,12 @@ async def async_record(
# Wait for latest segment, then add the lookback
await hls.recv()
recorder.prepend(list(hls.get_segments())[-num_segments:])

async def get_image(
self,
width: int | None = None,
height: int | None = None,
) -> bytes | None:
"""Wrap get_image from KeyFrameConverter."""

return await self._keyframe_converter.get_image(width=width, height=height)
85 changes: 85 additions & 0 deletions homeassistant/components/stream/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from .const import ATTR_STREAMS, DOMAIN

if TYPE_CHECKING:
from av import CodecContext, Packet

from . import Stream

PROVIDERS = Registry()
Expand Down Expand Up @@ -356,3 +358,86 @@ async def handle(
) -> web.StreamResponse:
"""Handle the stream request."""
raise NotImplementedError()


class KeyFrameConverter:
"""
Generate and hold the keyframe as a jpeg.

An overview of the thread and state interaction:
the worker thread sets a packet
at any time, main loop can run a get_image call
_generate_image will try to create an image from the packet
Running _generate_image will clear the packet, so there will only
be one attempt per packet
If successful, _image will be updated and returned by get_image
If unsuccessful, get_image will return the previous image
"""

def __init__(self, hass: HomeAssistant) -> None:
"""Initialize."""

# Keep import here so that we can import stream integration without installing reqs
# pylint: disable=import-outside-toplevel
from homeassistant.components.camera.img_util import TurboJPEGSingleton
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a problem to make camera a dependencies for stream? It seems like they would be loading it anyways

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also a circular import problem between the two components because camera also imports things from stream. The easiest way to avoid this is to rejig the imports (including the conditional ones) to avoid importing individual classes/functions from each other and instead importing the whole modules (ie avoid using from x import y and just using import x), but the former seems to be preferred in the HA codebase and the existing stream and camera code follow this. If we want to make those changes, I can do that in this PR or another one.


self.packet: Packet = None
self._hass = hass
self._image: bytes | None = None
self._turbojpeg = TurboJPEGSingleton.instance()
self._lock = asyncio.Lock()
self._codec_context: CodecContext | None = None

def create_codec_context(self, codec_context: CodecContext) -> None:
"""
Create a codec context to be used for decoding the keyframes.

This is run by the worker thread and will only be called once per worker.
"""

# Keep import here so that we can import stream integration without installing reqs
# pylint: disable=import-outside-toplevel
from av import CodecContext

self._codec_context = CodecContext.create(codec_context.name, "r")
self._codec_context.extradata = codec_context.extradata
self._codec_context.skip_frame = "NONKEY"
self._codec_context.thread_type = "NONE"

def _generate_image(self, width: int | None, height: int | None) -> None:
"""
Generate the keyframe image.

This is run in an executor thread, but since it is called within an
the asyncio lock from the main thread, there will only be one entry
at a time per instance.
"""

if not (self._turbojpeg and self.packet and self._codec_context):
return
packet = self.packet
self.packet = None
# decode packet (flush afterwards)
frames = self._codec_context.decode(packet)
for _i in range(2):
if frames:
break
frames = self._codec_context.decode(None)
if frames:
frame = frames[0]
if width and height:
frame = frame.reformat(width=width, height=height)
bgr_array = frame.to_ndarray(format="bgr24")
self._image = bytes(self._turbojpeg.encode(bgr_array))

async def get_image(
self,
width: int | None = None,
height: int | None = None,
) -> bytes | None:
"""Fetch an image from the Stream and return it as a jpeg in bytes."""

# Use a lock to ensure only one thread is working on the keyframe at a time
async with self._lock:
await self._hass.async_add_executor_job(self._generate_image, width, height)
return self._image
2 changes: 1 addition & 1 deletion homeassistant/components/stream/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"domain": "stream",
"name": "Stream",
"documentation": "https://www.home-assistant.io/integrations/stream",
"requirements": ["ha-av==8.0.4-rc.1"],
"requirements": ["ha-av==8.0.4-rc.1", "PyTurboJPEG==1.6.3"],
"dependencies": ["http"],
"codeowners": ["@hunterjm", "@uvjustin", "@allenporter"],
"quality_scale": "internal",
Expand Down
9 changes: 7 additions & 2 deletions homeassistant/components/stream/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from homeassistant.core import HomeAssistant

from . import redact_credentials
from . import KeyFrameConverter, redact_credentials
from .const import (
ATTR_SETTINGS,
AUDIO_CODECS,
Expand Down Expand Up @@ -439,6 +439,7 @@ def stream_worker(
source: str,
options: dict[str, str],
stream_state: StreamState,
keyframe_converter: KeyFrameConverter,
quit_event: Event,
) -> None:
"""Handle consuming streams."""
Expand All @@ -453,6 +454,7 @@ def stream_worker(
video_stream = container.streams.video[0]
except (KeyError, IndexError) as ex:
raise StreamWorkerError("Stream has no video") from ex
keyframe_converter.create_codec_context(codec_context=video_stream.codec_context)
try:
audio_stream = container.streams.audio[0]
except (KeyError, IndexError):
Expand All @@ -474,7 +476,7 @@ def stream_worker(

def is_video(packet: av.Packet) -> Any:
"""Return true if the packet is for the video stream."""
return packet.stream == video_stream
return packet.stream.type == "video"

# Have to work around two problems with RTSP feeds in ffmpeg
# 1 - first frame has bad pts/dts https://trac.ffmpeg.org/ticket/5018
Expand Down Expand Up @@ -535,3 +537,6 @@ def is_video(packet: av.Packet) -> Any:
raise StreamWorkerError("Error demuxing stream: %s" % str(ex)) from ex

muxer.mux_packet(packet)

if packet.is_keyframe and is_video(packet):
keyframe_converter.packet = packet
1 change: 1 addition & 0 deletions requirements_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ PySocks==1.7.1
PyTransportNSW==0.1.1

# homeassistant.components.camera
# homeassistant.components.stream
PyTurboJPEG==1.6.3

# homeassistant.components.vicare
Expand Down
1 change: 1 addition & 0 deletions requirements_test_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ PyRMVtransport==0.3.3
PyTransportNSW==0.1.1

# homeassistant.components.camera
# homeassistant.components.stream
PyTurboJPEG==1.6.3

# homeassistant.components.vicare
Expand Down
1 change: 1 addition & 0 deletions tests/components/camera/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ def mock_turbo_jpeg(
(second_width, second_height, 0, 0),
]
mocked_turbo_jpeg.scale_with_quality.return_value = EMPTY_8_6_JPEG
mocked_turbo_jpeg.encode.return_value = EMPTY_8_6_JPEG
return mocked_turbo_jpeg
45 changes: 43 additions & 2 deletions tests/components/stream/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import av
import pytest

from homeassistant.components.stream import Stream, create_stream
from homeassistant.components.stream import KeyFrameConverter, Stream, create_stream
from homeassistant.components.stream.const import (
ATTR_SETTINGS,
CONF_LL_HLS,
Expand All @@ -45,6 +45,7 @@
)
from homeassistant.setup import async_setup_component

from tests.components.camera.common import EMPTY_8_6_JPEG, mock_turbo_jpeg
from tests.components.stream.common import generate_h264_video, generate_h265_video
from tests.components.stream.test_ll_hls import TEST_PART_DURATION

Expand Down Expand Up @@ -97,6 +98,17 @@ class FakeCodec:

self.codec = FakeCodec()

class FakeCodecContext:
name = "h264"
extradata = None

self.codec_context = FakeCodecContext()

@property
def type(self):
"""Return packet type."""
return "video" if self.name == VIDEO_STREAM_FORMAT else "audio"

def __str__(self) -> str:
"""Return a stream name for debugging."""
return f"FakePyAvStream<{self.name}, {self.time_base}>"
Expand Down Expand Up @@ -195,6 +207,7 @@ def add_stream(self, template=None):
class FakeAvOutputStream:
def __init__(self, capture_packets):
self.capture_packets = capture_packets
self.type = "ignored-type"

def close(self):
return
Expand Down Expand Up @@ -258,7 +271,9 @@ def open(self, stream_source, *args, **kwargs):
def run_worker(hass, stream, stream_source):
"""Run the stream worker under test."""
stream_state = StreamState(hass, stream.outputs)
stream_worker(stream_source, {}, stream_state, threading.Event())
stream_worker(
stream_source, {}, stream_state, KeyFrameConverter(hass), threading.Event()
)


async def async_decode_stream(hass, packets, py_av=None):
Expand Down Expand Up @@ -854,3 +869,29 @@ async def test_h265_video_is_hvc1(hass, record_worker_sync):
await record_worker_sync.join()

stream.stop()


async def test_get_image(hass, record_worker_sync):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add some test coverage for the case with multiple tasks asking for image thumbnail to be produced? (I assume that is a case that can happen)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add this after we figure out the other stuff.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the code stands right now, there is an asyncio lock around the call to generate_image() from the main thread, so there can only be one generate_image() per KeyframeConverter/stream at a time. I think a test for that specific case is not necessary since the lock is straightforward. I think adding such a test may also be quite involved as we'd have to add another pause/sync mechanism inside the get_image call so that the calls overlap, so there would be a complicated test with little benefit.
As for the threading problems before, I think they have mostly been resolved. The last one was due to trying to use the CodecContext object from the existing outputs. It would work for a while until the container was closed at which time the CodecContext object was deallocated. I've now created a separate CodecContext just for KeyframeConverter use.

"""Test that the has_keyframe metadata matches the media."""
await async_setup_component(hass, "stream", {"stream": {}})

source = generate_h264_video()

# Since libjpeg-turbo is not installed on the CI runner, we use a mock
with patch(
"homeassistant.components.camera.img_util.TurboJPEGSingleton"
) as mock_turbo_jpeg_singleton:
mock_turbo_jpeg_singleton.instance.return_value = mock_turbo_jpeg()
stream = create_stream(hass, source, {})

# use record_worker_sync to grab output segments
with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record("/example/path")

assert stream._keyframe_converter._image is None

await record_worker_sync.join()

assert await stream.get_image() == EMPTY_8_6_JPEG

stream.stop()