Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 137 additions & 8 deletions tests/test_anthropic_stream_scrubber.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,11 @@ class TestScrubberParameterTags:
def test_parameter_tag_single_delta(self):
scrubber = _AnthropicStreamScrubber()
result = scrubber.feed("Before <parameter=city>NYC</parameter> After")
# parameter= maps to IN_FUNCTION mode, closes on </function>
# But </parameter> is consumed as stray closing tag in TEXT mode...
# Actually <parameter= opens IN_FUNCTION, and IN_FUNCTION closes on </function>
# So </parameter> won't close IN_FUNCTION. Let's verify behavior.
# The close for IN_FUNCTION is </function>, so content after <parameter=city>
# stays suppressed until </function> is seen or stream ends.
result += scrubber.flush()
assert "<parameter=" not in result
assert "NYC" not in result
assert "Before" in result
assert "After" in result

def test_stray_parameter_close_suppressed(self):
"""A stray </parameter> tag should be stripped."""
Expand Down Expand Up @@ -685,10 +681,10 @@ def test_function_back_to_text(self):
scrubber.feed("<function=test>body</function>")
assert scrubber.mode == "TEXT"

def test_text_to_parameter_enters_function_mode(self):
def test_text_to_parameter_enters_parameter_mode(self):
scrubber = _AnthropicStreamScrubber()
scrubber.feed("text<parameter=x>")
assert scrubber.mode == "IN_FUNCTION"
assert scrubber.mode == "IN_PARAMETER"

def test_stray_close_stays_in_text(self):
"""Stray closing tags should not change mode."""
Expand Down Expand Up @@ -1337,3 +1333,136 @@ def test_thinking_disabled_model(self):
thinking=AnthropicThinkingConfig(type="disabled"),
)
assert _is_thinking_enabled(req) is False


# =============================================================================
# Review Fix Tests
# =============================================================================


class TestScrubberCarryLimit:
"""Fix 1: Carry buffer must not grow without bound when '>' never arrives."""

def test_long_false_prefix_emitted(self):
"""<function= followed by too many chars without '>' is literal text."""
scrubber = _AnthropicStreamScrubber()
fake = "<function=" + "x" * (scrubber.MAX_TAG * 3)
result = scrubber.feed(fake) + scrubber.flush()
assert "<function=" in result
assert "x" * 10 in result

def test_incremental_carry_does_not_grow_unbounded(self):
"""Repeated feeds without '>' must flush carry at some point."""
scrubber = _AnthropicStreamScrubber()
scrubber.feed("<function=longname")
for _ in range(50):
scrubber.feed("a" * 100)
scrubber.flush()
# After processing, the carry should not have grown huge.
assert len(scrubber.carry) <= scrubber.MAX_TAG * 2


class TestScrubberFlushPartialPrefix:
"""Fix 2: flush() must strip partial prefix tags at end of stream."""

def test_partial_function_prefix_stripped(self):
scrubber = _AnthropicStreamScrubber()
scrubber.carry = "hello<function=myFunc"
scrubber.mode = "TEXT"
result = scrubber.flush()
assert result == "hello"

def test_partial_parameter_prefix_stripped(self):
scrubber = _AnthropicStreamScrubber()
scrubber.carry = "hello<parameter=name"
scrubber.mode = "TEXT"
result = scrubber.flush()
assert result == "hello"

def test_complete_prefix_also_stripped(self):
scrubber = _AnthropicStreamScrubber()
scrubber.carry = "hello<function=foo>"
scrubber.mode = "TEXT"
result = scrubber.flush()
assert result == "hello"


class TestScrubberStandaloneParameter:
"""Fix 5: Standalone <parameter=...> must close on </parameter>, not </function>."""

def test_standalone_parameter_closes_on_parameter_close(self):
scrubber = _AnthropicStreamScrubber()
text = "before<parameter=x>hidden</parameter>after"
result = scrubber.feed(text) + scrubber.flush()
assert result == "beforeafter"

