diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs index 5c5b035bf..7d6d3720a 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/end_to_end_with_CloudEvents.cs @@ -1,8 +1,14 @@ +using System.Text; +using System.Text.Json; +using Confluent.Kafka; using IntegrationTests; +using JasperFx.Core.Reflection; using JasperFx.Resources; using Microsoft.Extensions.Hosting; using Shouldly; using Wolverine.Postgresql; +using Wolverine.Runtime; +using Wolverine.Runtime.Interop; using Wolverine.Tracking; namespace Wolverine.Kafka.Tests; @@ -44,7 +50,7 @@ public async Task InitializeAsync() opts.Services.AddResourceSetupOnStartup(); opts.PublishAllMessages().ToKafkaTopic("cloudevents") - .InteropWithCloudEvents(); + .UseInterop((runtime, topic) => new CloudEventsOnlyMapper(new CloudEventsMapper(runtime.Options.HandlerGraph, new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }))); }).StartAsync(); } @@ -65,4 +71,25 @@ public async Task end_to_end() session.Received.SingleMessage() .Color.ShouldBe("yellow"); } +} + +internal class CloudEventsOnlyMapper : IKafkaEnvelopeMapper +{ + private readonly CloudEventsMapper _cloudEvents; + + public CloudEventsOnlyMapper(CloudEventsMapper cloudEvents) + { + _cloudEvents = cloudEvents; + } + + public void MapEnvelopeToOutgoing(Envelope envelope, Message outgoing) + { + outgoing.Key = envelope.GroupId; + outgoing.Value = _cloudEvents.WriteToBytes(envelope); + } + + public void MapIncomingToEnvelope(Envelope envelope, Message incoming) + { + throw new NotImplementedException(); + } } \ No newline at end of file diff --git a/src/Wolverine/AssemblyAttributes.cs b/src/Wolverine/AssemblyAttributes.cs index 643124d86..7e2f1fb67 100644 --- a/src/Wolverine/AssemblyAttributes.cs +++ b/src/Wolverine/AssemblyAttributes.cs @@ -33,4 +33,5 @@ [assembly: InternalsVisibleTo("MassTransitInteropTests")] [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] [assembly: InternalsVisibleTo("Wolverine.Http")] -[assembly: InternalsVisibleTo("Wolverine.Http.Tests")] \ No newline at end of file +[assembly: InternalsVisibleTo("Wolverine.Http.Tests")] +[assembly: InternalsVisibleTo("Wolverine.Kafka.Tests")] \ No newline at end of file diff --git a/src/Wolverine/Runtime/Interop/CloudEventsMapper.cs b/src/Wolverine/Runtime/Interop/CloudEventsMapper.cs index 24f82c81a..a8da0a021 100644 --- a/src/Wolverine/Runtime/Interop/CloudEventsMapper.cs +++ b/src/Wolverine/Runtime/Interop/CloudEventsMapper.cs @@ -76,7 +76,7 @@ public CloudEventsEnvelope(Envelope envelope) public string TraceParent { get; set; } } -public class CloudEventsMapper : IMessageSerializer +public class CloudEventsMapper : IUnwrapsMetadataMessageSerializer { private readonly HandlerGraph _handlers; private readonly JsonSerializerOptions _options; @@ -188,6 +188,12 @@ public object ReadFromData(Type messageType, Envelope envelope) return envelope.Message; } + public void Unwrap(Envelope envelope) + { + var node = JsonNode.Parse(envelope.Data); + MapIncoming(envelope, node); + } + public object ReadFromData(byte[] data) { throw new NotSupportedException(); diff --git a/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs b/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs index e8cab699c..6a5f60df4 100644 --- a/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs +++ b/src/Wolverine/Runtime/Serialization/IMessageSerializer.cs @@ -16,6 +16,16 @@ public interface IMessageSerializer byte[] WriteMessage(object message); } +/// +/// Marker interface that lets Wolverine know that the message serializer +/// may also unwrap metadata on the Envelope. For protocols like CloudEvents +/// that smuggle message metadata through the transport's message body +/// +public interface IUnwrapsMetadataMessageSerializer : IMessageSerializer +{ + void Unwrap(Envelope envelope); +} + /// /// Async version of /// diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index 21683ca27..d94ca0830 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -5,6 +5,7 @@ using Wolverine.Logging; using Wolverine.Persistence.Durability; using Wolverine.Runtime.Partitioning; +using Wolverine.Runtime.Serialization; using Wolverine.Transports; using Wolverine.Transports.Sending; @@ -360,6 +361,11 @@ await executeWithRetriesAsync(async () => { try { + if (envelope.MessageType.IsEmpty() && envelope.Serializer is IUnwrapsMetadataMessageSerializer serializer) + { + serializer.Unwrap(envelope); + } + envelope.OwnerId = _settings.AssignedNodeNumber; await _inbox.StoreIncomingAsync(envelope); envelope.WasPersistedInInbox = true;