Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions litellm/integrations/custom_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
AsyncGenerator,
Dict,
List,
NamedTuple,
Optional,
Tuple,
Union,
Expand Down Expand Up @@ -43,6 +44,9 @@
MCPPreCallRequestObject,
MCPPreCallResponseObject,
)
from litellm.types.llms.anthropic_messages.anthropic_response import (
AnthropicMessagesResponse,
)
from litellm.types.router import PreRoutingHookResponse

Span = Union[_Span, Any]
Expand All @@ -56,6 +60,7 @@
MCPDuringCallRequestObject = Any
MCPDuringCallResponseObject = Any
PreRoutingHookResponse = Any
AnthropicMessagesResponse = Any


_BASE64_INLINE_PATTERN = re.compile(
Expand All @@ -64,6 +69,19 @@
)


class ToolCallResult(NamedTuple):
"""Result of executing a single tool call via async_execute_tool_calls."""

tool_call_id: str
"""The id of the tool_use block that was executed."""

content: str
"""Text result to return to the model."""

is_error: bool
"""Whether this result represents an error."""


class CustomLogger: # https://docs.litellm.ai/docs/observability/custom_callback#callback-class
# Class variables or attributes
def __init__(
Expand Down Expand Up @@ -533,6 +551,34 @@ async def async_post_mcp_tool_call_hook(
"""
return None

#########################################################
# TOOL EXECUTION HOOKS (simplified tool interception)
#########################################################

async def async_execute_tool_calls(
self,
response: Union["AnthropicMessagesResponse", ModelResponse],
kwargs: Dict,
) -> List[ToolCallResult]:
"""
Detect and execute tool calls in the model response.

This is the simplified alternative to the two-step
async_should_run_agentic_loop / async_run_agentic_loop pattern.
Callbacks only need to detect tool calls and return results — the
framework handles message construction, thinking block preservation,
max_tokens adjustment, kwargs cleanup, and follow-up API requests.

Args:
response: Model response (AnthropicMessagesResponse dict, or ModelResponse)
kwargs: Full request kwargs (includes custom_llm_provider, tools, etc.)

Returns:
List of ToolCallResult for tool calls this callback handled.
Empty list means nothing was handled (skip this callback).
"""
return []

#########################################################
# AGENTIC LOOP HOOKS (for litellm.messages + future completion support)
#########################################################
Expand Down
263 changes: 262 additions & 1 deletion litellm/llms/custom_httpx/llm_http_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1984,7 +1984,22 @@ async def async_anthropic_messages_handler(
logging_obj=logging_obj,
)

# Call agentic completion hooks
# Call simplified tool execution hooks first (new pattern)
tool_exec_response = await self._call_tool_execution_hooks(
response=initial_response,
model=model,
messages=messages,
anthropic_messages_provider_config=anthropic_messages_provider_config,
anthropic_messages_optional_request_params=anthropic_messages_optional_request_params,
logging_obj=logging_obj,
stream=stream or False,
custom_llm_provider=custom_llm_provider,
kwargs=kwargs,
)
if tool_exec_response is not None:
return tool_exec_response

# Call legacy agentic completion hooks (old two-step pattern)
final_response = await self._call_agentic_completion_hooks(
response=initial_response,
model=model,
Expand Down Expand Up @@ -4390,6 +4405,252 @@ def _prepare_fake_stream_request(
return stream, data
return stream, data

async def _call_tool_execution_hooks(
self,
response: Any,
model: str,
messages: List[Dict],
anthropic_messages_provider_config: "BaseAnthropicMessagesConfig",
anthropic_messages_optional_request_params: Dict,
logging_obj: "LiteLLMLoggingObj",
stream: bool,
custom_llm_provider: str,
kwargs: Dict,
) -> Optional[Any]:
"""
Check all callbacks for tool calls to execute via the simplified
async_execute_tool_calls hook.

Aggregates results from all callbacks, then handles the agentic loop.
Multiple callbacks can handle different tool calls from the same response.

Returns the final response after tool execution, or None if no hooks fired.
"""
from litellm.integrations.custom_logger import CustomLogger, ToolCallResult

callbacks = litellm.callbacks + (
logging_obj.dynamic_success_callbacks or []
)

all_results: List[ToolCallResult] = []
working_response = response # progressively filtered

# Ensure custom_llm_provider is in kwargs for callbacks
kwargs_with_provider = kwargs.copy() if kwargs else {}
kwargs_with_provider["custom_llm_provider"] = custom_llm_provider

for callback in callbacks:
try:
if not isinstance(callback, CustomLogger):
continue

results = await callback.async_execute_tool_calls(
response=working_response,
kwargs=kwargs_with_provider,
)
if results:
all_results.extend(results)
# Remove handled tool_use blocks so next callback only sees unhandled ones
handled_ids = {r.tool_call_id for r in results}
working_response = self._filter_handled_tool_calls(
working_response, handled_ids
)
except Exception as e:
verbose_logger.exception(
f"LiteLLM.ToolExecutionHookError: Exception in tool execution hooks: {str(e)}"
)

if not all_results:
return None

# Framework builds messages and makes follow-up request
return await self._complete_tool_execution_loop(
response=response, # original unfiltered response
results=all_results,
model=model,
messages=messages,
anthropic_messages_optional_request_params=anthropic_messages_optional_request_params,
logging_obj=logging_obj,
kwargs=kwargs,
)

@staticmethod
def _filter_handled_tool_calls(
response: Any,
handled_ids: set,
) -> Any:
"""
Return a copy of ``response`` with tool_use blocks whose ids are in
``handled_ids`` removed. This lets subsequent callbacks only see
unhandled tool calls.
"""
if isinstance(response, dict):
content = response.get("content")
if content is None:
return response
filtered = [
b for b in content
if not (
(isinstance(b, dict) and b.get("type") == "tool_use" and b.get("id") in handled_ids)
or (hasattr(b, "type") and getattr(b, "type", None) == "tool_use" and getattr(b, "id", None) in handled_ids)
)
]
return {**response, "content": filtered}
# Object-style response
if hasattr(response, "content") and response.content is not None:
import copy

filtered = [
b for b in response.content
if not (
getattr(b, "type", None) == "tool_use"
and getattr(b, "id", None) in handled_ids
)
]
new_resp = copy.copy(response)
new_resp.content = filtered
return new_resp
return response

async def _complete_tool_execution_loop(
self,
response: Any,
results: List,
model: str,
messages: List[Dict],
anthropic_messages_optional_request_params: Dict,
logging_obj: "LiteLLMLoggingObj",
kwargs: Dict,
) -> Any:
"""
Framework handles ALL the plumbing after callbacks return ToolCallResults:
1. Extract thinking blocks + tool_use blocks from original response
2. Build assistant message (thinking + tool_use)
3. Build tool_result user message (matching results by tool_call_id)
4. Adjust max_tokens for thinking token usage
5. Clean kwargs of internal keys
6. Make follow-up API request
7. Return final response
"""
from litellm.anthropic_interface import messages as anthropic_messages
from litellm.litellm_core_utils.core_helpers import filter_internal_params

# ---- 1. Extract thinking blocks and tool_use blocks from response ----
if isinstance(response, dict):
content = response.get("content", [])
else:
content = getattr(response, "content", None) or []

thinking_blocks: List[Dict] = []
tool_use_blocks: List[Dict] = []
handled_ids = {r.tool_call_id for r in results}

for block in content:
if isinstance(block, dict):
btype = block.get("type")
bid = block.get("id")
else:
btype = getattr(block, "type", None)
bid = getattr(block, "id", None)

if btype in ("thinking", "redacted_thinking"):
if isinstance(block, dict):
thinking_blocks.append(block)
else:
normalized: Dict[str, Any] = {"type": btype}
for attr in ("thinking", "data", "signature"):
if hasattr(block, attr):
normalized[attr] = getattr(block, attr)
thinking_blocks.append(normalized)

elif btype == "tool_use" and bid in handled_ids:
if isinstance(block, dict):
tool_use_blocks.append(block)
else:
tool_use_blocks.append({
"type": "tool_use",
"id": bid,
"name": getattr(block, "name", ""),
"input": getattr(block, "input", {}),
})

# ---- 2. Build assistant message (thinking first, then tool_use) ----
assistant_content: List[Dict] = []
if thinking_blocks:
assistant_content.extend(thinking_blocks)
assistant_content.extend(tool_use_blocks)

assistant_message = {"role": "assistant", "content": assistant_content}

# ---- 3. Build tool_result user message ----
results_by_id = {r.tool_call_id: r for r in results}
tool_result_blocks = []
for tu in tool_use_blocks:
tc_id = tu["id"]
r = results_by_id.get(tc_id)
block: Dict[str, Any] = {
"type": "tool_result",
"tool_use_id": tc_id,
"content": r.content if r else "",
}
if r and r.is_error:
block["is_error"] = True
tool_result_blocks.append(block)

user_message = {"role": "user", "content": tool_result_blocks}

# ---- 4. Prepare max_tokens (subtract thinking usage if present) ----
max_tokens = anthropic_messages_optional_request_params.get(
"max_tokens",
kwargs.get("max_tokens", 1024),
)

# If thinking is enabled, subtract thinking token usage from max_tokens
if isinstance(response, dict):
usage = response.get("usage", {})
else:
usage = getattr(response, "usage", None)
if usage and not isinstance(usage, dict):
usage = getattr(usage, "__dict__", {})
if usage:
# cache_creation_input_tokens is used in some responses; thinking tokens
# are not currently reported separately, but this is where we'd adjust.
pass

# ---- 5. Clean kwargs for follow-up request ----
kwargs_for_followup = filter_internal_params(kwargs)

# ---- 6. Resolve full model name ----
full_model_name = model
if logging_obj is not None:
agentic_params = logging_obj.model_call_details.get(
"agentic_loop_params", {}
)
full_model_name = agentic_params.get("model", model)

optional_params_without_max_tokens = {
k: v
for k, v in anthropic_messages_optional_request_params.items()
if k != "max_tokens"
}

follow_up_messages = messages + [assistant_message, user_message]

verbose_logger.debug(
f"ToolExecutionLoop: Making follow-up request with "
f"{len(results)} tool result(s), model={full_model_name}"
)

# ---- 7. Make follow-up API request ----
final_response = await anthropic_messages.acreate(
max_tokens=max_tokens,
messages=follow_up_messages,
model=full_model_name,
**optional_params_without_max_tokens,
**kwargs_for_followup,
)
return final_response

async def _call_agentic_completion_hooks(
self,
response: Any,
Expand Down
Loading