Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 13 additions & 105 deletions dotnet/src/Microsoft.Agents.AI.Workflows/MessageMerger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.AI;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Agents.AI.Workflows;

Expand All @@ -14,66 +13,31 @@ private sealed class ResponseMergeState(string? responseId)
{
public string? ResponseId { get; } = responseId;

public Dictionary<string, List<AgentResponseUpdate>> UpdatesByMessageId { get; } = [];
public List<AgentResponseUpdate> DanglingUpdates { get; } = [];
public List<AgentResponseUpdate> OrderedUpdates { get; } = [];

public void AddUpdate(AgentResponseUpdate update)
{
if (update.MessageId is null)
{
this.DanglingUpdates.Add(update);
}
else
{
if (!this.UpdatesByMessageId.TryGetValue(update.MessageId, out List<AgentResponseUpdate>? updates))
{
this.UpdatesByMessageId[update.MessageId] = updates = [];
}

updates.Add(update);
}
}

public AgentResponse ComputeMerged(string messageId)
{
if (this.UpdatesByMessageId.TryGetValue(Throw.IfNull(messageId), out List<AgentResponseUpdate>? updates))
{
return updates.ToAgentResponse();
}

throw new KeyNotFoundException($"No updates found for message ID '{messageId}' in response '{this.ResponseId}'.");
this.OrderedUpdates.Add(update);
}

public AgentResponse ComputeDangling()
public AgentResponse ComputeOrdered()
{
if (this.DanglingUpdates.Count == 0)
if (this.OrderedUpdates.Count == 0)
{
throw new InvalidOperationException("No dangling updates to compute a response from.");
throw new InvalidOperationException("No updates to compute a response from.");
}

return this.DanglingUpdates.ToAgentResponse();
return this.OrderedUpdates.ToAgentResponse();
}

public List<ChatMessage> ComputeFlattened()
{
List<ChatMessage> result = this.UpdatesByMessageId.Keys.SelectMany(AggregateUpdatesToMessage).ToList();
if (this.DanglingUpdates.Count > 0)
if (this.OrderedUpdates.Count == 0)
{
result.AddRange(this.ComputeDangling().Messages);
return [];
}

return result;

IList<ChatMessage> AggregateUpdatesToMessage(string messageId)
{
List<AgentResponseUpdate> updates = this.UpdatesByMessageId[messageId];
if (updates.Count == 0)
{
throw new InvalidOperationException($"No updates found for message ID '{messageId}' in response '{this.ResponseId}'.");
}

return updates.Select(oldUpdate => oldUpdate.AsChatResponseUpdate()).ToChatResponse().Messages;
}
return this.ComputeOrdered().Messages.ToList();
}
}

Expand All @@ -84,7 +48,7 @@ public void AddUpdate(AgentResponseUpdate update)
{
if (update.ResponseId is null)
{
this._danglingState.DanglingUpdates.Add(update);
this._danglingState.AddUpdate(update);
}
else
{
Expand All @@ -97,28 +61,6 @@ public void AddUpdate(AgentResponseUpdate update)
}
}

private int CompareByDateTimeOffset(AgentResponse left, AgentResponse right)
{
const int LESS = -1, EQ = 0, GREATER = 1;

if (left.CreatedAt == right.CreatedAt)
{
return EQ;
}

if (!left.CreatedAt.HasValue)
{
return GREATER;
}

if (!right.CreatedAt.HasValue)
{
return LESS;
}

return left.CreatedAt.Value.CompareTo(right.CreatedAt.Value);
}

