diff --git a/ffmpeg/__init__.py b/ffmpeg/__init__.py index c1f79cf..5d85069 100644 --- a/ffmpeg/__init__.py +++ b/ffmpeg/__init__.py @@ -9,4 +9,4 @@ from .ffmpeg import FFmpeg from .progress import Progress -__version__ = "2.0.9" +__version__ = "2.0.10" diff --git a/ffmpeg/asyncio/ffmpeg.py b/ffmpeg/asyncio/ffmpeg.py index 6bedec9..26e4d8c 100644 --- a/ffmpeg/asyncio/ffmpeg.py +++ b/ffmpeg/asyncio/ffmpeg.py @@ -5,7 +5,7 @@ import os import signal import subprocess -from typing import Optional, Union +from typing import Optional, Union, AsyncIterable from pyee.asyncio import AsyncIOEventEmitter from typing_extensions import Self @@ -33,6 +33,7 @@ def __init__(self, executable: str = "ffmpeg"): self._process: asyncio.subprocess.Process self._executed: bool = False self._terminated: bool = False + self._tasks = {} self._tracker = Tracker(self) # type: ignore @@ -146,32 +147,13 @@ def output( self._options.output(url, options, **kwargs) return self - async def execute( - self, stream: Optional[Union[bytes, asyncio.StreamReader]] = None, timeout: Optional[float] = None - ) -> bytes: - """Execute FFmpeg using specified global options and files. - - Args: - stream: A stream to input to the standard input. Defaults to None. - timeout: The maximum number of seconds to wait before returning. Defaults to None. - - Raises: - FFmpegAlreadyExecuted: If FFmpeg is already executed. - FFmpegError: If FFmpeg process returns non-zero exit status. - asyncio.TimeoutError: If FFmpeg process does not terminate after `timeout` seconds. - - Returns: - The output to the standard output. - """ + async def _start_execution(self, stream: Optional[Union[bytes, asyncio.StreamReader]] = None): if self._executed: raise FFmpegAlreadyExecuted("FFmpeg is already executed", arguments=self.arguments) self._executed = False self._terminated = False - if stream is not None: - stream = ensure_stream_reader(stream) - self.emit("start", self.arguments) self._process = await create_subprocess( @@ -182,13 +164,9 @@ async def execute( ) self._executed = True - tasks = [ - asyncio.create_task(self._write_stdin(stream)), - asyncio.create_task(self._read_stdout()), - asyncio.create_task(self._handle_stderr()), - asyncio.create_task(asyncio.wait_for(self._process.wait(), timeout=timeout)), - ] - done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) + + async def _wait_for_execution(self): + done, pending = await asyncio.wait(self._tasks.values(), return_when=asyncio.FIRST_EXCEPTION) self._executed = False for task in done: @@ -205,9 +183,55 @@ async def execute( elif self._terminated: self.emit("terminated") else: - raise FFmpegError.create(message=tasks[2].result(), arguments=self.arguments) + raise FFmpegError.create(message=self._tasks["stderr"].result(), arguments=self.arguments) - return tasks[1].result() + async def execute( + self, stream: Optional[Union[bytes, asyncio.StreamReader]] = None, timeout: Optional[float] = None, + ) -> bytes: + """Execute FFmpeg using specified global options and files. + + Args: + stream: A stream to input to the standard input. Defaults to None. + timeout: The maximum number of seconds to wait before returning. Defaults to None. + + Raises: + FFmpegAlreadyExecuted: If FFmpeg is already executed. + FFmpegError: If FFmpeg process returns non-zero exit status. + asyncio.TimeoutError: If FFmpeg process does not terminate after `timeout` seconds. + + Returns: + The output to the standard output. + """ + if stream is not None: + stream = ensure_stream_reader(stream) + await self._start_execution(stream=stream) + + self._tasks = { + "stdin": asyncio.create_task(self._write_stdin(stream)), + "stdout": asyncio.create_task(self._read_stdout()), + "stderr": asyncio.create_task(self._handle_stderr()), + "main": asyncio.create_task(asyncio.wait_for(self._process.wait(), timeout=timeout)), + } + + await self._wait_for_execution() + return self._tasks["stdout"].result() + + async def execute_stream_stdout( + self, stream: Optional[Union[bytes, asyncio.StreamReader]] = None, timeout: Optional[float] = None, + *, stream_chunk_size=io.DEFAULT_BUFFER_SIZE + ) -> AsyncIterable[bytes]: + """Like `execute`, but returns a generator of bytes, so you can read the output as it starts to come out.""" + if stream is not None: + stream = ensure_stream_reader(stream) + await self._start_execution(stream=stream) + self._tasks = { + "stdin": asyncio.create_task(self._write_stdin(stream)), + "stderr": asyncio.create_task(self._handle_stderr()), + "main": asyncio.create_task(asyncio.wait_for(self._process.wait(), timeout=timeout)), + } + async for chunk in read_stream(self._process.stdout, size=stream_chunk_size): + yield chunk + await self._wait_for_execution() def terminate(self): """Gracefully terminate the running FFmpeg process. diff --git a/tests/test_asyncio_pipe.py b/tests/test_asyncio_pipe.py index 939f06b..f64d0f1 100644 --- a/tests/test_asyncio_pipe.py +++ b/tests/test_asyncio_pipe.py @@ -66,3 +66,32 @@ async def test_asyncio_output_via_stdout( assert abs(float(source["format"]["duration"]) - float(target["format"]["duration"])) <= epsilon assert target["format"]["format_name"] == "ogg" + + +@pytest.mark.asyncio +async def test_asyncio_output_stream( + assets_path: Path, + tmp_path: Path, +): + source_path = assets_path / "brewing.wav" + target_path = tmp_path / "brewing.ogg" + + ffmpeg = ( + FFmpeg() + .option("y") + .input(source_path) + .output( + "pipe:1", + f="ogg", + ) + ) + with open(target_path, "wb") as target_file: + async for chunk in ffmpeg.execute_stream_stdout(): + target_file.write(chunk) + + source = probe(source_path) + target = probe(target_path) + + assert abs(float(source["format"]["duration"]) - float(target["format"]["duration"])) <= epsilon + assert target["format"]["format_name"] == "ogg" +