Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ffmpeg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from .ffmpeg import FFmpeg
from .progress import Progress

__version__ = "2.0.9"
__version__ = "2.0.10"
84 changes: 54 additions & 30 deletions ffmpeg/asyncio/ffmpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
import signal
import subprocess
from typing import Optional, Union
from typing import Optional, Union, AsyncIterable

from pyee.asyncio import AsyncIOEventEmitter
from typing_extensions import Self
Expand Down Expand Up @@ -33,6 +33,7 @@ def __init__(self, executable: str = "ffmpeg"):
self._process: asyncio.subprocess.Process
self._executed: bool = False
self._terminated: bool = False
self._tasks = {}

self._tracker = Tracker(self) # type: ignore

Expand Down Expand Up @@ -146,32 +147,13 @@ def output(
self._options.output(url, options, **kwargs)
return self

async def execute(
self, stream: Optional[Union[bytes, asyncio.StreamReader]] = None, timeout: Optional[float] = None
) -> bytes:
"""Execute FFmpeg using specified global options and files.

Args:
stream: A stream to input to the standard input. Defaults to None.
timeout: The maximum number of seconds to wait before returning. Defaults to None.

Raises:
FFmpegAlreadyExecuted: If FFmpeg is already executed.
FFmpegError: If FFmpeg process returns non-zero exit status.
asyncio.TimeoutError: If FFmpeg process does not terminate after `timeout` seconds.

Returns:
The output to the standard output.
"""
async def _start_execution(self, stream: Optional[Union[bytes, asyncio.StreamReader]] = None):
if self._executed:
raise FFmpegAlreadyExecuted("FFmpeg is already executed", arguments=self.arguments)

self._executed = False
self._terminated = False

if stream is not None:
stream = ensure_stream_reader(stream)

self.emit("start", self.arguments)

self._process = await create_subprocess(
Expand All @@ -182,13 +164,9 @@ async def execute(
)

self._executed = True
tasks = [
asyncio.create_task(self._write_stdin(stream)),
asyncio.create_task(self._read_stdout()),
asyncio.create_task(self._handle_stderr()),
asyncio.create_task(asyncio.wait_for(self._process.wait(), timeout=timeout)),
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

async def _wait_for_execution(self):
done, pending = await asyncio.wait(self._tasks.values(), return_when=asyncio.FIRST_EXCEPTION)
self._executed = False

for task in done:
Expand All @@ -205,9 +183,55 @@ async def execute(
elif self._terminated:
self.emit("terminated")
else:
raise FFmpegError.create(message=tasks[2].result(), arguments=self.arguments)
raise FFmpegError.create(message=self._tasks["stderr"].result(), arguments=self.arguments)

return tasks[1].result()
async def execute(
self, stream: Optional[Union[bytes, asyncio.StreamReader]] = None, timeout: Optional[float] = None,
) -> bytes:
"""Execute FFmpeg using specified global options and files.

Args:
stream: A stream to input to the standard input. Defaults to None.
timeout: The maximum number of seconds to wait before returning. Defaults to None.

Raises:
FFmpegAlreadyExecuted: If FFmpeg is already executed.
FFmpegError: If FFmpeg process returns non-zero exit status.
asyncio.TimeoutError: If FFmpeg process does not terminate after `timeout` seconds.

Returns:
The output to the standard output.
"""
if stream is not None:
stream = ensure_stream_reader(stream)
await self._start_execution(stream=stream)

self._tasks = {
"stdin": asyncio.create_task(self._write_stdin(stream)),
"stdout": asyncio.create_task(self._read_stdout()),
"stderr": asyncio.create_task(self._handle_stderr()),
"main": asyncio.create_task(asyncio.wait_for(self._process.wait(), timeout=timeout)),
}

await self._wait_for_execution()
return self._tasks["stdout"].result()

async def execute_stream_stdout(
self, stream: Optional[Union[bytes, asyncio.StreamReader]] = None, timeout: Optional[float] = None,
*, stream_chunk_size=io.DEFAULT_BUFFER_SIZE
) -> AsyncIterable[bytes]:
"""Like `execute`, but returns a generator of bytes, so you can read the output as it starts to come out."""
if stream is not None:
stream = ensure_stream_reader(stream)
await self._start_execution(stream=stream)
self._tasks = {
"stdin": asyncio.create_task(self._write_stdin(stream)),
"stderr": asyncio.create_task(self._handle_stderr()),
"main": asyncio.create_task(asyncio.wait_for(self._process.wait(), timeout=timeout)),
}
async for chunk in read_stream(self._process.stdout, size=stream_chunk_size):
yield chunk
await self._wait_for_execution()

def terminate(self):
"""Gracefully terminate the running FFmpeg process.
Expand Down
29 changes: 29 additions & 0 deletions tests/test_asyncio_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,32 @@ async def test_asyncio_output_via_stdout(

assert abs(float(source["format"]["duration"]) - float(target["format"]["duration"])) <= epsilon
assert target["format"]["format_name"] == "ogg"


@pytest.mark.asyncio
async def test_asyncio_output_stream(
assets_path: Path,
tmp_path: Path,
):
source_path = assets_path / "brewing.wav"
target_path = tmp_path / "brewing.ogg"

ffmpeg = (
FFmpeg()
.option("y")
.input(source_path)
.output(
"pipe:1",
f="ogg",
)
)
with open(target_path, "wb") as target_file:
async for chunk in ffmpeg.execute_stream_stdout():
target_file.write(chunk)

source = probe(source_path)
target = probe(target_path)

assert abs(float(source["format"]["duration"]) - float(target["format"]["duration"])) <= epsilon
assert target["format"]["format_name"] == "ogg"