def test_standalone_parameter_does_not_eat_until_function_close(self):
"""<parameter=x> must NOT suppress until </function>."""
scrubber = _AnthropicStreamScrubber()
text = "before<parameter=x>hidden</parameter>middle</function>after"
result = scrubber.feed(text) + scrubber.flush()
assert "before" in result
assert "middle" in result
assert "after" in result
assert "hidden" not in result

def test_parameter_inside_function_still_suppressed(self):
"""Within <function=...>, everything is suppressed by the outer block."""
scrubber = _AnthropicStreamScrubber()
text = "ok<function=f><parameter=p>v</parameter></function>done"
result = scrubber.feed(text) + scrubber.flush()
assert result == "okdone"


class TestRouterCarryLimit:
"""Fix 1: Router carry buffer limit."""

def test_long_false_prefix_emitted(self):
router = _AnthropicStreamRouter()
fake = "<function=" + "x" * (router.MAX_TAG * 3)
pieces = router.feed(fake) + router.flush()
text = "".join(t for k, t in pieces if k == "text")
assert "<function=" in text


class TestRouterFlushPartialPrefix:
"""Fix 2: Router flush() strips partial prefixes."""

def test_partial_function_prefix_stripped(self):
router = _AnthropicStreamRouter()
router.carry = "hello<function=myFunc"
router.mode = "TEXT"
pieces = router.flush()
text = "".join(t for k, t in pieces if k == "text")
assert text == "hello"

def test_partial_parameter_prefix_stripped(self):
router = _AnthropicStreamRouter()
router.carry = "hello<parameter=name"
router.mode = "TEXT"
pieces = router.flush()
text = "".join(t for k, t in pieces if k == "text")
assert text == "hello"


class TestRouterStandaloneParameter:
"""Fix 5: Router standalone parameter handling."""

def test_standalone_parameter(self):
router = _AnthropicStreamRouter()
pieces = router.feed("before<parameter=x>hidden</parameter>after")
pieces += router.flush()
text = "".join(t for k, t in pieces if k == "text")
assert text == "beforeafter"


class TestRouterDeadCode:
"""Fix 4: _implicit_think attribute should not exist."""

def test_no_implicit_think_attribute(self):
router = _AnthropicStreamRouter()
assert not hasattr(router, "_implicit_think")

