[feat] Streaming diffusion video generation output#3737
Conversation
hsliuustc0106
left a comment
There was a problem hiding this comment.
BLOCKING:
- Test Coverage — The test results section says "To be updated" for unit/integration tests, and smoke test/E2E are marked "Todo". Please paste actual test outputs and add at least one smoke test showing streaming video chunks are emitted correctly.
VERDICT: REQUEST_CHANGES
Performance Check (currently not on CI)TLDRSeems like for Helios,
As a result,
Note:
Test
An important request param to speed up generation (already made default in both examples and noted in README) is Streaming, first timeStep-execution currently cannot log step-wise diffuse latency. So before this feature is completed in a separate PR, I have to "guess" the latency based on the logger time. Server side: From the end of text encoder to the last chunk's VAE decode: 13s. Token-To-First-Chunk: 3s Client side: Similar. e2e is around 13s. Input-To-First-Chunk: 3.18s Streaming (subsequent run)**TLDR: the same as the first run**Non streaming (baseline, first run)Non streaming (baseline, subsequent run)Why non-streaming mode has different performance across two runsFor Helios-Distilled with dummy This behavior is fixed in step execution mode (otherwise this mode would fail to run at all). But the non-streaming mode implementation is intentionally unchanged in this PR---to minimize unnecessary changes and allow accuracy comparison between these two modes) |
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
|
Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits. |
There was a problem hiding this comment.
Pull request overview
Adds diffusion video streaming output support across the diffusion pipeline/executor/engine stack and exposes a new OpenAI-style WebSocket endpoint (/v1/videos/stream) that streams fragmented MP4 bytes as generation proceeds.
Changes:
- Introduces
streaming_outputmode for diffusion requests, propagating chunkedDiffusionOutputwithfinishedsemantics across workers, executors, engine, and orchestrator. - Adds a WebSocket video output streaming endpoint plus incremental fMP4 encoding/finalization utilities.
- Expands unit/integration/e2e coverage for streamed chunk forwarding and streaming-vs-non-streaming similarity.
Reviewed changes
Copilot reviewed 46 out of 46 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| vllm_omni/outputs.py | Plumbs finished flag into diffusion outputs. |
| vllm_omni/entrypoints/openai/video_api_utils.py | Adds fMP4 streaming encoder + finalization helpers. |
| vllm_omni/entrypoints/openai/serving_video_output_stream.py | Implements /v1/videos/stream WebSocket handler. |
| vllm_omni/entrypoints/openai/api_server.py | Wires streaming video output handler into API server. |
| vllm_omni/entrypoints/omni_base.py | Forwards finished to request outputs. |
| vllm_omni/entrypoints/cli/serve.py | Adds --streaming-output CLI flag. |
| vllm_omni/engine/async_omni_engine.py | Passes streaming_output into diffusion stage config. |
| vllm_omni/diffusion/worker/diffusion_worker.py | Streams chunked outputs from workers when enabled. |
| vllm_omni/diffusion/worker/diffusion_model_runner.py | Supports pipeline generators for streaming output. |
| vllm_omni/diffusion/utils/media_utils.py | Adds incremental fragmented MP4 muxer + remux finalizer. |
| vllm_omni/diffusion/stage_diffusion_proc.py | Streams multiple ZMQ result envelopes per request. |
| vllm_omni/diffusion/sched/request_scheduler.py | Uses finished to decide request completion. |
| vllm_omni/diffusion/profiler/diffusion_pipeline_profiler.py | Adds generator-aware profiling wrapper. |
| vllm_omni/diffusion/models/interface.py | Defines streaming-output pipeline protocol. |
| vllm_omni/diffusion/models/helios/pipeline_helios.py | Implements chunk-yielding Helios forward path. |
| vllm_omni/diffusion/inline_stage_diffusion_client.py | Supports streaming chunk delivery inline. |
| vllm_omni/diffusion/executor/multiproc_executor.py | Adds execute_streaming_request generator RPC path. |
| vllm_omni/diffusion/executor/abstract.py | Adds streaming execution abstract method. |
| vllm_omni/diffusion/diffusion_engine.py | Adds async streaming step + streaming output queues. |
| vllm_omni/diffusion/data.py | Adds streaming_output config + streaming fields on outputs. |
| tests/helpers/runtime.py | Adds WS client helper for /v1/videos/stream. |
| tests/helpers/assertions.py | Adjusts Helios frame-count assertions. |
| tests/entrypoints/test_async_omni.py | Tests AsyncOmni yields intermediate diffusion chunks. |
| tests/entrypoints/openai_api/test_video_server.py | Updates extra_params expectations for Helios presets. |
| tests/entrypoints/openai_api/test_video_api_utils.py | Tests fMP4 encoder + finalization utilities. |
| tests/entrypoints/openai_api/test_serving_video_output_stream.py | Adds WebSocket protocol/unit tests for streaming. |
| tests/engine/test_orchestrator.py | Tests orchestrator forwards intermediate diffusion chunks. |
| tests/e2e/accuracy/wan22_i2v/test_wan22_i2v_video_similarity.py | Refactors to shared video similarity helpers. |
| tests/e2e/accuracy/test_video_streaming_output_similarity.py | Adds streaming vs non-streaming video similarity smoke test. |
| tests/e2e/accuracy/test_diffusers_backend_similarity.py | Imports shared ffmpeg similarity helper. |
| tests/e2e/accuracy/helpers.py | Adds ffmpeg/ffprobe-based video similarity helpers. |
| tests/diffusion/test_stage_diffusion_proc.py | Tests StageDiffusionProc yields every streaming chunk. |
| tests/diffusion/test_multiproc_engine_concurrency.py | Adds multiproc streaming behavior tests. |
| tests/diffusion/test_inline_stage_diffusion_client.py | Tests inline client streaming chunk delivery. |
| tests/diffusion/test_diffusion_streaming_output.py | Adds end-to-end streaming integration tests. |
| tests/diffusion/test_diffusion_step_pipeline.py | Adjusts configs/tests for new streaming flag. |
| tests/diffusion/test_diffusion_scheduler.py | Tests scheduler/engine streaming completion semantics. |
| tests/diffusion/test_diffusion_model_runner.py | Tests model runner forwards streaming generator outputs. |
| tests/diffusion/test_diffusion_engine_cleanup.py | Tests engine close completes streaming waiters. |
| pyproject.toml | Adds websockets dev dependency. |
| examples/online_serving/streaming_video_generation/video-stream-view.js | Browser MSE player for streamed fMP4 chunks. |
| examples/online_serving/streaming_video_generation/video-stream-view.html | HTML view for browser streaming player. |
| examples/online_serving/streaming_video_generation/streaming_video_client.py | CLI WS client example for streaming endpoint. |
| examples/online_serving/streaming_video_generation/README.md | Documents endpoint protocol + usage. |
| examples/online_serving/streaming_video_generation/gradio_demo.py | Gradio demo for browser-based streaming playback. |
| .buildkite/test-nightly.yml | Adds nightly e2e streaming similarity test. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
does the latency of the first forward chunk come from the warmup overhead? |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Zeyu Huang | 黃澤宇 <11222265+fhfuih@users.noreply.github.com>
Yes I think so. |
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
hsliuustc0106
left a comment
There was a problem hiding this comment.
Left inline comments on the streaming timeout, streaming/RPC queue interaction, and streaming exec-time metric.
| for req_id in sched_output.scheduled_req_ids | ||
| ] | ||
| ) | ||
| met_error = True |
There was a problem hiding this comment.
In streaming mode this drains generic RPCs between chunks while the active execute_model generator is still running. With the multiproc executor, a queued RPC will send another worker RPC and wait for one result on the same result queue that is also carrying streaming chunks. Since the worker cannot process the new RPC until the generator finishes, this can block streaming or let the RPC path consume a video chunk as its result. I think we should avoid processing generic RPCs during an active streaming request, or separate/correlate streaming replies from RPC replies.
There was a problem hiding this comment.
The current version should have resolved this issue, since it requires step execution scheduler to enable streaming output. In this mode, both Diffusion Enginer & worker should have already taken care of RPC requests & interleaving between two denoise steps, which is at a finer granularity than between video chunks.
Hmm this is indeed a critical comment. After some investigation, I can confirm such a potential. And this blocking can potentially nullify the subsequent feature of submitting in-flight prompt changes and conflict with the overall vision in #3632 Therfore, I will convert this PR to draft and rethink the diffusion engine architecture again. Maybe using step-execution and maybe also make this PR dependent of #3099. (And in this case, the changes should be mainly on Diffusion Enginer/Executer/Scheduler side. Layers above should be fine.) And I will reopen it once ready. |
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
|
Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits. |
|
This PR is ready. CC-ing the following people -- reviews are much appreciated! @Gaohan123 @wtomin : general architecture and design @asukaqaq-s : For the schedulers: I have extended the step execution scheduler to handle intermediate chunk output. Wonder if that goes along with your original step execution design @fake0fan @yinpeiqi Any comments on the diffusion engine, worker, and stuff? @princepride @SHYuanBest Any comments on the streaming-and-step-execution mode of the Helios pipeline? (I admit much of it is vibe-coded 😆 ) One particular new stuff is the handling of single num_inference_steps. This is heavily relied upon in the step-execution mode. |
LGTM! Thanks for your great effort! |
|
@fhfuih Please resolve conflicts. Thanks |
Gaohan123
left a comment
There was a problem hiding this comment.
In the following PR, I suggest we can move the streaming capacity from DiffusionEngine to a new inherited Engine to avoid the intrusive modification
| @@ -0,0 +1,9 @@ | |||
| <div id="vllm-streaming-video-view" style="display:flex; flex-direction:column; gap:10px;"> | |||
There was a problem hiding this comment.
Is this needed for gradio demo?
There was a problem hiding this comment.
Yes in my current implementation, because Gradio's own video component doesn't perfectly support streaming input. It does claim to support another format but I still could not make it running for some unknown reason (their Video component has minimal documentation or error message, harder to maintain and adapt). So I'd rather use native HTML which supports the Fragment MP4 format for streaming.
Another reason to not go for Gradio-style file format: I surveyed common video file formats for streaming, and (supposedly?) Fragment MP4 is the most modern go-to choice. Add another format means to add extra logic to the API layer, and I could not see much benefit from supporting the Gradio-specific format.
|
|
||
| def prepare_encode(self, state: DiffusionRequestState, **kwargs: Any) -> DiffusionRequestState: | ||
| """Prepare request-level inputs and return initialized state.""" | ||
| ... |
There was a problem hiding this comment.
This is the same as pass, which is empty function placeholder. It is better explicitly added to these interface definitions but wasn't before. The current syntax passes only because there are docstrings. Otherwise an empty function without ... or pass will cause IndentationError
| ) | ||
| # Diffusion model (mainly video generation models) streaming output mode | ||
| omni_config_group.add_argument( | ||
| "--streaming-output", |
There was a problem hiding this comment.
I think the argument name is not clear. It is easy to be misunderstood with stream of LLM stage
There was a problem hiding this comment.
Agree, since AR and diffusion share this interface. I change the CLI arg to --diffusion-streaming-output. Then when it converts to a OmniDiffusionConfig field, I keep it as od_config.streaming_output since this config is diffusion-internal
Gaohan123
left a comment
There was a problem hiding this comment.
For the video demo, I suggest we can show the video generation w/o streaming to visually present the improvement saliently.
Agree. And since this PR may not be able to catch up with this version, if there is any incoming implementation of a new engine in the next iteration, I can wait it to merge first and adapt this PR to it.
For Helios model + current streaming implementation, seems like streaming "TTFC" roughly equals full video generation time in non-streaming. So there isn't really an interaction-level speedup. If this gap is identified and resolved in future, I can add such a comparison |
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
Signed-off-by: Huang, Zeyu <11222265+fhfuih@users.noreply.github.com>
Purpose
#3632 Phase 1
Gradio example
examples/online_serving/streaming_video_generation/gradio_demo.pyoutput.mp4
Test Plan
Unit test
Engine / scheduler (step-execution streaming and chunk delivery) (should be auto-captured by existing ready (L1) CI yaml)
tests/diffusion/test_diffusion_model_runner.py: tests thatDiffusionModelRunner.execute_stepwise()returns no result on non-decode steps and emits streamingDiffusionOutputchunks at chunk boundaries.tests/diffusion/test_diffusion_scheduler.py: testsStepSchedulerrequest lifecycle, batching, LoRA-compatible batching, incompatible sampling separation, step-count priority, invalid initial step state rejection, streaming chunk notification, empty/aborted streaming completion notification, and selectingStepSchedulerwhenstep_execution=True.tests/diffusion/test_diffusion_engine_cleanup.py: tests that closing the diffusion engine completes pending streaming waiters with a terminal error output.tests/diffusion/test_multiproc_engine_concurrency.py: tests that multiproc step execution allows streaming output mode and returns the workerRunnerOutput, while preserving existing normal RPC behavior.tests/diffusion/test_stage_diffusion_proc.py: testsStageDiffusionProcyielding every streaming engine chunk with request metadata preserved.tests/diffusion/test_inline_stage_diffusion_client.py: tests inline stage client delivery of multiple streaming diffusion chunks.tests/engine/test_orchestrator.py: tests that the orchestrator forwards intermediate diffusion streaming chunks before the final output.Entrypoint (should be auto-captured by existing ready (L1) CI yaml)
tests/entrypoints/test_async_omni.py: tests thatAsyncOmni.generate()yields intermediate streaming diffusion chunks before the final chunk.tests/entrypoints/openai_api/test_serving_video_output_stream.py: adds WebSocket/v1/videos/streamprotocol tests forvideo.start, binary chunks,session.done, invalid format, final encoder delta, and generation errors.tests/entrypoints/openai_api/test_video_api_utils.py: (testing common video utility functions used in video related endpoints) adds fragmented MP4 streaming encoder/finalization tests.Integration test (should be auto-captured by existing ready (L1) CI yaml)
tests/diffusion/test_diffusion_streaming_output.py: adds mock pipeline streaming integration coverage through ZMQStageDiffusionClient, inline stage -> orchestrator ->AsyncOmni, and/v1/videos/stream; also tests midway pipeline errors, Helios step-execution streaming support, and rejecting unsupported pipelines.E2E (Smoke) test (Added in L4
.buildkite/test-nightly.yml)tests/e2e/accuracy/test_video_streaming_output_similarity.py: adds Helios full-model smoke test comparing streaming vs non-streaming video output with matching metadata plus SSIM/PSNR thresholds.tests/e2e/accuracy/helpers.py: adds shared ffmpeg/ffprobe video similarity helpers; Wan2.2 I2V test is refactored to reuse them.tests/helpers/runtime.py: adds an OpenAI client helper for native/v1/videos/streamWebSocket requests used by the E2E smoke test.The following unit test files are cases are edited simply to add the new od_config field.
Test Result
Unit and integration test
Smoke test
Performance
See below #3737 (comment) . Performance test is not added to CI
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model. Please runmkdocs serveto sync the documentation editions to./docs.BEFORE SUBMITTING, PLEASE READ https://github.com/vllm-project/vllm-omni/blob/main/CONTRIBUTING.md (anything written below this line will be removed by GitHub Actions)