Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 1 addition & 25 deletions src/Wolverine/Envelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
catch (Exception e)
{
throw new WolverineSerializationException(
$"Error trying to serialize message of type {Message.GetType().FullNameInCode()} with serializer {Serializer}", e);

Check warning on line 144 in src/Wolverine/Envelope.cs

View workflow job for this annotation

GitHub Actions / test

Dereference of a possibly null reference.
}
}

Expand Down Expand Up @@ -170,7 +170,7 @@
return _data;
}

throw new WolverineSerializationException($"No data or writer is known for this envelope of message type {_message.GetType().FullNameInCode()}");

Check warning on line 173 in src/Wolverine/Envelope.cs

View workflow job for this annotation

GitHub Actions / test

Dereference of a possibly null reference.
}

try
Expand All @@ -180,7 +180,7 @@
catch (Exception e)
{
throw new WolverineSerializationException(
$"Error trying to serialize message of type {Message.GetType().FullNameInCode()} with serializer {Serializer}", e);

Check warning on line 183 in src/Wolverine/Envelope.cs

View workflow job for this annotation

GitHub Actions / test

Dereference of a possibly null reference.
}

return _data;
Expand Down Expand Up @@ -292,34 +292,10 @@
/// </summary>
public string? TenantId { get; set; }

private string?[] _acceptedContentTypes = ["application/json"];
private string? _acceptedContentTypesJoined;

/// <summary>
/// Specifies the accepted content types for the requested reply
/// </summary>
public string?[] AcceptedContentTypes
{
get => _acceptedContentTypes;
set
{
_acceptedContentTypes = value;
_acceptedContentTypesJoined = null; // Invalidate cache
}
}

/// <summary>
/// Returns the AcceptedContentTypes as a comma-separated string.
/// This value is cached to avoid repeated allocations during serialization.
/// </summary>
internal string? AcceptedContentTypesJoined
{
get
{
if (_acceptedContentTypes.Length == 0) return null;
return _acceptedContentTypesJoined ??= string.Join(",", _acceptedContentTypes);
}
}
public string?[] AcceptedContentTypes { get; set; } = ["application/json"];

/// <summary>
/// Specific message id for this envelope
Expand Down
133 changes: 29 additions & 104 deletions src/Wolverine/Runtime/Serialization/EnvelopeSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,41 +1,10 @@
using System.Buffers;
using System.Globalization;
using System.Xml;

namespace Wolverine.Runtime.Serialization;

public static class EnvelopeSerializer
{
// Initial buffer size - will grow as needed
private const int InitialBufferSize = 4096;

// Thread-local buffer to avoid repeated rentals in tight loops
[ThreadStatic]
private static byte[]? t_buffer;

private static byte[] RentBuffer(int minimumSize)
{
var buffer = t_buffer;
if (buffer != null && buffer.Length >= minimumSize)
{
t_buffer = null;
return buffer;
}
return ArrayPool<byte>.Shared.Rent(Math.Max(minimumSize, InitialBufferSize));
}

private static void ReturnBuffer(byte[] buffer)
{
if (buffer.Length <= InitialBufferSize * 4)
{
t_buffer = buffer;
}
else
{
ArrayPool<byte>.Shared.Return(buffer);
}
}

public static void ReadDataElement(Envelope env, string key, string value)
{
try
Expand Down Expand Up @@ -234,89 +203,42 @@ private static Envelope readSingle(BinaryReader br)

public static byte[] Serialize(IList<Envelope> messages)
{
// Estimate size: 4 bytes for count + ~500 bytes per message average
var estimatedSize = 4 + (messages.Count * 500);
var buffer = RentBuffer(estimatedSize);
try
{
using var stream = new MemoryStream(buffer, 0, buffer.Length, writable: true, publiclyVisible: true);
using var writer = new BinaryWriter(stream);
writer.Write(messages.Count);
foreach (var message in messages) writeSingle(writer, message);
writer.Flush();

var length = (int)stream.Position;
var result = new byte[length];
Buffer.BlockCopy(buffer, 0, result, 0, length);
return result;
}
catch (NotSupportedException)
{
// Buffer was too small, fall back to expandable stream
ReturnBuffer(buffer);
using var stream = new MemoryStream();
using var writer = new BinaryWriter(stream);
writer.Write(messages.Count);
foreach (var message in messages) writeSingle(writer, message);
writer.Flush();
return stream.ToArray();
}
finally
{
ReturnBuffer(buffer);
}
using var stream = new MemoryStream();
using var writer = new BinaryWriter(stream);
writer.Write(messages.Count);
foreach (var message in messages) writeSingle(writer, message);
writer.Flush();
return stream.ToArray();
}

public static byte[] Serialize(Envelope env)
{
// Estimate size based on data length + headers (~200 bytes overhead)
var dataLength = env.Data?.Length ?? 0;
var estimatedSize = dataLength + 512;
var buffer = RentBuffer(estimatedSize);
try
{
using var stream = new MemoryStream(buffer, 0, buffer.Length, writable: true, publiclyVisible: true);
using var writer = new BinaryWriter(stream);
writeSingle(writer, env);
writer.Flush();

var length = (int)stream.Position;
var result = new byte[length];
Buffer.BlockCopy(buffer, 0, result, 0, length);
return result;
}
catch (NotSupportedException)
{
// Buffer was too small, fall back to expandable stream
ReturnBuffer(buffer);
using var stream = new MemoryStream();
using var writer = new BinaryWriter(stream);
writeSingle(writer, env);
writer.Flush();
return stream.ToArray();
}
finally
{
ReturnBuffer(buffer);
}
using var stream = new MemoryStream();
using var writer = new BinaryWriter(stream);
writeSingle(writer, env);
writer.Flush();
return stream.ToArray();
}

private static void writeSingle(BinaryWriter writer, Envelope env)
{
writer.Write(env.SentAt.UtcDateTime.ToBinary());

// Write placeholder for header count, remember position
var countPosition = writer.BaseStream.Position;
writer.Write(0); // placeholder
writer.Flush();

using (var headerData = new MemoryStream())
{
using (var headerWriter = new BinaryWriter(headerData))
{
var count = writeHeaders(headerWriter, env);
headerWriter.Flush();

// Write headers directly to the stream
var count = writeHeaders(writer, env);
writer.Write(count);

// Go back and write the actual count
var currentPosition = writer.BaseStream.Position;
writer.BaseStream.Position = countPosition;
writer.Write(count);
writer.BaseStream.Position = currentPosition;
headerData.Position = 0;
headerData.CopyTo(writer.BaseStream);
}
}

writer.Write(env.Data!.Length);
writer.Write(env.Data);
Expand All @@ -339,8 +261,11 @@ private static int writeHeaders(BinaryWriter writer, Envelope env)
writer.WriteProp(ref count, EnvelopeConstants.TopicNameKey, env.TopicName);


// Use cached joined string to avoid allocation on every serialization
writer.WriteProp(ref count, EnvelopeConstants.AcceptedContentTypesKey, env.AcceptedContentTypesJoined);
if (env.AcceptedContentTypes.Length != 0)
{
writer.WriteProp(ref count, EnvelopeConstants.AcceptedContentTypesKey,
string.Join(",", env.AcceptedContentTypes));
}

writer.WriteProp(ref count, EnvelopeConstants.IdKey, env.Id);
writer.WriteProp(ref count, EnvelopeConstants.ReplyRequestedKey, env.ReplyRequested);
Expand Down
Loading