diff --git a/src/Testing/CoreTests/Runtime/Interop/when_reading_and_writing_CloudEvents_data.cs b/src/Testing/CoreTests/Runtime/Interop/when_reading_and_writing_CloudEvents_data.cs index f2b496613..75cc65b66 100644 --- a/src/Testing/CoreTests/Runtime/Interop/when_reading_and_writing_CloudEvents_data.cs +++ b/src/Testing/CoreTests/Runtime/Interop/when_reading_and_writing_CloudEvents_data.cs @@ -1,6 +1,11 @@ +using System.Text; using System.Text.Json; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine.Runtime; using Wolverine.Runtime.Handlers; using Wolverine.Runtime.Interop; +using Wolverine.Util; using Xunit; namespace CoreTests.Runtime.Interop; @@ -85,6 +90,31 @@ public void has_a_sent_time() theEnvelope.SentAt.ShouldBe(DateTimeOffset.Parse("2020-09-23T06:23:21Z")); theCloudEventEnvelope.Time.ShouldBe(theEnvelope.SentAt.ToString("O")); } + + [Fact] + public async Task try_deserialize_envelope_unwraps_metadata_when_message_type_is_missing() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.RegisterMessageType(typeof(ApproveOrder), "com.dapr.event.sent"); + }).StartAsync(); + + var runtime = host.Services.GetRequiredService(); + var serializer = new CloudEventsMapper(runtime.Options.HandlerGraph, + new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }); + + var envelope = new Envelope + { + Data = Encoding.UTF8.GetBytes(Json), + Serializer = serializer + }; + + await runtime.Pipeline.TryDeserializeEnvelope(envelope); + + envelope.Message.ShouldBeOfType().OrderId.ShouldBe(1); + envelope.MessageType.ShouldBe(typeof(ApproveOrder).ToMessageTypeName()); + } } -public record ApproveOrder(int OrderId); \ No newline at end of file +public record ApproveOrder(int OrderId); diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs index f9e738574..70b62e82a 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs @@ -13,6 +13,11 @@ namespace Wolverine.Kafka.Tests; +internal static class CloudEventsKafkaTestConstants +{ + public const string ColorMessageTypeAlias = "wolverine.kafka.tests.color"; +} + public class end_to_end_with_CloudEvents : IAsyncLifetime { private IHost _receiver; @@ -25,12 +30,8 @@ public async Task InitializeAsync() { //opts.EnableAutomaticFailureAcks = false; opts.UseKafka("localhost:9092").AutoProvision(); - opts.ListenToKafkaTopic("cloudevents") - - // You do have to tell Wolverine what the message type - // is that you'll receive here so that it can deserialize the - // incoming data - .InteropWithCloudEvents(); + opts.RegisterMessageType(typeof(ColorMessage), CloudEventsKafkaTestConstants.ColorMessageTypeAlias); + opts.ListenToKafkaTopic("cloudevents").InteropWithCloudEvents(); // Include test assembly for handler discovery opts.Discovery.IncludeAssembly(GetType().Assembly); @@ -49,6 +50,7 @@ public async Task InitializeAsync() { opts.UseKafka("localhost:9092").AutoProvision(); opts.Policies.DisableConventionalLocalRouting(); + opts.RegisterMessageType(typeof(ColorMessage), CloudEventsKafkaTestConstants.ColorMessageTypeAlias); opts.Services.AddResourceSetupOnStartup(); @@ -78,6 +80,61 @@ public async Task end_to_end() } } +public class inline_end_to_end_with_CloudEvents : IAsyncLifetime +{ + private IHost _receiver; + private IHost _sender; + + public async Task InitializeAsync() + { + _receiver = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:9092").AutoProvision(); + opts.RegisterMessageType(typeof(ColorMessage), CloudEventsKafkaTestConstants.ColorMessageTypeAlias); + opts.ListenToKafkaTopic("cloudevents-inline") + .InteropWithCloudEvents() + .ProcessInline(); + + opts.Discovery.IncludeAssembly(GetType().Assembly); + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + + _sender = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:9092").AutoProvision(); + opts.Policies.DisableConventionalLocalRouting(); + opts.RegisterMessageType(typeof(ColorMessage), CloudEventsKafkaTestConstants.ColorMessageTypeAlias); + + opts.Services.AddResourceSetupOnStartup(); + + opts.PublishAllMessages().ToKafkaTopic("cloudevents-inline") + .UseInterop((runtime, topic) => new CloudEventsOnlyMapper(new CloudEventsMapper(runtime.Options.HandlerGraph, new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }))); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _sender.StopAsync(); + _sender.Dispose(); + await _receiver.StopAsync(); + _receiver.Dispose(); + } + + [Fact] + public async Task end_to_end_without_default_incoming_message_type() + { + var session = await _sender.TrackActivity() + .AlsoTrack(_receiver) + .WaitForMessageToBeReceivedAt(_receiver) + .PublishMessageAndWaitAsync(new ColorMessage("yellow")); + + session.Received.SingleMessage() + .Color.ShouldBe("yellow"); + } +} + internal class CloudEventsOnlyMapper : IKafkaEnvelopeMapper { private readonly CloudEventsMapper _cloudEvents; @@ -97,4 +154,4 @@ public void MapIncomingToEnvelope(Envelope envelope, Message inc { throw new NotImplementedException(); } -} \ No newline at end of file +} diff --git a/src/Wolverine/Runtime/HandlerPipeline.cs b/src/Wolverine/Runtime/HandlerPipeline.cs index 6e595a39a..27339ee84 100644 --- a/src/Wolverine/Runtime/HandlerPipeline.cs +++ b/src/Wolverine/Runtime/HandlerPipeline.cs @@ -115,6 +115,7 @@ public async ValueTask TryDeserializeEnvelope(Envelope envelope) try { var serializer = envelope.Serializer ?? _runtime.Options.DetermineSerializer(envelope); + serializer.UnwrapEnvelopeIfNecessary(envelope); if (envelope.Data == null) { @@ -122,7 +123,12 @@ public async ValueTask TryDeserializeEnvelope(Envelope envelope) "Envelope does not have a message or deserialized message data"); } - if (envelope.MessageType == null) + if (envelope.Message != null) + { + return NullContinuation.Instance; + } + + if (string.IsNullOrEmpty(envelope.MessageType)) { throw new ArgumentOutOfRangeException(nameof(envelope), "The envelope has no Message or MessageType name"); @@ -193,4 +199,4 @@ private async Task executeAsync(MessageContext context, Envelope return await executor.ExecuteAsync(context, _cancellation); } -} \ No newline at end of file +} diff --git a/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs b/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs index 6a5f60df4..5986206eb 100644 --- a/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs +++ b/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs @@ -26,6 +26,17 @@ public interface IUnwrapsMetadataMessageSerializer : IMessageSerializer void Unwrap(Envelope envelope); } +public static class MessageSerializerExtensions +{ + public static void UnwrapEnvelopeIfNecessary(this IMessageSerializer serializer, Envelope envelope) + { + if (string.IsNullOrEmpty(envelope.MessageType) && serializer is IUnwrapsMetadataMessageSerializer metadataSerializer) + { + metadataSerializer.Unwrap(envelope); + } + } +} + /// /// Async version of /// @@ -34,4 +45,4 @@ public interface IAsyncMessageSerializer : IMessageSerializer ValueTask WriteAsync(Envelope envelope); ValueTask ReadFromDataAsync(Type messageType, Envelope envelope); -} \ No newline at end of file +} diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index 392d7f3e0..e4dde43eb 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -413,26 +413,23 @@ await executeWithRetriesAsync(async () => { try { - if (envelope.MessageType.IsEmpty() && envelope.Serializer is IUnwrapsMetadataMessageSerializer serializer) + try { - try + envelope.Serializer?.UnwrapEnvelopeIfNecessary(envelope); + } + catch (Exception e) + { + _logger.LogInformation(e, "Failed to unwrap metadata for Envelope {Id} received at durable {Destination}. Moving to dead letter queue", envelope.Id, envelope.Destination); + + if (envelope.Id == Guid.Empty) { - serializer.Unwrap(envelope); + envelope.Id = NewId.NextSequentialGuid(); } - catch (Exception e) - { - _logger.LogInformation(e, "Failed to unwrap metadata for Envelope {Id} received at durable {Destination}. Moving to dead letter queue", envelope.Id, envelope.Destination); - - if (envelope.Id == Guid.Empty) - { - envelope.Id = NewId.NextSequentialGuid(); - } - envelope.MessageType ??= $"unknown/{e.GetType().Name}"; - envelope.Failure = e; - await _moveToErrors.PostAsync(envelope); - return; - } + envelope.MessageType ??= $"unknown/{e.GetType().Name}"; + envelope.Failure = e; + await _moveToErrors.PostAsync(envelope); + return; } // Have to do this before moving to the DLQ @@ -574,4 +571,4 @@ public void Latch() { _latched = true; } -} \ No newline at end of file +}