Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected override async ValueTask StoreChatHistoryAsync(InvokedContext context,
State state = this._sessionState.GetOrInitializeState(context.Session);

// Add request and response messages to the provider
var allNewMessages = context.RequestMessages.Concat(context.ResponseMessages ?? []);
var allNewMessages = (context.RequestMessages ?? Enumerable.Empty<ChatMessage>()).Concat(context.ResponseMessages ?? []);
Comment thread
alliscode marked this conversation as resolved.
Outdated
state.Messages.AddRange(allNewMessages);

if (this.ReducerTriggerEvent is InMemoryChatHistoryProviderOptions.ChatReducerTriggerEvent.AfterMessageAdded && this.ChatReducer is not null)
Expand Down
93 changes: 53 additions & 40 deletions dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -329,61 +329,74 @@ protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingA

this._logger.LogAgentChatClientInvokedStreamingAgent(nameof(RunStreamingAsync), this.Id, loggingAgentName, this._chatClientType);

bool hasUpdates;
// Ensure the inner enumerator is always disposed, even if the consumer breaks out early
// (e.g. ToolApprovalAgent does `yield break` after emitting an approval request). Without
// this, downstream decorators like PerServiceCallChatHistoryPersistingChatClient would be
// left suspended at `yield return`, never running their finally blocks, and any in-flight
// FunctionResultContent / FunctionCallContent state would not be persisted before the next
// turn, leaving the next request to the model with dangling tool calls.
try
{
// Ensure we start the streaming request
hasUpdates = await responseUpdatesEnumerator.MoveNextAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false);
throw;
}

while (hasUpdates)
{
var update = responseUpdatesEnumerator.Current;
if (update is not null)
{
update.AuthorName ??= this.Name;

responseUpdates.Add(update);

yield return new(update)
{
AgentId = this.Id,
ContinuationToken = WrapContinuationToken(update.ContinuationToken, GetInputMessages(inputMessages, continuationToken), responseUpdates)
};
}

bool hasUpdates;
try
{
// Re-ensure the run context has the resolved session before each MoveNextAsync.
// The base class RunStreamingAsync restores the original context (potentially with
// null session) after each yield, so we must re-establish it for the decorator.
EnsureRunContextHasSession(safeSession);
// Ensure we start the streaming request
hasUpdates = await responseUpdatesEnumerator.MoveNextAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false);
throw;
}
}

var chatResponse = responseUpdates.ToChatResponse();
while (hasUpdates)
{
var update = responseUpdatesEnumerator.Current;
if (update is not null)
{
update.AuthorName ??= this.Name;

var forceEndOfRunPersistence = continuationToken is not null || chatOptions?.AllowBackgroundResponses is true;
responseUpdates.Add(update);

// We can derive the type of supported session from whether we have a conversation id,
// so let's update it and set the conversation id for the service session case.
this.UpdateSessionConversationIdAtEndOfRun(safeSession, chatResponse.ConversationId, cancellationToken, forceUpdate: forceEndOfRunPersistence);
yield return new(update)
{
AgentId = this.Id,
ContinuationToken = WrapContinuationToken(update.ContinuationToken, GetInputMessages(inputMessages, continuationToken), responseUpdates)
};
}

// Notify providers of all new messages unless persistence is handled per-service-call by the decorator.
// When resuming from a continuation token or using background responses, force notification
// to send the combined data (per-service-call persistence is unreliable for these scenarios).
await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, chatOptions, cancellationToken, forceNotify: forceEndOfRunPersistence).ConfigureAwait(false);
try
{
// Re-ensure the run context has the resolved session before each MoveNextAsync.
// The base class RunStreamingAsync restores the original context (potentially with
// null session) after each yield, so we must re-establish it for the decorator.
EnsureRunContextHasSession(safeSession);
hasUpdates = await responseUpdatesEnumerator.MoveNextAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false);
throw;
}
}

var chatResponse = responseUpdates.ToChatResponse();

var forceEndOfRunPersistence = continuationToken is not null || chatOptions?.AllowBackgroundResponses is true;

// We can derive the type of supported session from whether we have a conversation id,
// so let's update it and set the conversation id for the service session case.
this.UpdateSessionConversationIdAtEndOfRun(safeSession, chatResponse.ConversationId, cancellationToken, forceUpdate: forceEndOfRunPersistence);

// Notify providers of all new messages unless persistence is handled per-service-call by the decorator.
// When resuming from a continuation token or using background responses, force notification
// to send the combined data (per-service-call persistence is unreliable for these scenarios).
await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, chatOptions, cancellationToken, forceNotify: forceEndOfRunPersistence).ConfigureAwait(false);
}
finally
{
await responseUpdatesEnumerator.DisposeAsync().ConfigureAwait(false);
}
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,14 @@ public override async IAsyncEnumerable<ChatResponseUpdate> GetStreamingResponseA
|| options?.AllowBackgroundResponses is true;
bool skipSimulation = isServiceManaged || isContinuationOrBackground;

var newMessages = messages as IList<ChatMessage> ?? messages.ToList();
// Snapshot the input messages into a private list. The caller (typically
// FunctionInvokingChatClient) reuses a single mutable buffer across iterations,
// and the streaming path can defer persistence until after the caller has already
// mutated that buffer for the next iteration (e.g. on the catch-path
// PersistInputOnErrorAsync). Aliasing the caller's list would then cause us to
// persist the wrong messages — losing FunctionResultContent and corrupting
// history with dangling FunctionCallContent.
var newMessages = messages.ToList();
Comment thread
alliscode marked this conversation as resolved.

