diff --git a/vllm/entrypoints/anthropic/protocol.py b/vllm/entrypoints/anthropic/protocol.py index af9430e78475..3081e9781034 100644 --- a/vllm/entrypoints/anthropic/protocol.py +++ b/vllm/entrypoints/anthropic/protocol.py @@ -34,7 +34,7 @@ class AnthropicUsage(BaseModel): class AnthropicContentBlock(BaseModel): """Content block in message""" - type: Literal["text", "image", "tool_use", "tool_result"] + type: Literal["text", "image", "tool_use", "tool_result", "thinking"] text: str | None = None # For image content source: dict[str, Any] | None = None @@ -45,6 +45,9 @@ class AnthropicContentBlock(BaseModel): input: dict[str, Any] | None = None content: str | list[dict[str, Any]] | None = None is_error: bool | None = None + # For thinking content + thinking: str | None = None + signature: str | None = None class AnthropicMessage(BaseModel): @@ -118,9 +121,14 @@ def validate_max_tokens(cls, v): class AnthropicDelta(BaseModel): """Delta for streaming responses""" - type: Literal["text_delta", "input_json_delta"] | None = None + type: ( + Literal["text_delta", "input_json_delta", "thinking_delta", "signature_delta"] + | None + ) = None text: str | None = None + thinking: str | None = None partial_json: str | None = None + signature: str | None = None # Message delta stop_reason: ( diff --git a/vllm/entrypoints/anthropic/serving.py b/vllm/entrypoints/anthropic/serving.py index dc037313de33..6318f854a1e4 100644 --- a/vllm/entrypoints/anthropic/serving.py +++ b/vllm/entrypoints/anthropic/serving.py @@ -8,6 +8,7 @@ import json import logging import time +import uuid from collections.abc import AsyncGenerator from typing import Any @@ -112,6 +113,7 @@ def _convert_anthropic_to_openai_request( # Handle complex content blocks content_parts: list[dict[str, Any]] = [] tool_calls: list[dict[str, Any]] = [] + reasoning_parts: list[str] = [] for block in msg.content: if block.type == "text" and block.text: @@ -123,6 +125,8 @@ def _convert_anthropic_to_openai_request( "image_url": {"url": block.source.get("data", "")}, } ) + elif block.type == "thinking" and block.thinking is not None: + reasoning_parts.append(block.thinking) elif block.type == "tool_use": # Convert tool use to function call format tool_call = { @@ -157,6 +161,9 @@ def _convert_anthropic_to_openai_request( } ) + if reasoning_parts: + openai_msg["reasoning"] = "".join(reasoning_parts) + # Add tool calls to the message if any if tool_calls: openai_msg["tool_calls"] = tool_calls # type: ignore @@ -167,7 +174,7 @@ def _convert_anthropic_to_openai_request( openai_msg["content"] = content_parts[0]["text"] else: openai_msg["content"] = content_parts # type: ignore - elif not tool_calls: + elif not tool_calls and not reasoning_parts: continue openai_messages.append(openai_msg) @@ -263,23 +270,32 @@ def messages_full_converter( output_tokens=generator.usage.completion_tokens, ), ) - if generator.choices[0].finish_reason == "stop": + choice = generator.choices[0] + if choice.finish_reason == "stop": result.stop_reason = "end_turn" - elif generator.choices[0].finish_reason == "length": + elif choice.finish_reason == "length": result.stop_reason = "max_tokens" - elif generator.choices[0].finish_reason == "tool_calls": + elif choice.finish_reason == "tool_calls": result.stop_reason = "tool_use" - content: list[AnthropicContentBlock] = [ - AnthropicContentBlock( - type="text", - text=generator.choices[0].message.content - if generator.choices[0].message.content - else "", + content: list[AnthropicContentBlock] = [] + if choice.message.reasoning: + content.append( + AnthropicContentBlock( + type="thinking", + thinking=choice.message.reasoning, + signature=uuid.uuid4().hex, + ) + ) + if choice.message.content: + content.append( + AnthropicContentBlock( + type="text", + text=choice.message.content, + ) ) - ] - for tool_call in generator.choices[0].message.tool_calls: + for tool_call in choice.message.tool_calls: anthropic_tool_call = AnthropicContentBlock( type="tool_use", id=tool_call.id, @@ -297,10 +313,85 @@ async def message_stream_converter( generator: AsyncGenerator[str, None], ) -> AsyncGenerator[str, None]: try: + + class _ActiveBlockState: + def __init__(self) -> None: + self.content_block_index = 0 + self.block_type: str | None = None + self.block_index: int | None = None + self.block_signature: str | None = None + self.signature_emitted: bool = False + self.tool_use_id: str | None = None + + def reset(self) -> None: + self.block_type = None + self.block_index = None + self.block_signature = None + self.signature_emitted = False + self.tool_use_id = None + + def start(self, block: AnthropicContentBlock) -> None: + self.block_type = block.type + self.block_index = self.content_block_index + if block.type == "thinking": + self.block_signature = uuid.uuid4().hex + self.signature_emitted = False + self.tool_use_id = None + elif block.type == "tool_use": + self.block_signature = None + self.signature_emitted = True + self.tool_use_id = block.id + else: + self.block_signature = None + self.signature_emitted = True + self.tool_use_id = None + first_item = True finish_reason = None - content_block_index = 0 - content_block_started = False + state = _ActiveBlockState() + # Map from tool call index to tool_use_id + tool_index_to_id: dict[int, str] = {} + + def stop_active_block(): + events: list[str] = [] + if state.block_type is None: + return events + if ( + state.block_type == "thinking" + and state.block_signature is not None + and not state.signature_emitted + ): + chunk = AnthropicStreamEvent( + index=state.block_index, + type="content_block_delta", + delta=AnthropicDelta( + type="signature_delta", + signature=state.block_signature, + ), + ) + data = chunk.model_dump_json(exclude_unset=True) + events.append(wrap_data_with_event(data, "content_block_delta")) + state.signature_emitted = True + stop_chunk = AnthropicStreamEvent( + index=state.block_index, + type="content_block_stop", + ) + data = stop_chunk.model_dump_json(exclude_unset=True) + events.append(wrap_data_with_event(data, "content_block_stop")) + state.reset() + state.content_block_index += 1 + return events + + def start_block(block: AnthropicContentBlock): + chunk = AnthropicStreamEvent( + index=state.content_block_index, + type="content_block_start", + content_block=block, + ) + data = chunk.model_dump_json(exclude_unset=True) + event = wrap_data_with_event(data, "content_block_start") + state.start(block) + return event async for item in generator: if item.startswith("data:"): @@ -326,6 +417,8 @@ async def message_stream_converter( id=origin_chunk.id, content=[], model=origin_chunk.model, + stop_reason=None, + stop_sequence=None, usage=AnthropicUsage( input_tokens=origin_chunk.usage.prompt_tokens if origin_chunk.usage @@ -341,13 +434,8 @@ async def message_stream_converter( # last chunk including usage info if len(origin_chunk.choices) == 0: - if content_block_started: - stop_chunk = AnthropicStreamEvent( - index=content_block_index, - type="content_block_stop", - ) - data = stop_chunk.model_dump_json(exclude_unset=True) - yield wrap_data_with_event(data, "content_block_stop") + for event in stop_active_block(): + yield event stop_reason = self.stop_reason_map.get( finish_reason or "stop" ) @@ -369,96 +457,139 @@ async def message_stream_converter( if origin_chunk.choices[0].finish_reason is not None: finish_reason = origin_chunk.choices[0].finish_reason - continue + # continue - # content - if origin_chunk.choices[0].delta.content is not None: - if not content_block_started: + # thinking / text content + reasoning_delta = origin_chunk.choices[0].delta.reasoning + if reasoning_delta is not None: + if reasoning_delta == "": + pass + else: + if state.block_type != "thinking": + for event in stop_active_block(): + yield event + start_event = start_block( + AnthropicContentBlock( + type="thinking", thinking="" + ) + ) + yield start_event chunk = AnthropicStreamEvent( - index=content_block_index, - type="content_block_start", - content_block=AnthropicContentBlock( - type="text", text="" + index=( + state.block_index + if state.block_index is not None + else state.content_block_index + ), + type="content_block_delta", + delta=AnthropicDelta( + type="thinking_delta", + thinking=reasoning_delta, ), ) data = chunk.model_dump_json(exclude_unset=True) - yield wrap_data_with_event(data, "content_block_start") - content_block_started = True + yield wrap_data_with_event(data, "content_block_delta") + if origin_chunk.choices[0].delta.content is not None: if origin_chunk.choices[0].delta.content == "": - continue - chunk = AnthropicStreamEvent( - index=content_block_index, - type="content_block_delta", - delta=AnthropicDelta( - type="text_delta", - text=origin_chunk.choices[0].delta.content, - ), - ) - data = chunk.model_dump_json(exclude_unset=True) - yield wrap_data_with_event(data, "content_block_delta") - continue - - # tool calls - elif len(origin_chunk.choices[0].delta.tool_calls) > 0: - tool_call = origin_chunk.choices[0].delta.tool_calls[0] - if tool_call.id is not None: - if content_block_started: - stop_chunk = AnthropicStreamEvent( - index=content_block_index, - type="content_block_stop", - ) - data = stop_chunk.model_dump_json( - exclude_unset=True - ) - yield wrap_data_with_event( - data, "content_block_stop" + pass + else: + if state.block_type != "text": + for event in stop_active_block(): + yield event + start_event = start_block( + AnthropicContentBlock(type="text", text="") ) - content_block_started = False - content_block_index += 1 - + yield start_event chunk = AnthropicStreamEvent( - index=content_block_index, - type="content_block_start", - content_block=AnthropicContentBlock( - type="tool_use", - id=tool_call.id, - name=tool_call.function.name - if tool_call.function - else None, - input={}, + index=( + state.block_index + if state.block_index is not None + else state.content_block_index ), - ) - data = chunk.model_dump_json(exclude_unset=True) - yield wrap_data_with_event(data, "content_block_start") - content_block_started = True - if tool_call.function and tool_call.function.arguments: - chunk = AnthropicStreamEvent( - index=content_block_index, - type="content_block_delta", - delta=AnthropicDelta( - type="input_json_delta", - partial_json=tool_call.function.arguments, - ), - ) - data = chunk.model_dump_json(exclude_unset=True) - yield wrap_data_with_event( - data, "content_block_delta" - ) - - else: - chunk = AnthropicStreamEvent( - index=content_block_index, type="content_block_delta", delta=AnthropicDelta( - type="input_json_delta", - partial_json=tool_call.function.arguments - if tool_call.function - else None, + type="text_delta", + text=origin_chunk.choices[0].delta.content, ), ) data = chunk.model_dump_json(exclude_unset=True) yield wrap_data_with_event(data, "content_block_delta") + + # tool calls - process all tool calls in the delta + if len(origin_chunk.choices[0].delta.tool_calls) > 0: + for tool_call in origin_chunk.choices[0].delta.tool_calls: + if tool_call.id is not None: + # Update mapping for incremental updates + tool_index_to_id[tool_call.index] = tool_call.id + # Only create new block if different tool call + # AND has a name + tool_name = ( + tool_call.function.name + if tool_call.function + else None + ) + if ( + state.tool_use_id != tool_call.id + and tool_name is not None + ): + for event in stop_active_block(): + yield event + start_event = start_block( + AnthropicContentBlock( + type="tool_use", + id=tool_call.id, + name=tool_name, + input={}, + ) + ) + yield start_event + # Handle initial arguments if present + if ( + tool_call.function + and tool_call.function.arguments + and state.tool_use_id == tool_call.id + ): + chunk = AnthropicStreamEvent( + index=( + state.block_index + if state.block_index is not None + else state.content_block_index + ), + type="content_block_delta", + delta=AnthropicDelta( + type="input_json_delta", + partial_json=tool_call.function.arguments, + ), + ) + data = chunk.model_dump_json(exclude_unset=True) + yield wrap_data_with_event( + data, "content_block_delta" + ) + else: + # Incremental update - use index to find tool_use_id + tool_use_id = tool_index_to_id.get(tool_call.index) + if ( + tool_use_id is not None + and tool_call.function + and tool_call.function.arguments + and state.tool_use_id == tool_use_id + ): + chunk = AnthropicStreamEvent( + index=( + state.block_index + if state.block_index is not None + else state.content_block_index + ), + type="content_block_delta", + delta=AnthropicDelta( + type="input_json_delta", + partial_json=tool_call.function.arguments, + ), + ) + data = chunk.model_dump_json(exclude_unset=True) + yield wrap_data_with_event( + data, "content_block_delta" + ) continue else: error_response = AnthropicStreamEvent(