From 1df8f41aac7a7ce79ae3e36577804430b05d76c7 Mon Sep 17 00:00:00 2001 From: Korolev Dmitry Date: Fri, 17 Oct 2025 13:19:10 +0200 Subject: [PATCH 1/2] reformat event sequence --- .../Responses/AIAgentResponsesProcessor.cs | 61 ++++++++-- .../Responses/Model/ResponseStreamEvent.cs | 106 ++++++++++++++++++ 2 files changed, 160 insertions(+), 7 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/AIAgentResponsesProcessor.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/AIAgentResponsesProcessor.cs index 0eefa37f2c..5361bbc6e0 100644 --- a/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/AIAgentResponsesProcessor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/AIAgentResponsesProcessor.cs @@ -17,6 +17,7 @@ using Microsoft.AspNetCore.Http.Features; using Microsoft.Extensions.AI; using OpenAI.Responses; +using ChatMessage = Microsoft.Extensions.AI.ChatMessage; namespace Microsoft.Agents.AI.Hosting.OpenAI.Responses; @@ -101,6 +102,7 @@ private async IAsyncEnumerable> GetStreaming { var sequenceNumber = 1; var outputIndex = 1; + Dictionary messageIdOutputIndexes = new(); AgentThread? agentThread = null; ResponseItem? lastResponseItem = null; @@ -115,6 +117,8 @@ private async IAsyncEnumerable> GetStreaming continue; } + // response.created + // response.in_progress if (sequenceNumber == 1) { lastOpenAIResponse = update.AsChatResponse().AsOpenAIResponse(); @@ -124,6 +128,12 @@ private async IAsyncEnumerable> GetStreaming Response = lastOpenAIResponse }; yield return new(responseCreated, responseCreated.Type); + + var inProgressResponse = new StreamingInProgressResponse(sequenceNumber++) + { + Response = lastOpenAIResponse + }; + yield return new(inProgressResponse, inProgressResponse.Type); } if (update.Contents is not { Count: > 0 }) @@ -139,6 +149,7 @@ private async IAsyncEnumerable> GetStreaming CreatedAt = update.CreatedAt, RawRepresentation = update.RawRepresentation }; + var messageId = chatMessage.MessageId ?? ""; foreach (var openAIResponseItem in MicrosoftExtensionsAIResponsesExtensions.AsOpenAIResponseItems([chatMessage])) { @@ -147,21 +158,57 @@ private async IAsyncEnumerable> GetStreaming openAIResponseItem.SetId(chatMessage.MessageId); } - lastResponseItem = openAIResponseItem; + if (!messageIdOutputIndexes.TryGetValue(messageId, out var index)) + { + messageIdOutputIndexes[messageId] = index = 0; + var responseContentPartAdded = new StreamingContentPartAddedResponse(sequenceNumber++) + { + ItemId = chatMessage.MessageId, + ContentIndex = 0, + OutputIndex = index++, + Part = null! + }; + yield return new(responseContentPartAdded, responseContentPartAdded.Type); + } - var responseOutputItemAdded = new StreamingOutputItemAddedResponse(sequenceNumber++) + lastResponseItem = openAIResponseItem; + var responseOutputTextDeltaResponse = new StreamingOutputTextDeltaResponse(sequenceNumber++) { - OutputIndex = outputIndex++, - Item = openAIResponseItem + ItemId = chatMessage.MessageId, + ContentIndex = 0, + OutputIndex = index++, + Delta = chatMessage.Text }; - yield return new(responseOutputItemAdded, responseOutputItemAdded.Type); + yield return new(responseOutputTextDeltaResponse, responseOutputTextDeltaResponse.Type); + + messageIdOutputIndexes[messageId] = index; } } if (lastResponseItem is not null) { - // we were streaming "response.output_item.added" before - // so we should complete it now via "response.output_item.done" + // here goes a sequence of completions for earlier started events: + + // "response.output_text.delta" should be completed with "response.output_text.done" + var index = messageIdOutputIndexes[lastResponseItem.Id]; + var responseOutputTextDeltaResponse = new StreamingOutputTextDoneResponse(sequenceNumber++) + { + ItemId = lastResponseItem.Id, + ContentIndex = 0, + OutputIndex = index++, + }; + yield return new(responseOutputTextDeltaResponse, responseOutputTextDeltaResponse.Type); + + // then "response.content_part.added" should be completed with "response.content_part.done" + var streamingContentPartDoneResponse = new StreamingContentPartDoneResponse(sequenceNumber++) + { + ItemId = lastResponseItem.Id, + ContentIndex = 0, + OutputIndex = index++, + }; + yield return new(streamingContentPartDoneResponse, streamingContentPartDoneResponse.Type); + + // then "response.output_item.added" should be completed with "response.output_item.done" var responseOutputDoneAdded = new StreamingOutputItemDoneResponse(sequenceNumber++) { OutputIndex = outputIndex++, diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/Model/ResponseStreamEvent.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/Model/ResponseStreamEvent.cs index b9bc0ed51c..be0f5ee4d6 100644 --- a/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/Model/ResponseStreamEvent.cs +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/Model/ResponseStreamEvent.cs @@ -12,6 +12,12 @@ namespace Microsoft.Agents.AI.Hosting.OpenAI.Responses.Model; [JsonPolymorphic(UnknownDerivedTypeHandling = JsonUnknownDerivedTypeHandling.FailSerialization)] [JsonDerivedType(typeof(StreamingOutputItemAddedResponse), StreamingOutputItemAddedResponse.EventType)] [JsonDerivedType(typeof(StreamingOutputItemDoneResponse), StreamingOutputItemDoneResponse.EventType)] +[JsonDerivedType(typeof(StreamingInProgressResponse), StreamingInProgressResponse.EventType)] +[JsonDerivedType(typeof(StreamingOutputTextDeltaResponse), StreamingOutputTextDeltaResponse.EventType)] +[JsonDerivedType(typeof(StreamingOutputTextDoneResponse), StreamingOutputTextDoneResponse.EventType)] +[JsonDerivedType(typeof(StreamingInProgressResponse), StreamingInProgressResponse.EventType)] +[JsonDerivedType(typeof(StreamingContentPartAddedResponse), StreamingContentPartAddedResponse.EventType)] +[JsonDerivedType(typeof(StreamingContentPartDoneResponse), StreamingContentPartDoneResponse.EventType)] [JsonDerivedType(typeof(StreamingCreatedResponse), StreamingCreatedResponse.EventType)] [JsonDerivedType(typeof(StreamingCompletedResponse), StreamingCompletedResponse.EventType)] internal abstract class StreamingResponseEventBase @@ -43,6 +49,106 @@ public StreamingResponseEventBase(string type, int sequenceNumber) } } +internal sealed class StreamingContentPartAddedResponse : StreamingResponseEventBase +{ + public const string EventType = "response.content_part.added"; + + public StreamingContentPartAddedResponse(int sequenceNumber) : base(EventType, sequenceNumber) + { + } + + [JsonPropertyName("content_index")] + public required int ContentIndex { get; set; } + + [JsonPropertyName("item_id")] + public required string? ItemId { get; set; } + + [JsonPropertyName("output_index")] + public required int OutputIndex { get; set; } + + [JsonPropertyName("part")] + public ResponseContentPart? Part { get; set; } +} + +internal sealed class StreamingContentPartDoneResponse : StreamingResponseEventBase +{ + public const string EventType = "response.content_part.done"; + + public StreamingContentPartDoneResponse(int sequenceNumber) : base(EventType, sequenceNumber) + { + } + + [JsonPropertyName("content_index")] + public required int ContentIndex { get; set; } + + [JsonPropertyName("item_id")] + public required string ItemId { get; set; } + + [JsonPropertyName("output_index")] + public required int OutputIndex { get; set; } + + [JsonPropertyName("part")] + public ResponseContentPart? Part { get; set; } +} + +internal sealed class StreamingInProgressResponse : StreamingResponseEventBase +{ + public const string EventType = "response.in_progress"; + + public StreamingInProgressResponse(int sequenceNumber) : base(EventType, sequenceNumber) + { + } + + /// + /// Gets or sets the response item that was added to the output. + /// This contains the actual content or data produced by the AI agent. + /// + [JsonPropertyName("response")] + public required OpenAIResponse Response { get; set; } +} + +internal sealed class StreamingOutputTextDeltaResponse : StreamingResponseEventBase +{ + public const string EventType = "response.output_text.delta"; + + public StreamingOutputTextDeltaResponse(int sequenceNumber) : base(EventType, sequenceNumber) + { + } + + [JsonPropertyName("content_index")] + public required int ContentIndex { get; set; } + + [JsonPropertyName("item_id")] + public string? ItemId { get; set; } + + [JsonPropertyName("output_index")] + public int OutputIndex { get; set; } + + [JsonPropertyName("delta")] + public required string Delta { get; set; } +} + +internal sealed class StreamingOutputTextDoneResponse : StreamingResponseEventBase +{ + public const string EventType = "response.output_text.done"; + + public StreamingOutputTextDoneResponse(int sequenceNumber) : base(EventType, sequenceNumber) + { + } + + [JsonPropertyName("content_index")] + public required int ContentIndex { get; set; } + + [JsonPropertyName("item_id")] + public required string ItemId { get; set; } + + [JsonPropertyName("output_index")] + public int OutputIndex { get; set; } + + [JsonPropertyName("text")] + public string? Text { get; set; } +} + /// /// Represents a streaming response event indicating that a new output item has been added to the response. /// This event is sent when the AI agent produces a new piece of content during streaming. From 0ccf00c3a7d7f86a2f76c75b6ad6fc7d61bd8b42 Mon Sep 17 00:00:00 2001 From: Korolev Dmitry Date: Fri, 17 Oct 2025 13:27:13 +0200 Subject: [PATCH 2/2] fixes --- .../Responses/AIAgentResponsesProcessor.cs | 9 ++++++++- .../Responses/Model/ResponseStreamEvent.cs | 7 +++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/AIAgentResponsesProcessor.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/AIAgentResponsesProcessor.cs index 5361bbc6e0..90009b4135 100644 --- a/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/AIAgentResponsesProcessor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/AIAgentResponsesProcessor.cs @@ -169,6 +169,13 @@ private async IAsyncEnumerable> GetStreaming Part = null! }; yield return new(responseContentPartAdded, responseContentPartAdded.Type); + + var responseOutputItemAddedResponse = new StreamingOutputItemAddedResponse(sequenceNumber++) + { + OutputIndex = index++, + Item = openAIResponseItem + }; + yield return new(responseOutputItemAddedResponse, responseOutputItemAddedResponse.Type); } lastResponseItem = openAIResponseItem; @@ -190,7 +197,7 @@ private async IAsyncEnumerable> GetStreaming // here goes a sequence of completions for earlier started events: // "response.output_text.delta" should be completed with "response.output_text.done" - var index = messageIdOutputIndexes[lastResponseItem.Id]; + var index = messageIdOutputIndexes[lastResponseItem.Id ?? ""]; var responseOutputTextDeltaResponse = new StreamingOutputTextDoneResponse(sequenceNumber++) { ItemId = lastResponseItem.Id, diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/Model/ResponseStreamEvent.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/Model/ResponseStreamEvent.cs index be0f5ee4d6..6b8ffd06fb 100644 --- a/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/Model/ResponseStreamEvent.cs +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.OpenAI/Responses/Model/ResponseStreamEvent.cs @@ -15,7 +15,6 @@ namespace Microsoft.Agents.AI.Hosting.OpenAI.Responses.Model; [JsonDerivedType(typeof(StreamingInProgressResponse), StreamingInProgressResponse.EventType)] [JsonDerivedType(typeof(StreamingOutputTextDeltaResponse), StreamingOutputTextDeltaResponse.EventType)] [JsonDerivedType(typeof(StreamingOutputTextDoneResponse), StreamingOutputTextDoneResponse.EventType)] -[JsonDerivedType(typeof(StreamingInProgressResponse), StreamingInProgressResponse.EventType)] [JsonDerivedType(typeof(StreamingContentPartAddedResponse), StreamingContentPartAddedResponse.EventType)] [JsonDerivedType(typeof(StreamingContentPartDoneResponse), StreamingContentPartDoneResponse.EventType)] [JsonDerivedType(typeof(StreamingCreatedResponse), StreamingCreatedResponse.EventType)] @@ -61,7 +60,7 @@ public StreamingContentPartAddedResponse(int sequenceNumber) : base(EventType, s public required int ContentIndex { get; set; } [JsonPropertyName("item_id")] - public required string? ItemId { get; set; } + public string? ItemId { get; set; } [JsonPropertyName("output_index")] public required int OutputIndex { get; set; } @@ -82,7 +81,7 @@ public StreamingContentPartDoneResponse(int sequenceNumber) : base(EventType, se public required int ContentIndex { get; set; } [JsonPropertyName("item_id")] - public required string ItemId { get; set; } + public string? ItemId { get; set; } [JsonPropertyName("output_index")] public required int OutputIndex { get; set; } @@ -140,7 +139,7 @@ public StreamingOutputTextDoneResponse(int sequenceNumber) : base(EventType, seq public required int ContentIndex { get; set; } [JsonPropertyName("item_id")] - public required string ItemId { get; set; } + public string? ItemId { get; set; } [JsonPropertyName("output_index")] public int OutputIndex { get; set; }