From 2455735a370a615c76e214e55e11681c322a8e2d Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Thu, 26 Jun 2025 11:11:59 +0200 Subject: [PATCH] fix(anthropic): prevent streaming tool calling responses when internal execution is enabled - For streaming, block tool calling ChatResponse unless internal execution is disabled - Add streaming validation test and debug logging Related to #3650 Signed-off-by: Christian Tzolov --- .../ai/anthropic/AnthropicChatModel.java | 45 +++++++++++-------- .../ai/anthropic/api/AnthropicApi.java | 26 +++++++---- .../client/AnthropicChatClientIT.java | 43 ++++++++++++++++-- ...lientMethodInvokingFunctionCallbackIT.java | 2 +- .../application-logging-test.properties | 2 + 5 files changed, 86 insertions(+), 32 deletions(-) diff --git a/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/AnthropicChatModel.java b/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/AnthropicChatModel.java index 270f3bef43d..02407e52c41 100644 --- a/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/AnthropicChatModel.java +++ b/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/AnthropicChatModel.java @@ -260,26 +260,33 @@ public Flux internalStream(Prompt prompt, ChatResponse previousCha Usage accumulatedUsage = UsageCalculator.getCumulativeUsage(currentChatResponseUsage, previousChatResponse); ChatResponse chatResponse = toChatResponse(chatCompletionResponse, accumulatedUsage); - if (this.toolExecutionEligibilityPredicate.isToolExecutionRequired(prompt.getOptions(), chatResponse) && chatResponse.hasFinishReasons(Set.of("tool_use"))) { - // FIXME: bounded elastic needs to be used since tool calling - // is currently only synchronous - return Flux.defer(() -> { - var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, chatResponse); - if (toolExecutionResult.returnDirect()) { - // Return tool execution result directly to the client. - return Flux.just(ChatResponse.builder().from(chatResponse) - .generations(ToolExecutionResult.buildGenerations(toolExecutionResult)) - .build()); - } - else { - // Send the tool execution result back to the model. - return this.internalStream(new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()), - chatResponse); - } - }).subscribeOn(Schedulers.boundedElastic()); - } + if (this.toolExecutionEligibilityPredicate.isToolExecutionRequired(prompt.getOptions(), chatResponse)) { + + if (chatResponse.hasFinishReasons(Set.of("tool_use"))) { + // FIXME: bounded elastic needs to be used since tool calling + // is currently only synchronous + return Flux.defer(() -> { + var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, chatResponse); + if (toolExecutionResult.returnDirect()) { + // Return tool execution result directly to the client. + return Flux.just(ChatResponse.builder().from(chatResponse) + .generations(ToolExecutionResult.buildGenerations(toolExecutionResult)) + .build()); + } + else { + // Send the tool execution result back to the model. + return this.internalStream(new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()), + chatResponse); + } + }).subscribeOn(Schedulers.boundedElastic()); + } else { + return Mono.empty(); + } - return Mono.just(chatResponse); + } else { + // If internal tool execution is not required, just return the chat response. + return Mono.just(chatResponse); + } }) .doOnError(observation::error) .doFinally(s -> observation.stop()) diff --git a/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/api/AnthropicApi.java b/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/api/AnthropicApi.java index 768690e8f0f..cf410690216 100644 --- a/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/api/AnthropicApi.java +++ b/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/api/AnthropicApi.java @@ -24,15 +24,8 @@ import java.util.function.Consumer; import java.util.function.Predicate; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonInclude.Include; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.ai.anthropic.api.StreamHelper.ChatCompletionResponseBuilder; import org.springframework.ai.model.ApiKey; import org.springframework.ai.model.ChatModelDescription; @@ -52,6 +45,16 @@ import org.springframework.web.client.RestClient; import org.springframework.web.reactive.function.client.WebClient; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + /** * The Anthropic API client. * @@ -67,6 +70,8 @@ */ public final class AnthropicApi { + private static final Logger logger = LoggerFactory.getLogger(AnthropicApi.class); + public static Builder builder() { return new Builder(); } @@ -222,6 +227,9 @@ public Flux chatCompletionStream(ChatCompletionRequest c .filter(event -> event.type() != EventType.PING) // Detect if the chunk is part of a streaming function call. .map(event -> { + + logger.debug("Received event: {}", event); + if (this.streamHelper.isToolUseStart(event)) { isInsideTool.set(true); } diff --git a/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/client/AnthropicChatClientIT.java b/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/client/AnthropicChatClientIT.java index 13882775d3f..2baf73dd97e 100644 --- a/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/client/AnthropicChatClientIT.java +++ b/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/client/AnthropicChatClientIT.java @@ -16,6 +16,8 @@ package org.springframework.ai.anthropic.client; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.IOException; import java.net.URL; import java.util.Arrays; @@ -29,8 +31,6 @@ import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; - import org.springframework.ai.anthropic.AnthropicChatOptions; import org.springframework.ai.anthropic.AnthropicTestConfiguration; import org.springframework.ai.anthropic.api.AnthropicApi; @@ -41,7 +41,9 @@ import org.springframework.ai.chat.model.ChatResponse; import org.springframework.ai.converter.BeanOutputConverter; import org.springframework.ai.converter.ListOutputConverter; +import org.springframework.ai.model.tool.ToolCallingChatOptions; import org.springframework.ai.test.CurlyBracketEscaper; +import org.springframework.ai.tool.annotation.Tool; import org.springframework.ai.tool.function.FunctionToolCallback; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -53,7 +55,7 @@ import org.springframework.test.context.ActiveProfiles; import org.springframework.util.MimeTypeUtils; -import static org.assertj.core.api.Assertions.assertThat; +import reactor.core.publisher.Flux; @SpringBootTest(classes = AnthropicTestConfiguration.class, properties = "spring.ai.retry.on-http-codes=429") @EnabledIfEnvironmentVariable(named = "ANTHROPIC_API_KEY", matches = ".+") @@ -343,4 +345,39 @@ record ActorsFilms(String actor, List movies) { } + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "claude-3-7-sonnet-latest", "claude-sonnet-4-0" }) + void streamToolCallingResponseShouldNotContainToolCallMessages(String modelName) { + + ChatClient chatClient = ChatClient.builder(this.chatModel).build(); + + Flux responses = chatClient.prompt() + .options(ToolCallingChatOptions.builder().model(modelName).build()) + .tools(new MyTools()) + .user("Get current weather in Amsterdam and Paris") + // .user("Get current weather in Amsterdam. Please don't explain that you will + // call tools.") + .stream() + .chatResponse(); + + List chatResponses = responses.collectList().block(); + + assertThat(chatResponses).isNotEmpty(); + + // Verify that none of the ChatResponse objects have tool calls + chatResponses.forEach(chatResponse -> { + logger.info("ChatResponse Results: {}", chatResponse.getResults()); + assertThat(chatResponse.hasToolCalls()).isFalse(); + }); + } + + public static class MyTools { + + @Tool(description = "Get the current weather forecast by city name") + String getCurrentDateTime(String cityName) { + return "For " + cityName + " Weather is hot and sunny with a temperature of 20 degrees"; + } + + } + } diff --git a/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/client/AnthropicChatClientMethodInvokingFunctionCallbackIT.java b/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/client/AnthropicChatClientMethodInvokingFunctionCallbackIT.java index 2fa9bccf6bf..56d245a846f 100644 --- a/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/client/AnthropicChatClientMethodInvokingFunctionCallbackIT.java +++ b/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/client/AnthropicChatClientMethodInvokingFunctionCallbackIT.java @@ -290,7 +290,7 @@ void streamingParameterLessTool(String modelName) { .map(cr -> cr.getResult().getOutput().getText()) .collect(Collectors.joining()); - assertThat(content).contains("20 degrees"); + assertThat(content).contains("20"); } public static class ParameterLessTools { diff --git a/models/spring-ai-anthropic/src/test/resources/application-logging-test.properties b/models/spring-ai-anthropic/src/test/resources/application-logging-test.properties index 4466a718052..ee0d793d8b6 100644 --- a/models/spring-ai-anthropic/src/test/resources/application-logging-test.properties +++ b/models/spring-ai-anthropic/src/test/resources/application-logging-test.properties @@ -15,3 +15,5 @@ # logging.level.org.springframework.ai.chat.client.advisor=DEBUG + +logging.level.org.springframework.ai.anthropic.api.AnthropicApi=DEBUG