From 58660836182aa44352989b0472712b7504899fcc Mon Sep 17 00:00:00 2001 From: Marko Lahma Date: Mon, 16 Mar 2026 18:28:39 +0200 Subject: [PATCH] Fix CloudEvents unwrap for inline listeners Allow the handler pipeline to unwrap metadata-aware serializers before requiring Envelope.MessageType so inline CloudEvents receivers can deserialize messages without DefaultIncomingMessage(). Reuse the same helper in durable receivers and add runtime and Kafka regression coverage for inline CloudEvents handling. --- ...en_reading_and_writing_CloudEvents_data.cs | 32 ++++++++- .../end_to_end_with_CloudEvents.cs | 71 +++++++++++++++++-- src/Wolverine/Runtime/HandlerPipeline.cs | 10 ++- .../Serialization/IMessageSerializer.cs | 13 +++- .../Runtime/WorkerQueues/DurableReceiver.cs | 31 ++++---- 5 files changed, 129 insertions(+), 28 deletions(-) diff --git a/src/Testing/CoreTests/Runtime/Interop/when_reading_and_writing_CloudEvents_data.cs b/src/Testing/CoreTests/Runtime/Interop/when_reading_and_writing_CloudEvents_data.cs index f2b496613..75cc65b66 100644 --- a/src/Testing/CoreTests/Runtime/Interop/when_reading_and_writing_CloudEvents_data.cs +++ b/src/Testing/CoreTests/Runtime/Interop/when_reading_and_writing_CloudEvents_data.cs @@ -1,6 +1,11 @@ +using System.Text; using System.Text.Json; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine.Runtime; using Wolverine.Runtime.Handlers; using Wolverine.Runtime.Interop; +using Wolverine.Util; using Xunit; namespace CoreTests.Runtime.Interop; @@ -85,6 +90,31 @@ public void has_a_sent_time() theEnvelope.SentAt.ShouldBe(DateTimeOffset.Parse("2020-09-23T06:23:21Z")); theCloudEventEnvelope.Time.ShouldBe(theEnvelope.SentAt.ToString("O")); } + + [Fact] + public async Task try_deserialize_envelope_unwraps_metadata_when_message_type_is_missing() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.RegisterMessageType(typeof(ApproveOrder), "com.dapr.event.sent"); + }).StartAsync(); + + var runtime = host.Services.GetRequiredService(); + var serializer = new CloudEventsMapper(runtime.Options.HandlerGraph, + new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }); + + var envelope = new Envelope + { + Data = Encoding.UTF8.GetBytes(Json), + Serializer = serializer + }; + + await runtime.Pipeline.TryDeserializeEnvelope(envelope); + + envelope.Message.ShouldBeOfType().OrderId.ShouldBe(1); + envelope.MessageType.ShouldBe(typeof(ApproveOrder).ToMessageTypeName()); + } } -public record ApproveOrder(int OrderId); \ No newline at end of file +public record ApproveOrder(int OrderId); diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs index f9e738574..70b62e82a 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs @@ -13,6 +13,11 @@ namespace Wolverine.Kafka.Tests; +internal static class CloudEventsKafkaTestConstants +{ + public const string ColorMessageTypeAlias = "wolverine.kafka.tests.color"; +} + public class end_to_end_with_CloudEvents : IAsyncLifetime { private IHost _receiver; @@ -25,12 +30,8 @@ public async Task InitializeAsync() { //opts.EnableAutomaticFailureAcks = false; opts.UseKafka("localhost:9092").AutoProvision(); - opts.ListenToKafkaTopic("cloudevents") - - // You do have to tell Wolverine what the message type - // is that you'll receive here so that it can deserialize the - // incoming data - .InteropWithCloudEvents(); + opts.RegisterMessageType(typeof(ColorMessage), CloudEventsKafkaTestConstants.ColorMessageTypeAlias); + opts.ListenToKafkaTopic("cloudevents").InteropWithCloudEvents(); // Include test assembly for handler discovery opts.Discovery.IncludeAssembly(GetType().Assembly); @@ -49,6 +50,7 @@ public async Task InitializeAsync() { opts.UseKafka("localhost:9092").AutoProvision(); opts.Policies.DisableConventionalLocalRouting(); + opts.RegisterMessageType(typeof(ColorMessage), CloudEventsKafkaTestConstants.ColorMessageTypeAlias); opts.Services.AddResourceSetupOnStartup(); @@ -78,6 +80,61 @@ public async Task end_to_end() } } +public class inline_end_to_end_with_CloudEvents : IAsyncLifetime +{ + private IHost _receiver; + private IHost _sender; + + public async Task InitializeAsync() + { + _receiver = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:9092").AutoProvision(); + opts.RegisterMessageType(typeof(ColorMessage), CloudEventsKafkaTestConstants.ColorMessageTypeAlias); + opts.ListenToKafkaTopic("cloudevents-inline") + .InteropWithCloudEvents() + .ProcessInline(); + + opts.Discovery.IncludeAssembly(GetType().Assembly); + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + _sender = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:9092").AutoProvision(); + opts.Policies.DisableConventionalLocalRouting(); + opts.RegisterMessageType(typeof(ColorMessage), CloudEventsKafkaTestConstants.ColorMessageTypeAlias); + + opts.Services.AddResourceSetupOnStartup(); + + opts.PublishAllMessages().ToKafkaTopic("cloudevents-inline") + .UseInterop((runtime, topic) => new CloudEventsOnlyMapper(new CloudEventsMapper(runtime.Options.HandlerGraph, new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }))); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _sender.StopAsync(); + _sender.Dispose(); + await _receiver.StopAsync(); + _receiver.Dispose(); + } + + [Fact] + public async Task end_to_end_without_default_incoming_message_type() + { + var session = await _sender.TrackActivity() + .AlsoTrack(_receiver) + .WaitForMessageToBeReceivedAt(_receiver) + .PublishMessageAndWaitAsync(new ColorMessage("yellow")); + + session.Received.SingleMessage() + .Color.ShouldBe("yellow"); + } +} + internal class CloudEventsOnlyMapper : IKafkaEnvelopeMapper { private readonly CloudEventsMapper _cloudEvents; @@ -97,4 +154,4 @@ public void MapIncomingToEnvelope(Envelope envelope, Message inc { throw new NotImplementedException(); } -} \ No newline at end of file +} diff --git a/src/Wolverine/Runtime/HandlerPipeline.cs b/src/Wolverine/Runtime/HandlerPipeline.cs index 6e595a39a..27339ee84 100644 --- a/src/Wolverine/Runtime/HandlerPipeline.cs +++ b/src/Wolverine/Runtime/HandlerPipeline.cs @@ -115,6 +115,7 @@ public async ValueTask TryDeserializeEnvelope(Envelope envelope) try { var serializer = envelope.Serializer ?? _runtime.Options.DetermineSerializer(envelope); + serializer.UnwrapEnvelopeIfNecessary(envelope); if (envelope.Data == null) { @@ -122,7 +123,12 @@ public async ValueTask TryDeserializeEnvelope(Envelope envelope) "Envelope does not have a message or deserialized message data"); } - if (envelope.MessageType == null) + if (envelope.Message != null) + { + return NullContinuation.Instance; + } + + if (string.IsNullOrEmpty(envelope.MessageType)) { throw new ArgumentOutOfRangeException(nameof(envelope), "The envelope has no Message or MessageType name"); @@ -193,4 +199,4 @@ private async Task executeAsync(MessageContext context, Envelope return await executor.ExecuteAsync(context, _cancellation); } -} \ No newline at end of file +} diff --git a/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs b/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs index 6a5f60df4..5986206eb 100644 --- a/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs +++ b/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs @@ -26,6 +26,17 @@ public interface IUnwrapsMetadataMessageSerializer : IMessageSerializer void Unwrap(Envelope envelope); } +public static class MessageSerializerExtensions +{ + public static void UnwrapEnvelopeIfNecessary(this IMessageSerializer serializer, Envelope envelope) + { + if (string.IsNullOrEmpty(envelope.MessageType) && serializer is IUnwrapsMetadataMessageSerializer metadataSerializer) + { + metadataSerializer.Unwrap(envelope); + } + } +} + /// /// Async version of /// @@ -34,4 +45,4 @@ public interface IAsyncMessageSerializer : IMessageSerializer ValueTask WriteAsync(Envelope envelope); ValueTask ReadFromDataAsync(Type messageType, Envelope envelope); -} \ No newline at end of file +} diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index 392d7f3e0..e4dde43eb 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -413,26 +413,23 @@ await executeWithRetriesAsync(async () => { try { - if (envelope.MessageType.IsEmpty() && envelope.Serializer is IUnwrapsMetadataMessageSerializer serializer) + try { - try + envelope.Serializer?.UnwrapEnvelopeIfNecessary(envelope); + } + catch (Exception e) + { + _logger.LogInformation(e, "Failed to unwrap metadata for Envelope {Id} received at durable {Destination}. Moving to dead letter queue", envelope.Id, envelope.Destination); + + if (envelope.Id == Guid.Empty) { - serializer.Unwrap(envelope); + envelope.Id = NewId.NextSequentialGuid(); } - catch (Exception e) - { - _logger.LogInformation(e, "Failed to unwrap metadata for Envelope {Id} received at durable {Destination}. Moving to dead letter queue", envelope.Id, envelope.Destination); - - if (envelope.Id == Guid.Empty) - { - envelope.Id = NewId.NextSequentialGuid(); - } - envelope.MessageType ??= $"unknown/{e.GetType().Name}"; - envelope.Failure = e; - await _moveToErrors.PostAsync(envelope); - return; - } + envelope.MessageType ??= $"unknown/{e.GetType().Name}"; + envelope.Failure = e; + await _moveToErrors.PostAsync(envelope); + return; } // Have to do this before moving to the DLQ @@ -574,4 +571,4 @@ public void Latch() { _latched = true; } -} \ No newline at end of file +}