From 48b15096702fadddc13f744a254fa5263d90b5b0 Mon Sep 17 00:00:00 2001 From: SammLSH Date: Tue, 26 May 2026 20:01:04 +0000 Subject: [PATCH 01/10] =?UTF-8?q?[Perf]=20Realtime=20ASR=20=E2=80=94=20ski?= =?UTF-8?q?p=20PCM/WAV=20codec=20round-trip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PCM16 from the WebSocket path was being encoded into WAV bytes only for load_audio to decode it back into a float ndarray. Convert directly to float samples (1/32768 normalization matches soundfile.read default for signed 16-bit, so the float values are bit-equal to the old path), and teach load_audio to accept a pre-decoded ndarray as a no-op passthrough. ASR/cache semantics unchanged — this only removes the WAV adapter layer. A future optimization could maintain decoded samples incrementally to avoid re-converting the cumulative PCM buffer on every chunk. --- .../srt/entrypoints/openai/realtime/session.py | 18 ++++++++---------- python/sglang/srt/utils/common.py | 7 +++++++ 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/python/sglang/srt/entrypoints/openai/realtime/session.py b/python/sglang/srt/entrypoints/openai/realtime/session.py index c5951993e25d..9fc4357b0bd8 100644 --- a/python/sglang/srt/entrypoints/openai/realtime/session.py +++ b/python/sglang/srt/entrypoints/openai/realtime/session.py @@ -9,7 +9,6 @@ from __future__ import annotations import asyncio -import io import json import logging import math @@ -18,7 +17,6 @@ import numpy as np import pybase64 -import soundfile as sf from fastapi import WebSocket, WebSocketDisconnect from openai.types.realtime import ( ConversationItemCreatedEvent, @@ -99,11 +97,11 @@ def _resample_to_target_rate(pcm: bytes, src_rate: int, target_rate: int) -> byt return (np.clip(samples, -1.0, 1.0) * 32767.0).astype(np.int16).tobytes() -def _pcm_to_wav(pcm: bytes, sample_rate: int) -> bytes: - samples = np.frombuffer(pcm, dtype=np.int16) - buf = io.BytesIO() - sf.write(buf, samples, sample_rate, format="WAV") - return buf.getvalue() +def _pcm_to_float_samples(pcm: bytes) -> np.ndarray: + # Match soundfile.read default normalization for signed 16-bit PCM + # (divide by 32768.0) so callers see the same float values they used to + # get from the PCM -> WAV (sf.write) -> WAV (sf.read) round trip. + return np.frombuffer(pcm, dtype=np.int16).astype(np.float64) / 32768.0 _CLIENT_EVENT_TYPES: Dict[str, type] = { @@ -582,15 +580,15 @@ async def _run_inference(self, is_last: bool) -> bool: """Run ASR on the current cumulative buffer. Returns False on failure: commit-time emits transcription.failed and rolls the item; append-time emits a generic error envelope and closes the WebSocket.""" - wav_data = await asyncio.to_thread( - _pcm_to_wav, bytes(self.audio.pcm_buffer), self.model_sample_rate + audio_samples = await asyncio.to_thread( + _pcm_to_float_samples, bytes(self.audio.pcm_buffer) ) try: delta = await process_asr_chunk( tokenizer_manager=self.tokenizer_manager, adapter=self.adapter, state=self.audio.state, - audio_data=wav_data, + audio_data=audio_samples, sampling_params=self.config.sampling_params, is_last=is_last, ) diff --git a/python/sglang/srt/utils/common.py b/python/sglang/srt/utils/common.py index feb505d5dd6b..20ebd3037b80 100644 --- a/python/sglang/srt/utils/common.py +++ b/python/sglang/srt/utils/common.py @@ -777,6 +777,13 @@ def load_audio( if sr is None: sr = 16000 + # Caller must pre-resample to `sr`. Multi-channel layout assumed + # (n_samples, n_channels) per soundfile.read. + if isinstance(audio_file, np.ndarray): + if mono and audio_file.ndim > 1: + return np.mean(audio_file, axis=1) + return audio_file + # Normalize input: resolve URL / base64 / file:// to bytes or path if isinstance(audio_file, bytes): source = audio_file From 9c7fa6938ab1a4380dfb2a0addc24d17c4deacb7 Mon Sep 17 00:00:00 2001 From: SammLSH Date: Sat, 30 May 2026 06:46:28 +0000 Subject: [PATCH 02/10] =?UTF-8?q?[Perf]=20Realtime=20ASR=20=E2=80=94=20sli?= =?UTF-8?q?ce=20pending=20audio=20after=20committed-text=20rolls=20forward?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the WS /v1/realtime cumulative inference path (re-send the whole PCM buffer on every chunk) with input slicing once a committed-text prefix exists. Once StreamingASRState has stable emitted text, is past the K-token holdback gate, and has accumulated at least eight chunks (~16 s) of cumulative context, the model runs on ``pcm_buffer[committed_audio_until_bytes - left_overlap_bytes:]`` plus a 2 s left overlap instead of the full buffer. The prompt stays at ``adapter.prompt_template`` — emitted_text is not injected as a continuation prefix; the retained acoustic overlap plus a word-level dedupe (with CJK char-level fallback) takes its place. The first gated call still starts at offset 0 because committed_audio_until_bytes is initialized to 0; only chunk 9 onward is bounded to overlap + new chunk. Performance (TED-LIUM long-form sweep on Qwen3-ASR-0.6B, H100): audio cumul wall sliced wall save 30 s 1.51 s 1.29 s 14 % 60 s 3.00 s 2.52 s 16 % 120 s 6.17 s 5.11 s 17 % 240 s 14.78 s 10.07 s 32 % 300 s 19.49 s 12.01 s 38 % 600 s 77.24 s 26.68 s 65 % 900 s 171.37 s 38.23 s 78 % Per-chunk model-call wall stays flat at ~80 ms mean / ~121 ms max across the sweep instead of growing to 137 ms mean / 399 ms max in the cumulative path at 300 s. Realtime-paced sum of per-chunk inference wall drops 40-42 % on both 0.6B and 1.7B Qwen3-ASR. Implementation: - ``adapter.realtime_slicing_config`` returns left_overlap_ms (default 2000) and min_audio_sec (default 16.0); slicing_min_chunk_index is derived as ceil(min_audio_sec / chunk_size_sec). - ``_slice_pcm_from`` snapshots the bytearray via memoryview so the per-chunk copy is slice-sized instead of full-buffer + slice (~7.7 MB -> ~128 KB at 240 s when slicing engaged). - ``dedupe_overlap`` normalizes only the tail of committed_text bounded by len(candidate_words), so dedupe cost does not grow with session length. - ``process_asr_chunk`` gains ``prompt: Optional[str]`` and ``dedupe_against: Optional[str]`` kwargs; the realtime path uses them, the HTTP / HTTP SSE path keeps existing behavior via defaults. - ``load_audio`` annotation widened from ``str`` to ``Union[str, bytes, np.ndarray]`` to match the existing isinstance branches; not exposed through any Pydantic schema path. Tests: 21-case CI unit suite at test/registered/unit/entrypoints/openai/test_streaming_asr.py covering dedupe_overlap (word + CJK + suffix-only-history invariant), _pcm_to_float_samples (normalization + soundfile-round-trip bit-equality + odd-length raises), and _slice_pcm_from validation. --- .../entrypoints/openai/realtime/session.py | 88 +++++++++++-- .../srt/entrypoints/openai/streaming_asr.py | 122 ++++++++++++++++-- .../openai/transcription_adapters/base.py | 14 ++ python/sglang/srt/utils/common.py | 4 +- .../entrypoints/openai/test_streaming_asr.py | 113 ++++++++++++++++ 5 files changed, 318 insertions(+), 23 deletions(-) create mode 100644 test/registered/unit/entrypoints/openai/test_streaming_asr.py diff --git a/python/sglang/srt/entrypoints/openai/realtime/session.py b/python/sglang/srt/entrypoints/openai/realtime/session.py index 9fc4357b0bd8..dca5f1f12333 100644 --- a/python/sglang/srt/entrypoints/openai/realtime/session.py +++ b/python/sglang/srt/entrypoints/openai/realtime/session.py @@ -81,6 +81,15 @@ _SAMPLE_WIDTH = 2 +def _slice_pcm_from(buffer, start: int) -> bytes: + """Immutable snapshot of ``buffer[start:]`` via memoryview — one slice-sized + copy instead of the two ``bytes(bytearray)[start:]`` would do. ``buffer`` is + bytes or bytearray. Raises instead of silently returning empty.""" + if not (0 <= start <= len(buffer)): + raise ValueError(f"_slice_pcm_from: start={start} not in [0, {len(buffer)}]") + return bytes(memoryview(buffer)[start:]) + + def _resample_to_target_rate(pcm: bytes, src_rate: int, target_rate: int) -> bytes: if src_rate == target_rate or not pcm: return pcm @@ -98,9 +107,8 @@ def _resample_to_target_rate(pcm: bytes, src_rate: int, target_rate: int) -> byt def _pcm_to_float_samples(pcm: bytes) -> np.ndarray: - # Match soundfile.read default normalization for signed 16-bit PCM - # (divide by 32768.0) so callers see the same float values they used to - # get from the PCM -> WAV (sf.write) -> WAV (sf.read) round trip. + # /32768.0 matches soundfile.read's default int16 normalization so the + # samples are bit-equal to the prior PCM→WAV→sf.read path. return np.frombuffer(pcm, dtype=np.int16).astype(np.float64) / 32768.0 @@ -137,17 +145,29 @@ class _SessionConfig: @dataclass class _AudioState: - """Per-item audio state: PCM buffer accumulated from - input_audio_buffer.append, the chunked ASR rollback state, and the - static buffer-size limits set at __init__. pcm_buffer / state / - last_inference_offset reset on commit-roll and clear; the size limits - stay constant for the session's lifetime.""" + """Per-item audio state. Once the slicing gate is reached (``state.emitted_text`` + non-empty AND ``state.chunk_index >= slicing_min_chunk_index``), inference + switches from the cumulative buffer to a tail slice at + ``pcm_buffer[committed_audio_until_bytes - left_overlap_bytes:]``. The FIRST + gated call still starts at offset 0 because ``committed_audio_until_bytes`` is + initialized to 0; only subsequent calls are bounded to the left overlap plus + newly appended audio. ``emitted_text`` is not injected into the prompt — the + retained acoustic overlap plus output-side dedupe takes the place of a + continuation prefix.""" max_buffer_bytes: int chunk_size_bytes: int + left_overlap_bytes: int + slicing_min_chunk_index: int state: StreamingASRState + # False when left_overlap covers the whole unfixed-chunk window, which + # leaves the K-unfixed dedupe target unreachable; flipped at session + # construction. When False, _run_inference always takes the cumulative + # path even after emitted_text becomes non-empty. + slicing_enabled: bool = True pcm_buffer: bytearray = field(default_factory=bytearray) last_inference_offset: int = 0 + committed_audio_until_bytes: int = 0 @dataclass @@ -188,6 +208,11 @@ def __init__( self.config = _SessionConfig() + slicing_cfg = adapter.realtime_slicing_config + left_overlap_ms = int(slicing_cfg["left_overlap_ms"]) + min_audio_sec = float(slicing_cfg["min_audio_sec"]) + left_overlap_bytes = int(left_overlap_ms / 1000 * self.bytes_per_second) + state = StreamingASRState(**adapter.chunked_streaming_config) chunk_size_bytes = int(state.chunk_size_sec * self.bytes_per_second) if chunk_size_bytes <= 0: @@ -195,10 +220,24 @@ def __init__( f"adapter.chunked_streaming_config produced non-positive " f"chunk_size_sec; got {state.chunk_size_sec!r}" ) + slicing_min_chunk_index = math.ceil(min_audio_sec / state.chunk_size_sec) + slicing_enabled = ( + left_overlap_bytes < state.unfixed_chunk_num * chunk_size_bytes + ) + if not slicing_enabled: + logger.warning( + "[realtime] left_overlap=%dms >= unfixed_chunks_duration=%dms; " + "audio slicing disabled, falling back to cumulative inference", + left_overlap_ms, + state.unfixed_chunk_num * int(state.chunk_size_sec * 1000), + ) self.audio = _AudioState( max_buffer_bytes=self.max_buffer_seconds * self.bytes_per_second, chunk_size_bytes=chunk_size_bytes, state=state, + left_overlap_bytes=left_overlap_bytes, + slicing_min_chunk_index=slicing_min_chunk_index, + slicing_enabled=slicing_enabled, ) self.item = _ItemState(current_item_id=f"item_{random_uuid()}") @@ -541,8 +580,7 @@ async def _on_input_audio_buffer_commit( tail = self.audio.state.finalize() await self._emit_transcription_delta(tail) - # Build from emitted_deltas, not state.full_transcript: prefix injection - # means the last chunk's full_transcript is only the continuation tail. + # Use emitted_deltas: under slicing, state.full_transcript is the deduped tail. transcript = normalize_whitespace("".join(self.item.emitted_deltas)) await self._send( @@ -580,9 +618,29 @@ async def _run_inference(self, is_last: bool) -> bool: """Run ASR on the current cumulative buffer. Returns False on failure: commit-time emits transcription.failed and rolls the item; append-time emits a generic error envelope and closes the WebSocket.""" - audio_samples = await asyncio.to_thread( - _pcm_to_float_samples, bytes(self.audio.pcm_buffer) + # Bare prompt under slicing: emitted_text is not injected as a + # continuation prefix; the retained overlap + output dedupe + # takes its place. + committed_text = self.audio.state.get_prefix_text() + use_slicing = ( + self.audio.slicing_enabled + and bool(committed_text) + and self.audio.state.chunk_index >= self.audio.slicing_min_chunk_index ) + if use_slicing: + prompt: Optional[str] = self.adapter.prompt_template + dedupe_against: Optional[str] = committed_text + slice_start = max( + 0, + self.audio.committed_audio_until_bytes - self.audio.left_overlap_bytes, + ) + else: + prompt = None + dedupe_against = None + slice_start = 0 + + pcm_slice = _slice_pcm_from(self.audio.pcm_buffer, slice_start) + audio_samples = await asyncio.to_thread(_pcm_to_float_samples, pcm_slice) try: delta = await process_asr_chunk( tokenizer_manager=self.tokenizer_manager, @@ -591,6 +649,8 @@ async def _run_inference(self, is_last: bool) -> bool: audio_data=audio_samples, sampling_params=self.config.sampling_params, is_last=is_last, + prompt=prompt, + dedupe_against=dedupe_against, ) except Exception: logger.exception( @@ -630,6 +690,9 @@ async def _run_inference(self, is_last: bool) -> bool: ) return False + if use_slicing: + self.audio.committed_audio_until_bytes = len(self.audio.pcm_buffer) + self.audio.last_inference_offset = len(self.audio.pcm_buffer) await self._emit_transcription_delta(delta) return True @@ -667,6 +730,7 @@ def _reset_inference_state(self) -> None: self.audio.pcm_buffer.clear() # in-place; reuses the buffer's allocation self.item.emitted_deltas.clear() self.audio.last_inference_offset = 0 + self.audio.committed_audio_until_bytes = 0 def _build_session_info(self) -> TranscriptionSessionConfig: # id / object aren't SDK fields; round-trip via extra='allow' so diff --git a/python/sglang/srt/entrypoints/openai/streaming_asr.py b/python/sglang/srt/entrypoints/openai/streaming_asr.py index a347cc8f3e33..a37f17eeb4b5 100644 --- a/python/sglang/srt/entrypoints/openai/streaming_asr.py +++ b/python/sglang/srt/entrypoints/openai/streaming_asr.py @@ -3,8 +3,9 @@ import logging import re from dataclasses import dataclass -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union +import numpy as np import soundfile as sf from fastapi import Request @@ -130,18 +131,15 @@ def normalize_whitespace(text: str) -> str: def _is_cjk(c: str) -> bool: - """Whether char is a CJK-context glyph that doesn't take inter-word - spaces — ideographs, Japanese kana, CJK punctuation, fullwidth forms. - Excludes Hangul / Devanagari / Arabic etc., which are non-ASCII but - space-separated and need the normal boundary space.""" + """CJK-context glyph that doesn't take inter-word spaces.""" cp = ord(c) return ( - 0x3000 <= cp <= 0x303F # CJK Symbols and Punctuation (,。、《》「」…) + 0x3000 <= cp <= 0x303F # CJK Symbols and Punctuation or 0x3040 <= cp <= 0x309F # Hiragana or 0x30A0 <= cp <= 0x30FF # Katakana or 0x3400 <= cp <= 0x4DBF # CJK Unified Ideographs Ext A or 0x4E00 <= cp <= 0x9FFF # CJK Unified Ideographs - or 0xFF00 <= cp <= 0xFFEF # Halfwidth & Fullwidth Forms (fullwidth ASCII) + or 0xFF00 <= cp <= 0xFFEF # Halfwidth & Fullwidth Forms ) @@ -162,18 +160,120 @@ def needs_space(prev: str, cur: str) -> bool: return True +# Trailing punctuation stripped during dedupe match. Includes em dash +# (U+2014), hyphen-minus, and CJK fullwidth equivalents. +_DEDUPE_NORM_STRIP = ",.!?;:—-,。!?;:、" + + +def _dedupe_norm(word: str) -> str: + """Lowercase + strip trailing punctuation for dedupe matching.""" + return word.strip(_DEDUPE_NORM_STRIP).lower() + + +def _dedupe_word_level(committed_text: str, candidate_out: str) -> str: + """Drop the longest prefix of ``candidate_out`` matching the suffix of + ``committed_text`` word-for-word (case- and punctuation-insensitive).""" + cand_words = candidate_out.split() + if not cand_words: + return candidate_out + c_words = committed_text.split() + if not c_words: + return candidate_out + # Longest possible overlap is bounded by candidate length; normalize + # only that tail of committed text instead of scanning the whole history. + # Pre-normalize once instead of O(k²) calls inside the inner loop, then + # compare list slices in C rather than glyph-by-glyph in Python. + max_k = min(len(c_words), len(cand_words)) + c_norm = [_dedupe_norm(w) for w in c_words[-max_k:]] + cand_norm = [_dedupe_norm(w) for w in cand_words] + for k in range(max_k, 0, -1): + if c_norm[-k:] == cand_norm[:k]: + return " ".join(cand_words[k:]) + return candidate_out + + +def _find_kth_cjk_pos(text: str, k: int) -> Optional[int]: + """Return index after the k-th CJK glyph in text, or None if text + contains fewer than k CJK glyphs.""" + seen = 0 + for i, c in enumerate(text): + if c.isspace() or not _is_cjk(c): + continue + seen += 1 + if seen == k: + return i + 1 + return None + + +def _dedupe_cjk_char_level(committed_text: str, candidate_out: str) -> str: + """Drop leading CJK glyphs of ``candidate_out`` matching the CJK-tail of + ``committed_text``. Non-CJK glyphs are skipped during match, preserved + in trimmed output.""" + cand_chars = [c for c in candidate_out if not c.isspace() and _is_cjk(c)] + if not cand_chars: + return candidate_out + # Longest possible overlap is bounded by candidate CJK length; collect + # only that tail of committed CJK glyphs instead of scanning the whole + # history. We iterate committed_text in reverse and stop once we have + # len(cand_chars) CJK glyphs. + max_cand = len(cand_chars) + c_tail_rev = [] + for c in reversed(committed_text): + if c.isspace() or not _is_cjk(c): + continue + c_tail_rev.append(c) + if len(c_tail_rev) >= max_cand: + break + if not c_tail_rev: + return candidate_out + c_chars = list(reversed(c_tail_rev)) + max_k = min(len(c_chars), len(cand_chars)) + for k in range(max_k, 0, -1): + if c_chars[-k:] != cand_chars[:k]: + continue + cut_pos = _find_kth_cjk_pos(candidate_out, k) + if cut_pos is None: + return "" + return candidate_out[cut_pos:].lstrip() + return candidate_out + + +def dedupe_overlap(committed_text: str, candidate_out: str) -> str: + """Trim words/CJK glyphs at the start of ``candidate_out`` that + re-transcribe ``committed_text``'s tail. Word-level first, CJK + char-level fallback.""" + if not committed_text or not candidate_out: + return candidate_out + deduped = _dedupe_word_level(committed_text, candidate_out) + if deduped != candidate_out: + return deduped + if any(_is_cjk(c) for c in committed_text) or any( + _is_cjk(c) for c in candidate_out + ): + return _dedupe_cjk_char_level(committed_text, candidate_out) + return candidate_out + + async def process_asr_chunk( tokenizer_manager: TokenizerManager, adapter: TranscriptionAdapter, state: StreamingASRState, - audio_data: bytes, + audio_data: Union[bytes, np.ndarray], sampling_params: Dict[str, Any], is_last: bool, raw_request: Optional[Request] = None, routing_key: Optional[str] = None, + prompt: Optional[str] = None, + dedupe_against: Optional[str] = None, ) -> str: - """Run inference on one audio chunk. Shared by the HTTP and WebSocket paths.""" - prompt = adapter.prompt_template + state.get_prefix_text() + """Run inference on one audio chunk. Shared by the HTTP and WS paths. + + ``audio_data`` accepts WAV bytes or pre-decoded float samples. + ``prompt`` overrides the default ``adapter.prompt_template + state.get_prefix_text()``. + ``dedupe_against`` triggers ``dedupe_overlap`` on raw model output before ``state`` ingests it. + """ + if prompt is None: + prompt = adapter.prompt_template + state.get_prefix_text() chunk_request = GenerateReqInput( text=prompt, @@ -202,6 +302,8 @@ async def process_asr_chunk( return "" text = normalize_whitespace(adapter.postprocess_text(ret.get("text", ""))) + if dedupe_against is not None: + text = dedupe_overlap(dedupe_against, text) if is_last: state.full_transcript = text diff --git a/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py b/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py index cd97b42997f9..1120289ec3ff 100644 --- a/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py +++ b/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py @@ -107,6 +107,20 @@ def chunked_streaming_config(self) -> dict: """ return {} + @property + def realtime_slicing_config(self) -> dict: + """Tuning knobs for the WS realtime slicing path. Only consulted + when ``supports_chunked_streaming`` is True. Override per adapter + when the model's token rate or per-chunk stability differs. + + ``left_overlap_ms``: audio kept across the sliced boundary so + dedupe has context; cover the K-token rollback window. + ``min_audio_sec``: don't slice below this many seconds of + cumulative audio (sliced output diverges from cumulative + on short inputs and dedupe over-matches). + """ + return {"left_overlap_ms": 2000, "min_audio_sec": 16.0} + def postprocess_text(self, text: str) -> str: """Strip model-specific markers from raw decoded text. diff --git a/python/sglang/srt/utils/common.py b/python/sglang/srt/utils/common.py index 20ebd3037b80..7bbaaa40204c 100644 --- a/python/sglang/srt/utils/common.py +++ b/python/sglang/srt/utils/common.py @@ -772,7 +772,9 @@ def set_random_seed(seed: int) -> None: def load_audio( - audio_file: str, sr: Optional[int] = None, mono: bool = True + audio_file: Union[str, bytes, np.ndarray], + sr: Optional[int] = None, + mono: bool = True, ) -> np.ndarray: if sr is None: sr = 16000 diff --git a/test/registered/unit/entrypoints/openai/test_streaming_asr.py b/test/registered/unit/entrypoints/openai/test_streaming_asr.py new file mode 100644 index 000000000000..a73a29e3aebb --- /dev/null +++ b/test/registered/unit/entrypoints/openai/test_streaming_asr.py @@ -0,0 +1,113 @@ +"""Unit tests for realtime ASR slicing-path helpers. + +Edge cases for ``dedupe_overlap`` (normalization rules, CJK fallback, the +suffix-only-history invariant the perf optimization depends on), the +bit-equality invariant for ``_pcm_to_float_samples``, and ``_slice_pcm_from`` +validation. Trivial happy-path assertions that restated Python primitives were +dropped. The slicing trigger logic and its interaction with +``StreamingASRState`` are exercised by the manual GPU suite, not by CI. +""" + +import io +import unittest + +import numpy as np +import soundfile as sf + +from sglang.srt.entrypoints.openai.realtime.session import ( + _pcm_to_float_samples, + _slice_pcm_from, +) +from sglang.srt.entrypoints.openai.streaming_asr import dedupe_overlap +from sglang.test.ci.ci_register import register_cpu_ci +from sglang.test.test_utils import CustomTestCase + +register_cpu_ci(est_time=2, suite="base-a-test-cpu") + + +class TestDedupeOverlap(CustomTestCase): + """Edge cases for the dedupe heuristic. + + Drops trivial happy-path assertions; keeps cases that lock + normalization rules, CJK fallback paths, and the suffix-only-history + invariant that the perf optimization relies on. + """ + + def test_full_candidate_overlaps_returns_empty(self): + # Whole-candidate match must emit empty so StreamingASRState doesn't + # double-record the previous chunk's content. + self.assertEqual(dedupe_overlap("hello world", "hello world"), "") + + def test_empty_committed_returns_candidate_unchanged(self): + self.assertEqual(dedupe_overlap("", "anything goes"), "anything goes") + + def test_empty_candidate_returns_empty(self): + self.assertEqual(dedupe_overlap("anything", ""), "") + + def test_em_dash_normalized_during_match(self): + # Trailing em dash and case differences are stripped during match. + # Regression test for the dedupe rule documented in _DEDUPE_NORM_STRIP. + self.assertEqual( + dedupe_overlap("stew for dinner—", "Dinner: turnips"), "turnips" + ) + + def test_cjk_char_level_fallback(self): + # No whitespace → word-level returns unchanged → CJK fallback engages. + self.assertEqual(dedupe_overlap("你好世界", "世界今天很好"), "今天很好") + + def test_cjk_overlap_with_punctuation(self): + # CJK punctuation in committed_text must not block the char-level + # match on the ideographs that follow. + self.assertEqual(dedupe_overlap("你好,世界", "世界今天很好"), "今天很好") + + def test_long_committed_history_uses_suffix_overlap(self): + # Locks the suffix-only invariant the tail-only optimization + # depends on: a massive committed prefix unrelated to the candidate + # must not change the match outcome. + committed = " ".join(["old"] * 1000 + ["a", "b", "c"]) + self.assertEqual(dedupe_overlap(committed, "b c d"), "d") + + +class TestPcmToFloatSamples(CustomTestCase): + """The bit-equality invariant the PR's perf claim depends on, plus the + one corruption boundary worth catching loudly.""" + + def test_matches_soundfile_round_trip(self): + # The PCM→WAV→sf.read path was the legacy converter; this PR's + # direct conversion must remain bit-equal to it. + rng = np.random.default_rng(42) + ints = rng.integers(-32768, 32768, size=4096, dtype=np.int16) + pcm = ints.tobytes() + + direct = _pcm_to_float_samples(pcm) + + buf = io.BytesIO() + sf.write(buf, ints, 16000, format="WAV") + buf.seek(0) + round_trip, _ = sf.read(buf) + + np.testing.assert_array_equal(direct, round_trip) + + def test_odd_length_pcm_raises(self): + # int16 frames are 2 bytes; an odd-length buffer means upstream + # corruption. Keep the np.frombuffer ValueError loud — silent + # rounding would mask the bug. + with self.assertRaises(ValueError): + _pcm_to_float_samples(b"\x00") + + +class TestSlicePcmFrom(CustomTestCase): + """Only the validation behavior — the trivial slice cases were + Python-built-in tests, not ours.""" + + def test_negative_start_raises(self): + with self.assertRaises(ValueError): + _slice_pcm_from(b"abcdef", -1) + + def test_past_end_raises(self): + with self.assertRaises(ValueError): + _slice_pcm_from(b"abcdef", 7) + + +if __name__ == "__main__": + unittest.main() From 66a15542fa3122e1f0929b8ce89d18fc5cdc7dc8 Mon Sep 17 00:00:00 2001 From: sam huang Date: Sat, 30 May 2026 22:47:21 -0400 Subject: [PATCH 03/10] [Perf] Realtime ASR: bound long-form per-chunk audio with input slicing After a stability gate (8 chunks / ~16s for Qwen3-ASR), the realtime WebSocket path runs inference on a bounded audio tail (the new chunk + a 2s left overlap) instead of the full cumulative PCM buffer, with output-side dedupe. Slicing is opt-in per adapter: the base config keeps it off; Qwen3-ASR enables it. Short audio and non-opting adapters keep the cumulative path unchanged. Also in this changeset: - StreamingASRState.update(): drop the char-level startswith fast path that emitted mid-word fragments ("world" -> "worldly" emitted "ly"); the word-level common-prefix scan now runs unconditionally (matching finalize()). - Convert PCM16 to float directly, skipping the PCM -> WAV -> ndarray round-trip; load_audio accepts a pre-decoded ndarray. - Add a 14-case CPU unit suite (process_asr_chunk integration, slicing-enable guard, update() reconciliation, dedupe rules, PCM/slice helpers). --- .../entrypoints/openai/realtime/session.py | 51 ++-- .../srt/entrypoints/openai/streaming_asr.py | 71 +++-- .../openai/transcription_adapters/base.py | 10 +- .../transcription_adapters/qwen3_asr.py | 7 + .../entrypoints/openai/test_streaming_asr.py | 259 +++++++++++++----- 5 files changed, 274 insertions(+), 124 deletions(-) diff --git a/python/sglang/srt/entrypoints/openai/realtime/session.py b/python/sglang/srt/entrypoints/openai/realtime/session.py index dca5f1f12333..ea8c63b9c9b9 100644 --- a/python/sglang/srt/entrypoints/openai/realtime/session.py +++ b/python/sglang/srt/entrypoints/openai/realtime/session.py @@ -13,7 +13,7 @@ import logging import math from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union import numpy as np import pybase64 @@ -81,10 +81,8 @@ _SAMPLE_WIDTH = 2 -def _slice_pcm_from(buffer, start: int) -> bytes: - """Immutable snapshot of ``buffer[start:]`` via memoryview — one slice-sized - copy instead of the two ``bytes(bytearray)[start:]`` would do. ``buffer`` is - bytes or bytearray. Raises instead of silently returning empty.""" +def _slice_pcm_from(buffer: Union[bytes, bytearray], start: int) -> bytes: + """Return an immutable ``buffer[start:]`` snapshot with bounds checking.""" if not (0 <= start <= len(buffer)): raise ValueError(f"_slice_pcm_from: start={start} not in [0, {len(buffer)}]") return bytes(memoryview(buffer)[start:]) @@ -145,15 +143,11 @@ class _SessionConfig: @dataclass class _AudioState: - """Per-item audio state. Once the slicing gate is reached (``state.emitted_text`` - non-empty AND ``state.chunk_index >= slicing_min_chunk_index``), inference - switches from the cumulative buffer to a tail slice at - ``pcm_buffer[committed_audio_until_bytes - left_overlap_bytes:]``. The FIRST - gated call still starts at offset 0 because ``committed_audio_until_bytes`` is - initialized to 0; only subsequent calls are bounded to the left overlap plus - newly appended audio. ``emitted_text`` is not injected into the prompt — the - retained acoustic overlap plus output-side dedupe takes the place of a - continuation prefix.""" + """Per-item audio buffer and slicing state. + + After the slicing gate is reached, inference switches from the cumulative + buffer to a tail slice. The first gated call may still start at offset 0; + later calls use ``committed_audio_until_bytes - left_overlap_bytes``.""" max_buffer_bytes: int chunk_size_bytes: int @@ -209,8 +203,9 @@ def __init__( self.config = _SessionConfig() slicing_cfg = adapter.realtime_slicing_config - left_overlap_ms = int(slicing_cfg["left_overlap_ms"]) - min_audio_sec = float(slicing_cfg["min_audio_sec"]) + slicing_opt_in = bool(slicing_cfg.get("enabled", False)) + left_overlap_ms = int(slicing_cfg.get("left_overlap_ms", 0)) + min_audio_sec = float(slicing_cfg.get("min_audio_sec", 0.0)) left_overlap_bytes = int(left_overlap_ms / 1000 * self.bytes_per_second) state = StreamingASRState(**adapter.chunked_streaming_config) @@ -220,11 +215,14 @@ def __init__( f"adapter.chunked_streaming_config produced non-positive " f"chunk_size_sec; got {state.chunk_size_sec!r}" ) - slicing_min_chunk_index = math.ceil(min_audio_sec / state.chunk_size_sec) + slicing_min_chunk_index = ( + math.ceil(min_audio_sec / state.chunk_size_sec) if slicing_opt_in else 0 + ) slicing_enabled = ( - left_overlap_bytes < state.unfixed_chunk_num * chunk_size_bytes + slicing_opt_in + and left_overlap_bytes < state.unfixed_chunk_num * chunk_size_bytes ) - if not slicing_enabled: + if slicing_opt_in and not slicing_enabled: logger.warning( "[realtime] left_overlap=%dms >= unfixed_chunks_duration=%dms; " "audio slicing disabled, falling back to cumulative inference", @@ -580,7 +578,10 @@ async def _on_input_audio_buffer_commit( tail = self.audio.state.finalize() await self._emit_transcription_delta(tail) - # Use emitted_deltas: under slicing, state.full_transcript is the deduped tail. + # Build from emitted_deltas, not state.full_transcript: both paths leave + # full_transcript a partial — prefix injection (cumulative path) leaves + # only the continuation tail, dedupe (slicing path) leaves only the + # deduped tail. emitted_deltas reconstructs the full transcript verbatim. transcript = normalize_whitespace("".join(self.item.emitted_deltas)) await self._send( @@ -615,9 +616,13 @@ async def _on_input_audio_buffer_clear( ) async def _run_inference(self, is_last: bool) -> bool: - """Run ASR on the current cumulative buffer. Returns False on failure: - commit-time emits transcription.failed and rolls the item; append-time - emits a generic error envelope and closes the WebSocket.""" + """Run ASR on the current audio window. + + The cumulative path uses the whole PCM buffer. The slicing path uses a + tail slice with left overlap and trims duplicated text before + ``StreamingASRState`` ingests it. Returns False on failure: commit-time + emits transcription.failed and rolls the item; append-time emits a + generic error envelope and closes the WebSocket.""" # Bare prompt under slicing: emitted_text is not injected as a # continuation prefix; the retained overlap + output dedupe # takes its place. diff --git a/python/sglang/srt/entrypoints/openai/streaming_asr.py b/python/sglang/srt/entrypoints/openai/streaming_asr.py index a37f17eeb4b5..35c9d0771e8b 100644 --- a/python/sglang/srt/entrypoints/openai/streaming_asr.py +++ b/python/sglang/srt/entrypoints/openai/streaming_asr.py @@ -41,8 +41,8 @@ class StreamingASRState: unfixed_chunk_num: int unfixed_token_num: int confirmed_text: str = "" - # Monotonic accumulator; used as prompt prefix so the model sees a - # natural continuation point, not the rolled-back ``confirmed_text``. + # Monotonic accumulator. Used as the prompt prefix on cumulative paths and + # as the dedupe prefix on the slicing path. emitted_text: str = "" full_transcript: str = "" chunk_index: int = 0 @@ -68,10 +68,9 @@ def update(self, new_transcript: str) -> str: self.confirmed_text = "" self.full_transcript = new_transcript self.chunk_index += 1 - if self.confirmed_text.startswith(old_confirmed): - return self._record_emit(self.confirmed_text[len(old_confirmed) :].strip()) - # Model revised earlier text, use word level common prefix to avoid - # re-emitting already-sent content and cutting mid-word. + # Token-level common prefix, not char-level startswith: startswith + # sliced mid-word when a confirmed token was extended ("world" -> + # "worldly" emitted "ly"). old_words = old_confirmed.split() new_words = self.confirmed_text.split() common_count = 0 @@ -173,22 +172,23 @@ def _dedupe_norm(word: str) -> str: def _dedupe_word_level(committed_text: str, candidate_out: str) -> str: """Drop the longest prefix of ``candidate_out`` matching the suffix of ``committed_text`` word-for-word (case- and punctuation-insensitive).""" - cand_words = candidate_out.split() - if not cand_words: + candidate_words = candidate_out.split() + if not candidate_words: return candidate_out - c_words = committed_text.split() - if not c_words: + committed_words = committed_text.split() + if not committed_words: return candidate_out - # Longest possible overlap is bounded by candidate length; normalize - # only that tail of committed text instead of scanning the whole history. - # Pre-normalize once instead of O(k²) calls inside the inner loop, then - # compare list slices in C rather than glyph-by-glyph in Python. - max_k = min(len(c_words), len(cand_words)) - c_norm = [_dedupe_norm(w) for w in c_words[-max_k:]] - cand_norm = [_dedupe_norm(w) for w in cand_words] - for k in range(max_k, 0, -1): - if c_norm[-k:] == cand_norm[:k]: - return " ".join(cand_words[k:]) + # The overlap is at most as long as the candidate, so only the last + # `max_overlap` committed words can match. Normalize that committed tail and + # the candidate prefix once, then compare list slices instead of + # re-normalizing inside the loop. + max_overlap = min(len(committed_words), len(candidate_words)) + committed_tail_norm = [_dedupe_norm(w) for w in committed_words[-max_overlap:]] + candidate_norm = [_dedupe_norm(w) for w in candidate_words[:max_overlap]] + # Longest overlap first; the first match wins. + for overlap in range(max_overlap, 0, -1): + if committed_tail_norm[-overlap:] == candidate_norm[:overlap]: + return " ".join(candidate_words[overlap:]) return candidate_out @@ -209,29 +209,28 @@ def _dedupe_cjk_char_level(committed_text: str, candidate_out: str) -> str: """Drop leading CJK glyphs of ``candidate_out`` matching the CJK-tail of ``committed_text``. Non-CJK glyphs are skipped during match, preserved in trimmed output.""" - cand_chars = [c for c in candidate_out if not c.isspace() and _is_cjk(c)] - if not cand_chars: + candidate_chars = [c for c in candidate_out if not c.isspace() and _is_cjk(c)] + if not candidate_chars: return candidate_out - # Longest possible overlap is bounded by candidate CJK length; collect - # only that tail of committed CJK glyphs instead of scanning the whole - # history. We iterate committed_text in reverse and stop once we have - # len(cand_chars) CJK glyphs. - max_cand = len(cand_chars) - c_tail_rev = [] + # The overlap is at most the candidate's CJK length, so collect only that + # many CJK glyphs from the end of committed_text (scanning in reverse and + # stopping early) instead of the whole history. + max_overlap = len(candidate_chars) + committed_tail_rev = [] for c in reversed(committed_text): if c.isspace() or not _is_cjk(c): continue - c_tail_rev.append(c) - if len(c_tail_rev) >= max_cand: + committed_tail_rev.append(c) + if len(committed_tail_rev) >= max_overlap: break - if not c_tail_rev: + if not committed_tail_rev: return candidate_out - c_chars = list(reversed(c_tail_rev)) - max_k = min(len(c_chars), len(cand_chars)) - for k in range(max_k, 0, -1): - if c_chars[-k:] != cand_chars[:k]: + committed_tail_chars = list(reversed(committed_tail_rev)) + # Longest overlap first; the first match wins. + for overlap in range(len(committed_tail_chars), 0, -1): + if committed_tail_chars[-overlap:] != candidate_chars[:overlap]: continue - cut_pos = _find_kth_cjk_pos(candidate_out, k) + cut_pos = _find_kth_cjk_pos(candidate_out, overlap) if cut_pos is None: return "" return candidate_out[cut_pos:].lstrip() diff --git a/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py b/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py index 1120289ec3ff..dafd820bbf91 100644 --- a/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py +++ b/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py @@ -109,17 +109,19 @@ def chunked_streaming_config(self) -> dict: @property def realtime_slicing_config(self) -> dict: - """Tuning knobs for the WS realtime slicing path. Only consulted - when ``supports_chunked_streaming`` is True. Override per adapter - when the model's token rate or per-chunk stability differs. + """Tuning knobs for the WS realtime slicing path. Adapters opt in + by overriding this with ``enabled=True`` and supplying values + tuned for their model; the base default keeps slicing off so a + new adapter must consciously calibrate the overlap window. + ``enabled``: whether to consult the slicing path at all. ``left_overlap_ms``: audio kept across the sliced boundary so dedupe has context; cover the K-token rollback window. ``min_audio_sec``: don't slice below this many seconds of cumulative audio (sliced output diverges from cumulative on short inputs and dedupe over-matches). """ - return {"left_overlap_ms": 2000, "min_audio_sec": 16.0} + return {"enabled": False, "left_overlap_ms": 0, "min_audio_sec": 0.0} def postprocess_text(self, text: str) -> str: """Strip model-specific markers from raw decoded text. diff --git a/python/sglang/srt/entrypoints/openai/transcription_adapters/qwen3_asr.py b/python/sglang/srt/entrypoints/openai/transcription_adapters/qwen3_asr.py index df686b15aecb..d984b644686d 100644 --- a/python/sglang/srt/entrypoints/openai/transcription_adapters/qwen3_asr.py +++ b/python/sglang/srt/entrypoints/openai/transcription_adapters/qwen3_asr.py @@ -35,6 +35,13 @@ def chunked_streaming_config(self) -> dict: def prompt_template(self) -> str: return DEFAULT_ASR_PROMPT + @property + def realtime_slicing_config(self) -> dict: + # Tuned for Qwen3-ASR: 2s left overlap covers the K=5 token rollback + # window; 16s min audio keeps slicing off on short inputs, where manual + # fixtures showed sliced output diverging from cumulative. + return {"enabled": True, "left_overlap_ms": 2000, "min_audio_sec": 16.0} + def build_sampling_params(self, request: TranscriptionRequest) -> dict: temperature = request.temperature if temperature == 0.0: diff --git a/test/registered/unit/entrypoints/openai/test_streaming_asr.py b/test/registered/unit/entrypoints/openai/test_streaming_asr.py index a73a29e3aebb..54802d505c2f 100644 --- a/test/registered/unit/entrypoints/openai/test_streaming_asr.py +++ b/test/registered/unit/entrypoints/openai/test_streaming_asr.py @@ -1,85 +1,235 @@ -"""Unit tests for realtime ASR slicing-path helpers. +"""Unit tests for the realtime ASR slicing path: process_asr_chunk's prompt +override + dedupe, update() reconciliation, and the dedupe / PCM helpers.""" -Edge cases for ``dedupe_overlap`` (normalization rules, CJK fallback, the -suffix-only-history invariant the perf optimization depends on), the -bit-equality invariant for ``_pcm_to_float_samples``, and ``_slice_pcm_from`` -validation. Trivial happy-path assertions that restated Python primitives were -dropped. The slicing trigger logic and its interaction with -``StreamingASRState`` are exercised by the manual GPU suite, not by CI. -""" +from sglang.test.test_utils import maybe_stub_sgl_kernel + +maybe_stub_sgl_kernel() # must precede any import that pulls in sgl_kernel import io import unittest +from types import SimpleNamespace import numpy as np import soundfile as sf from sglang.srt.entrypoints.openai.realtime.session import ( + RealtimeConnection, _pcm_to_float_samples, _slice_pcm_from, ) -from sglang.srt.entrypoints.openai.streaming_asr import dedupe_overlap +from sglang.srt.entrypoints.openai.streaming_asr import ( + StreamingASRState, + dedupe_overlap, + process_asr_chunk, +) +from sglang.srt.utils import get_or_create_event_loop from sglang.test.ci.ci_register import register_cpu_ci from sglang.test.test_utils import CustomTestCase -register_cpu_ci(est_time=2, suite="base-a-test-cpu") +register_cpu_ci(est_time=3, suite="base-a-test-cpu") -class TestDedupeOverlap(CustomTestCase): - """Edge cases for the dedupe heuristic. +class _FakeAdapter: + prompt_template = "PROMPT:" - Drops trivial happy-path assertions; keeps cases that lock - normalization rules, CJK fallback paths, and the suffix-only-history - invariant that the perf optimization relies on. - """ + def postprocess_text(self, text: str) -> str: + return text - def test_full_candidate_overlaps_returns_empty(self): - # Whole-candidate match must emit empty so StreamingASRState doesn't - # double-record the previous chunk's content. - self.assertEqual(dedupe_overlap("hello world", "hello world"), "") - def test_empty_committed_returns_candidate_unchanged(self): - self.assertEqual(dedupe_overlap("", "anything goes"), "anything goes") +class _MockTokenizerManager: + """Yields one synthetic transcript and records the request, so tests can + assert on the prompt that was sent.""" + + def __init__(self, transcript: str): + self._transcript = transcript + self.requests = [] + + def generate_request(self, adapted_request, raw_request=None): + self.requests.append(adapted_request) + transcript = self._transcript + + async def gen(): + yield {"text": transcript} + + return gen() + + +def _run(coro): + return get_or_create_event_loop().run_until_complete(coro) + + +_AUDIO = np.zeros(1600, dtype=np.float32) + + +class TestProcessAsrChunkSlicing(CustomTestCase): + def _state(self): + return StreamingASRState( + chunk_size_sec=1.0, unfixed_chunk_num=2, unfixed_token_num=2 + ) + + def test_cumulative_path_injects_prefix_and_skips_dedupe(self): + # prompt=None -> prompt_template + get_prefix_text(), no dedupe. + state = self._state() + state.emitted_text = "hello" + state.chunk_index = 5 # past unfixed_chunk_num, so the prefix is live + tm = _MockTokenizerManager("hello world foo") + + _run( + process_asr_chunk( + tokenizer_manager=tm, + adapter=_FakeAdapter(), + state=state, + audio_data=_AUDIO, + sampling_params={}, + is_last=False, + ) + ) + + self.assertEqual(tm.requests[0].text, "PROMPT:hello") + self.assertEqual(state.full_transcript, "hello world foo") + + def test_slicing_path_uses_bare_prompt_and_dedupes(self): + # Bare prompt (no prefix injection); dedupe trims the overlapping word. + state = self._state() + tm = _MockTokenizerManager("beta gamma") + + _run( + process_asr_chunk( + tokenizer_manager=tm, + adapter=_FakeAdapter(), + state=state, + audio_data=_AUDIO, + sampling_params={}, + is_last=False, + prompt="PROMPT:", + dedupe_against="alpha beta", + ) + ) + + self.assertEqual(tm.requests[0].text, "PROMPT:") + self.assertEqual(state.full_transcript, "gamma") + + def test_is_last_dedupes_then_finalizes(self): + # The final chunk also dedupes before finalize(). + state = self._state() + tm = _MockTokenizerManager("alpha beta gamma") + + out = _run( + process_asr_chunk( + tokenizer_manager=tm, + adapter=_FakeAdapter(), + state=state, + audio_data=_AUDIO, + sampling_params={}, + is_last=True, + prompt="PROMPT:", + dedupe_against="alpha", + ) + ) + + self.assertEqual(state.full_transcript, "beta gamma") + self.assertEqual(out, "beta gamma") + + +class _SlicingAdapter: + """Minimal adapter exposing only what RealtimeConnection.__init__ reads.""" + + model_sample_rate = 16000 - def test_empty_candidate_returns_empty(self): - self.assertEqual(dedupe_overlap("anything", ""), "") + def __init__(self, left_overlap_ms, enabled=True): + self._left_overlap_ms = left_overlap_ms + self._enabled = enabled - def test_em_dash_normalized_during_match(self): - # Trailing em dash and case differences are stripped during match. - # Regression test for the dedupe rule documented in _DEDUPE_NORM_STRIP. + @property + def realtime_slicing_config(self): + return { + "enabled": self._enabled, + "left_overlap_ms": self._left_overlap_ms, + "min_audio_sec": 16.0, + } + + @property + def chunked_streaming_config(self): + # 2s chunks, 2 unfixed chunks -> 4s (=4000ms) unfixed window. + return {"chunk_size_sec": 2.0, "unfixed_chunk_num": 2, "unfixed_token_num": 5} + + +class TestSlicingEnabledGuard(CustomTestCase): + """RealtimeConnection.__init__ guard: slicing only turns on when opted in + AND the left overlap fits inside the unfixed-chunk window. Pure CPU — no + GPU, tokenizer, or websocket I/O is touched in __init__.""" + + def _conn(self, left_overlap_ms, enabled=True): + server_args = SimpleNamespace(asr_max_buffer_seconds=60) + return RealtimeConnection( + object(), object(), _SlicingAdapter(left_overlap_ms, enabled), server_args + ) + + def test_overlap_within_unfixed_window_enables_slicing(self): + # 2s overlap < 4s unfixed window -> slicing turns on. + self.assertTrue(self._conn(left_overlap_ms=2000).audio.slicing_enabled) + + def test_overlap_exceeding_unfixed_window_disables_slicing(self): + # 8s overlap >= 4s window: the dedupe target is unreachable, so the + # guard disables slicing and falls back to cumulative inference. + self.assertFalse(self._conn(left_overlap_ms=8000).audio.slicing_enabled) + + def test_opt_out_disables_slicing(self): + # enabled=False: never slices regardless of overlap. + self.assertFalse( + self._conn(left_overlap_ms=2000, enabled=False).audio.slicing_enabled + ) + + +class TestStreamingASRStateUpdate(CustomTestCase): + def test_extended_word_emits_whole_word_not_fragment(self): + # "world" re-transcribed as "worldly" must emit "worldly", not "ly" + # (regression guard for the removed char-level startswith fast path). + state = StreamingASRState( + chunk_size_sec=1.0, + unfixed_chunk_num=0, + unfixed_token_num=1, + confirmed_text="hello world", + ) + self.assertEqual(state.update("hello worldly test tail"), "worldly test") + + def test_clean_append_emits_only_new_words(self): + state = StreamingASRState( + chunk_size_sec=1.0, + unfixed_chunk_num=0, + unfixed_token_num=1, + confirmed_text="hello", + ) + self.assertEqual(state.update("hello world tail"), "world") + + +class TestDedupeOverlap(CustomTestCase): + def test_full_candidate_overlap_returns_empty(self): + self.assertEqual(dedupe_overlap("hello world", "hello world"), "") + + def test_em_dash_and_case_normalized_during_match(self): + # Trailing em dash and case are stripped before matching. self.assertEqual( dedupe_overlap("stew for dinner—", "Dinner: turnips"), "turnips" ) def test_cjk_char_level_fallback(self): - # No whitespace → word-level returns unchanged → CJK fallback engages. + # No whitespace -> word-level can't match -> CJK char-level fallback. self.assertEqual(dedupe_overlap("你好世界", "世界今天很好"), "今天很好") - def test_cjk_overlap_with_punctuation(self): - # CJK punctuation in committed_text must not block the char-level - # match on the ideographs that follow. - self.assertEqual(dedupe_overlap("你好,世界", "世界今天很好"), "今天很好") - - def test_long_committed_history_uses_suffix_overlap(self): - # Locks the suffix-only invariant the tail-only optimization - # depends on: a massive committed prefix unrelated to the candidate - # must not change the match outcome. + def test_long_history_matches_only_committed_suffix(self): + # A large unrelated prefix must not change the suffix match. committed = " ".join(["old"] * 1000 + ["a", "b", "c"]) self.assertEqual(dedupe_overlap(committed, "b c d"), "d") -class TestPcmToFloatSamples(CustomTestCase): - """The bit-equality invariant the PR's perf claim depends on, plus the - one corruption boundary worth catching loudly.""" - - def test_matches_soundfile_round_trip(self): - # The PCM→WAV→sf.read path was the legacy converter; this PR's - # direct conversion must remain bit-equal to it. +class TestPcmHelpers(CustomTestCase): + def test_pcm_to_float_matches_soundfile_round_trip(self): + # Direct PCM16->float stays bit-equal to the legacy PCM->WAV->sf.read. rng = np.random.default_rng(42) ints = rng.integers(-32768, 32768, size=4096, dtype=np.int16) - pcm = ints.tobytes() - direct = _pcm_to_float_samples(pcm) + direct = _pcm_to_float_samples(ints.tobytes()) buf = io.BytesIO() sf.write(buf, ints, 16000, format="WAV") @@ -88,23 +238,10 @@ def test_matches_soundfile_round_trip(self): np.testing.assert_array_equal(direct, round_trip) - def test_odd_length_pcm_raises(self): - # int16 frames are 2 bytes; an odd-length buffer means upstream - # corruption. Keep the np.frombuffer ValueError loud — silent - # rounding would mask the bug. - with self.assertRaises(ValueError): - _pcm_to_float_samples(b"\x00") - - -class TestSlicePcmFrom(CustomTestCase): - """Only the validation behavior — the trivial slice cases were - Python-built-in tests, not ours.""" - - def test_negative_start_raises(self): + def test_slice_out_of_bounds_start_raises(self): + # Out-of-range start raises rather than returning a wrong window. with self.assertRaises(ValueError): _slice_pcm_from(b"abcdef", -1) - - def test_past_end_raises(self): with self.assertRaises(ValueError): _slice_pcm_from(b"abcdef", 7) From 5d8455da71ac4a109273cfeea2e433c79436cffc Mon Sep 17 00:00:00 2001 From: sam huang Date: Sun, 31 May 2026 03:38:06 -0400 Subject: [PATCH 04/10] Realtime ASR slicing: Unicode-aware output dedupe + entry-point tests Refines the M2 input-slicing output dedupe and its test suite: - Dedupe normalization uses NFKC + Unicode category-P edge stripping (Whisper-style) instead of a hand-listed punctuation set. - Split CJK detection into _is_cjk_no_space (spacing) and _is_cjk_dedupe (dedupe, narrower); use Script_Extensions so the kana marks U+30FC/U+30FB are covered; keep Hangul out (Korean is space-delimited). - CJK dedupe is boundary-only: compare the leading/trailing CJK runs and never skip interior non-CJK content; require a >=2-glyph overlap for letters, allow 1 for punctuation. - Fix spurious over-deletion when lone-punctuation tokens normalize to "" and match each other; require a real word in the matched overlap. - _dedupe_by_word rsplits the committed tail instead of tokenizing the whole growing transcript. - Rewrite the unit tests as entry-point scenarios through process_asr_chunk plus the slicing-enable guard. --- .../entrypoints/openai/realtime/session.py | 2 + .../srt/entrypoints/openai/streaming_asr.py | 201 +++++++++------- .../transcription_adapters/qwen3_asr.py | 7 +- .../entrypoints/openai/test_streaming_asr.py | 216 +++++++----------- 4 files changed, 202 insertions(+), 224 deletions(-) diff --git a/python/sglang/srt/entrypoints/openai/realtime/session.py b/python/sglang/srt/entrypoints/openai/realtime/session.py index ea8c63b9c9b9..d81fe93c9bc7 100644 --- a/python/sglang/srt/entrypoints/openai/realtime/session.py +++ b/python/sglang/srt/entrypoints/openai/realtime/session.py @@ -696,6 +696,8 @@ async def _run_inference(self, is_last: bool) -> bool: return False if use_slicing: + # Held-back tokens are re-covered only if their audio span fits the + # left overlap; slower speech can drop the earliest (see known limits). self.audio.committed_audio_until_bytes = len(self.audio.pcm_buffer) self.audio.last_inference_offset = len(self.audio.pcm_buffer) diff --git a/python/sglang/srt/entrypoints/openai/streaming_asr.py b/python/sglang/srt/entrypoints/openai/streaming_asr.py index 35c9d0771e8b..b98659af1d2c 100644 --- a/python/sglang/srt/entrypoints/openai/streaming_asr.py +++ b/python/sglang/srt/entrypoints/openai/streaming_asr.py @@ -2,10 +2,12 @@ import io import logging import re +import unicodedata from dataclasses import dataclass from typing import Any, Dict, List, Optional, Union import numpy as np +import regex import soundfile as sf from fastapi import Request @@ -54,9 +56,13 @@ def get_prefix_text(self) -> str: def _record_emit(self, delta: str) -> str: if delta: - self.emitted_text = ( - f"{self.emitted_text} {delta}".strip() if self.emitted_text else delta - ) + if self.emitted_text: + # needs_space avoids a space between adjacent CJK characters; + # this accumulator feeds the prompt prefix and the dedupe target. + sep = " " if needs_space(self.emitted_text, delta) else "" + self.emitted_text = f"{self.emitted_text}{sep}{delta}".strip() + else: + self.emitted_text = delta return delta def update(self, new_transcript: str) -> str: @@ -129,23 +135,43 @@ def normalize_whitespace(text: str) -> str: _NO_SPACE_AFTER = frozenset("([{(【《「『") -def _is_cjk(c: str) -> bool: - """CJK-context glyph that doesn't take inter-word spaces.""" - cp = ord(c) - return ( - 0x3000 <= cp <= 0x303F # CJK Symbols and Punctuation - or 0x3040 <= cp <= 0x309F # Hiragana - or 0x30A0 <= cp <= 0x30FF # Katakana - or 0x3400 <= cp <= 0x4DBF # CJK Unified Ideographs Ext A - or 0x4E00 <= cp <= 0x9FFF # CJK Unified Ideographs - or 0xFF00 <= cp <= 0xFFEF # Halfwidth & Fullwidth Forms - ) +# Two predicates: dedup = genuine CJK script only; spacing also includes the +# fullwidth forms. scx (not Script) so the kana marks ー/・ count; Hangul excluded +# (Korean uses spaces). +_HALFWIDTH_HANGUL = chr(0xFFA0) + "-" + chr(0xFFDC) + +# Dedup set: &&\p{P} adds fullwidth punctuation but not fullwidth ASCII/digits. +_CJK_DEDUPE_RE = regex.compile( + "(?V1)[" + r"\p{Han}\p{scx=Hiragana}\p{scx=Katakana}\p{Bopomofo}" + r"\p{Block=CJK_Symbols_and_Punctuation}" + r"[\p{Block=Halfwidth_and_Fullwidth_Forms}&&\p{P}]]" +) + +# Spacing set: dedup set + fullwidth ASCII/digits (wide glyphs take no space), +# minus the halfwidth Hangul jamo (U+FFA0-FFDC) the block would re-add. +_CJK_NO_SPACE_RE = regex.compile( + "(?V1)[" + r"\p{Han}\p{scx=Hiragana}\p{scx=Katakana}\p{Bopomofo}" + r"\p{Block=CJK_Symbols_and_Punctuation}" + r"[\p{Block=Halfwidth_and_Fullwidth_Forms}--[" + _HALFWIDTH_HANGUL + "]]]" +) + + +def _is_cjk_no_space(c: str) -> bool: + """No-inter-word-space char (CJK script + fullwidth forms); spacing only.""" + return bool(_CJK_NO_SPACE_RE.match(c)) + + +def _is_cjk_dedupe(c: str) -> bool: + """Genuine CJK script char eligible for char-level dedup (no fullwidth ASCII).""" + return bool(_CJK_DEDUPE_RE.match(c)) def needs_space(prev: str, cur: str) -> bool: """Return whether a boundary space is needed between emitted deltas. - Avoid spaces around punctuation and between adjacent CJK-context glyphs. + Avoid spaces around punctuation and between adjacent CJK-context characters. Shared by the realtime WS and HTTP SSE chunked streaming paths. """ if not prev or not cur: @@ -154,103 +180,110 @@ def needs_space(prev: str, cur: str) -> bool: return False if cur[0] in _NO_SPACE_BEFORE or prev[-1] in _NO_SPACE_AFTER: return False - if _is_cjk(prev[-1]) and _is_cjk(cur[0]): + if _is_cjk_no_space(prev[-1]) and _is_cjk_no_space(cur[0]): return False return True -# Trailing punctuation stripped during dedupe match. Includes em dash -# (U+2014), hyphen-minus, and CJK fullwidth equivalents. -_DEDUPE_NORM_STRIP = ",.!?;:—-,。!?;:、" - - def _dedupe_norm(word: str) -> str: - """Lowercase + strip trailing punctuation for dedupe matching.""" - return word.strip(_DEDUPE_NORM_STRIP).lower() - - -def _dedupe_word_level(committed_text: str, candidate_out: str) -> str: + """Lowercase + NFKC-fold + strip edge punctuation (Unicode category P), so + "dinner," == "dinner" and exotic marks (《》«» …) need no hand-listed set. + Strips P only, not S, so "$5" / "3+4" keep their symbols.""" + word = unicodedata.normalize("NFKC", word) + lo, hi = 0, len(word) + while lo < hi and unicodedata.category(word[lo])[0] == "P": + lo += 1 + while hi > lo and unicodedata.category(word[hi - 1])[0] == "P": + hi -= 1 + return word[lo:hi].lower() + + +def _dedupe_by_word(committed_text: str, candidate_out: str) -> str: """Drop the longest prefix of ``candidate_out`` matching the suffix of ``committed_text`` word-for-word (case- and punctuation-insensitive).""" candidate_words = candidate_out.split() if not candidate_words: return candidate_out - committed_words = committed_text.split() - if not committed_words: + # Only the last len(candidate_words) committed words can overlap, so rsplit + # the tail instead of tokenizing the whole (growing) committed transcript. + committed_tail = committed_text.rsplit(maxsplit=len(candidate_words))[ + -len(candidate_words) : + ] + if not committed_tail: return candidate_out - # The overlap is at most as long as the candidate, so only the last - # `max_overlap` committed words can match. Normalize that committed tail and - # the candidate prefix once, then compare list slices instead of - # re-normalizing inside the loop. - max_overlap = min(len(committed_words), len(candidate_words)) - committed_tail_norm = [_dedupe_norm(w) for w in committed_words[-max_overlap:]] + # Normalize the committed tail and candidate prefix once, then compare slices. + max_overlap = min(len(committed_tail), len(candidate_words)) + committed_tail_norm = [_dedupe_norm(w) for w in committed_tail] candidate_norm = [_dedupe_norm(w) for w in candidate_words[:max_overlap]] # Longest overlap first; the first match wins. for overlap in range(max_overlap, 0, -1): - if committed_tail_norm[-overlap:] == candidate_norm[:overlap]: - return " ".join(candidate_words[overlap:]) + if committed_tail_norm[-overlap:] != candidate_norm[:overlap]: + continue + # Skip all-punctuation overlaps: lone "@"/"#" both normalize to "" and + # would match spuriously. + if not any(candidate_norm[:overlap]): + continue + return " ".join(candidate_words[overlap:]) return candidate_out -def _find_kth_cjk_pos(text: str, k: int) -> Optional[int]: - """Return index after the k-th CJK glyph in text, or None if text - contains fewer than k CJK glyphs.""" - seen = 0 - for i, c in enumerate(text): - if c.isspace() or not _is_cjk(c): - continue - seen += 1 - if seen == k: - return i + 1 - return None - - -def _dedupe_cjk_char_level(committed_text: str, candidate_out: str) -> str: - """Drop leading CJK glyphs of ``candidate_out`` matching the CJK-tail of - ``committed_text``. Non-CJK glyphs are skipped during match, preserved - in trimmed output.""" - candidate_chars = [c for c in candidate_out if not c.isspace() and _is_cjk(c)] - if not candidate_chars: - return candidate_out - # The overlap is at most the candidate's CJK length, so collect only that - # many CJK glyphs from the end of committed_text (scanning in reverse and - # stopping early) instead of the whole history. - max_overlap = len(candidate_chars) - committed_tail_rev = [] - for c in reversed(committed_text): - if c.isspace() or not _is_cjk(c): - continue - committed_tail_rev.append(c) - if len(committed_tail_rev) >= max_overlap: +def _is_punctuation(c: str) -> bool: + return unicodedata.category(c)[0] == "P" + + +def _get_leading_cjk_chars(text: str) -> List[str]: + """Return the CJK characters at the start of ``text``, stopping at the first + non-CJK character (leading whitespace is skipped).""" + chars: List[str] = [] + for char in text.lstrip(): + if not _is_cjk_dedupe(char): + break + chars.append(char) + return chars + + +def _get_trailing_cjk_chars(text: str) -> List[str]: + """Return the CJK characters at the end of ``text``, stopping at the first + non-CJK character (trailing whitespace is skipped).""" + chars: List[str] = [] + for char in reversed(text.rstrip()): + if not _is_cjk_dedupe(char): break - if not committed_tail_rev: + chars.append(char) + chars.reverse() + return chars + + +def _dedupe_by_cjk_char(committed_text: str, candidate_out: str) -> str: + """Drop the CJK characters at the start of ``candidate_out`` when they repeat + the CJK characters at the end of ``committed_text``. Only those boundary + characters are compared, so a non-CJK prefix ("today ") is never deleted to + reach a later match.""" + lead = _get_leading_cjk_chars(candidate_out) + tail = _get_trailing_cjk_chars(committed_text) + if not lead or not tail: return candidate_out - committed_tail_chars = list(reversed(committed_tail_rev)) - # Longest overlap first; the first match wins. - for overlap in range(len(committed_tail_chars), 0, -1): - if committed_tail_chars[-overlap:] != candidate_chars[:overlap]: + for overlap in range(min(len(lead), len(tail)), 0, -1): + if tail[-overlap:] != lead[:overlap]: continue - cut_pos = _find_kth_cjk_pos(candidate_out, overlap) - if cut_pos is None: - return "" - return candidate_out[cut_pos:].lstrip() + # Single-glyph matches collide too often for CJK letters; require >=2, + # allow 1 only for punctuation. + if overlap == 1 and not _is_punctuation(lead[0]): + continue + return candidate_out.lstrip()[overlap:].lstrip() return candidate_out def dedupe_overlap(committed_text: str, candidate_out: str) -> str: - """Trim words/CJK glyphs at the start of ``candidate_out`` that - re-transcribe ``committed_text``'s tail. Word-level first, CJK - char-level fallback.""" + """Trim words / CJK characters at the start of ``candidate_out`` that + re-transcribe ``committed_text``'s tail. Matches by whole word first, then + falls back to matching the leading/trailing CJK characters.""" if not committed_text or not candidate_out: return candidate_out - deduped = _dedupe_word_level(committed_text, candidate_out) + deduped = _dedupe_by_word(committed_text, candidate_out) if deduped != candidate_out: return deduped - if any(_is_cjk(c) for c in committed_text) or any( - _is_cjk(c) for c in candidate_out - ): - return _dedupe_cjk_char_level(committed_text, candidate_out) - return candidate_out + return _dedupe_by_cjk_char(committed_text, candidate_out) async def process_asr_chunk( diff --git a/python/sglang/srt/entrypoints/openai/transcription_adapters/qwen3_asr.py b/python/sglang/srt/entrypoints/openai/transcription_adapters/qwen3_asr.py index d984b644686d..c6ecbae4cf8a 100644 --- a/python/sglang/srt/entrypoints/openai/transcription_adapters/qwen3_asr.py +++ b/python/sglang/srt/entrypoints/openai/transcription_adapters/qwen3_asr.py @@ -37,9 +37,10 @@ def prompt_template(self) -> str: @property def realtime_slicing_config(self) -> dict: - # Tuned for Qwen3-ASR: 2s left overlap covers the K=5 token rollback - # window; 16s min audio keeps slicing off on short inputs, where manual - # fixtures showed sliced output diverging from cumulative. + # Empirically tuned for Qwen3-ASR: in our fixtures a 2s left overlap + # gave enough acoustic context for the K=5 (unfixed_token_num) rollback + # window; 16s min audio keeps slicing off on short inputs, where sliced + # output diverged from cumulative. return {"enabled": True, "left_overlap_ms": 2000, "min_audio_sec": 16.0} def build_sampling_params(self, request: TranscriptionRequest) -> dict: diff --git a/test/registered/unit/entrypoints/openai/test_streaming_asr.py b/test/registered/unit/entrypoints/openai/test_streaming_asr.py index 54802d505c2f..10b4c7f6941d 100644 --- a/test/registered/unit/entrypoints/openai/test_streaming_asr.py +++ b/test/registered/unit/entrypoints/openai/test_streaming_asr.py @@ -1,25 +1,25 @@ -"""Unit tests for the realtime ASR slicing path: process_asr_chunk's prompt -override + dedupe, update() reconciliation, and the dedupe / PCM helpers.""" +"""Unit tests for the realtime ASR slicing path. + +Drives the shared ``process_asr_chunk`` entry point with a mocked +``TokenizerManager`` (same style as ``test_serving_transcription`` / +``test_serving_embedding``) across the real scenarios: the cumulative (M1) and +sliced (M2) inference paths, output dedupe for Latin and CJK, the no-overlap and +empty-response edges, last-chunk finalize, and word reconciliation -- plus the +``RealtimeConnection`` guard that decides whether slicing turns on. +""" from sglang.test.test_utils import maybe_stub_sgl_kernel maybe_stub_sgl_kernel() # must precede any import that pulls in sgl_kernel -import io import unittest from types import SimpleNamespace import numpy as np -import soundfile as sf -from sglang.srt.entrypoints.openai.realtime.session import ( - RealtimeConnection, - _pcm_to_float_samples, - _slice_pcm_from, -) +from sglang.srt.entrypoints.openai.realtime.session import RealtimeConnection from sglang.srt.entrypoints.openai.streaming_asr import ( StreamingASRState, - dedupe_overlap, process_asr_chunk, ) from sglang.srt.utils import get_or_create_event_loop @@ -37,10 +37,10 @@ def postprocess_text(self, text: str) -> str: class _MockTokenizerManager: - """Yields one synthetic transcript and records the request, so tests can - assert on the prompt that was sent.""" + """Records the request and yields one synthetic transcript (or nothing, when + ``transcript`` is None, to exercise the empty-response path).""" - def __init__(self, transcript: str): + def __init__(self, transcript): self._transcript = transcript self.requests = [] @@ -49,7 +49,8 @@ def generate_request(self, adapted_request, raw_request=None): transcript = self._transcript async def gen(): - yield {"text": transcript} + if transcript is not None: + yield {"text": transcript} return gen() @@ -61,74 +62,87 @@ def _run(coro): _AUDIO = np.zeros(1600, dtype=np.float32) -class TestProcessAsrChunkSlicing(CustomTestCase): - def _state(self): - return StreamingASRState( - chunk_size_sec=1.0, unfixed_chunk_num=2, unfixed_token_num=2 - ) - - def test_cumulative_path_injects_prefix_and_skips_dedupe(self): - # prompt=None -> prompt_template + get_prefix_text(), no dedupe. - state = self._state() - state.emitted_text = "hello" - state.chunk_index = 5 # past unfixed_chunk_num, so the prefix is live - tm = _MockTokenizerManager("hello world foo") +class TestProcessAsrChunk(CustomTestCase): + def _state(self, **kwargs): + params = dict(chunk_size_sec=1.0, unfixed_chunk_num=2, unfixed_token_num=2) + params.update(kwargs) + return StreamingASRState(**params) - _run( + def _chunk(self, state, transcript, is_last=False, **kwargs): + tm = _MockTokenizerManager(transcript) + out = _run( process_asr_chunk( tokenizer_manager=tm, adapter=_FakeAdapter(), state=state, audio_data=_AUDIO, sampling_params={}, - is_last=False, + is_last=is_last, + **kwargs, ) ) + return tm, out + def test_cumulative_path_injects_prefix_and_skips_dedupe(self): + # prompt=None -> prompt_template + get_prefix_text(), no dedupe (M1). + state = self._state() + state.emitted_text = "hello" + state.chunk_index = 5 # past unfixed_chunk_num, so the prefix is live + tm, _ = self._chunk(state, "hello world foo") self.assertEqual(tm.requests[0].text, "PROMPT:hello") self.assertEqual(state.full_transcript, "hello world foo") def test_slicing_path_uses_bare_prompt_and_dedupes(self): - # Bare prompt (no prefix injection); dedupe trims the overlapping word. + # Bare prompt (no prefix injection); dedupe trims the word that overlaps + # the committed tail (M2). state = self._state() - tm = _MockTokenizerManager("beta gamma") - - _run( - process_asr_chunk( - tokenizer_manager=tm, - adapter=_FakeAdapter(), - state=state, - audio_data=_AUDIO, - sampling_params={}, - is_last=False, - prompt="PROMPT:", - dedupe_against="alpha beta", - ) + tm, _ = self._chunk( + state, "beta gamma", prompt="PROMPT:", dedupe_against="alpha beta" ) - self.assertEqual(tm.requests[0].text, "PROMPT:") self.assertEqual(state.full_transcript, "gamma") - def test_is_last_dedupes_then_finalizes(self): - # The final chunk also dedupes before finalize(). + def test_slicing_path_dedupes_overlapping_cjk(self): + # Dedupe also trims an overlapping CJK run (no inter-word spaces). state = self._state() - tm = _MockTokenizerManager("alpha beta gamma") + self._chunk(state, "天气很好", prompt="PROMPT:", dedupe_against="今天天气") + self.assertEqual(state.full_transcript, "很好") - out = _run( - process_asr_chunk( - tokenizer_manager=tm, - adapter=_FakeAdapter(), - state=state, - audio_data=_AUDIO, - sampling_params={}, - is_last=True, - prompt="PROMPT:", - dedupe_against="alpha", - ) - ) + def test_slicing_path_keeps_non_overlapping_candidate(self): + # No overlap with the committed tail -> nothing is trimmed. + state = self._state() + self._chunk(state, "gamma delta", prompt="PROMPT:", dedupe_against="alpha beta") + self.assertEqual(state.full_transcript, "gamma delta") - self.assertEqual(state.full_transcript, "beta gamma") + def test_last_chunk_dedupes_then_finalizes(self): + # The final chunk dedupes against the committed tail, then finalize() + # emits the remaining text. + state = self._state() + _, out = self._chunk( + state, + "alpha beta gamma", + is_last=True, + prompt="PROMPT:", + dedupe_against="alpha", + ) self.assertEqual(out, "beta gamma") + self.assertEqual(state.full_transcript, "beta gamma") + + def test_extended_word_emits_whole_word_not_fragment(self): + # "world" re-transcribed as "worldly" must emit "worldly", not "ly" + # (regression guard for the removed char-level startswith fast path). + state = self._state( + unfixed_chunk_num=0, unfixed_token_num=1, confirmed_text="hello world" + ) + _, out = self._chunk(state, "hello worldly test tail") + self.assertEqual(out, "worldly test") + + def test_empty_model_response_emits_nothing(self): + # No model output -> empty delta, no state mutation, no crash. + state = self._state() + _, out = self._chunk(state, None) + self.assertEqual(out, "") + self.assertEqual(state.full_transcript, "") class _SlicingAdapter: @@ -150,101 +164,29 @@ def realtime_slicing_config(self): @property def chunked_streaming_config(self): - # 2s chunks, 2 unfixed chunks -> 4s (=4000ms) unfixed window. + # 2s chunks, 2 unfixed chunks -> 4s unfixed window. return {"chunk_size_sec": 2.0, "unfixed_chunk_num": 2, "unfixed_token_num": 5} class TestSlicingEnabledGuard(CustomTestCase): - """RealtimeConnection.__init__ guard: slicing only turns on when opted in - AND the left overlap fits inside the unfixed-chunk window. Pure CPU — no - GPU, tokenizer, or websocket I/O is touched in __init__.""" - def _conn(self, left_overlap_ms, enabled=True): server_args = SimpleNamespace(asr_max_buffer_seconds=60) return RealtimeConnection( object(), object(), _SlicingAdapter(left_overlap_ms, enabled), server_args ) - def test_overlap_within_unfixed_window_enables_slicing(self): - # 2s overlap < 4s unfixed window -> slicing turns on. + def test_enabled_only_when_overlap_fits_unfixed_window(self): + # 2s overlap fits the 4s window -> slicing on; 8s overlap makes the + # dedupe target unreachable -> guard falls back to cumulative. self.assertTrue(self._conn(left_overlap_ms=2000).audio.slicing_enabled) - - def test_overlap_exceeding_unfixed_window_disables_slicing(self): - # 8s overlap >= 4s window: the dedupe target is unreachable, so the - # guard disables slicing and falls back to cumulative inference. self.assertFalse(self._conn(left_overlap_ms=8000).audio.slicing_enabled) - def test_opt_out_disables_slicing(self): - # enabled=False: never slices regardless of overlap. + def test_disabled_when_adapter_opts_out(self): + # enabled=False (the base-adapter default) -> never slices. self.assertFalse( self._conn(left_overlap_ms=2000, enabled=False).audio.slicing_enabled ) -class TestStreamingASRStateUpdate(CustomTestCase): - def test_extended_word_emits_whole_word_not_fragment(self): - # "world" re-transcribed as "worldly" must emit "worldly", not "ly" - # (regression guard for the removed char-level startswith fast path). - state = StreamingASRState( - chunk_size_sec=1.0, - unfixed_chunk_num=0, - unfixed_token_num=1, - confirmed_text="hello world", - ) - self.assertEqual(state.update("hello worldly test tail"), "worldly test") - - def test_clean_append_emits_only_new_words(self): - state = StreamingASRState( - chunk_size_sec=1.0, - unfixed_chunk_num=0, - unfixed_token_num=1, - confirmed_text="hello", - ) - self.assertEqual(state.update("hello world tail"), "world") - - -class TestDedupeOverlap(CustomTestCase): - def test_full_candidate_overlap_returns_empty(self): - self.assertEqual(dedupe_overlap("hello world", "hello world"), "") - - def test_em_dash_and_case_normalized_during_match(self): - # Trailing em dash and case are stripped before matching. - self.assertEqual( - dedupe_overlap("stew for dinner—", "Dinner: turnips"), "turnips" - ) - - def test_cjk_char_level_fallback(self): - # No whitespace -> word-level can't match -> CJK char-level fallback. - self.assertEqual(dedupe_overlap("你好世界", "世界今天很好"), "今天很好") - - def test_long_history_matches_only_committed_suffix(self): - # A large unrelated prefix must not change the suffix match. - committed = " ".join(["old"] * 1000 + ["a", "b", "c"]) - self.assertEqual(dedupe_overlap(committed, "b c d"), "d") - - -class TestPcmHelpers(CustomTestCase): - def test_pcm_to_float_matches_soundfile_round_trip(self): - # Direct PCM16->float stays bit-equal to the legacy PCM->WAV->sf.read. - rng = np.random.default_rng(42) - ints = rng.integers(-32768, 32768, size=4096, dtype=np.int16) - - direct = _pcm_to_float_samples(ints.tobytes()) - - buf = io.BytesIO() - sf.write(buf, ints, 16000, format="WAV") - buf.seek(0) - round_trip, _ = sf.read(buf) - - np.testing.assert_array_equal(direct, round_trip) - - def test_slice_out_of_bounds_start_raises(self): - # Out-of-range start raises rather than returning a wrong window. - with self.assertRaises(ValueError): - _slice_pcm_from(b"abcdef", -1) - with self.assertRaises(ValueError): - _slice_pcm_from(b"abcdef", 7) - - if __name__ == "__main__": unittest.main() From 57fcd0da05a78404034b727a292218a22f29a1a8 Mon Sep 17 00:00:00 2001 From: sam huang Date: Sun, 31 May 2026 10:40:43 -0400 Subject: [PATCH 05/10] Trim verbose comments/docstrings in realtime ASR slicing Tighten slicing-path comments to one-liners (base adapter config docstring, session.py slicing_enabled / emitted_deltas / _run_inference, streaming_asr predicate header). No logic change. --- .../entrypoints/openai/realtime/session.py | 28 +++++++------------ .../srt/entrypoints/openai/streaming_asr.py | 5 ++-- .../openai/transcription_adapters/base.py | 16 ++++------- 3 files changed, 17 insertions(+), 32 deletions(-) diff --git a/python/sglang/srt/entrypoints/openai/realtime/session.py b/python/sglang/srt/entrypoints/openai/realtime/session.py index d81fe93c9bc7..9cabd1a78b14 100644 --- a/python/sglang/srt/entrypoints/openai/realtime/session.py +++ b/python/sglang/srt/entrypoints/openai/realtime/session.py @@ -154,10 +154,8 @@ class _AudioState: left_overlap_bytes: int slicing_min_chunk_index: int state: StreamingASRState - # False when left_overlap covers the whole unfixed-chunk window, which - # leaves the K-unfixed dedupe target unreachable; flipped at session - # construction. When False, _run_inference always takes the cumulative - # path even after emitted_text becomes non-empty. + # False when the left overlap covers the whole unfixed-chunk window (the + # K-unfixed dedupe target would be unreachable); set at construction. slicing_enabled: bool = True pcm_buffer: bytearray = field(default_factory=bytearray) last_inference_offset: int = 0 @@ -578,10 +576,8 @@ async def _on_input_audio_buffer_commit( tail = self.audio.state.finalize() await self._emit_transcription_delta(tail) - # Build from emitted_deltas, not state.full_transcript: both paths leave - # full_transcript a partial — prefix injection (cumulative path) leaves - # only the continuation tail, dedupe (slicing path) leaves only the - # deduped tail. emitted_deltas reconstructs the full transcript verbatim. + # Rebuild from emitted_deltas: both paths leave full_transcript only a + # partial tail, while the deltas together are the whole transcript. transcript = normalize_whitespace("".join(self.item.emitted_deltas)) await self._send( @@ -616,16 +612,12 @@ async def _on_input_audio_buffer_clear( ) async def _run_inference(self, is_last: bool) -> bool: - """Run ASR on the current audio window. - - The cumulative path uses the whole PCM buffer. The slicing path uses a - tail slice with left overlap and trims duplicated text before - ``StreamingASRState`` ingests it. Returns False on failure: commit-time - emits transcription.failed and rolls the item; append-time emits a - generic error envelope and closes the WebSocket.""" - # Bare prompt under slicing: emitted_text is not injected as a - # continuation prefix; the retained overlap + output dedupe - # takes its place. + """Run ASR on the current audio window: the whole PCM buffer + (cumulative) or a tail slice with left overlap + output dedupe + (slicing). Returns False on failure -- commit-time emits + transcription.failed and rolls the item; append-time closes the WS.""" + # Slicing uses a bare prompt: the retained overlap + dedupe replace + # injecting emitted_text as a continuation prefix. committed_text = self.audio.state.get_prefix_text() use_slicing = ( self.audio.slicing_enabled diff --git a/python/sglang/srt/entrypoints/openai/streaming_asr.py b/python/sglang/srt/entrypoints/openai/streaming_asr.py index b98659af1d2c..4ce3fddfcf7e 100644 --- a/python/sglang/srt/entrypoints/openai/streaming_asr.py +++ b/python/sglang/srt/entrypoints/openai/streaming_asr.py @@ -135,9 +135,8 @@ def normalize_whitespace(text: str) -> str: _NO_SPACE_AFTER = frozenset("([{(【《「『") -# Two predicates: dedup = genuine CJK script only; spacing also includes the -# fullwidth forms. scx (not Script) so the kana marks ー/・ count; Hangul excluded -# (Korean uses spaces). +# Two predicates: dedup = CJK script only; spacing also includes fullwidth forms. +# scx (not Script) so kana marks ー/・ count; Hangul excluded (Korean uses spaces). _HALFWIDTH_HANGUL = chr(0xFFA0) + "-" + chr(0xFFDC) # Dedup set: &&\p{P} adds fullwidth punctuation but not fullwidth ASCII/digits. diff --git a/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py b/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py index dafd820bbf91..ff4e5a2df894 100644 --- a/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py +++ b/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py @@ -109,17 +109,11 @@ def chunked_streaming_config(self) -> dict: @property def realtime_slicing_config(self) -> dict: - """Tuning knobs for the WS realtime slicing path. Adapters opt in - by overriding this with ``enabled=True`` and supplying values - tuned for their model; the base default keeps slicing off so a - new adapter must consciously calibrate the overlap window. - - ``enabled``: whether to consult the slicing path at all. - ``left_overlap_ms``: audio kept across the sliced boundary so - dedupe has context; cover the K-token rollback window. - ``min_audio_sec``: don't slice below this many seconds of - cumulative audio (sliced output diverges from cumulative - on short inputs and dedupe over-matches). + """Slicing-path tuning knobs, off by default -- an adapter opts in by + overriding with ``enabled=True`` and model-tuned values. + ``left_overlap_ms`` is the audio kept across the sliced boundary for + dedupe context; ``min_audio_sec`` is the floor below which slicing stays + off. """ return {"enabled": False, "left_overlap_ms": 0, "min_audio_sec": 0.0} From 9e8530eb59120d61b57b14d1f8a9338c6e69477d Mon Sep 17 00:00:00 2001 From: sam huang Date: Sun, 31 May 2026 10:48:42 -0400 Subject: [PATCH 06/10] Drop CJK char-level dedupe from realtime ASR slicing (defer to M3) CJK never enters the sliced path -- the slicing gate needs confirmed text, which the whitespace word-split in StreamingASRState.update never produces for space-less scripts -- so the CJK char-level dedupe was unreachable for CJK and only added review surface. dedupe_overlap is now word-level only; the spacing predicate (needs_space) and word-level dedupe (incl. the punctuation-overlap fix) stay. CJK-aware dedupe is deferred to M3, where slicing also engages for CJK. --- .../srt/entrypoints/openai/streaming_asr.py | 84 +++---------------- .../entrypoints/openai/test_streaming_asr.py | 6 -- 2 files changed, 11 insertions(+), 79 deletions(-) diff --git a/python/sglang/srt/entrypoints/openai/streaming_asr.py b/python/sglang/srt/entrypoints/openai/streaming_asr.py index 4ce3fddfcf7e..93b969c69ec9 100644 --- a/python/sglang/srt/entrypoints/openai/streaming_asr.py +++ b/python/sglang/srt/entrypoints/openai/streaming_asr.py @@ -135,20 +135,10 @@ def normalize_whitespace(text: str) -> str: _NO_SPACE_AFTER = frozenset("([{(【《「『") -# Two predicates: dedup = CJK script only; spacing also includes fullwidth forms. -# scx (not Script) so kana marks ー/・ count; Hangul excluded (Korean uses spaces). +# No-inter-word-space characters, for needs_space() output spacing. scx (not +# Script) so the kana marks ー/・ count; the halfwidth Hangul jamo (U+FFA0-FFDC) +# the fullwidth block would re-add is subtracted, since Korean is space-delimited. _HALFWIDTH_HANGUL = chr(0xFFA0) + "-" + chr(0xFFDC) - -# Dedup set: &&\p{P} adds fullwidth punctuation but not fullwidth ASCII/digits. -_CJK_DEDUPE_RE = regex.compile( - "(?V1)[" - r"\p{Han}\p{scx=Hiragana}\p{scx=Katakana}\p{Bopomofo}" - r"\p{Block=CJK_Symbols_and_Punctuation}" - r"[\p{Block=Halfwidth_and_Fullwidth_Forms}&&\p{P}]]" -) - -# Spacing set: dedup set + fullwidth ASCII/digits (wide glyphs take no space), -# minus the halfwidth Hangul jamo (U+FFA0-FFDC) the block would re-add. _CJK_NO_SPACE_RE = regex.compile( "(?V1)[" r"\p{Han}\p{scx=Hiragana}\p{scx=Katakana}\p{Bopomofo}" @@ -158,15 +148,10 @@ def normalize_whitespace(text: str) -> str: def _is_cjk_no_space(c: str) -> bool: - """No-inter-word-space char (CJK script + fullwidth forms); spacing only.""" + """No-inter-word-space char (CJK script + fullwidth forms); for spacing.""" return bool(_CJK_NO_SPACE_RE.match(c)) -def _is_cjk_dedupe(c: str) -> bool: - """Genuine CJK script char eligible for char-level dedup (no fullwidth ASCII).""" - return bool(_CJK_DEDUPE_RE.match(c)) - - def needs_space(prev: str, cur: str) -> bool: """Return whether a boundary space is needed between emitted deltas. @@ -226,63 +211,16 @@ def _dedupe_by_word(committed_text: str, candidate_out: str) -> str: return candidate_out -def _is_punctuation(c: str) -> bool: - return unicodedata.category(c)[0] == "P" - - -def _get_leading_cjk_chars(text: str) -> List[str]: - """Return the CJK characters at the start of ``text``, stopping at the first - non-CJK character (leading whitespace is skipped).""" - chars: List[str] = [] - for char in text.lstrip(): - if not _is_cjk_dedupe(char): - break - chars.append(char) - return chars - - -def _get_trailing_cjk_chars(text: str) -> List[str]: - """Return the CJK characters at the end of ``text``, stopping at the first - non-CJK character (trailing whitespace is skipped).""" - chars: List[str] = [] - for char in reversed(text.rstrip()): - if not _is_cjk_dedupe(char): - break - chars.append(char) - chars.reverse() - return chars - - -def _dedupe_by_cjk_char(committed_text: str, candidate_out: str) -> str: - """Drop the CJK characters at the start of ``candidate_out`` when they repeat - the CJK characters at the end of ``committed_text``. Only those boundary - characters are compared, so a non-CJK prefix ("today ") is never deleted to - reach a later match.""" - lead = _get_leading_cjk_chars(candidate_out) - tail = _get_trailing_cjk_chars(committed_text) - if not lead or not tail: - return candidate_out - for overlap in range(min(len(lead), len(tail)), 0, -1): - if tail[-overlap:] != lead[:overlap]: - continue - # Single-glyph matches collide too often for CJK letters; require >=2, - # allow 1 only for punctuation. - if overlap == 1 and not _is_punctuation(lead[0]): - continue - return candidate_out.lstrip()[overlap:].lstrip() - return candidate_out - - def dedupe_overlap(committed_text: str, candidate_out: str) -> str: - """Trim words / CJK characters at the start of ``candidate_out`` that - re-transcribe ``committed_text``'s tail. Matches by whole word first, then - falls back to matching the leading/trailing CJK characters.""" + """Trim words at the start of ``candidate_out`` that re-transcribe + ``committed_text``'s tail (word-level, case- and punctuation-insensitive). + + CJK has no inter-word spaces, so the word-level matcher does not help there; + a character-level CJK dedupe is deferred to M3, where slicing also engages + for CJK (today it stays on the cumulative path).""" if not committed_text or not candidate_out: return candidate_out - deduped = _dedupe_by_word(committed_text, candidate_out) - if deduped != candidate_out: - return deduped - return _dedupe_by_cjk_char(committed_text, candidate_out) + return _dedupe_by_word(committed_text, candidate_out) async def process_asr_chunk( diff --git a/test/registered/unit/entrypoints/openai/test_streaming_asr.py b/test/registered/unit/entrypoints/openai/test_streaming_asr.py index 10b4c7f6941d..1b72d277976b 100644 --- a/test/registered/unit/entrypoints/openai/test_streaming_asr.py +++ b/test/registered/unit/entrypoints/openai/test_streaming_asr.py @@ -102,12 +102,6 @@ def test_slicing_path_uses_bare_prompt_and_dedupes(self): self.assertEqual(tm.requests[0].text, "PROMPT:") self.assertEqual(state.full_transcript, "gamma") - def test_slicing_path_dedupes_overlapping_cjk(self): - # Dedupe also trims an overlapping CJK run (no inter-word spaces). - state = self._state() - self._chunk(state, "天气很好", prompt="PROMPT:", dedupe_against="今天天气") - self.assertEqual(state.full_transcript, "很好") - def test_slicing_path_keeps_non_overlapping_candidate(self): # No overlap with the committed tail -> nothing is trimmed. state = self._state() From ae724c8f8876056e2023a16c53d01726b9b18c9a Mon Sep 17 00:00:00 2001 From: sam huang Date: Sun, 31 May 2026 11:01:26 -0400 Subject: [PATCH 07/10] Realtime ASR: revert CJK spacing predicate to codepoint _is_cjk Drop the regex/scx rewrite of the spacing predicate back to the baseline codepoint _is_cjk (removes the new `regex` dependency); add a halfwidth Hangul jamo guard so the function matches its docstring. Fix a stale "token-level" comment in update() (the scan is word-level; token-level rollback is M3) and shorten _dedupe_norm's docstring. --- .../srt/entrypoints/openai/streaming_asr.py | 40 +++++++++---------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/python/sglang/srt/entrypoints/openai/streaming_asr.py b/python/sglang/srt/entrypoints/openai/streaming_asr.py index 93b969c69ec9..a110defaf0dc 100644 --- a/python/sglang/srt/entrypoints/openai/streaming_asr.py +++ b/python/sglang/srt/entrypoints/openai/streaming_asr.py @@ -7,7 +7,6 @@ from typing import Any, Dict, List, Optional, Union import numpy as np -import regex import soundfile as sf from fastapi import Request @@ -74,8 +73,8 @@ def update(self, new_transcript: str) -> str: self.confirmed_text = "" self.full_transcript = new_transcript self.chunk_index += 1 - # Token-level common prefix, not char-level startswith: startswith - # sliced mid-word when a confirmed token was extended ("world" -> + # Word-level common prefix, not char-level startswith: startswith + # sliced mid-word when a confirmed word was extended ("world" -> # "worldly" emitted "ly"). old_words = old_confirmed.split() new_words = self.confirmed_text.split() @@ -135,21 +134,19 @@ def normalize_whitespace(text: str) -> str: _NO_SPACE_AFTER = frozenset("([{(【《「『") -# No-inter-word-space characters, for needs_space() output spacing. scx (not -# Script) so the kana marks ー/・ count; the halfwidth Hangul jamo (U+FFA0-FFDC) -# the fullwidth block would re-add is subtracted, since Korean is space-delimited. -_HALFWIDTH_HANGUL = chr(0xFFA0) + "-" + chr(0xFFDC) -_CJK_NO_SPACE_RE = regex.compile( - "(?V1)[" - r"\p{Han}\p{scx=Hiragana}\p{scx=Katakana}\p{Bopomofo}" - r"\p{Block=CJK_Symbols_and_Punctuation}" - r"[\p{Block=Halfwidth_and_Fullwidth_Forms}--[" + _HALFWIDTH_HANGUL + "]]]" -) - - -def _is_cjk_no_space(c: str) -> bool: - """No-inter-word-space char (CJK script + fullwidth forms); for spacing.""" - return bool(_CJK_NO_SPACE_RE.match(c)) +def _is_cjk(c: str) -> bool: + """CJK-context character that takes no inter-word space.""" + cp = ord(c) + if 0xFFA0 <= cp <= 0xFFDC: # halfwidth Hangul jamo -- Korean is space-delimited + return False + return ( + 0x3000 <= cp <= 0x303F # CJK Symbols and Punctuation + or 0x3040 <= cp <= 0x309F # Hiragana + or 0x30A0 <= cp <= 0x30FF # Katakana (incl. ー / ・) + or 0x3400 <= cp <= 0x4DBF # CJK Unified Ideographs Ext A + or 0x4E00 <= cp <= 0x9FFF # CJK Unified Ideographs + or 0xFF00 <= cp <= 0xFFEF # Halfwidth & Fullwidth Forms + ) def needs_space(prev: str, cur: str) -> bool: @@ -164,15 +161,14 @@ def needs_space(prev: str, cur: str) -> bool: return False if cur[0] in _NO_SPACE_BEFORE or prev[-1] in _NO_SPACE_AFTER: return False - if _is_cjk_no_space(prev[-1]) and _is_cjk_no_space(cur[0]): + if _is_cjk(prev[-1]) and _is_cjk(cur[0]): return False return True def _dedupe_norm(word: str) -> str: - """Lowercase + NFKC-fold + strip edge punctuation (Unicode category P), so - "dinner," == "dinner" and exotic marks (《》«» …) need no hand-listed set. - Strips P only, not S, so "$5" / "3+4" keep their symbols.""" + """Normalize a word for overlap matching: NFKC, lowercase, strip edge + punctuation (Unicode category P).""" word = unicodedata.normalize("NFKC", word) lo, hi = 0, len(word) while lo < hi and unicodedata.category(word[lo])[0] == "P": From c59d41496c2db7147180da5b436cd58d86ee7bce Mon Sep 17 00:00:00 2001 From: sam huang Date: Sun, 31 May 2026 11:42:28 -0400 Subject: [PATCH 08/10] realtime ASR: rename slicing anchor to last_sliced_buffer_end_bytes "committed_audio_until_bytes" collided with the OpenAI realtime input_audio_buffer.commit concept; the field actually marks the PCM offset the previous sliced inference consumed up to. Rename it across the field declaration, slice-start arithmetic, anchor update, and the per-item reset. Also fix a stale test docstring left over from the CJK-dedupe rollback ("Latin and CJK" -> word-level dedupe). --- .../sglang/srt/entrypoints/openai/realtime/session.py | 10 +++++----- .../unit/entrypoints/openai/test_streaming_asr.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/sglang/srt/entrypoints/openai/realtime/session.py b/python/sglang/srt/entrypoints/openai/realtime/session.py index 9cabd1a78b14..f737f1e8893d 100644 --- a/python/sglang/srt/entrypoints/openai/realtime/session.py +++ b/python/sglang/srt/entrypoints/openai/realtime/session.py @@ -147,7 +147,7 @@ class _AudioState: After the slicing gate is reached, inference switches from the cumulative buffer to a tail slice. The first gated call may still start at offset 0; - later calls use ``committed_audio_until_bytes - left_overlap_bytes``.""" + later calls use ``last_sliced_buffer_end_bytes - left_overlap_bytes``.""" max_buffer_bytes: int chunk_size_bytes: int @@ -159,7 +159,7 @@ class _AudioState: slicing_enabled: bool = True pcm_buffer: bytearray = field(default_factory=bytearray) last_inference_offset: int = 0 - committed_audio_until_bytes: int = 0 + last_sliced_buffer_end_bytes: int = 0 @dataclass @@ -629,7 +629,7 @@ async def _run_inference(self, is_last: bool) -> bool: dedupe_against: Optional[str] = committed_text slice_start = max( 0, - self.audio.committed_audio_until_bytes - self.audio.left_overlap_bytes, + self.audio.last_sliced_buffer_end_bytes - self.audio.left_overlap_bytes, ) else: prompt = None @@ -690,7 +690,7 @@ async def _run_inference(self, is_last: bool) -> bool: if use_slicing: # Held-back tokens are re-covered only if their audio span fits the # left overlap; slower speech can drop the earliest (see known limits). - self.audio.committed_audio_until_bytes = len(self.audio.pcm_buffer) + self.audio.last_sliced_buffer_end_bytes = len(self.audio.pcm_buffer) self.audio.last_inference_offset = len(self.audio.pcm_buffer) await self._emit_transcription_delta(delta) @@ -729,7 +729,7 @@ def _reset_inference_state(self) -> None: self.audio.pcm_buffer.clear() # in-place; reuses the buffer's allocation self.item.emitted_deltas.clear() self.audio.last_inference_offset = 0 - self.audio.committed_audio_until_bytes = 0 + self.audio.last_sliced_buffer_end_bytes = 0 def _build_session_info(self) -> TranscriptionSessionConfig: # id / object aren't SDK fields; round-trip via extra='allow' so diff --git a/test/registered/unit/entrypoints/openai/test_streaming_asr.py b/test/registered/unit/entrypoints/openai/test_streaming_asr.py index 1b72d277976b..1932e851f13d 100644 --- a/test/registered/unit/entrypoints/openai/test_streaming_asr.py +++ b/test/registered/unit/entrypoints/openai/test_streaming_asr.py @@ -3,7 +3,7 @@ Drives the shared ``process_asr_chunk`` entry point with a mocked ``TokenizerManager`` (same style as ``test_serving_transcription`` / ``test_serving_embedding``) across the real scenarios: the cumulative (M1) and -sliced (M2) inference paths, output dedupe for Latin and CJK, the no-overlap and +sliced (M2) inference paths, word-level output dedupe, the no-overlap and empty-response edges, last-chunk finalize, and word reconciliation -- plus the ``RealtimeConnection`` guard that decides whether slicing turns on. """ From 36b3322e430037c82a058a52dfe94f227a67c587 Mon Sep 17 00:00:00 2001 From: Sam H <49463080+SammLSH@users.noreply.github.com> Date: Sun, 31 May 2026 12:00:25 -0400 Subject: [PATCH 09/10] Update python/sglang/srt/entrypoints/openai/realtime/session.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- python/sglang/srt/entrypoints/openai/realtime/session.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/sglang/srt/entrypoints/openai/realtime/session.py b/python/sglang/srt/entrypoints/openai/realtime/session.py index f737f1e8893d..169b5dfe89df 100644 --- a/python/sglang/srt/entrypoints/openai/realtime/session.py +++ b/python/sglang/srt/entrypoints/openai/realtime/session.py @@ -636,9 +636,9 @@ async def _run_inference(self, is_last: bool) -> bool: dedupe_against = None slice_start = 0 - pcm_slice = _slice_pcm_from(self.audio.pcm_buffer, slice_start) - audio_samples = await asyncio.to_thread(_pcm_to_float_samples, pcm_slice) try: + pcm_slice = _slice_pcm_from(self.audio.pcm_buffer, slice_start) + audio_samples = await asyncio.to_thread(_pcm_to_float_samples, pcm_slice) delta = await process_asr_chunk( tokenizer_manager=self.tokenizer_manager, adapter=self.adapter, From 31dbc971a20c342d593f88db886e94ddeb81ce5c Mon Sep 17 00:00:00 2001 From: Sam H <49463080+SammLSH@users.noreply.github.com> Date: Sun, 31 May 2026 12:26:44 -0400 Subject: [PATCH 10/10] Update python/sglang/srt/entrypoints/openai/realtime/session.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- python/sglang/srt/entrypoints/openai/realtime/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/sglang/srt/entrypoints/openai/realtime/session.py b/python/sglang/srt/entrypoints/openai/realtime/session.py index 169b5dfe89df..7543611d5870 100644 --- a/python/sglang/srt/entrypoints/openai/realtime/session.py +++ b/python/sglang/srt/entrypoints/openai/realtime/session.py @@ -107,7 +107,7 @@ def _resample_to_target_rate(pcm: bytes, src_rate: int, target_rate: int) -> byt def _pcm_to_float_samples(pcm: bytes) -> np.ndarray: # /32768.0 matches soundfile.read's default int16 normalization so the # samples are bit-equal to the prior PCM→WAV→sf.read path. - return np.frombuffer(pcm, dtype=np.int16).astype(np.float64) / 32768.0 + return np.frombuffer(pcm, dtype=np.int16).astype(np.float32) / 32768.0 _CLIENT_EVENT_TYPES: Dict[str, type] = {