Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions examples/online_serving/qwen3_omni/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ Asynchronous chunk streaming operates as **enabled by default** within this bund
Additionally, NPU, ROCm, and XPU per-platform configuration deltas are deterministically merged from the
`platforms`: section of the corresponding YAML.

**Note:** The OpenAI-style **`/v1/realtime`** WebSocket interface (facilitating streaming PCM audio input alongside audio and transcription output)
is currently **unsupported** while the `async_chunk` configuration attribute is enabled.
It is requisite to instantiate the default omni architecture or utilize a deployment configuration specifying `async_chunk: false` to facilitate real-time streaming sessions.
The OpenAI-style **`/v1/realtime`** WebSocket interface accepts streaming PCM audio input and returns audio plus transcription events. With `async_chunk: true`, realtime sessions use a commit-then-generate bridge: the server buffers uploaded PCM chunks until `input_audio_buffer.commit` with `final: true`, then submits one normal multimodal Qwen3-Omni request through the async-chunk Thinker -> Talker -> Code2Wav pipeline. Use `--no-async-chunk` only when you specifically want the legacy streaming-input path where generation can start from a non-final commit.

To explicitly utilize a custom deployment YAML, mandate the configuration path accordingly:
```bash
Expand Down Expand Up @@ -141,7 +139,9 @@ parser defaults). If you don't pass a flag, the YAML value wins.
> bool. Pipelines that implement alternate processor functions for
> chunked vs end-to-end modes (e.g. qwen3_tts code2wav) dispatch
> automatically based on that bool — no extra flag or variant yaml is
> needed.
> needed. For `/v1/realtime`, `async_chunk: true` waits for a final commit
> before generation, while `--no-async-chunk` preserves the legacy
> streaming-input behavior.

> ⚠️ **For multi-stage models that share GPUs (qwen3_omni_moe by default
> shares cuda:1 between stages 1 and 2), avoid using global memory flags.**
Expand Down Expand Up @@ -255,7 +255,7 @@ python examples/online_serving/openai_chat_completion_client_for_multimodal_gene

[`openai_realtime_client.py`](./openai_realtime_client.py) connects to **`ws://<host>:<port>/v1/realtime`**, streams a local WAV as **PCM16 mono @ 16 kHz** in fixed-size chunks (OpenAI-style `input_audio_buffer.append` / `commit`), and receives **`response.audio.delta`** (incremental PCM for the reply) plus **`transcription.*`** events. By default it concatenates audio deltas and writes **`--output-wav`** (model output is typically **24 kHz**). Optional **`--delta-dump-dir`** saves each delta as `delta_000001.wav`, … for debugging.

Streaming input works well for translation-style use cases; if the Thinker runs while input is still incomplete, consider limiting **`max_tokens`** in your session / server defaults to avoid over-generation.
Streaming input works well for translation-style use cases. In `async_chunk: true` mode, the client can still upload chunks incrementally, but generation starts after the final commit. In `--no-async-chunk` mode, a non-final commit starts the legacy streaming-input path, so consider limiting **`max_tokens`** in your session / server defaults if the Thinker runs while input is still incomplete.

**Dependencies:**

Expand Down Expand Up @@ -289,12 +289,18 @@ python openai_realtime_client.py \
| `--num-requests` | `1` | Number of sequential sessions (see `--concurrency`) |
| `--concurrency` | `1` | Max concurrent WebSocket sessions when `--num-requests` > 1 |

Ensure the server is running **without** `async_chunk` if you use `/v1/realtime`, for example:
The default Qwen3-Omni deployment enables `async_chunk`, so the command below uses the commit-then-generate bridge:

```bash
vllm serve Qwen/Qwen3-Omni-30B-A3B-Instruct --omni --port 8091
```

To use the legacy realtime streaming-input path instead, disable async chunking explicitly:

```bash
vllm serve Qwen/Qwen3-Omni-30B-A3B-Instruct --omni --port 8091 --no-async-chunk
```

The Python client supports the following command-line arguments:

- `--query-type` (or `-q`): Query type (default: `use_video`). Options: `text`, `use_audio`, `use_image`, `use_video`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import asyncio
import base64
import inspect
import io
import json
import os
Expand All @@ -23,7 +24,7 @@
generate_synthetic_audio,
)
from tests.helpers.runtime import OmniServerParams
from tests.helpers.stage_config import get_deploy_config_path
from tests.helpers.stage_config import get_deploy_config_path, modify_stage_config

os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"

Expand All @@ -37,6 +38,7 @@
# The new-schema CI overlay bakes in async_chunk: False and covers CUDA/ROCm/XPU
# via its ``platforms:`` section, so one path serves all three.
default_stage_config = get_deploy_config_path("ci/qwen3_omni_moe.yaml")
async_chunk_stage_config = modify_stage_config(default_stage_config, updates={"async_chunk": True})

realtime_server_params = [
pytest.param(
Expand All @@ -46,7 +48,15 @@
use_stage_cli=True,
server_args=["--no-async-chunk"],
),
id="default",
id="no_async_chunk",
),
pytest.param(
OmniServerParams(
model=MODEL,
stage_config_path=async_chunk_stage_config,
use_stage_cli=True,
),
id="async_chunk",
),
]

Expand Down Expand Up @@ -74,6 +84,13 @@ def _wav_bytes_from_pcm16(pcm: bytes, sample_rate_hz: int) -> bytes:
return buf.getvalue()


async def _connect_local_websocket(uri: str):
kwargs: dict = {"max_size": 64 * 1024 * 1024}
if "proxy" in inspect.signature(websockets.connect).parameters:
kwargs["proxy"] = None
return await websockets.connect(uri, **kwargs)


async def _run_realtime_audio_roundtrip(
host: str,
port: int,
Expand All @@ -92,7 +109,18 @@ async def _run_realtime_audio_roundtrip(
bytes_per_ms = 16000 * 2 // 1000
chunk_bytes = max(bytes_per_ms * chunk_ms, 2)

async with websockets.connect(uri, max_size=64 * 1024 * 1024) as ws:
last_connect_error: Exception | None = None
for attempt in range(5):
try:
ws = await _connect_local_websocket(uri)
break
except (OSError, websockets.exceptions.InvalidMessage) as exc:
last_connect_error = exc
await asyncio.sleep(1 + attempt)
else:
raise AssertionError(f"Could not connect to realtime websocket: {last_connect_error!r}")

async with ws:
await ws.send(json.dumps({"type": "session.update", "model": model}))
await ws.send(json.dumps({"type": "input_audio_buffer.commit", "final": False}))

Expand Down
Loading