Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/Http/Wolverine.Http/Transport/HttpTransportExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,17 @@ public async Task<IResult> InvokeAsync(HttpContext httpContext)
envelope.Destination = $"http://localhost{httpContext.Request.Path}".ToUri();
envelope.DoNotCascadeResponse = true;
envelope.Serializer = _runtime.Options.FindSerializer(envelope.ContentType);

if (!_runtime.Pipeline.TryDeserializeEnvelope(envelope, out var continuation))

var deserializeResult = await _runtime.Pipeline.TryDeserializeEnvelope(envelope);

if (deserializeResult != NullContinuation.Instance)
{
if (continuation is NoHandlerContinuation)
if (deserializeResult is NoHandlerContinuation)
{
return Results.Problem($"No handler for the requested message type {envelope.MessageType}", statusCode:400);
}

if (continuation is MoveToErrorQueue move)
if (deserializeResult is MoveToErrorQueue move)
{
_logger.LogError(move.Exception, "Error executing message of type {MessageType}", envelope.MessageType);
return Results.Problem($"Execution error for requested message type {envelope.MessageType}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public async Task<bool> PingAsync()

public async ValueTask SendAsync(Envelope envelope)
{
var message = _topic.Mapper.CreateMessage(envelope);
var message = await _topic.Mapper.CreateMessage(envelope);

await _producer.ProduceAsync(envelope.TopicName ?? _topic.TopicName, message);
_producer.Flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task SendBatchAsync(ISenderCallback callback, OutgoingMessageBatch
{
// TODO -- separate try/catch here!

var message = _topic.Mapper.CreateMessage(envelope);
var message = await _topic.Mapper.CreateMessage(envelope);
await _producer.ProduceAsync(envelope.TopicName ?? _topic.TopicName, message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,13 @@ internal static Envelope CreateEnvelope(this IKafkaEnvelopeMapper mapper, string
return envelope;
}

internal static Message<string, byte[]> CreateMessage(this IKafkaEnvelopeMapper mapper, Envelope envelope)
internal static async ValueTask<Message<string, byte[]>> CreateMessage(this IKafkaEnvelopeMapper mapper, Envelope envelope)
{
var data = await envelope.GetDataAsync();
var message = new Message<string, byte[]>
{
Key = !string.IsNullOrEmpty(envelope.PartitionKey) ? envelope.PartitionKey : envelope.Id.ToString(),
Value = envelope.Data,
Value = data,
Headers = new Headers()
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ namespace Wolverine.RabbitMQ.Internal;
internal class RabbitMqSender : RabbitMqChannelAgent, ISender
{
private readonly RabbitMqEndpoint _endpoint;
private readonly RoutingMode _routingType;
private readonly IWolverineRuntime _runtime;
private readonly CachedString _exchangeName;
private readonly bool _isDurable;
private readonly string _key;
private readonly CachedString _routingKey;
private readonly IRabbitMqEnvelopeMapper _mapper;
private readonly Func<Envelope, CachedString> _toRoutingKey;

public RabbitMqSender(RabbitMqEndpoint endpoint, RabbitMqTransport transport,
RoutingMode routingType, IWolverineRuntime runtime) : base(
Expand All @@ -26,29 +27,40 @@ public RabbitMqSender(RabbitMqEndpoint endpoint, RabbitMqTransport transport,
_isDurable = endpoint.Mode == EndpointMode.Durable;

_exchangeName = new CachedString(endpoint.ExchangeName);
_key = endpoint.RoutingKey();

_toRoutingKey = routingType == RoutingMode.Static ? _ => new CachedString(_key) : x =>

if (routingType == RoutingMode.Static)
{
if (x.TopicName.IsEmpty() && x.Message == null)
{
try
{
runtime.Pipeline.TryDeserializeEnvelope(x, out var _);
}
catch (Exception e)
{
Logger.LogError(e, "Error trying to deserialize an envelope in order to determine the topic name");
}
}

return new CachedString(TopicRouting.DetermineTopicName(x));
};
_routingKey = new CachedString(endpoint.RoutingKey());
}

_mapper = endpoint.BuildMapper(runtime);
_endpoint = endpoint;
_routingType = routingType;
_runtime = runtime;
}

private async ValueTask<CachedString> ToRoutingKeyAsync(Envelope envelope)
{
if(_routingKey != null)
{
return _routingKey;
}

if (envelope.TopicName.IsEmpty() && envelope.Message == null)
{
try
{
await _runtime.Pipeline.TryDeserializeEnvelope(envelope);
}
catch (Exception e)
{
Logger.LogError(e, "Error trying to deserialize an envelope in order to determine the topic name");
}
}
return new CachedString(TopicRouting.DetermineTopicName(envelope));
}


public bool SupportsNativeScheduledSend => false;
public Uri Destination { get; }

Expand All @@ -75,7 +87,7 @@ public async ValueTask SendAsync(Envelope envelope)

_mapper.MapEnvelopeToOutgoing(envelope, props);

var routingKey = _toRoutingKey(envelope);
var routingKey = await ToRoutingKeyAsync(envelope);
await Channel.BasicPublishAsync(_exchangeName, routingKey, false, props, envelope.Data);
}

Expand Down
40 changes: 35 additions & 5 deletions src/Wolverine/Envelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using JasperFx.MultiTenancy;
using MassTransit;
using Wolverine.Attributes;
using Wolverine.Runtime.Serialization;
using Wolverine.Util;

namespace Wolverine;
Expand Down Expand Up @@ -120,6 +121,30 @@ public TimeSpan? ScheduleDelay
get => _scheduleDelay;
}

public async ValueTask<byte[]?> GetDataAsync()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is additive change GetDataAsync can be executed and it fallback to Data

{
if (_data != null)
{
return _data;
}
AssertMessage();

if(Serializer is IAsyncMessageSerializer asyncMessaeSerializer)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use only when serializer support async

{
try
{
_data = await asyncMessaeSerializer.WriteAsync(this);
}
catch (Exception e)
{
throw new WolverineSerializationException(
$"Error trying to serialize message of type {Message.GetType().FullNameInCode()} with serializer {Serializer}", e);
}
}

return Data;
}

/// <summary>
/// The raw, serialized message data
/// </summary>
Expand All @@ -132,10 +157,7 @@ public byte[]? Data
return _data;
}

if (_message == null)
{
throw new WolverineSerializationException($"Cannot ensure data is present when there is no message. The Message Type Name is '{MessageType}'");
}
AssertMessage();

if (Serializer == null)
{
Expand All @@ -144,7 +166,7 @@ public byte[]? Data
_data = serializable.Write();
return _data;
}

throw new WolverineSerializationException($"No data or writer is known for this envelope of message type {_message.GetType().FullNameInCode()}");
}

Expand All @@ -163,6 +185,14 @@ public byte[]? Data
set => _data = value;
}

private void AssertMessage()
{
if (_message == null)
{
throw new WolverineSerializationException($"Cannot ensure data is present when there is no message. The Message Type Name is '{MessageType}'");
}
}

internal int? MessagePayloadSize => _data?.Length;

/// <summary>
Expand Down
38 changes: 21 additions & 17 deletions src/Wolverine/Runtime/HandlerPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Wolverine.ErrorHandling;
using Wolverine.Logging;
using Wolverine.Runtime.Handlers;
using Wolverine.Runtime.Serialization;
using Wolverine.Transports;

namespace Wolverine.Runtime;
Expand Down Expand Up @@ -106,7 +107,7 @@ public async Task InvokeAsync(Envelope envelope, IChannelCallback channel, Activ
}
}

public bool TryDeserializeEnvelope(Envelope envelope, out IContinuation continuation)
public async ValueTask<IContinuation> TryDeserializeEnvelope(Envelope envelope)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method signature is changing - for me it is even better because it is more function like - one result instead two (bool and IContinuation)

{
// Try to deserialize
try
Expand All @@ -127,49 +128,52 @@ public bool TryDeserializeEnvelope(Envelope envelope, out IContinuation continua

if (_graph.TryFindMessageType(envelope.MessageType, out var messageType))
{
envelope.Message = serializer.ReadFromData(messageType, envelope);
if (serializer is IAsyncMessageSerializer asyncMessageSerializer)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use async if it implement this

{
envelope.Message = await asyncMessageSerializer.ReadFromDataAsync(messageType, envelope);
}
else
{
envelope.Message = serializer.ReadFromData(messageType, envelope);
}
}
else
{
continuation = new NoHandlerContinuation(_runtime.MissingHandlers(), _runtime);
return false;
return new NoHandlerContinuation(_runtime.MissingHandlers(), _runtime);
}

if (envelope.Message == null)
{
continuation = new MoveToErrorQueue(new InvalidOperationException(
return new MoveToErrorQueue(new InvalidOperationException(
"No message body could be de-serialized from the raw data in this envelope"));

return false;
}

continuation = NullContinuation.Instance;
return true;
return NullContinuation.Instance;
}
catch (Exception? e)
{
continuation = new MoveToErrorQueue(e);
return false;
return new MoveToErrorQueue(e);
}
finally
{
Logger.Received(envelope);
}
}

private Task<IContinuation> executeAsync(MessageContext context, Envelope envelope, Activity? activity)
private async Task<IContinuation> executeAsync(MessageContext context, Envelope envelope, Activity? activity)
{
if (envelope.IsExpired())
{
return Task.FromResult<IContinuation>(DiscardEnvelope.Instance);
return DiscardEnvelope.Instance;
}

if (envelope.Message == null)
{
if (!TryDeserializeEnvelope(envelope, out var serializationError))
var deserializationResult = await TryDeserializeEnvelope(envelope);
if(deserializationResult != NullContinuation.Instance)
{
activity?.SetStatus(ActivityStatusCode.Error, "Serialization Failure");
return Task.FromResult(serializationError);
return deserializationResult;
}
}
else
Expand All @@ -180,11 +184,11 @@ private Task<IContinuation> executeAsync(MessageContext context, Envelope envelo
if (envelope.IsResponse)
{
_runtime.Replies.Complete(envelope);
return Task.FromResult<IContinuation>(MessageSucceededContinuation.Instance);
return MessageSucceededContinuation.Instance;
}

var executor = _executors[envelope.Message!.GetType()];

return executor.ExecuteAsync(context, _cancellation);
return await executor.ExecuteAsync(context, _cancellation);
}
}
2 changes: 1 addition & 1 deletion src/Wolverine/Runtime/IHandlerPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ public interface IHandlerPipeline
{
Task InvokeAsync(Envelope envelope, IChannelCallback channel);
Task InvokeAsync(Envelope envelope, IChannelCallback channel, Activity activity);
bool TryDeserializeEnvelope(Envelope envelope, out IContinuation continuation);
ValueTask<IContinuation> TryDeserializeEnvelope(Envelope envelope);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only one potential breaking change but I'm wonder if this interface should be public? Maybe it should be internal or nobody is executing this so event public we don't break anything

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be part of the public interface. It's just a detail of the HandlerPipeline details

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeremydmiller So I think it should be no problem with change :) As a note in our company we use MS https://www.nuget.org/packages/Microsoft.CodeAnalysis.PublicApiAnalyzers/ to keep public API stable. It reminds to keep internal things (when things are internal your life is easier because analyser is not screaming in your face :) ). You have to use to this analyser especially when adding new API but tooling is quite good (especially "use this for whole project/solution" so you can add new API's in one click.

}
10 changes: 10 additions & 0 deletions src/Wolverine/Runtime/Serialization/IMessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,14 @@ public interface IMessageSerializer
object ReadFromData(byte[] data);

byte[] WriteMessage(object message);
}

/// <summary>
/// Async version of <seealso cref="IMessageSerializer"/>
/// </summary>
public interface IAsyncMessageSerializer : IMessageSerializer
Copy link
Contributor Author

@dominikjeske dominikjeske Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We extend standard serializer so nothing is changing in code - envelope serializer type is still IMessageSerializer

{
ValueTask<byte[]> WriteAsync(Envelope envelope);

ValueTask<object?> ReadFromDataAsync(Type messageType, Envelope envelope);
}
Loading