// When simulating, load history and prepend it. When the service manages
// history (real ConversationId) or this is a continuation/background run,
Expand All @@ -170,6 +177,10 @@ public override async IAsyncEnumerable<ChatResponseUpdate> GetStreamingResponseA
}
Comment thread
alliscode marked this conversation as resolved.
catch (Exception ex)
{
// The service call could not even be initiated. Persist the input messages
// (e.g. function results from a prior iteration) so that history remains
// consistent and a subsequent run does not see dangling FunctionCallContent.
await PersistInputOnErrorAsync(agent, session, newMessages, options, cancellationToken).ConfigureAwait(false);
await agent.NotifyProvidersOfFailureAsync(session, ex, newMessages, options, cancellationToken).ConfigureAwait(false);
throw;
}
Expand All @@ -181,42 +192,87 @@ public override async IAsyncEnumerable<ChatResponseUpdate> GetStreamingResponseA
}
catch (Exception ex)
{
await PersistInputOnErrorAsync(agent, session, newMessages, options, cancellationToken).ConfigureAwait(false);
Comment thread
alliscode marked this conversation as resolved.
Outdated
await agent.NotifyProvidersOfFailureAsync(session, ex, newMessages, options, cancellationToken).ConfigureAwait(false);
throw;
}

while (hasUpdates)
bool loopExitedNormally = false;
try
{
var update = enumerator.Current;
responseUpdates.Add(update.Clone());

// If the service returned a real ConversationId on any update, remember that.
// Otherwise stamp our sentinel so FICC treats this as service-managed —
// unless this is a continuation/background run where the agent handles everything.
if (!string.IsNullOrEmpty(update.ConversationId))
{
isServiceManaged = true;
}
else if (!skipSimulation)
{
update.ConversationId = LocalHistoryConversationId;
}

yield return update;

try
while (hasUpdates)
{
hasUpdates = await enumerator.MoveNextAsync().ConfigureAwait(false);
var update = enumerator.Current;
responseUpdates.Add(update.Clone());

// If the service returned a real ConversationId on any update, remember that.
// Otherwise stamp our sentinel so FICC treats this as service-managed —
// unless this is a continuation/background run where the agent handles everything.
if (!string.IsNullOrEmpty(update.ConversationId))
{
isServiceManaged = true;
}
else if (!skipSimulation)
{
update.ConversationId = LocalHistoryConversationId;
}

yield return update;

try
{
hasUpdates = await enumerator.MoveNextAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await PersistInputOnErrorAsync(agent, session, newMessages, options, cancellationToken).ConfigureAwait(false);
Comment thread
alliscode marked this conversation as resolved.
Outdated
await agent.NotifyProvidersOfFailureAsync(session, ex, newMessages, options, cancellationToken).ConfigureAwait(false);
throw;
}
}
catch (Exception ex)
loopExitedNormally = true;
}
finally
{
// If the iterator was disposed early (e.g. consumer bailed out, or the underlying
// stream ended without throwing but the loop did not complete normally), we still
// need to persist the input messages — otherwise function-call/function-result
// pairings can be lost, leaving the next iteration with dangling FunctionCallContent.
if (!loopExitedNormally)
{
await agent.NotifyProvidersOfFailureAsync(session, ex, newMessages, options, cancellationToken).ConfigureAwait(false);
throw;
try
{
await PersistInputOnErrorAsync(agent, session, newMessages, options, CancellationToken.None).ConfigureAwait(false);
}
catch
{
// Best-effort persistence on the abnormal-exit path; swallow to avoid masking the original error.
}
}
}

var chatResponse = responseUpdates.ToChatResponse();

// If the service emitted an in-stream error (e.g. rate limit) the underlying client
// may complete the enumeration "normally" while leaving us with no usable assistant
// output. Treat that as a failure and persist the input messages without polluting
// history with the error content.
var inStreamError = chatResponse.Messages
.SelectMany(m => m.Contents)
.OfType<ErrorContent>()
.FirstOrDefault();

if (inStreamError is not null)
{
await PersistInputOnErrorAsync(agent, session, newMessages, options, cancellationToken).ConfigureAwait(false);
var errorMessage = string.IsNullOrWhiteSpace(inStreamError.Message)
? "The chat service returned an in-stream error."
: inStreamError.Message;
var streamException = new InvalidOperationException(errorMessage);
await agent.NotifyProvidersOfFailureAsync(session, streamException, newMessages, options, cancellationToken).ConfigureAwait(false);
throw streamException;
}

await agent.NotifyProvidersOfNewMessagesAsync(session, newMessages, chatResponse.Messages, options, cancellationToken).ConfigureAwait(false);

if (isContinuationOrBackground)
Expand All @@ -236,6 +292,29 @@ public override async IAsyncEnumerable<ChatResponseUpdate> GetStreamingResponseA
}
}

/// <summary>
Comment thread
alliscode marked this conversation as resolved.
/// Persists the input messages (the new messages handed to this iteration, e.g.
/// <see cref="FunctionResultContent"/> entries from the previous iteration) to the
/// agent's chat history without any response messages. This is used on error paths
/// to ensure that function-call/function-result pairings are not split across
/// failures, which would otherwise leave dangling <see cref="FunctionCallContent"/>
/// in the persisted chat history and corrupt subsequent runs.
/// </summary>
private static async Task PersistInputOnErrorAsync(
Comment thread
alliscode marked this conversation as resolved.
Outdated
ChatClientAgent agent,
ChatClientAgentSession session,
List<ChatMessage> newMessages,
ChatOptions? options,
CancellationToken cancellationToken)
{
if (newMessages.Count == 0)
{
return;
}

await agent.NotifyProvidersOfNewMessagesAsync(session, newMessages, [], options, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Sets the sentinel <see cref="LocalHistoryConversationId"/> on the response and session
/// so that <see cref="FunctionInvokingChatClient"/> treats the conversation as service-managed.
Expand Down
Loading
Loading