diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/masstransit_interop_serializer_on_retry.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/masstransit_interop_serializer_on_retry.cs new file mode 100644 index 000000000..c7efb60ab --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/masstransit_interop_serializer_on_retry.cs @@ -0,0 +1,97 @@ +using System.Text; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.ComplianceTests; +using Wolverine.RabbitMQ.Internal; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Wolverine.Util; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests; + +public class masstransit_interop_serializer_on_retry +{ + // Reproduces the MassTransit interop "double path" bug: UseMassTransitInterop + // registers its serializer only on the listener endpoint. At first receipt the + // envelope carries that serializer, but on a replay (scheduled retry, durable + // recovery) the runtime-only Serializer reference is gone. The deserialization + // path then has to re-resolve the serializer, and resolving from the global + // content-type registry (which never saw "application/vnd.masstransit+json") + // falls back to the default JSON serializer. That deserializes the *un-unwrapped* + // MassTransit envelope root, so every field on the real message comes back as a + // default value (here, OrderId 0). + [Fact] + public async Task replayed_masstransit_envelope_is_unwrapped_on_the_retry_path() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.DisableConventionalDiscovery(); + opts.IncludeType(); + + // Bogus host name on purpose: stubbed transports must never connect. + opts.UseRabbitMq(x => x.HostName = Guid.NewGuid().ToString()); + opts.ListenToRabbitQueue("orders").UseMassTransitInterop(); + + opts.StubAllExternalTransports(); + }).StartAsync(); + + var runtime = host.GetRuntime(); + + var endpoint = runtime.Endpoints.EndpointFor(new Uri("rabbitmq://queue/orders")) + .ShouldNotBeNull() + .ShouldBeAssignableTo(); + + // Building the envelope mapper is what applies UseMassTransitInterop and wires + // the MassTransit serializer onto the endpoint. A live listener does this at + // startup; StubAllExternalTransports() skips listener startup, so trigger it + // here to reach the same post-startup endpoint state. + endpoint.BuildMapper(runtime); + + // The wire payload a MassTransit producer actually sends: the real message + // lives under "message", not at the root. + var json = $$""" + { + "messageId": "{{Guid.NewGuid()}}", + "messageType": ["urn:message:Orders:OrderPlaced"], + "message": { "orderId": 92883 } + } + """; + + // Rebuild the envelope the way durable storage hands it back on a retry: + // ContentType + Destination + Data are persisted, but Serializer is not. + var replayed = new Envelope + { + Data = Encoding.UTF8.GetBytes(json), + ContentType = "application/vnd.masstransit+json", + MessageType = typeof(OrderPlaced).ToMessageTypeName(), + Destination = endpoint.Uri + }; + + // The replay precondition: no Serializer is attached, so resolution must go + // through serializerFor(envelope) — the code path under test. + replayed.Serializer.ShouldBeNull(); + + var continuation = await runtime.Pipeline.TryDeserializeEnvelope(replayed); + + // A clean deserialization yields NullContinuation. Asserting it here surfaces a + // deserialization failure (MoveToErrorQueue) directly instead of as a confusing + // null Message on the next assertion. + continuation.ShouldBeOfType(); + + replayed.Message.ShouldBeOfType() + .OrderId.ShouldBe(92883); + } + + public record OrderPlaced(int OrderId); + + public class OrderPlacedHandler + { + // Presence registers OrderPlaced in the HandlerGraph so the pipeline can + // resolve its message type during deserialization. + public void Handle(OrderPlaced order) + { + } + } +} diff --git a/src/Wolverine/Runtime/HandlerPipeline.cs b/src/Wolverine/Runtime/HandlerPipeline.cs index 64b93b8a9..cfc8f4453 100644 --- a/src/Wolverine/Runtime/HandlerPipeline.cs +++ b/src/Wolverine/Runtime/HandlerPipeline.cs @@ -34,7 +34,7 @@ internal HandlerPipeline(WolverineRuntime runtime, IExecutorFactory executorFact _executors = new LightweightCache(executorFactory.BuildFor); } - + internal HandlerPipeline(WolverineRuntime runtime, IExecutorFactory executorFactory, Endpoint endpoint) { _graph = runtime.Handlers; @@ -133,7 +133,7 @@ public async ValueTask TryDeserializeEnvelope(Envelope envelope) return new MoveToErrorQueue(new EncryptionPolicyViolationException(envelope)); } - var serializer = envelope.Serializer ?? _runtime.Options.DetermineSerializer(envelope); + var serializer = envelope.Serializer ?? serializerFor(envelope); serializer.UnwrapEnvelopeIfNecessary(envelope); if (envelope.Data == null) @@ -190,6 +190,39 @@ public async ValueTask TryDeserializeEnvelope(Envelope envelope) } } + // Resolve the serializer for an envelope whose runtime-only Serializer reference + // is no longer set. This happens on replay paths (scheduled retry, durable + // recovery) where the envelope is rehydrated from storage: ContentType and + // Destination survive, but Serializer does not. Resolve from the originating + // endpoint first so endpoint-scoped serializers are honored on replay exactly + // as they are at first receipt. This matters most for the MassTransit and + // NServiceBus interop serializers wired by UseMassTransitInterop() / + // UseNServiceBusInterop(): they register only on the listener endpoint, never in + // the global content-type registry. Without this, DetermineSerializer falls back + // to the default JSON serializer, which deserializes the un-unwrapped interop + // envelope root and yields an all-default message. Mirrors DeadLetterEnvelope.TryReadData. + private IMessageSerializer serializerFor(Envelope envelope) + { + if (envelope.ContentType.IsNotEmpty()) + { + // _endpoint is set on per-listener pipelines and is the most reliable + // source; fall back to the persisted Destination for the global pipeline. + var endpoint = _endpoint ?? endpointFor(envelope.Destination); + var serializer = endpoint?.TryFindSerializer(envelope.ContentType); + if (serializer != null) + { + return serializer; + } + } + + return _runtime.Options.DetermineSerializer(envelope); + } + + private Endpoint? endpointFor(Uri? destination) + { + return destination == null ? null : _runtime.Endpoints.EndpointFor(destination); + } + private bool RequiresEncryption(Envelope envelope) { var options = _runtime.Options; @@ -216,7 +249,7 @@ private async Task executeAsync(MessageContext context, Envelope if (envelope.Message == null) { var deserializationResult = await TryDeserializeEnvelope(envelope).ConfigureAwait(false); - if(deserializationResult is not NullContinuation) + if (deserializationResult is not NullContinuation) { activity?.SetStatus(ActivityStatusCode.Error, "Serialization Failure"); return deserializationResult;