Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System.Text;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.ComplianceTests;
using Wolverine.RabbitMQ.Internal;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Wolverine.Util;
using Xunit;

namespace Wolverine.RabbitMQ.Tests;

public class masstransit_interop_serializer_on_retry
{
// Reproduces the MassTransit interop "double path" bug: UseMassTransitInterop
// registers its serializer only on the listener endpoint. At first receipt the
// envelope carries that serializer, but on a replay (scheduled retry, durable
// recovery) the runtime-only Serializer reference is gone. The deserialization
// path then has to re-resolve the serializer, and resolving from the global
// content-type registry (which never saw "application/vnd.masstransit+json")
// falls back to the default JSON serializer. That deserializes the *un-unwrapped*
// MassTransit envelope root, so every field on the real message comes back as a
// default value (here, OrderId 0).
[Fact]
public async Task replayed_masstransit_envelope_is_unwrapped_on_the_retry_path()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.DisableConventionalDiscovery();
opts.IncludeType<OrderPlacedHandler>();

// Bogus host name on purpose: stubbed transports must never connect.
opts.UseRabbitMq(x => x.HostName = Guid.NewGuid().ToString());
opts.ListenToRabbitQueue("orders").UseMassTransitInterop();

opts.StubAllExternalTransports();
}).StartAsync();

var runtime = host.GetRuntime();

var endpoint = runtime.Endpoints.EndpointFor(new Uri("rabbitmq://queue/orders"))
.ShouldNotBeNull()
.ShouldBeAssignableTo<RabbitMqEndpoint>();

// Building the envelope mapper is what applies UseMassTransitInterop and wires
// the MassTransit serializer onto the endpoint. A live listener does this at
// startup; StubAllExternalTransports() skips listener startup, so trigger it
// here to reach the same post-startup endpoint state.
endpoint.BuildMapper(runtime);

// The wire payload a MassTransit producer actually sends: the real message
// lives under "message", not at the root.
var json = $$"""
{
"messageId": "{{Guid.NewGuid()}}",
"messageType": ["urn:message:Orders:OrderPlaced"],
"message": { "orderId": 92883 }
}
""";

// Rebuild the envelope the way durable storage hands it back on a retry:
// ContentType + Destination + Data are persisted, but Serializer is not.
var replayed = new Envelope
{
Data = Encoding.UTF8.GetBytes(json),
ContentType = "application/vnd.masstransit+json",
MessageType = typeof(OrderPlaced).ToMessageTypeName(),
Destination = endpoint.Uri
};

// The replay precondition: no Serializer is attached, so resolution must go
// through serializerFor(envelope) — the code path under test.
replayed.Serializer.ShouldBeNull();

var continuation = await runtime.Pipeline.TryDeserializeEnvelope(replayed);

// A clean deserialization yields NullContinuation. Asserting it here surfaces a
// deserialization failure (MoveToErrorQueue) directly instead of as a confusing
// null Message on the next assertion.
continuation.ShouldBeOfType<NullContinuation>();

replayed.Message.ShouldBeOfType<OrderPlaced>()
.OrderId.ShouldBe(92883);
}

public record OrderPlaced(int OrderId);

public class OrderPlacedHandler
{
// Presence registers OrderPlaced in the HandlerGraph so the pipeline can
// resolve its message type during deserialization.
public void Handle(OrderPlaced order)
{
}
}
}
39 changes: 36 additions & 3 deletions src/Wolverine/Runtime/HandlerPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ internal HandlerPipeline(WolverineRuntime runtime, IExecutorFactory executorFact

_executors = new LightweightCache<Type, IExecutor>(executorFactory.BuildFor);
}

internal HandlerPipeline(WolverineRuntime runtime, IExecutorFactory executorFactory, Endpoint endpoint)
{
_graph = runtime.Handlers;
Expand Down Expand Up @@ -133,7 +133,7 @@ public async ValueTask<IContinuation> TryDeserializeEnvelope(Envelope envelope)
return new MoveToErrorQueue(new EncryptionPolicyViolationException(envelope));
}

var serializer = envelope.Serializer ?? _runtime.Options.DetermineSerializer(envelope);
var serializer = envelope.Serializer ?? serializerFor(envelope);
serializer.UnwrapEnvelopeIfNecessary(envelope);

if (envelope.Data == null)
Expand Down Expand Up @@ -190,6 +190,39 @@ public async ValueTask<IContinuation> TryDeserializeEnvelope(Envelope envelope)
}
}

// Resolve the serializer for an envelope whose runtime-only Serializer reference
// is no longer set. This happens on replay paths (scheduled retry, durable
// recovery) where the envelope is rehydrated from storage: ContentType and
// Destination survive, but Serializer does not. Resolve from the originating
// endpoint first so endpoint-scoped serializers are honored on replay exactly
// as they are at first receipt. This matters most for the MassTransit and
// NServiceBus interop serializers wired by UseMassTransitInterop() /
// UseNServiceBusInterop(): they register only on the listener endpoint, never in
// the global content-type registry. Without this, DetermineSerializer falls back
// to the default JSON serializer, which deserializes the un-unwrapped interop
// envelope root and yields an all-default message. Mirrors DeadLetterEnvelope.TryReadData.
private IMessageSerializer serializerFor(Envelope envelope)
{
if (envelope.ContentType.IsNotEmpty())
{
// _endpoint is set on per-listener pipelines and is the most reliable
// source; fall back to the persisted Destination for the global pipeline.
var endpoint = _endpoint ?? endpointFor(envelope.Destination);
var serializer = endpoint?.TryFindSerializer(envelope.ContentType);
if (serializer != null)
{
return serializer;
}
}

return _runtime.Options.DetermineSerializer(envelope);
}

private Endpoint? endpointFor(Uri? destination)
{
return destination == null ? null : _runtime.Endpoints.EndpointFor(destination);
}

private bool RequiresEncryption(Envelope envelope)
{
var options = _runtime.Options;
Expand All @@ -216,7 +249,7 @@ private async Task<IContinuation> executeAsync(MessageContext context, Envelope
if (envelope.Message == null)
{
var deserializationResult = await TryDeserializeEnvelope(envelope).ConfigureAwait(false);
if(deserializationResult is not NullContinuation)
if (deserializationResult is not NullContinuation)
{
activity?.SetStatus(ActivityStatusCode.Error, "Serialization Failure");
return deserializationResult;
Expand Down
Loading