Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync
(ChatClientAgentThread safeThread, ChatOptions? chatOptions, List<ChatMessage> inputMessagesForChatClient, IList<ChatMessage>? aiContextProviderMessages) =
await this.PrepareThreadAndMessagesAsync(thread, inputMessages, options, cancellationToken).ConfigureAwait(false);

ValidateStreamResumptionAllowed(chatOptions?.ContinuationToken, safeThread);

var chatClient = this.ChatClient;

chatClient = ApplyRunOptionsTransformations(options, chatClient);
Expand Down Expand Up @@ -621,6 +623,12 @@ await thread.AIContextProvider.InvokedAsync(new(inputMessages, aiContextProvider
{
throw new InvalidOperationException("Input messages are not allowed when continuing a background response using a continuation token.");
}

if (chatOptions?.ContinuationToken is not null && typedThread.ConversationId is null && typedThread.MessageStore is null)
{
throw new InvalidOperationException("Continuation tokens are not allowed to be used for initial runs.");
}

List<ChatMessage> inputMessagesForChatClient = [];
IList<ChatMessage>? aiContextProviderMessages = null;

Expand Down Expand Up @@ -731,6 +739,28 @@ private static Task NotifyMessageStoreOfNewMessagesAsync(ChatClientAgentThread t
return Task.CompletedTask;
}

private static void ValidateStreamResumptionAllowed(ResponseContinuationToken? continuationToken, ChatClientAgentThread safeThread)
{
if (continuationToken is null)
{
return;
}

// Streaming resumption is only supported with threads managed by the agent service because, currently, there's no good solution
// to collect updates received in failed runs and pass them to the last successful run so it can store them to the message store.
if (safeThread.MessageStore is not null)
{
throw new NotSupportedException("Using non-agent service store for chat history with streaming resumption is not supported.");
}

// Similarly, streaming resumption is not supported when a context provider is used because, currently, there's no good solution
// to collect updates received in failed runs and pass them to the last successful run so it can notify the context provider of the updates.
if (safeThread.AIContextProvider is not null)
{
throw new NotSupportedException("Using context provider with streaming resumption is not supported.");
}
}

private string GetLoggingAgentName() => this.Name ?? "UnnamedAgent";
#endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -2149,7 +2149,7 @@ public async Task RunAsyncPropagatesBackgroundResponsesPropertiesToChatClientAsy
It.IsAny<ChatOptions>(),
It.IsAny<CancellationToken>()))
.Callback<IEnumerable<ChatMessage>, ChatOptions, CancellationToken>((m, co, ct) => capturedChatOptions = co)
.ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")]) { ContinuationToken = null });
.ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")]) { ContinuationToken = null, ConversationId = "conversation-id" });

AgentRunOptions agentRunOptions;

Expand All @@ -2174,7 +2174,7 @@ public async Task RunAsyncPropagatesBackgroundResponsesPropertiesToChatClientAsy

ChatClientAgent agent = new(mockChatClient.Object);

ChatClientAgentThread thread = new();
ChatClientAgentThread thread = new() { ConversationId = "conversation-id" };

// Act
await agent.RunAsync(thread, options: agentRunOptions);
Expand All @@ -2199,7 +2199,7 @@ public async Task RunAsyncPrioritizesBackgroundResponsesPropertiesFromAgentRunOp
It.IsAny<ChatOptions>(),
It.IsAny<CancellationToken>()))
.Callback<IEnumerable<ChatMessage>, ChatOptions, CancellationToken>((m, co, ct) => capturedChatOptions = co)
.ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")]) { ContinuationToken = null });
.ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")]) { ContinuationToken = null, ConversationId = "conversation-id" });

ChatOptions chatOptions = new()
{
Expand All @@ -2213,10 +2213,12 @@ public async Task RunAsyncPrioritizesBackgroundResponsesPropertiesFromAgentRunOp
ContinuationToken = continuationToken2
};

ChatClientAgentThread thread = new() { ConversationId = "conversation-id" };

ChatClientAgent agent = new(mockChatClient.Object);

// Act
await agent.RunAsync(options: agentRunOptions);
await agent.RunAsync(thread, options: agentRunOptions);

