diff --git a/python/sglang/srt/entrypoints/openai/realtime/session.py b/python/sglang/srt/entrypoints/openai/realtime/session.py index c5951993e25d..7543611d5870 100644 --- a/python/sglang/srt/entrypoints/openai/realtime/session.py +++ b/python/sglang/srt/entrypoints/openai/realtime/session.py @@ -9,16 +9,14 @@ from __future__ import annotations import asyncio -import io import json import logging import math from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union import numpy as np import pybase64 -import soundfile as sf from fastapi import WebSocket, WebSocketDisconnect from openai.types.realtime import ( ConversationItemCreatedEvent, @@ -83,6 +81,13 @@ _SAMPLE_WIDTH = 2 +def _slice_pcm_from(buffer: Union[bytes, bytearray], start: int) -> bytes: + """Return an immutable ``buffer[start:]`` snapshot with bounds checking.""" + if not (0 <= start <= len(buffer)): + raise ValueError(f"_slice_pcm_from: start={start} not in [0, {len(buffer)}]") + return bytes(memoryview(buffer)[start:]) + + def _resample_to_target_rate(pcm: bytes, src_rate: int, target_rate: int) -> bytes: if src_rate == target_rate or not pcm: return pcm @@ -99,11 +104,10 @@ def _resample_to_target_rate(pcm: bytes, src_rate: int, target_rate: int) -> byt return (np.clip(samples, -1.0, 1.0) * 32767.0).astype(np.int16).tobytes() -def _pcm_to_wav(pcm: bytes, sample_rate: int) -> bytes: - samples = np.frombuffer(pcm, dtype=np.int16) - buf = io.BytesIO() - sf.write(buf, samples, sample_rate, format="WAV") - return buf.getvalue() +def _pcm_to_float_samples(pcm: bytes) -> np.ndarray: + # /32768.0 matches soundfile.read's default int16 normalization so the + # samples are bit-equal to the prior PCM→WAV→sf.read path. + return np.frombuffer(pcm, dtype=np.int16).astype(np.float32) / 32768.0 _CLIENT_EVENT_TYPES: Dict[str, type] = { @@ -139,17 +143,23 @@ class _SessionConfig: @dataclass class _AudioState: - """Per-item audio state: PCM buffer accumulated from - input_audio_buffer.append, the chunked ASR rollback state, and the - static buffer-size limits set at __init__. pcm_buffer / state / - last_inference_offset reset on commit-roll and clear; the size limits - stay constant for the session's lifetime.""" + """Per-item audio buffer and slicing state. + + After the slicing gate is reached, inference switches from the cumulative + buffer to a tail slice. The first gated call may still start at offset 0; + later calls use ``last_sliced_buffer_end_bytes - left_overlap_bytes``.""" max_buffer_bytes: int chunk_size_bytes: int + left_overlap_bytes: int + slicing_min_chunk_index: int state: StreamingASRState + # False when the left overlap covers the whole unfixed-chunk window (the + # K-unfixed dedupe target would be unreachable); set at construction. + slicing_enabled: bool = True pcm_buffer: bytearray = field(default_factory=bytearray) last_inference_offset: int = 0 + last_sliced_buffer_end_bytes: int = 0 @dataclass @@ -190,6 +200,12 @@ def __init__( self.config = _SessionConfig() + slicing_cfg = adapter.realtime_slicing_config + slicing_opt_in = bool(slicing_cfg.get("enabled", False)) + left_overlap_ms = int(slicing_cfg.get("left_overlap_ms", 0)) + min_audio_sec = float(slicing_cfg.get("min_audio_sec", 0.0)) + left_overlap_bytes = int(left_overlap_ms / 1000 * self.bytes_per_second) + state = StreamingASRState(**adapter.chunked_streaming_config) chunk_size_bytes = int(state.chunk_size_sec * self.bytes_per_second) if chunk_size_bytes <= 0: @@ -197,10 +213,27 @@ def __init__( f"adapter.chunked_streaming_config produced non-positive " f"chunk_size_sec; got {state.chunk_size_sec!r}" ) + slicing_min_chunk_index = ( + math.ceil(min_audio_sec / state.chunk_size_sec) if slicing_opt_in else 0 + ) + slicing_enabled = ( + slicing_opt_in + and left_overlap_bytes < state.unfixed_chunk_num * chunk_size_bytes + ) + if slicing_opt_in and not slicing_enabled: + logger.warning( + "[realtime] left_overlap=%dms >= unfixed_chunks_duration=%dms; " + "audio slicing disabled, falling back to cumulative inference", + left_overlap_ms, + state.unfixed_chunk_num * int(state.chunk_size_sec * 1000), + ) self.audio = _AudioState( max_buffer_bytes=self.max_buffer_seconds * self.bytes_per_second, chunk_size_bytes=chunk_size_bytes, state=state, + left_overlap_bytes=left_overlap_bytes, + slicing_min_chunk_index=slicing_min_chunk_index, + slicing_enabled=slicing_enabled, ) self.item = _ItemState(current_item_id=f"item_{random_uuid()}") @@ -543,8 +576,8 @@ async def _on_input_audio_buffer_commit( tail = self.audio.state.finalize() await self._emit_transcription_delta(tail) - # Build from emitted_deltas, not state.full_transcript: prefix injection - # means the last chunk's full_transcript is only the continuation tail. + # Rebuild from emitted_deltas: both paths leave full_transcript only a + # partial tail, while the deltas together are the whole transcript. transcript = normalize_whitespace("".join(self.item.emitted_deltas)) await self._send( @@ -579,20 +612,42 @@ async def _on_input_audio_buffer_clear( ) async def _run_inference(self, is_last: bool) -> bool: - """Run ASR on the current cumulative buffer. Returns False on failure: - commit-time emits transcription.failed and rolls the item; append-time - emits a generic error envelope and closes the WebSocket.""" - wav_data = await asyncio.to_thread( - _pcm_to_wav, bytes(self.audio.pcm_buffer), self.model_sample_rate + """Run ASR on the current audio window: the whole PCM buffer + (cumulative) or a tail slice with left overlap + output dedupe + (slicing). Returns False on failure -- commit-time emits + transcription.failed and rolls the item; append-time closes the WS.""" + # Slicing uses a bare prompt: the retained overlap + dedupe replace + # injecting emitted_text as a continuation prefix. + committed_text = self.audio.state.get_prefix_text() + use_slicing = ( + self.audio.slicing_enabled + and bool(committed_text) + and self.audio.state.chunk_index >= self.audio.slicing_min_chunk_index ) + if use_slicing: + prompt: Optional[str] = self.adapter.prompt_template + dedupe_against: Optional[str] = committed_text + slice_start = max( + 0, + self.audio.last_sliced_buffer_end_bytes - self.audio.left_overlap_bytes, + ) + else: + prompt = None + dedupe_against = None + slice_start = 0 + try: + pcm_slice = _slice_pcm_from(self.audio.pcm_buffer, slice_start) + audio_samples = await asyncio.to_thread(_pcm_to_float_samples, pcm_slice) delta = await process_asr_chunk( tokenizer_manager=self.tokenizer_manager, adapter=self.adapter, state=self.audio.state, - audio_data=wav_data, + audio_data=audio_samples, sampling_params=self.config.sampling_params, is_last=is_last, + prompt=prompt, + dedupe_against=dedupe_against, ) except Exception: logger.exception( @@ -632,6 +687,11 @@ async def _run_inference(self, is_last: bool) -> bool: ) return False + if use_slicing: + # Held-back tokens are re-covered only if their audio span fits the + # left overlap; slower speech can drop the earliest (see known limits). + self.audio.last_sliced_buffer_end_bytes = len(self.audio.pcm_buffer) + self.audio.last_inference_offset = len(self.audio.pcm_buffer) await self._emit_transcription_delta(delta) return True @@ -669,6 +729,7 @@ def _reset_inference_state(self) -> None: self.audio.pcm_buffer.clear() # in-place; reuses the buffer's allocation self.item.emitted_deltas.clear() self.audio.last_inference_offset = 0 + self.audio.last_sliced_buffer_end_bytes = 0 def _build_session_info(self) -> TranscriptionSessionConfig: # id / object aren't SDK fields; round-trip via extra='allow' so diff --git a/python/sglang/srt/entrypoints/openai/streaming_asr.py b/python/sglang/srt/entrypoints/openai/streaming_asr.py index a347cc8f3e33..a110defaf0dc 100644 --- a/python/sglang/srt/entrypoints/openai/streaming_asr.py +++ b/python/sglang/srt/entrypoints/openai/streaming_asr.py @@ -2,9 +2,11 @@ import io import logging import re +import unicodedata from dataclasses import dataclass -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union +import numpy as np import soundfile as sf from fastapi import Request @@ -40,8 +42,8 @@ class StreamingASRState: unfixed_chunk_num: int unfixed_token_num: int confirmed_text: str = "" - # Monotonic accumulator; used as prompt prefix so the model sees a - # natural continuation point, not the rolled-back ``confirmed_text``. + # Monotonic accumulator. Used as the prompt prefix on cumulative paths and + # as the dedupe prefix on the slicing path. emitted_text: str = "" full_transcript: str = "" chunk_index: int = 0 @@ -53,9 +55,13 @@ def get_prefix_text(self) -> str: def _record_emit(self, delta: str) -> str: if delta: - self.emitted_text = ( - f"{self.emitted_text} {delta}".strip() if self.emitted_text else delta - ) + if self.emitted_text: + # needs_space avoids a space between adjacent CJK characters; + # this accumulator feeds the prompt prefix and the dedupe target. + sep = " " if needs_space(self.emitted_text, delta) else "" + self.emitted_text = f"{self.emitted_text}{sep}{delta}".strip() + else: + self.emitted_text = delta return delta def update(self, new_transcript: str) -> str: @@ -67,10 +73,9 @@ def update(self, new_transcript: str) -> str: self.confirmed_text = "" self.full_transcript = new_transcript self.chunk_index += 1 - if self.confirmed_text.startswith(old_confirmed): - return self._record_emit(self.confirmed_text[len(old_confirmed) :].strip()) - # Model revised earlier text, use word level common prefix to avoid - # re-emitting already-sent content and cutting mid-word. + # Word-level common prefix, not char-level startswith: startswith + # sliced mid-word when a confirmed word was extended ("world" -> + # "worldly" emitted "ly"). old_words = old_confirmed.split() new_words = self.confirmed_text.split() common_count = 0 @@ -130,25 +135,24 @@ def normalize_whitespace(text: str) -> str: def _is_cjk(c: str) -> bool: - """Whether char is a CJK-context glyph that doesn't take inter-word - spaces — ideographs, Japanese kana, CJK punctuation, fullwidth forms. - Excludes Hangul / Devanagari / Arabic etc., which are non-ASCII but - space-separated and need the normal boundary space.""" + """CJK-context character that takes no inter-word space.""" cp = ord(c) + if 0xFFA0 <= cp <= 0xFFDC: # halfwidth Hangul jamo -- Korean is space-delimited + return False return ( - 0x3000 <= cp <= 0x303F # CJK Symbols and Punctuation (,。、《》「」…) + 0x3000 <= cp <= 0x303F # CJK Symbols and Punctuation or 0x3040 <= cp <= 0x309F # Hiragana - or 0x30A0 <= cp <= 0x30FF # Katakana + or 0x30A0 <= cp <= 0x30FF # Katakana (incl. ー / ・) or 0x3400 <= cp <= 0x4DBF # CJK Unified Ideographs Ext A or 0x4E00 <= cp <= 0x9FFF # CJK Unified Ideographs - or 0xFF00 <= cp <= 0xFFEF # Halfwidth & Fullwidth Forms (fullwidth ASCII) + or 0xFF00 <= cp <= 0xFFEF # Halfwidth & Fullwidth Forms ) def needs_space(prev: str, cur: str) -> bool: """Return whether a boundary space is needed between emitted deltas. - Avoid spaces around punctuation and between adjacent CJK-context glyphs. + Avoid spaces around punctuation and between adjacent CJK-context characters. Shared by the realtime WS and HTTP SSE chunked streaming paths. """ if not prev or not cur: @@ -162,18 +166,79 @@ def needs_space(prev: str, cur: str) -> bool: return True +def _dedupe_norm(word: str) -> str: + """Normalize a word for overlap matching: NFKC, lowercase, strip edge + punctuation (Unicode category P).""" + word = unicodedata.normalize("NFKC", word) + lo, hi = 0, len(word) + while lo < hi and unicodedata.category(word[lo])[0] == "P": + lo += 1 + while hi > lo and unicodedata.category(word[hi - 1])[0] == "P": + hi -= 1 + return word[lo:hi].lower() + + +def _dedupe_by_word(committed_text: str, candidate_out: str) -> str: + """Drop the longest prefix of ``candidate_out`` matching the suffix of + ``committed_text`` word-for-word (case- and punctuation-insensitive).""" + candidate_words = candidate_out.split() + if not candidate_words: + return candidate_out + # Only the last len(candidate_words) committed words can overlap, so rsplit + # the tail instead of tokenizing the whole (growing) committed transcript. + committed_tail = committed_text.rsplit(maxsplit=len(candidate_words))[ + -len(candidate_words) : + ] + if not committed_tail: + return candidate_out + # Normalize the committed tail and candidate prefix once, then compare slices. + max_overlap = min(len(committed_tail), len(candidate_words)) + committed_tail_norm = [_dedupe_norm(w) for w in committed_tail] + candidate_norm = [_dedupe_norm(w) for w in candidate_words[:max_overlap]] + # Longest overlap first; the first match wins. + for overlap in range(max_overlap, 0, -1): + if committed_tail_norm[-overlap:] != candidate_norm[:overlap]: + continue + # Skip all-punctuation overlaps: lone "@"/"#" both normalize to "" and + # would match spuriously. + if not any(candidate_norm[:overlap]): + continue + return " ".join(candidate_words[overlap:]) + return candidate_out + + +def dedupe_overlap(committed_text: str, candidate_out: str) -> str: + """Trim words at the start of ``candidate_out`` that re-transcribe + ``committed_text``'s tail (word-level, case- and punctuation-insensitive). + + CJK has no inter-word spaces, so the word-level matcher does not help there; + a character-level CJK dedupe is deferred to M3, where slicing also engages + for CJK (today it stays on the cumulative path).""" + if not committed_text or not candidate_out: + return candidate_out + return _dedupe_by_word(committed_text, candidate_out) + + async def process_asr_chunk( tokenizer_manager: TokenizerManager, adapter: TranscriptionAdapter, state: StreamingASRState, - audio_data: bytes, + audio_data: Union[bytes, np.ndarray], sampling_params: Dict[str, Any], is_last: bool, raw_request: Optional[Request] = None, routing_key: Optional[str] = None, + prompt: Optional[str] = None, + dedupe_against: Optional[str] = None, ) -> str: - """Run inference on one audio chunk. Shared by the HTTP and WebSocket paths.""" - prompt = adapter.prompt_template + state.get_prefix_text() + """Run inference on one audio chunk. Shared by the HTTP and WS paths. + + ``audio_data`` accepts WAV bytes or pre-decoded float samples. + ``prompt`` overrides the default ``adapter.prompt_template + state.get_prefix_text()``. + ``dedupe_against`` triggers ``dedupe_overlap`` on raw model output before ``state`` ingests it. + """ + if prompt is None: + prompt = adapter.prompt_template + state.get_prefix_text() chunk_request = GenerateReqInput( text=prompt, @@ -202,6 +267,8 @@ async def process_asr_chunk( return "" text = normalize_whitespace(adapter.postprocess_text(ret.get("text", ""))) + if dedupe_against is not None: + text = dedupe_overlap(dedupe_against, text) if is_last: state.full_transcript = text diff --git a/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py b/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py index cd97b42997f9..ff4e5a2df894 100644 --- a/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py +++ b/python/sglang/srt/entrypoints/openai/transcription_adapters/base.py @@ -107,6 +107,16 @@ def chunked_streaming_config(self) -> dict: """ return {} + @property + def realtime_slicing_config(self) -> dict: + """Slicing-path tuning knobs, off by default -- an adapter opts in by + overriding with ``enabled=True`` and model-tuned values. + ``left_overlap_ms`` is the audio kept across the sliced boundary for + dedupe context; ``min_audio_sec`` is the floor below which slicing stays + off. + """ + return {"enabled": False, "left_overlap_ms": 0, "min_audio_sec": 0.0} + def postprocess_text(self, text: str) -> str: """Strip model-specific markers from raw decoded text. diff --git a/python/sglang/srt/entrypoints/openai/transcription_adapters/qwen3_asr.py b/python/sglang/srt/entrypoints/openai/transcription_adapters/qwen3_asr.py index df686b15aecb..c6ecbae4cf8a 100644 --- a/python/sglang/srt/entrypoints/openai/transcription_adapters/qwen3_asr.py +++ b/python/sglang/srt/entrypoints/openai/transcription_adapters/qwen3_asr.py @@ -35,6 +35,14 @@ def chunked_streaming_config(self) -> dict: def prompt_template(self) -> str: return DEFAULT_ASR_PROMPT + @property + def realtime_slicing_config(self) -> dict: + # Empirically tuned for Qwen3-ASR: in our fixtures a 2s left overlap + # gave enough acoustic context for the K=5 (unfixed_token_num) rollback + # window; 16s min audio keeps slicing off on short inputs, where sliced + # output diverged from cumulative. + return {"enabled": True, "left_overlap_ms": 2000, "min_audio_sec": 16.0} + def build_sampling_params(self, request: TranscriptionRequest) -> dict: temperature = request.temperature if temperature == 0.0: diff --git a/python/sglang/srt/utils/common.py b/python/sglang/srt/utils/common.py index feb505d5dd6b..7bbaaa40204c 100644 --- a/python/sglang/srt/utils/common.py +++ b/python/sglang/srt/utils/common.py @@ -772,11 +772,20 @@ def set_random_seed(seed: int) -> None: def load_audio( - audio_file: str, sr: Optional[int] = None, mono: bool = True + audio_file: Union[str, bytes, np.ndarray], + sr: Optional[int] = None, + mono: bool = True, ) -> np.ndarray: if sr is None: sr = 16000 + # Caller must pre-resample to `sr`. Multi-channel layout assumed + # (n_samples, n_channels) per soundfile.read. + if isinstance(audio_file, np.ndarray): + if mono and audio_file.ndim > 1: + return np.mean(audio_file, axis=1) + return audio_file + # Normalize input: resolve URL / base64 / file:// to bytes or path if isinstance(audio_file, bytes): source = audio_file diff --git a/test/registered/unit/entrypoints/openai/test_streaming_asr.py b/test/registered/unit/entrypoints/openai/test_streaming_asr.py new file mode 100644 index 000000000000..1932e851f13d --- /dev/null +++ b/test/registered/unit/entrypoints/openai/test_streaming_asr.py @@ -0,0 +1,186 @@ +"""Unit tests for the realtime ASR slicing path. + +Drives the shared ``process_asr_chunk`` entry point with a mocked +``TokenizerManager`` (same style as ``test_serving_transcription`` / +``test_serving_embedding``) across the real scenarios: the cumulative (M1) and +sliced (M2) inference paths, word-level output dedupe, the no-overlap and +empty-response edges, last-chunk finalize, and word reconciliation -- plus the +``RealtimeConnection`` guard that decides whether slicing turns on. +""" + +from sglang.test.test_utils import maybe_stub_sgl_kernel + +maybe_stub_sgl_kernel() # must precede any import that pulls in sgl_kernel + +import unittest +from types import SimpleNamespace + +import numpy as np + +from sglang.srt.entrypoints.openai.realtime.session import RealtimeConnection +from sglang.srt.entrypoints.openai.streaming_asr import ( + StreamingASRState, + process_asr_chunk, +) +from sglang.srt.utils import get_or_create_event_loop +from sglang.test.ci.ci_register import register_cpu_ci +from sglang.test.test_utils import CustomTestCase + +register_cpu_ci(est_time=3, suite="base-a-test-cpu") + + +class _FakeAdapter: + prompt_template = "PROMPT:" + + def postprocess_text(self, text: str) -> str: + return text + + +class _MockTokenizerManager: + """Records the request and yields one synthetic transcript (or nothing, when + ``transcript`` is None, to exercise the empty-response path).""" + + def __init__(self, transcript): + self._transcript = transcript + self.requests = [] + + def generate_request(self, adapted_request, raw_request=None): + self.requests.append(adapted_request) + transcript = self._transcript + + async def gen(): + if transcript is not None: + yield {"text": transcript} + + return gen() + + +def _run(coro): + return get_or_create_event_loop().run_until_complete(coro) + + +_AUDIO = np.zeros(1600, dtype=np.float32) + + +class TestProcessAsrChunk(CustomTestCase): + def _state(self, **kwargs): + params = dict(chunk_size_sec=1.0, unfixed_chunk_num=2, unfixed_token_num=2) + params.update(kwargs) + return StreamingASRState(**params) + + def _chunk(self, state, transcript, is_last=False, **kwargs): + tm = _MockTokenizerManager(transcript) + out = _run( + process_asr_chunk( + tokenizer_manager=tm, + adapter=_FakeAdapter(), + state=state, + audio_data=_AUDIO, + sampling_params={}, + is_last=is_last, + **kwargs, + ) + ) + return tm, out + + def test_cumulative_path_injects_prefix_and_skips_dedupe(self): + # prompt=None -> prompt_template + get_prefix_text(), no dedupe (M1). + state = self._state() + state.emitted_text = "hello" + state.chunk_index = 5 # past unfixed_chunk_num, so the prefix is live + tm, _ = self._chunk(state, "hello world foo") + self.assertEqual(tm.requests[0].text, "PROMPT:hello") + self.assertEqual(state.full_transcript, "hello world foo") + + def test_slicing_path_uses_bare_prompt_and_dedupes(self): + # Bare prompt (no prefix injection); dedupe trims the word that overlaps + # the committed tail (M2). + state = self._state() + tm, _ = self._chunk( + state, "beta gamma", prompt="PROMPT:", dedupe_against="alpha beta" + ) + self.assertEqual(tm.requests[0].text, "PROMPT:") + self.assertEqual(state.full_transcript, "gamma") + + def test_slicing_path_keeps_non_overlapping_candidate(self): + # No overlap with the committed tail -> nothing is trimmed. + state = self._state() + self._chunk(state, "gamma delta", prompt="PROMPT:", dedupe_against="alpha beta") + self.assertEqual(state.full_transcript, "gamma delta") + + def test_last_chunk_dedupes_then_finalizes(self): + # The final chunk dedupes against the committed tail, then finalize() + # emits the remaining text. + state = self._state() + _, out = self._chunk( + state, + "alpha beta gamma", + is_last=True, + prompt="PROMPT:", + dedupe_against="alpha", + ) + self.assertEqual(out, "beta gamma") + self.assertEqual(state.full_transcript, "beta gamma") + + def test_extended_word_emits_whole_word_not_fragment(self): + # "world" re-transcribed as "worldly" must emit "worldly", not "ly" + # (regression guard for the removed char-level startswith fast path). + state = self._state( + unfixed_chunk_num=0, unfixed_token_num=1, confirmed_text="hello world" + ) + _, out = self._chunk(state, "hello worldly test tail") + self.assertEqual(out, "worldly test") + + def test_empty_model_response_emits_nothing(self): + # No model output -> empty delta, no state mutation, no crash. + state = self._state() + _, out = self._chunk(state, None) + self.assertEqual(out, "") + self.assertEqual(state.full_transcript, "") + + +class _SlicingAdapter: + """Minimal adapter exposing only what RealtimeConnection.__init__ reads.""" + + model_sample_rate = 16000 + + def __init__(self, left_overlap_ms, enabled=True): + self._left_overlap_ms = left_overlap_ms + self._enabled = enabled + + @property + def realtime_slicing_config(self): + return { + "enabled": self._enabled, + "left_overlap_ms": self._left_overlap_ms, + "min_audio_sec": 16.0, + } + + @property + def chunked_streaming_config(self): + # 2s chunks, 2 unfixed chunks -> 4s unfixed window. + return {"chunk_size_sec": 2.0, "unfixed_chunk_num": 2, "unfixed_token_num": 5} + + +class TestSlicingEnabledGuard(CustomTestCase): + def _conn(self, left_overlap_ms, enabled=True): + server_args = SimpleNamespace(asr_max_buffer_seconds=60) + return RealtimeConnection( + object(), object(), _SlicingAdapter(left_overlap_ms, enabled), server_args + ) + + def test_enabled_only_when_overlap_fits_unfixed_window(self): + # 2s overlap fits the 4s window -> slicing on; 8s overlap makes the + # dedupe target unreachable -> guard falls back to cumulative. + self.assertTrue(self._conn(left_overlap_ms=2000).audio.slicing_enabled) + self.assertFalse(self._conn(left_overlap_ms=8000).audio.slicing_enabled) + + def test_disabled_when_adapter_opts_out(self): + # enabled=False (the base-adapter default) -> never slices. + self.assertFalse( + self._conn(left_overlap_ms=2000, enabled=False).audio.slicing_enabled + ) + + +if __name__ == "__main__": + unittest.main()