Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.AI;
using OpenAI.Responses;
using ChatMessage = Microsoft.Extensions.AI.ChatMessage;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary?


namespace Microsoft.Agents.AI.Hosting.OpenAI.Responses;

Expand Down Expand Up @@ -101,6 +102,7 @@ private async IAsyncEnumerable<SseItem<StreamingResponseEventBase>> GetStreaming
{
var sequenceNumber = 1;
var outputIndex = 1;
Dictionary<string, int> messageIdOutputIndexes = new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
Dictionary<string, int> messageIdOutputIndexes = new();
Dictionary<string, int> messageIdOutputIndexes = [];

AgentThread? agentThread = null;

ResponseItem? lastResponseItem = null;
Expand All @@ -115,6 +117,8 @@ private async IAsyncEnumerable<SseItem<StreamingResponseEventBase>> GetStreaming
continue;
}

// response.created
// response.in_progress
if (sequenceNumber == 1)
{
lastOpenAIResponse = update.AsChatResponse().AsOpenAIResponse();
Expand All @@ -124,6 +128,12 @@ private async IAsyncEnumerable<SseItem<StreamingResponseEventBase>> GetStreaming
Response = lastOpenAIResponse
};
yield return new(responseCreated, responseCreated.Type);

var inProgressResponse = new StreamingInProgressResponse(sequenceNumber++)
{
Response = lastOpenAIResponse
};
yield return new(inProgressResponse, inProgressResponse.Type);
}

if (update.Contents is not { Count: > 0 })
Expand All @@ -139,6 +149,7 @@ private async IAsyncEnumerable<SseItem<StreamingResponseEventBase>> GetStreaming
CreatedAt = update.CreatedAt,
RawRepresentation = update.RawRepresentation
};
var messageId = chatMessage.MessageId ?? "<null>";
Copy link
Member

@stephentoub stephentoub Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this using "<null>"? Is that just an arbitrary sentinel to use to group together all messages without IDs?


foreach (var openAIResponseItem in MicrosoftExtensionsAIResponsesExtensions.AsOpenAIResponseItems([chatMessage]))
{
Expand All @@ -147,21 +158,64 @@ private async IAsyncEnumerable<SseItem<StreamingResponseEventBase>> GetStreaming
openAIResponseItem.SetId(chatMessage.MessageId);
}

lastResponseItem = openAIResponseItem;
if (!messageIdOutputIndexes.TryGetValue(messageId, out var index))
{
messageIdOutputIndexes[messageId] = index = 0;
var responseContentPartAdded = new StreamingContentPartAddedResponse(sequenceNumber++)
{
ItemId = chatMessage.MessageId,
ContentIndex = 0,
OutputIndex = index++,
Part = null!
};
yield return new(responseContentPartAdded, responseContentPartAdded.Type);

var responseOutputItemAddedResponse = new StreamingOutputItemAddedResponse(sequenceNumber++)
{
OutputIndex = index++,
Item = openAIResponseItem
};
yield return new(responseOutputItemAddedResponse, responseOutputItemAddedResponse.Type);
}

var responseOutputItemAdded = new StreamingOutputItemAddedResponse(sequenceNumber++)
lastResponseItem = openAIResponseItem;
var responseOutputTextDeltaResponse = new StreamingOutputTextDeltaResponse(sequenceNumber++)
{
OutputIndex = outputIndex++,
Item = openAIResponseItem
ItemId = chatMessage.MessageId,
ContentIndex = 0,
OutputIndex = index++,
Delta = chatMessage.Text
};
yield return new(responseOutputItemAdded, responseOutputItemAdded.Type);
yield return new(responseOutputTextDeltaResponse, responseOutputTextDeltaResponse.Type);

messageIdOutputIndexes[messageId] = index;
}
}