// Assert
Assert.NotNull(capturedChatOptions);
Expand All @@ -2232,8 +2234,8 @@ public async Task RunStreamingAsyncPropagatesBackgroundResponsesPropertiesToChat
// Arrange
ChatResponseUpdate[] returnUpdates =
[
new ChatResponseUpdate(role: ChatRole.Assistant, content: "wh"),
new ChatResponseUpdate(role: ChatRole.Assistant, content: "at?"),
new ChatResponseUpdate(role: ChatRole.Assistant, content: "wh") { ConversationId = "conversation-id" },
new ChatResponseUpdate(role: ChatRole.Assistant, content: "at?") { ConversationId = "conversation-id" },
];

var continuationToken = ResponseContinuationToken.FromBytes(new byte[] { 1, 2, 3 });
Expand Down Expand Up @@ -2270,7 +2272,7 @@ public async Task RunStreamingAsyncPropagatesBackgroundResponsesPropertiesToChat

ChatClientAgent agent = new(mockChatClient.Object);

ChatClientAgentThread thread = new();
ChatClientAgentThread thread = new() { ConversationId = "conversation-id" };

// Act
await foreach (var _ in agent.RunStreamingAsync(thread, options: agentRunOptions))
Expand All @@ -2290,7 +2292,7 @@ public async Task RunStreamingAsyncPrioritizesBackgroundResponsesPropertiesFromA
// Arrange
ChatResponseUpdate[] returnUpdates =
[
new ChatResponseUpdate(role: ChatRole.Assistant, content: "wh"),
new ChatResponseUpdate(role: ChatRole.Assistant, content: "wh") { ConversationId = "conversation-id" },
];

var continuationToken1 = ResponseContinuationToken.FromBytes(new byte[] { 1, 2, 3 });
Expand Down Expand Up @@ -2319,8 +2321,10 @@ public async Task RunStreamingAsyncPrioritizesBackgroundResponsesPropertiesFromA

ChatClientAgent agent = new(mockChatClient.Object);

var thread = new ChatClientAgentThread() { ConversationId = "conversation-id" };

// Act
await foreach (var _ in agent.RunStreamingAsync(options: agentRunOptions))
await foreach (var _ in agent.RunStreamingAsync(thread, options: agentRunOptions))
{
}

Expand Down Expand Up @@ -2551,9 +2555,10 @@ public async Task RunStreamingAsyncSkipsThreadMessagePopulationWithContinuationT
AgentRunOptions runOptions = new() { ContinuationToken = ResponseContinuationToken.FromBytes(new byte[] { 1, 2, 3 }) };

// Act
await agent.RunStreamingAsync([], thread, options: runOptions).ToListAsync();
var exception = await Assert.ThrowsAsync<NotSupportedException>(async () => await agent.RunStreamingAsync(thread, options: runOptions).ToListAsync());

// Assert
Assert.Equal("Using non-agent service store for chat history with streaming resumption is not supported.", exception.Message);

// With continuation token, thread message population should be skipped
Assert.Empty(capturedMessages);
Expand Down Expand Up @@ -2623,6 +2628,128 @@ await Assert.ThrowsAsync<InvalidOperationException>(async () =>
Times.Never);
}

[Fact]
public async Task RunAsyncThrowsWhenContinuationTokenProvidedForInitialRunAsync()
{
// Arrange
Mock<IChatClient> mockChatClient = new();

ChatClientAgent agent = new(mockChatClient.Object);

// Create a new thread with no ConversationId and no MessageStore (initial run state)
ChatClientAgentThread thread = new();

AgentRunOptions runOptions = new() { ContinuationToken = ResponseContinuationToken.FromBytes(new byte[] { 1, 2, 3 }) };

// Act & Assert
var exception = await Assert.ThrowsAsync<InvalidOperationException>(() => agent.RunAsync(thread: thread, options: runOptions));
Assert.Equal("Continuation tokens are not allowed to be used for initial runs.", exception.Message);

// Verify that the IChatClient was never called due to early validation
mockChatClient.Verify(
c => c.GetResponseAsync(
It.IsAny<IEnumerable<ChatMessage>>(),
It.IsAny<ChatOptions>(),
It.IsAny<CancellationToken>()),
Times.Never);
}