public AgentResponse ComputeMerged(string primaryResponseId, string? primaryAgentId = null, string? primaryAgentName = null)
{
List<ChatMessage> messages = [];
Expand All @@ -130,15 +72,9 @@ public AgentResponse ComputeMerged(string primaryResponseId, string? primaryAgen
{
ResponseMergeState mergeState = this._mergeStates[responseId];

List<AgentResponse> responseList = mergeState.UpdatesByMessageId.Keys.Select(mergeState.ComputeMerged).ToList();
if (mergeState.DanglingUpdates.Count > 0)
{
responseList.Add(mergeState.ComputeDangling());
}

responseList.Sort(this.CompareByDateTimeOffset);
responses[responseId] = responseList.Aggregate(MergeResponses);
messages.AddRange(GetMessagesWithCreatedAt(responses[responseId]));
AgentResponse orderedResponse = mergeState.ComputeOrdered();
responses[responseId] = orderedResponse;
messages.AddRange(GetMessagesWithCreatedAt(orderedResponse));
}

UsageDetails? usage = null;
Expand Down Expand Up @@ -194,34 +130,6 @@ public AgentResponse ComputeMerged(string primaryResponseId, string? primaryAgen
AdditionalProperties = additionalProperties
};

static AgentResponse MergeResponses(AgentResponse? current, AgentResponse incoming)
{
if (current is null)
{
return incoming;
}

if (current.ResponseId != incoming.ResponseId)
{
throw new InvalidOperationException($"Cannot merge responses with different IDs: '{current.ResponseId}' and '{incoming.ResponseId}'.");
}

List<object?> rawRepresentation = current.RawRepresentation as List<object?> ?? [];
rawRepresentation.Add(incoming.RawRepresentation);

return new()
{
AgentId = incoming.AgentId ?? current.AgentId,
AdditionalProperties = MergeProperties(current.AdditionalProperties, incoming.AdditionalProperties),
CreatedAt = incoming.CreatedAt ?? current.CreatedAt,
FinishReason = incoming.FinishReason ?? current.FinishReason,
Messages = current.Messages.Concat(incoming.Messages).ToList(),
ResponseId = current.ResponseId,
RawRepresentation = rawRepresentation,
Usage = MergeUsage(current.Usage, incoming.Usage),
};
}

static IEnumerable<ChatMessage> GetMessagesWithCreatedAt(AgentResponse response)
{
if (response.Messages.Count == 0)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Linq;
using FluentAssertions;
using Microsoft.Extensions.AI;

Expand Down Expand Up @@ -69,4 +70,74 @@ public void Test_MessageMerger_PropagatesFinishReasonFromUpdates()
// Assert - FinishReason from the update should propagate through
response.FinishReason.Should().Be(ChatFinishReason.ContentFilter);
}

[Fact]
public void Test_MessageMerger_NullMessageId_CoalescesWithAdjacentUpdates()
{
// Simulates the MEAI OpenAIResponsesChatClient behavior where reasoning
// content streams with MessageId = null while text gets a proper MessageId.
// The merger should preserve insertion order so that ToChatResponse()
// coalesces null-MessageId updates with the surrounding message.
DateTimeOffset creationTime = DateTimeOffset.UtcNow;
string responseId = Guid.NewGuid().ToString("N");
string textMessageId = Guid.NewGuid().ToString("N");

MessageMerger merger = new();

// Reasoning updates with null MessageId (the MEAI bug)
merger.AddUpdate(new AgentResponseUpdate
{
Role = ChatRole.Assistant,
AuthorName = TestAuthorName1,
Contents = [new TextReasoningContent("Let me think...")],
ResponseId = responseId,
AgentId = TestAgentId1,
CreatedAt = creationTime,
MessageId = null,
});

merger.AddUpdate(new AgentResponseUpdate
{
Contents = [new TextReasoningContent(" 2 + 2 = 4.")],
ResponseId = responseId,
AgentId = TestAgentId1,
CreatedAt = creationTime,
MessageId = null,
});

// Text updates with a proper MessageId
merger.AddUpdate(new AgentResponseUpdate
{
Role = ChatRole.Assistant,
AuthorName = TestAuthorName1,
Contents = [new TextContent("The answer is ")],
ResponseId = responseId,
AgentId = TestAgentId1,
CreatedAt = creationTime,
MessageId = textMessageId,
});

merger.AddUpdate(new AgentResponseUpdate
{
Contents = [new TextContent("4.")],
ResponseId = responseId,
AgentId = TestAgentId1,
CreatedAt = creationTime,
MessageId = textMessageId,
});

AgentResponse response = merger.ComputeMerged(responseId);

// Reasoning and text should be coalesced into the same message
// because null MessageId is treated as "same message" by ToChatResponse().
response.Messages.Should().HaveCount(1);
response.Messages[0].Role.Should().Be(ChatRole.Assistant);
response.Messages[0].AuthorName.Should().Be(TestAuthorName1);

var reasoningContents = response.Messages[0].Contents.OfType<TextReasoningContent>().ToList();
var textContents = response.Messages[0].Contents.OfType<TextContent>().ToList();

reasoningContents.Should().NotBeEmpty("reasoning content should be in the message");
textContents.Should().NotBeEmpty("text content should be in the message");
}
}