if (lastResponseItem is not null)
{
// we were streaming "response.output_item.added" before
// so we should complete it now via "response.output_item.done"
// here goes a sequence of completions for earlier started events:

// "response.output_text.delta" should be completed with "response.output_text.done"
var index = messageIdOutputIndexes[lastResponseItem.Id ?? "<null>"];
var responseOutputTextDeltaResponse = new StreamingOutputTextDoneResponse(sequenceNumber++)
{
ItemId = lastResponseItem.Id,
ContentIndex = 0,
OutputIndex = index++,
};
yield return new(responseOutputTextDeltaResponse, responseOutputTextDeltaResponse.Type);

// then "response.content_part.added" should be completed with "response.content_part.done"
var streamingContentPartDoneResponse = new StreamingContentPartDoneResponse(sequenceNumber++)
{
ItemId = lastResponseItem.Id,
ContentIndex = 0,
OutputIndex = index++,
};
yield return new(streamingContentPartDoneResponse, streamingContentPartDoneResponse.Type);

// then "response.output_item.added" should be completed with "response.output_item.done"
var responseOutputDoneAdded = new StreamingOutputItemDoneResponse(sequenceNumber++)
{
OutputIndex = outputIndex++,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ namespace Microsoft.Agents.AI.Hosting.OpenAI.Responses.Model;
[JsonPolymorphic(UnknownDerivedTypeHandling = JsonUnknownDerivedTypeHandling.FailSerialization)]
[JsonDerivedType(typeof(StreamingOutputItemAddedResponse), StreamingOutputItemAddedResponse.EventType)]
[JsonDerivedType(typeof(StreamingOutputItemDoneResponse), StreamingOutputItemDoneResponse.EventType)]
[JsonDerivedType(typeof(StreamingInProgressResponse), StreamingInProgressResponse.EventType)]
[JsonDerivedType(typeof(StreamingOutputTextDeltaResponse), StreamingOutputTextDeltaResponse.EventType)]
[JsonDerivedType(typeof(StreamingOutputTextDoneResponse), StreamingOutputTextDoneResponse.EventType)]
[JsonDerivedType(typeof(StreamingContentPartAddedResponse), StreamingContentPartAddedResponse.EventType)]
[JsonDerivedType(typeof(StreamingContentPartDoneResponse), StreamingContentPartDoneResponse.EventType)]
[JsonDerivedType(typeof(StreamingCreatedResponse), StreamingCreatedResponse.EventType)]
[JsonDerivedType(typeof(StreamingCompletedResponse), StreamingCompletedResponse.EventType)]
internal abstract class StreamingResponseEventBase
Expand Down Expand Up @@ -43,6 +48,106 @@ public StreamingResponseEventBase(string type, int sequenceNumber)
}
}

internal sealed class StreamingContentPartAddedResponse : StreamingResponseEventBase
{
public const string EventType = "response.content_part.added";

public StreamingContentPartAddedResponse(int sequenceNumber) : base(EventType, sequenceNumber)
{
}

[JsonPropertyName("content_index")]
public required int ContentIndex { get; set; }

[JsonPropertyName("item_id")]
public string? ItemId { get; set; }

[JsonPropertyName("output_index")]
public required int OutputIndex { get; set; }

[JsonPropertyName("part")]
public ResponseContentPart? Part { get; set; }
}

internal sealed class StreamingContentPartDoneResponse : StreamingResponseEventBase
{
public const string EventType = "response.content_part.done";

public StreamingContentPartDoneResponse(int sequenceNumber) : base(EventType, sequenceNumber)
{
}

[JsonPropertyName("content_index")]
public required int ContentIndex { get; set; }

[JsonPropertyName("item_id")]
public string? ItemId { get; set; }

[JsonPropertyName("output_index")]
public required int OutputIndex { get; set; }

[JsonPropertyName("part")]
public ResponseContentPart? Part { get; set; }
}

internal sealed class StreamingInProgressResponse : StreamingResponseEventBase
{
public const string EventType = "response.in_progress";

public StreamingInProgressResponse(int sequenceNumber) : base(EventType, sequenceNumber)
{
}

/// <summary>
/// Gets or sets the response item that was added to the output.
/// This contains the actual content or data produced by the AI agent.
/// </summary>
[JsonPropertyName("response")]
public required OpenAIResponse Response { get; set; }
}

internal sealed class StreamingOutputTextDeltaResponse : StreamingResponseEventBase
{
public const string EventType = "response.output_text.delta";

public StreamingOutputTextDeltaResponse(int sequenceNumber) : base(EventType, sequenceNumber)
{
}

[JsonPropertyName("content_index")]
public required int ContentIndex { get; set; }

[JsonPropertyName("item_id")]
public string? ItemId { get; set; }

[JsonPropertyName("output_index")]
public int OutputIndex { get; set; }

[JsonPropertyName("delta")]
public required string Delta { get; set; }
}

internal sealed class StreamingOutputTextDoneResponse : StreamingResponseEventBase
{
public const string EventType = "response.output_text.done";

public StreamingOutputTextDoneResponse(int sequenceNumber) : base(EventType, sequenceNumber)
{
}

[JsonPropertyName("content_index")]
public required int ContentIndex { get; set; }

[JsonPropertyName("item_id")]
public string? ItemId { get; set; }

[JsonPropertyName("output_index")]
public int OutputIndex { get; set; }

[JsonPropertyName("text")]
public string? Text { get; set; }
}

/// <summary>
/// Represents a streaming response event indicating that a new output item has been added to the response.
/// This event is sent when the AI agent produces a new piece of content during streaming.
Expand Down
Loading