Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
31a5343
save input messages and stream updates to the continuation token to b…
SergeyMenshykh Dec 10, 2025
16e7b1e
Merge branch 'main' into fix-data-loss
SergeyMenshykh Dec 10, 2025
0fe912e
Update dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentConti…
SergeyMenshykh Dec 10, 2025
84a368d
Update dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentConti…
SergeyMenshykh Dec 10, 2025
238d9de
Update dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatClie…
SergeyMenshykh Dec 10, 2025
2ef8f3a
Update dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentConti…
SergeyMenshykh Dec 10, 2025
4e6a701
Update dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentConti…
SergeyMenshykh Dec 10, 2025
519f453
fix typo
SergeyMenshykh Dec 10, 2025
2384a9d
Merge branch 'fix-data-loss' of https://github.com/SergeyMenshykh/age…
SergeyMenshykh Dec 10, 2025
38e1cae
Merge branch 'main' into fix-data-loss
SergeyMenshykh Dec 10, 2025
451ae90
Merge branch 'main' into fix-data-loss
SergeyMenshykh Dec 10, 2025
622e709
Merge branch 'main' into fix-data-loss
SergeyMenshykh Dec 10, 2025
a60458e
Merge branch 'main' into fix-data-loss
SergeyMenshykh Dec 10, 2025
81fb8e8
Merge branch 'main' into fix-data-loss
SergeyMenshykh Dec 11, 2025
9579eec
Merge branch 'main' into fix-data-loss
SergeyMenshykh Jan 5, 2026
24cd818
init continuation token from chat response
SergeyMenshykh Jan 5, 2026
df5492a
remove unnecessary types for source generation
SergeyMenshykh Jan 5, 2026
e3a0cfe
merge with the latest main
SergeyMenshykh Jan 5, 2026
3e40eb9
remove check for continuation token passed at initial run
SergeyMenshykh Jan 5, 2026
ae6f57d
remove check for continuation token pass at initial run
SergeyMenshykh Jan 5, 2026
07f0bd5
centralize continuation token parsing
SergeyMenshykh Jan 5, 2026
98a7d36
Merge branch 'main' into fix-data-loss
SergeyMenshykh Jan 6, 2026
074ac85
update xml comments
SergeyMenshykh Jan 6, 2026
3989023
Merge branch 'fix-data-loss' of https://github.com/SergeyMenshykh/age…
SergeyMenshykh Jan 6, 2026
bf78569
Merge branch 'main' into fix-data-loss
SergeyMenshykh Jan 6, 2026
7883a29
use readonly collection instead of enumerable
SergeyMenshykh Jan 6, 2026
8b41d2b
Merge branch 'fix-data-loss' of https://github.com/SergeyMenshykh/age…
SergeyMenshykh Jan 6, 2026
06065db
Merge branch 'main' into fix-data-loss
SergeyMenshykh Jan 6, 2026
aa799f3
Merge branch 'main' into fix-data-loss
SergeyMenshykh Jan 8, 2026
3917b43
Merge branch 'main' into fix-data-loss
SergeyMenshykh Jan 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 80 additions & 44 deletions dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ static Task<ChatResponse> GetResponseAsync(IChatClient chatClient, List<ChatMess

static AgentRunResponse CreateResponse(ChatResponse chatResponse)
{
return new AgentRunResponse(chatResponse);
return new AgentRunResponse(chatResponse)
{
ContinuationToken = WrapContinuationToken(chatResponse.ContinuationToken)
};
}

return this.RunCoreAsync(GetResponseAsync, CreateResponse, messages, thread, options, cancellationToken);
Expand Down Expand Up @@ -201,11 +204,14 @@ protected override async IAsyncEnumerable<AgentRunResponseUpdate> RunCoreStreami
{
var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection<ChatMessage> ?? messages.ToList();

(ChatClientAgentThread safeThread, ChatOptions? chatOptions, List<ChatMessage> inputMessagesForChatClient, IList<ChatMessage>? aiContextProviderMessages, IList<ChatMessage>? chatMessageStoreMessages) =
(ChatClientAgentThread safeThread,
ChatOptions? chatOptions,
List<ChatMessage> inputMessagesForChatClient,
IList<ChatMessage>? aiContextProviderMessages,
IList<ChatMessage>? chatMessageStoreMessages,
ChatClientAgentContinuationToken? continuationToken) =
await this.PrepareThreadAndMessagesAsync(thread, inputMessages, options, cancellationToken).ConfigureAwait(false);

ValidateStreamResumptionAllowed(chatOptions?.ContinuationToken, safeThread);

var chatClient = this.ChatClient;

chatClient = ApplyRunOptionsTransformations(options, chatClient);
Expand All @@ -214,7 +220,7 @@ protected override async IAsyncEnumerable<AgentRunResponseUpdate> RunCoreStreami

this._logger.LogAgentChatClientInvokingAgent(nameof(RunStreamingAsync), this.Id, loggingAgentName, this._chatClientType);

List<ChatResponseUpdate> responseUpdates = [];
List<ChatResponseUpdate> responseUpdates = GetResponseUpdates(continuationToken);

IAsyncEnumerator<ChatResponseUpdate> responseUpdatesEnumerator;

Expand All @@ -225,8 +231,8 @@ protected override async IAsyncEnumerable<AgentRunResponseUpdate> RunCoreStreami
}
catch (Exception ex)
{
await NotifyMessageStoreOfFailureAsync(safeThread, ex, inputMessages, chatMessageStoreMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
await NotifyAIContextProviderOfFailureAsync(safeThread, ex, inputMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
await NotifyMessageStoreOfFailureAsync(safeThread, ex, GetInputMessages(inputMessages, continuationToken), chatMessageStoreMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
await NotifyAIContextProviderOfFailureAsync(safeThread, ex, GetInputMessages(inputMessages, continuationToken), aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
throw;
}

Expand All @@ -240,8 +246,8 @@ protected override async IAsyncEnumerable<AgentRunResponseUpdate> RunCoreStreami
}
catch (Exception ex)
{
await NotifyMessageStoreOfFailureAsync(safeThread, ex, inputMessages, chatMessageStoreMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
await NotifyAIContextProviderOfFailureAsync(safeThread, ex, inputMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
await NotifyMessageStoreOfFailureAsync(safeThread, ex, GetInputMessages(inputMessages, continuationToken), chatMessageStoreMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
await NotifyAIContextProviderOfFailureAsync(safeThread, ex, GetInputMessages(inputMessages, continuationToken), aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
throw;
}

Expand All @@ -253,7 +259,12 @@ protected override async IAsyncEnumerable<AgentRunResponseUpdate> RunCoreStreami
update.AuthorName ??= this.Name;

responseUpdates.Add(update);
yield return new(update) { AgentId = this.Id };

yield return new(update)
{
AgentId = this.Id,
ContinuationToken = WrapContinuationToken(update.ContinuationToken, GetInputMessages(inputMessages, continuationToken), responseUpdates)
};
}

try
Expand All @@ -262,8 +273,8 @@ protected override async IAsyncEnumerable<AgentRunResponseUpdate> RunCoreStreami
}
catch (Exception ex)
{
await NotifyMessageStoreOfFailureAsync(safeThread, ex, inputMessages, chatMessageStoreMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
await NotifyAIContextProviderOfFailureAsync(safeThread, ex, inputMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
await NotifyMessageStoreOfFailureAsync(safeThread, ex, GetInputMessages(inputMessages, continuationToken), chatMessageStoreMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
await NotifyAIContextProviderOfFailureAsync(safeThread, ex, GetInputMessages(inputMessages, continuationToken), aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
throw;
}
}
Expand All @@ -275,10 +286,10 @@ protected override async IAsyncEnumerable<AgentRunResponseUpdate> RunCoreStreami
this.UpdateThreadWithTypeAndConversationId(safeThread, chatResponse.ConversationId);

// To avoid inconsistent state we only notify the thread of the input messages if no error occurs after the initial request.
await NotifyMessageStoreOfNewMessagesAsync(safeThread, inputMessages, chatMessageStoreMessages, aiContextProviderMessages, chatResponse.Messages, cancellationToken).ConfigureAwait(false);
await NotifyMessageStoreOfNewMessagesAsync(safeThread, GetInputMessages(inputMessages, continuationToken), chatMessageStoreMessages, aiContextProviderMessages, chatResponse.Messages, cancellationToken).ConfigureAwait(false);

// Notify the AIContextProvider of all new messages.
await NotifyAIContextProviderOfSuccessAsync(safeThread, inputMessages, aiContextProviderMessages, chatResponse.Messages, cancellationToken).ConfigureAwait(false);
await NotifyAIContextProviderOfSuccessAsync(safeThread, GetInputMessages(inputMessages, continuationToken), aiContextProviderMessages, chatResponse.Messages, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -382,7 +393,12 @@ private async Task<TAgentRunResponse> RunCoreAsync<TAgentRunResponse, TChatClien
{
var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection<ChatMessage> ?? messages.ToList();

(ChatClientAgentThread safeThread, ChatOptions? chatOptions, List<ChatMessage> inputMessagesForChatClient, IList<ChatMessage>? aiContextProviderMessages, IList<ChatMessage>? chatMessageStoreMessages) =
(ChatClientAgentThread safeThread,
ChatOptions? chatOptions,
List<ChatMessage> inputMessagesForChatClient,
IList<ChatMessage>? aiContextProviderMessages,
IList<ChatMessage>? chatMessageStoreMessages,
ChatClientAgentContinuationToken? _) =
await this.PrepareThreadAndMessagesAsync(thread, inputMessages, options, cancellationToken).ConfigureAwait(false);

var chatClient = this.ChatClient;
Expand Down Expand Up @@ -474,20 +490,20 @@ await thread.AIContextProvider.InvokedAsync(new(inputMessages, aiContextProvider
/// <param name="runOptions">Optional run options that may include specific chat configuration settings.</param>
/// <returns>A <see cref="ChatOptions"/> object representing the merged chat configuration, or <see langword="null"/> if
/// neither the run options nor the agent's chat options are available.</returns>
private ChatOptions? CreateConfiguredChatOptions(AgentRunOptions? runOptions)
private (ChatOptions?, ChatClientAgentContinuationToken?) CreateConfiguredChatOptions(AgentRunOptions? runOptions)
{
ChatOptions? requestChatOptions = (runOptions as ChatClientAgentRunOptions)?.ChatOptions?.Clone();

// If no agent chat options were provided, return the request chat options as is.
if (this._agentOptions?.ChatOptions is null)
{
return ApplyBackgroundResponsesProperties(requestChatOptions, runOptions);
return GetContinuationTokenAndApplyBackgroundResponsesProperties(requestChatOptions, runOptions);
}

// If no request chat options were provided, use the agent's chat options clone.
if (requestChatOptions is null)
{
return ApplyBackgroundResponsesProperties(this._agentOptions?.ChatOptions.Clone(), runOptions);
return GetContinuationTokenAndApplyBackgroundResponsesProperties(this._agentOptions?.ChatOptions.Clone(), runOptions);
}

// If both are present, we need to merge them.
Expand Down Expand Up @@ -583,19 +599,26 @@ await thread.AIContextProvider.InvokedAsync(new(inputMessages, aiContextProvider
}
}

return ApplyBackgroundResponsesProperties(requestChatOptions, runOptions);
return GetContinuationTokenAndApplyBackgroundResponsesProperties(requestChatOptions, runOptions);

static ChatOptions? ApplyBackgroundResponsesProperties(ChatOptions? chatOptions, AgentRunOptions? agentRunOptions)
static (ChatOptions?, ChatClientAgentContinuationToken?) GetContinuationTokenAndApplyBackgroundResponsesProperties(ChatOptions? chatOptions, AgentRunOptions? agentRunOptions)
{
// If any of the background response properties are set in the run options, we should apply both to the chat options.
if (agentRunOptions?.AllowBackgroundResponses is not null || agentRunOptions?.ContinuationToken is not null)
if (agentRunOptions?.AllowBackgroundResponses is not null)
{
chatOptions ??= new ChatOptions();
chatOptions.AllowBackgroundResponses = agentRunOptions.AllowBackgroundResponses;
chatOptions.ContinuationToken = agentRunOptions.ContinuationToken;
}

return chatOptions;
ChatClientAgentContinuationToken? agentContinuationToken = null;

if ((agentRunOptions?.ContinuationToken ?? chatOptions?.ContinuationToken) is { } continuationToken)
{
agentContinuationToken = ChatClientAgentContinuationToken.FromToken(continuationToken);
chatOptions ??= new ChatOptions();
chatOptions.ContinuationToken = agentContinuationToken!.InnerToken;
}

return (chatOptions, agentContinuationToken);
}
}

Expand All @@ -606,21 +629,22 @@ await thread.AIContextProvider.InvokedAsync(new(inputMessages, aiContextProvider
/// <param name="inputMessages">The input messages to use.</param>
/// <param name="runOptions">Optional parameters for agent invocation.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A tuple containing the thread, chat options, and thread messages.</returns>
/// <returns>A tuple containing the thread, chat options, messages and continuation token.</returns>
private async Task
<(
ChatClientAgentThread AgentThread,
ChatOptions? ChatOptions,
List<ChatMessage> InputMessagesForChatClient,
IList<ChatMessage>? AIContextProviderMessages,
IList<ChatMessage>? ChatMessageStoreMessages
IList<ChatMessage>? ChatMessageStoreMessages,
ChatClientAgentContinuationToken? ContinuationToken
)> PrepareThreadAndMessagesAsync(
AgentThread? thread,
IEnumerable<ChatMessage> inputMessages,
AgentRunOptions? runOptions,
CancellationToken cancellationToken)
{
ChatOptions? chatOptions = this.CreateConfiguredChatOptions(runOptions);
(ChatOptions? chatOptions, ChatClientAgentContinuationToken? continuationToken) = this.CreateConfiguredChatOptions(runOptions);

// Supplying a thread for background responses is required to prevent inconsistent experience
// for callers if they forget to provide the thread for initial or follow-up runs.
Expand All @@ -641,11 +665,6 @@ private async Task
throw new InvalidOperationException("Input messages are not allowed when continuing a background response using a continuation token.");
}

if (chatOptions?.ContinuationToken is not null && typedThread.ConversationId is null && typedThread.MessageStore is null)
{
throw new InvalidOperationException("Continuation tokens are not allowed to be used for initial runs.");
}

List<ChatMessage> inputMessagesForChatClient = [];
IList<ChatMessage>? aiContextProviderMessages = null;
IList<ChatMessage>? chatMessageStoreMessages = null;
Expand Down Expand Up @@ -713,7 +732,7 @@ private async Task
chatOptions.ConversationId = typedThread.ConversationId;
}

return (typedThread, chatOptions, inputMessagesForChatClient, aiContextProviderMessages, chatMessageStoreMessages);
return (typedThread, chatOptions, inputMessagesForChatClient, aiContextProviderMessages, chatMessageStoreMessages, continuationToken);
}

private void UpdateThreadWithTypeAndConversationId(ChatClientAgentThread thread, string? responseConversationId)
Expand Down Expand Up @@ -791,26 +810,43 @@ private static Task NotifyMessageStoreOfNewMessagesAsync(
return Task.CompletedTask;
}

private static void ValidateStreamResumptionAllowed(ResponseContinuationToken? continuationToken, ChatClientAgentThread safeThread)
private static ChatClientAgentContinuationToken? WrapContinuationToken(ResponseContinuationToken? continuationToken, IEnumerable<ChatMessage>? inputMessages = null, List<ChatResponseUpdate>? responseUpdates = null)
{
if (continuationToken is null)
{
return;
return null;
}

// Streaming resumption is only supported with chat history managed by the agent service because, currently, there's no good solution
// to collect updates received in failed runs and pass them to the last successful run so it can store them to the message store.
if (safeThread.ConversationId is null)
return new(continuationToken)
{
throw new NotSupportedException("Streaming resumption is only supported when chat history is stored and managed by the underlying AI service.");
}
// Save input messages to the continuation token so they can be added to the thread and
// provided to the context provider in the last successful streaming resumption run.
// That's necessary for scenarios where initial streaming run is interrupted and streaming is resumed later.
InputMessages = inputMessages?.Any() is true ? inputMessages : null,

// Save all updates received so far to the continuation token so they can be provided to the
// message store and context provider in the last successful streaming resumption run.
// That's necessary for scenarios where a streaming run is interrupted after some updates were received.
ResponseUpdates = responseUpdates?.Count > 0 ? responseUpdates : null
};
}

// Similarly, streaming resumption is not supported when a context provider is used because, currently, there's no good solution
// to collect updates received in failed runs and pass them to the last successful run so it can notify the context provider of the updates.
if (safeThread.AIContextProvider is not null)
private static IEnumerable<ChatMessage> GetInputMessages(IReadOnlyCollection<ChatMessage> inputMessages, ChatClientAgentContinuationToken? token)
{
// First, use input messages if provided.
if (inputMessages.Count > 0)
{
throw new NotSupportedException("Using context provider with streaming resumption is not supported.");
return inputMessages;
}

// Fallback to messages saved in the continuation token if available.
return token?.InputMessages ?? [];
}

private static List<ChatResponseUpdate> GetResponseUpdates(ChatClientAgentContinuationToken? token)
{
// Restore any previously received updates from the continuation token.
return token?.ResponseUpdates?.ToList() ?? [];
}

private string GetLoggingAgentName() => this.Name ?? "UnnamedAgent";
Expand Down
Loading
Loading