Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial sketch of an SSE JSON serializer helper. #5557

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eng/Versions.props
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
<SystemSecurityCryptographyPkcsVersion>9.0.0-rc.2.24473.5</SystemSecurityCryptographyPkcsVersion>
<SystemSecurityCryptographyXmlVersion>9.0.0-rc.2.24473.5</SystemSecurityCryptographyXmlVersion>
<SystemTextEncodingsWebVersion>9.0.0-rc.2.24473.5</SystemTextEncodingsWebVersion>
<SystemNetServerSentEventsVersion>9.0.0-rc.2.24473.5</SystemNetServerSentEventsVersion>
<SystemNumericsTensorsVersion>9.0.0-rc.2.24473.5</SystemNumericsTensorsVersion>
<SystemTextJsonVersion>9.0.0-rc.2.24473.5</SystemTextJsonVersion>
<!-- Dependencies from https://github.com/aspnet/AspNetCore -->
Expand Down
1 change: 1 addition & 0 deletions eng/packages/TestOnly.props
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<PackageVersion Include="Polly.Testing" Version="8.4.2" />
<PackageVersion Include="StrongNamer" Version="0.2.5" />
<PackageVersion Include="System.Configuration.ConfigurationManager" Version="$(SystemConfigurationConfigurationManagerVersion)" />
<PackageVersion Include="System.Net.ServerSentEvents" Version="$(SystemNetServerSentEventsVersion)" />
<PackageVersion Include="System.Numerics.Tensors" Version="$(SystemNumericsTensorsVersion)" />
<PackageVersion Include="Verify.Xunit" Version="20.4.0" />
<PackageVersion Include="Xunit.Combinatorial" Version="1.6.24" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<PropertyGroup>
<InjectSharedEmptyCollections>true</InjectSharedEmptyCollections>
<InjectStringSyntaxAttributeOnLegacy>true</InjectStringSyntaxAttributeOnLegacy>
<InjectSharedBufferWriterPool>true</InjectSharedBufferWriterPool>
<DisableMicrosoftExtensionsLoggingSourceGenerator>false</DisableMicrosoftExtensionsLoggingSourceGenerator>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization.Metadata;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Shared.Diagnostics;
using Microsoft.Shared.Pools;

#pragma warning disable SA1114 // Parameter list should follow declaration

namespace Microsoft.Extensions.AI;