[Fact]
public async Task RunStreamingAsyncThrowsWhenContinuationTokenProvidedForInitialRunAsync()
{
// Arrange
Mock<IChatClient> mockChatClient = new();

ChatClientAgent agent = new(mockChatClient.Object);

// Create a new thread with no ConversationId and no MessageStore (initial run state)
ChatClientAgentThread thread = new();

AgentRunOptions runOptions = new() { ContinuationToken = ResponseContinuationToken.FromBytes(new byte[] { 1, 2, 3 }) };

// Act & Assert
var exception = await Assert.ThrowsAsync<InvalidOperationException>(async () => await agent.RunStreamingAsync(thread: thread, options: runOptions).ToListAsync());
Assert.Equal("Continuation tokens are not allowed to be used for initial runs.", exception.Message);

// Verify that the IChatClient was never called due to early validation
mockChatClient.Verify(
c => c.GetStreamingResponseAsync(
It.IsAny<IEnumerable<ChatMessage>>(),
It.IsAny<ChatOptions>(),
It.IsAny<CancellationToken>()),
Times.Never);
}

[Fact]
public async Task RunStreamingAsyncThrowsWhenContinuationTokenUsedWithMessageStoreAsync()
{
// Arrange
Mock<IChatClient> mockChatClient = new();

ChatClientAgent agent = new(mockChatClient.Object);

// Create a thread with a MessageStore
ChatClientAgentThread thread = new()
{
MessageStore = new InMemoryChatMessageStore()
};

// Create run options with a continuation token
AgentRunOptions runOptions = new() { ContinuationToken = ResponseContinuationToken.FromBytes(new byte[] { 1, 2, 3 }) };

// Act & Assert
var exception = await Assert.ThrowsAsync<NotSupportedException>(async () => await agent.RunStreamingAsync(thread: thread, options: runOptions).ToListAsync());
Assert.Equal("Using non-agent service store for chat history with streaming resumption is not supported.", exception.Message);

// Verify that the IChatClient was never called due to early validation
mockChatClient.Verify(
c => c.GetStreamingResponseAsync(
It.IsAny<IEnumerable<ChatMessage>>(),
It.IsAny<ChatOptions>(),
It.IsAny<CancellationToken>()),
Times.Never);
}

[Fact]
public async Task RunStreamingAsyncThrowsWhenContinuationTokenUsedWithAIContextProviderAsync()
{
// Arrange
Mock<IChatClient> mockChatClient = new();

ChatClientAgent agent = new(mockChatClient.Object);

// Create a mock AIContextProvider
var mockContextProvider = new Mock<AIContextProvider>();
mockContextProvider
.Setup(p => p.InvokingAsync(It.IsAny<AIContextProvider.InvokingContext>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new AIContext());
mockContextProvider
.Setup(p => p.InvokedAsync(It.IsAny<AIContextProvider.InvokedContext>(), It.IsAny<CancellationToken>()))
.Returns(new ValueTask());

// Create a thread with an AIContextProvider and conversation ID to simulate non-initial run
ChatClientAgentThread thread = new()
{
ConversationId = "existing-conversation-id",
AIContextProvider = mockContextProvider.Object
};

AgentRunOptions runOptions = new() { ContinuationToken = ResponseContinuationToken.FromBytes(new byte[] { 1, 2, 3 }) };

// Act & Assert
var exception = await Assert.ThrowsAsync<NotSupportedException>(async () => await agent.RunStreamingAsync(thread: thread, options: runOptions).ToListAsync());

Assert.Equal("Using context provider with streaming resumption is not supported.", exception.Message);

// Verify that the IChatClient was never called due to early validation
mockChatClient.Verify(
c => c.GetStreamingResponseAsync(
It.IsAny<IEnumerable<ChatMessage>>(),
It.IsAny<ChatOptions>(),
It.IsAny<CancellationToken>()),
Times.Never);
}

#endregion

private static async IAsyncEnumerable<T> ToAsyncEnumerableAsync<T>(IEnumerable<T> values)
Expand Down
Loading