def test_no_implicit_think_when_start_true(self):
router = _AnthropicStreamRouter(start_in_thinking=True)
assert not hasattr(router, "_implicit_think")
40 changes: 31 additions & 9 deletions vllm_mlx/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import json
import logging
import os
import re
import secrets
import tempfile
import threading
Expand Down Expand Up @@ -1690,6 +1691,7 @@ class _AnthropicStreamScrubber:
* **IN_THINK** – suppress until ``</think>``.
* **IN_TOOLCALL** – suppress until ``</tool_call>``.
* **IN_FUNCTION** – suppress until ``</function>``.
* **IN_PARAMETER** – suppress until ``</parameter>``.
"""

# --- Fixed (exact-match) tags ----------------------------------------
Expand Down Expand Up @@ -1729,14 +1731,15 @@ class _AnthropicStreamScrubber:
THINK_OPEN: "IN_THINK",
TOOL_OPEN: "IN_TOOLCALL",
FUNC_PREFIX: "IN_FUNCTION",
PARAM_PREFIX: "IN_FUNCTION", # parameters inside function blocks
PARAM_PREFIX: "IN_PARAMETER",
}

# Map from suppression mode → closing tag
_CLOSE_MAP = {
"IN_THINK": THINK_CLOSE,
"IN_TOOLCALL": TOOL_CLOSE,
"IN_FUNCTION": FUNC_CLOSE,
"IN_PARAMETER": PARAM_CLOSE,
}

def __init__(self) -> None:
Expand Down Expand Up @@ -1817,9 +1820,16 @@ def feed(self, delta: str) -> str:

if consume < 0:
# Prefix tag found but closing '>' missing – truncated.
carry_candidate = s[pos:]
if len(carry_candidate) > self.MAX_TAG * 2:
# Carry grew too large — this is a literal '<',
# not a real tag. Emit everything and move on.
out.append(s[i : pos + len(marker)])
i = pos + len(marker)
continue
if pos > i:
out.append(s[i:pos])
self.carry = s[pos:]
self.carry = carry_candidate
return "".join(out)

tag_end = pos + consume
Expand Down Expand Up @@ -1870,10 +1880,11 @@ def flush(self) -> str:
for tag in self._EXACT_TAGS:
result = result.replace(tag, "")
# Strip any residual prefix tags (e.g. ``<function=foo>``).
import re

result = re.sub(r"<function=[^>]*>", "", result)
result = re.sub(r"<parameter=[^>]*>", "", result)
# Strip partial prefix tags at end of stream (no closing '>').
result = re.sub(r"<function=[^>]*$", "", result)
result = re.sub(r"<parameter=[^>]*$", "", result)
self.carry = ""
return result
self.carry = ""
Expand Down Expand Up @@ -1912,6 +1923,7 @@ class _AnthropicStreamRouter:

_EXACT_TAGS = _AnthropicStreamScrubber._EXACT_TAGS
_PREFIX_TAGS = _AnthropicStreamScrubber._PREFIX_TAGS
MAX_TAG = _AnthropicStreamScrubber.MAX_TAG
CARRY_N = _AnthropicStreamScrubber.CARRY_N

_MODE_MAP = _AnthropicStreamScrubber._MODE_MAP
Expand All @@ -1923,7 +1935,6 @@ def __init__(self, start_in_thinking: bool = False) -> None:
# <think> into the prompt, so the first output IS thinking content).
self.mode: str = "IN_THINK" if start_in_thinking else "TEXT"
self.carry: str = ""
self._implicit_think = start_in_thinking
# Delegate marker scanning to a scrubber instance.
self._scanner = _AnthropicStreamScrubber()

Expand Down Expand Up @@ -1959,9 +1970,15 @@ def feed(self, delta: str) -> list[tuple[str, str]]:
pos, marker, consume = hit

if consume < 0:
carry_candidate = s[pos:]
if len(carry_candidate) > self.MAX_TAG * 2:
# Carry grew too large — emit as literal text.
pieces.append(("text", s[i : pos + len(marker)]))
i = pos + len(marker)
continue
if pos > i:
pieces.append(("text", s[i:pos]))
self.carry = s[pos:]
self.carry = carry_candidate
return pieces

tag_end = pos + consume
Expand Down Expand Up @@ -2001,7 +2018,7 @@ def feed(self, delta: str) -> list[tuple[str, str]]:
self.mode = "TEXT"

else:
# IN_TOOLCALL or IN_FUNCTION – suppress content.
# IN_TOOLCALL, IN_FUNCTION, or IN_PARAMETER – suppress.
close_tag = self._CLOSE_MAP[self.mode]
close_pos = s.find(close_tag, i)
if close_pos == -1:
Expand All @@ -2025,10 +2042,11 @@ def flush(self) -> list[tuple[str, str]]:
result = self.carry
for tag in self._EXACT_TAGS:
result = result.replace(tag, "")
import re

result = re.sub(r"<function=[^>]*>", "", result)
result = re.sub(r"<parameter=[^>]*>", "", result)
# Strip partial prefix tags at end of stream (no closing '>').
result = re.sub(r"<function=[^>]*$", "", result)
result = re.sub(r"<parameter=[^>]*$", "", result)
if result:
pieces.append(("text", result))
# IN_TOOLCALL/IN_FUNCTION – discard.
Expand Down Expand Up @@ -2116,6 +2134,10 @@ async def _stream_anthropic_messages(
if thinking_enabled:
# Use the stream router which yields typed (kind, text) pieces
# that separate thinking content from user-facing text.
# start_in_thinking=True is correct for current models (Qwen3/3.5
# with --reasoning-parser qwen3): the chat template injects <think>
# into the prompt, so the model's first output is thinking content
# WITHOUT an explicit <think> tag.
router: _AnthropicStreamRouter | None = _AnthropicStreamRouter(
start_in_thinking=True
)
Expand Down