public static partial class AIJsonUtilities
{
private static readonly byte[] _sseEventFieldPrefix = "event: "u8.ToArray();
private static readonly byte[] _sseDataFieldPrefix = "data: "u8.ToArray();
private static readonly byte[] _sseIdFieldPrefix = "id: "u8.ToArray();
private static readonly byte[] _sseLineBreak = Encoding.UTF8.GetBytes(Environment.NewLine);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the OS-specific line break felt appropriate given that all variants are supported by the spec.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most implementations I've seen always use '\n' regardless of OS.


/// <summary>
/// Serializes the specified server-sent events to the provided stream as JSON data.
/// </summary>
/// <typeparam name="T">Specifies the type of data payload in the event.</typeparam>
/// <param name="stream">The UTF-8 stream to write the server-sent events to.</param>
/// <param name="sseEvents">The events to serialize to the stream.</param>
/// <param name="options">The options configuring serialization.</param>
/// <param name="cancellationToken">The token taht can be used to cancel the write operation.</param>
/// <returns>A task representing the asynchronous write operation.</returns>
public static async ValueTask SerializeAsSseAsync<T>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want this as a public API, I'd rather it be in System.Net.ServerSentEvents. For now it can be a non-public implementation detail from anything that needs to use it. I don't think we should be exposing this publicly from M.E.AI.

Copy link
Member Author

@eiriktsarpalis eiriktsarpalis Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now it can be a non-public implementation detail from anything that needs to use it.

Do you have any particular use case in mind? Like a specific streamer for IAsyncEnumerable<StreamingChatCompletionUpdate> or something else?

I don't think we should be exposing this publicly from M.E.AI.

Should I file a proposal in runtime following this shape?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any particular use case in mind? Like a specific streamer for IAsyncEnumerable or something else?

The use case I've been talking about all along, being able to write out the M.E.AI object model as an OpenAI-compatible response, both non-streaming and streaming varieties (this case being the latter).

Should I file a proposal in runtime following this shape?

Sure. But in runtime ideally I'd want it to be something ASP.NET would rely on, so it'd be good to understand what its needs would be.

Stream stream,
IAsyncEnumerable<SseEvent<T>> sseEvents,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider an overload accepting IASyncEnumerable<T>?

JsonSerializerOptions? options = null,
CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(stream);
_ = Throw.IfNull(sseEvents);

options ??= DefaultOptions;
options.MakeReadOnly();
var typeInfo = (JsonTypeInfo<T>)options.GetTypeInfo(typeof(T));

BufferWriter<byte> bufferWriter = BufferWriterPool.SharedBufferWriterPool.Get();

try
{
// Build a custom Utf8JsonWriter that ignores indentation configuration from JsonSerializerOptions.
using Utf8JsonWriter writer = new(bufferWriter);

await foreach (var sseEvent in sseEvents.WithCancellation(cancellationToken).ConfigureAwait(false))
{
JsonSerializer.Serialize(writer, sseEvent.Data, typeInfo);
#pragma warning disable CA1849 // Call async methods when in an async method
writer.Flush();
#pragma warning restore CA1849 // Call async methods when in an async method
Debug.Assert(bufferWriter.WrittenSpan.IndexOf((byte)'\n') == -1, "The buffer writer should not contain any newline characters.");

if (sseEvent.EventType is { } eventType)
{
await stream.WriteAsync(_sseEventFieldPrefix, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(Encoding.UTF8.GetBytes(eventType), cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(_sseLineBreak, cancellationToken).ConfigureAwait(false);
}

await stream.WriteAsync(_sseDataFieldPrefix, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(
#if NET
bufferWriter.WrittenMemory,
#else
bufferWriter.WrittenMemory.ToArray(),
#endif
cancellationToken).ConfigureAwait(false);

await stream.WriteAsync(_sseLineBreak, cancellationToken).ConfigureAwait(false);

if (sseEvent.Id is { } id)
{
await stream.WriteAsync(_sseIdFieldPrefix, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(Encoding.UTF8.GetBytes(id), cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(_sseLineBreak, cancellationToken).ConfigureAwait(false);
}

await stream.WriteAsync(_sseLineBreak, cancellationToken).ConfigureAwait(false);

bufferWriter.Reset();
writer.Reset();
}
}
finally
{
BufferWriterPool.SharedBufferWriterPool.Return(bufferWriter);
}
}

#if !NET
private static Task WriteAsync(this Stream stream, byte[] buffer, CancellationToken cancellationToken = default)
=> stream.WriteAsync(buffer, 0, buffer.Length, cancellationToken);
#endif
}
28 changes: 28 additions & 0 deletions src/Libraries/Microsoft.Extensions.AI/Utilities/SseEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

#pragma warning disable SA1114 // Parameter list should follow declaration
#pragma warning disable CA1815 // Override equals and operator equals on value types

namespace Microsoft.Extensions.AI;

/// <summary>Represents a server-sent event.</summary>
/// <typeparam name="T">Specifies the type of data payload in the event.</typeparam>
public readonly struct SseEvent<T>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This largely replicates the SseItem<T> type present in System.Net.ServerSentEvents. I feel we might be able to avoid duplication but that would require taking a dependency on one more NuGet package.

{
/// <summary>Initializes a new instance of the <see cref="SseEvent{T}"/> struct.</summary>
/// <param name="data">The event's payload.</param>
public SseEvent(T data)
{
Data = data;
}

/// <summary>Gets the event's payload.</summary>
public T Data { get; }

/// <summary>Gets the event's type.</summary>
public string? EventType { get; init; }

/// <summary>Gets the event's identifier.</summary>
public string? Id { get; init; }
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
using System.Net.ServerSentEvents;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using Xunit;

#pragma warning disable S2197 // Modulus results should not be checked for direct equality

namespace Microsoft.Extensions.AI;

public static class AIJsonUtilitiesTests
Expand Down Expand Up @@ -142,4 +149,81 @@ public enum MyEnumValue
A = 1,
B = 2
}

[Fact]
public static async Task SerializeAsSseAsync_HasExpectedOutput()
{
using MemoryStream stream = new();
await AIJsonUtilities.SerializeAsSseAsync(stream, CreateEvents());
string output = Encoding.UTF8.GetString(stream.ToArray());

Assert.Equal("""
data: {"value":1}

event: eventType1
data: {"value":2}

data: {"value":3}
id: 3

event: eventType2
data: {"value":4}
id: 4


""",
output);

static async IAsyncEnumerable<SseEvent<SseValue>> CreateEvents()
{
yield return new SseEvent<SseValue>(new SseValue(1));
yield return new SseEvent<SseValue>(new SseValue(2)) { EventType = "eventType1" };
await Task.CompletedTask;
yield return new SseEvent<SseValue>(new SseValue(3)) { Id = "3" };
yield return new SseEvent<SseValue>(new SseValue(4)) { Id = "4", EventType = "eventType2" };
}
}

[Fact]
public static async Task SerializeAsSseAsync_CanRoundtripValues()
{
using MemoryStream stream = new();
await AIJsonUtilities.SerializeAsSseAsync(stream, CreateEvents(100), cancellationToken: default);
stream.Position = 0;

var parser = SseParser.Create(stream, (_, data) => JsonSerializer.Deserialize<SseValue>(data, AIJsonUtilities.DefaultOptions)!);
string expectedLastEventId = "";
int i = 0;

foreach (var parsedEvent in parser.Enumerate())
{
Assert.NotNull(parsedEvent.Data);
Assert.Equal(i, parsedEvent.Data.Value);
Assert.Equal((i % 7) switch { 3 => "A", 5 => "B", _ => "message" }, parsedEvent.EventType);

if (i % 10 == 9)
{
expectedLastEventId = i.ToString();
}

Assert.Equal(expectedLastEventId, parser.LastEventId);
i++;
}

static async IAsyncEnumerable<SseEvent<SseValue>> CreateEvents(int count)
{
for (int i = 0; i < count; i++)
{
yield return new SseEvent<SseValue>(new SseValue(i))
{
Id = i % 10 == 9 ? i.ToString() : null,
EventType = (i % 7) switch { 3 => "A", 5 => "B", _ => null },
};
}

await Task.CompletedTask;
}
}

public record SseValue(int Value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

<ItemGroup>
<PackageReference Include="System.Memory.Data" />
<PackageReference Include="System.Net.ServerSentEvents" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading