Skip to content
2 changes: 1 addition & 1 deletion lib/crewai/src/crewai/events/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def on_llm_stream_chunk(source, event: LLMStreamChunkEvent):

# Read from the in-memory stream
content = self.text_stream.read()
_printer.print(content, end="", flush=True)
_printer.print(content)
self.next_chunk = self.text_stream.tell()

# ----------- LLM GUARDRAIL EVENTS -----------
Expand Down
263 changes: 195 additions & 68 deletions lib/crewai/src/crewai/llms/providers/anthropic/completion.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import logging
import os
from typing import Any
Expand Down Expand Up @@ -40,6 +39,7 @@ def __init__(
top_p: float | None = None,
stop_sequences: list[str] | None = None,
stream: bool = False,
client_params: dict[str, Any] | None = None,
**kwargs,
):
"""Initialize Anthropic chat completion client.
Expand All @@ -55,19 +55,20 @@ def __init__(
top_p: Nucleus sampling parameter
stop_sequences: Stop sequences (Anthropic uses stop_sequences, not stop)
stream: Enable streaming responses
client_params: Additional parameters for the Anthropic client
**kwargs: Additional parameters
"""
super().__init__(
model=model, temperature=temperature, stop=stop_sequences or [], **kwargs
)

# Initialize Anthropic client
self.client = Anthropic(
api_key=api_key or os.getenv("ANTHROPIC_API_KEY"),
base_url=base_url,
timeout=timeout,
max_retries=max_retries,
)
# Client params
self.client_params = client_params
self.base_url = base_url
self.timeout = timeout
self.max_retries = max_retries

self.client = Anthropic(**self._get_client_params())

# Store completion parameters
self.max_tokens = max_tokens
Expand All @@ -79,6 +80,26 @@ def __init__(
self.is_claude_3 = "claude-3" in model.lower()
self.supports_tools = self.is_claude_3 # Claude 3+ supports tool use

def _get_client_params(self) -> dict[str, Any]:
"""Get client parameters."""

if self.api_key is None:
self.api_key = os.getenv("ANTHROPIC_API_KEY")
if self.api_key is None:
raise ValueError("ANTHROPIC_API_KEY is required")

client_params = {
"api_key": self.api_key,
"base_url": self.base_url,
"timeout": self.timeout,
"max_retries": self.max_retries,
}

if self.client_params:
client_params.update(self.client_params)

return client_params

def call(
self,
messages: str | list[dict[str, str]],
Expand Down Expand Up @@ -183,20 +204,34 @@ def _prepare_completion_params(

def _convert_tools_for_interference(self, tools: list[dict]) -> list[dict]:
"""Convert CrewAI tool format to Anthropic tool use format."""
from crewai.llms.providers.utils.common import safe_tool_conversion

anthropic_tools = []

for tool in tools:
name, description, parameters = safe_tool_conversion(tool, "Anthropic")
if "input_schema" in tool and "name" in tool and "description" in tool:
anthropic_tools.append(tool)
continue

try:
from crewai.llms.providers.utils.common import safe_tool_conversion

name, description, parameters = safe_tool_conversion(tool, "Anthropic")
except (ImportError, KeyError, ValueError) as e:
logging.error(f"Error converting tool to Anthropic format: {e}")
raise e

anthropic_tool = {
"name": name,
"description": description,
}

if parameters and isinstance(parameters, dict):
anthropic_tool["input_schema"] = parameters # type: ignore
anthropic_tool["input_schema"] = parameters
else:
anthropic_tool["input_schema"] = {
"type": "object",
"properties": {},
"required": [],
}

anthropic_tools.append(anthropic_tool)

Expand Down Expand Up @@ -229,13 +264,11 @@ def _format_messages_for_anthropic(
content = message.get("content", "")

if role == "system":
# Extract system message - Anthropic handles it separately
if system_message:
system_message += f"\n\n{content}"
else:
system_message = content
else:
# Add user/assistant messages - ensure both role and content are str, not None
role_str = role if role is not None else "user"
content_str = content if content is not None else ""
formatted_messages.append({"role": role_str, "content": content_str})
Expand Down Expand Up @@ -270,22 +303,22 @@ def _handle_completion(
usage = self._extract_anthropic_token_usage(response)
self._track_token_usage_internal(usage)

# Check if Claude wants to use tools
if response.content and available_functions:
for content_block in response.content:
if isinstance(content_block, ToolUseBlock):
function_name = content_block.name
function_args = content_block.input

result = self._handle_tool_execution(
function_name=function_name,
function_args=function_args, # type: ignore
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)

if result is not None:
return result
tool_uses = [
block for block in response.content if isinstance(block, ToolUseBlock)
]

if tool_uses:
# Handle tool use conversation flow
return self._handle_tool_use_conversation(
response,
tool_uses,
params,
available_functions,
from_task,
from_agent,
)

# Extract text content
content = ""
Expand Down Expand Up @@ -318,12 +351,14 @@ def _handle_streaming_completion(
) -> str:
"""Handle streaming message completion."""
full_response = ""
tool_uses = {}

# Remove 'stream' parameter as messages.stream() doesn't accept it
# (the SDK sets it internally)
stream_params = {k: v for k, v in params.items() if k != "stream"}

# Make streaming API call
with self.client.messages.stream(**params) as stream:
with self.client.messages.stream(**stream_params) as stream:
for event in stream:
# Handle content delta events
if hasattr(event, "delta") and hasattr(event.delta, "text"):
text_delta = event.delta.text
full_response += text_delta
Expand All @@ -333,43 +368,28 @@ def _handle_streaming_completion(
from_agent=from_agent,
)

# Handle tool use events
elif hasattr(event, "delta") and hasattr(event.delta, "partial_json"):
# Tool use streaming - accumulate JSON
tool_id = getattr(event, "index", "default")
if tool_id not in tool_uses:
tool_uses[tool_id] = {
"name": "",
"input": "",
}

if hasattr(event.delta, "name"):
tool_uses[tool_id]["name"] = event.delta.name
if hasattr(event.delta, "partial_json"):
tool_uses[tool_id]["input"] += event.delta.partial_json

# Handle completed tool uses
if tool_uses and available_functions:
for tool_data in tool_uses.values():
function_name = tool_data["name"]

try:
function_args = json.loads(tool_data["input"])
except json.JSONDecodeError as e:
logging.error(f"Failed to parse streamed tool arguments: {e}")
continue

# Execute tool
result = self._handle_tool_execution(
function_name=function_name,
function_args=function_args,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
final_message: Message = stream.get_final_message()

usage = self._extract_anthropic_token_usage(final_message)
self._track_token_usage_internal(usage)

if result is not None:
return result
if final_message.content and available_functions:
tool_uses = [
block
for block in final_message.content
if isinstance(block, ToolUseBlock)
]

if tool_uses:
# Handle tool use conversation flow
return self._handle_tool_use_conversation(
final_message,
tool_uses,
params,
available_functions,
from_task,
from_agent,
)

# Apply stop words to full response
full_response = self._apply_stop_words(full_response)
Expand All @@ -385,6 +405,113 @@ def _handle_streaming_completion(

return full_response

def _handle_tool_use_conversation(
self,
initial_response: Message,
tool_uses: list[ToolUseBlock],
params: dict[str, Any],
available_functions: dict[str, Any],
from_task: Any | None = None,
from_agent: Any | None = None,
) -> str:
"""Handle the complete tool use conversation flow.

This implements the proper Anthropic tool use pattern:
1. Claude requests tool use
2. We execute the tools
3. We send tool results back to Claude
4. Claude processes results and generates final response
"""
# Execute all requested tools and collect results
tool_results = []

for tool_use in tool_uses:
function_name = tool_use.name
function_args = tool_use.input

# Execute the tool
result = self._handle_tool_execution(
function_name=function_name,
function_args=function_args, # type: ignore
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)

# Create tool result in Anthropic format
tool_result = {
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": str(result)
if result is not None
else "Tool execution completed",
}
tool_results.append(tool_result)

# Prepare follow-up conversation with tool results
follow_up_params = params.copy()

# Add Claude's tool use response to conversation
assistant_message = {"role": "assistant", "content": initial_response.content}

# Add user message with tool results
user_message = {"role": "user", "content": tool_results}

# Update messages for follow-up call
follow_up_params["messages"] = params["messages"] + [
assistant_message,
user_message,
]

try:
# Send tool results back to Claude for final response
final_response: Message = self.client.messages.create(**follow_up_params)

# Track token usage for follow-up call
follow_up_usage = self._extract_anthropic_token_usage(final_response)
self._track_token_usage_internal(follow_up_usage)

# Extract final text content
final_content = ""
if final_response.content:
for content_block in final_response.content:
if hasattr(content_block, "text"):
final_content += content_block.text

final_content = self._apply_stop_words(final_content)

# Emit completion event for the final response
self._emit_call_completed_event(
response=final_content,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=follow_up_params["messages"],
)

# Log combined token usage
total_usage = {
"input_tokens": follow_up_usage.get("input_tokens", 0),
"output_tokens": follow_up_usage.get("output_tokens", 0),
"total_tokens": follow_up_usage.get("total_tokens", 0),
}

if total_usage.get("total_tokens", 0) > 0:
logging.info(f"Anthropic API tool conversation usage: {total_usage}")

return final_content

except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded in tool follow-up: {e}")
raise LLMContextLengthExceededError(str(e)) from e

logging.error(f"Tool follow-up conversation failed: {e}")
# Fallback: return the first tool result if follow-up fails
if tool_results:
return tool_results[0]["content"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this could return unexpected results - not sure how to handle that other than raising raise LLMToolExecutionError("Failed to complete tool use conversation") from e, though

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is different from agent invoking tools. this is for the case of :

def anthropic_tool_use_runner():
    def get_weather(location: str) -> str:
        return f"The weather in {location} is sunny"

    llm = LLM(
        model="anthropic/claude-3-5-sonnet-20241022",
        api_key=os.getenv("ANTHROPIC_API_KEY"),
    )
    result = llm.call(
        messages=[{"role": "user", "content": "What is the weather in San Francisco?"}],
        available_functions={"get_weather": get_weather},
        tools=[
            {
                "type": "function",
                "function": {
                    "name": "get_weather",
                    "description": "Get the weather in a location",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "location": {
                                "type": "string",
                                "description": "The location to get the weather for",
                            }
                        },
                        "required": ["location"],
                    },
                },
            }
        ],
    )
    print("anthropic tool use result", result)


if __name__ == "__main__":
    anthropic_tool_use_runner()

raise e

def supports_function_calling(self) -> bool:
"""Check if the model supports function calling."""
return self.supports_tools
Expand Down
Loading