Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
using System.Text;
using System.Text.Json;
using Confluent.Kafka;
using IntegrationTests;
using JasperFx.Core.Reflection;
using JasperFx.Resources;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Postgresql;
using Wolverine.Runtime;
using Wolverine.Runtime.Interop;
using Wolverine.Tracking;

namespace Wolverine.Kafka.Tests;
Expand Down Expand Up @@ -44,7 +50,7 @@ public async Task InitializeAsync()
opts.Services.AddResourceSetupOnStartup();

opts.PublishAllMessages().ToKafkaTopic("cloudevents")
.InteropWithCloudEvents();
.UseInterop((runtime, topic) => new CloudEventsOnlyMapper(new CloudEventsMapper(runtime.Options.HandlerGraph, new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase })));
}).StartAsync();
}

Expand All @@ -65,4 +71,25 @@ public async Task end_to_end()
session.Received.SingleMessage<ColorMessage>()
.Color.ShouldBe("yellow");
}
}

internal class CloudEventsOnlyMapper : IKafkaEnvelopeMapper
{
private readonly CloudEventsMapper _cloudEvents;

public CloudEventsOnlyMapper(CloudEventsMapper cloudEvents)
{
_cloudEvents = cloudEvents;
}

public void MapEnvelopeToOutgoing(Envelope envelope, Message<string, byte[]> outgoing)
{
outgoing.Key = envelope.GroupId;
outgoing.Value = _cloudEvents.WriteToBytes(envelope);
}

public void MapIncomingToEnvelope(Envelope envelope, Message<string, byte[]> incoming)
{
throw new NotImplementedException();
}
}
3 changes: 2 additions & 1 deletion src/Wolverine/AssemblyAttributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@
[assembly: InternalsVisibleTo("MassTransitInteropTests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
[assembly: InternalsVisibleTo("Wolverine.Http")]
[assembly: InternalsVisibleTo("Wolverine.Http.Tests")]
[assembly: InternalsVisibleTo("Wolverine.Http.Tests")]
[assembly: InternalsVisibleTo("Wolverine.Kafka.Tests")]
8 changes: 7 additions & 1 deletion src/Wolverine/Runtime/Interop/CloudEventsMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public CloudEventsEnvelope(Envelope envelope)
public string TraceParent { get; set; }
}

public class CloudEventsMapper : IMessageSerializer
public class CloudEventsMapper : IUnwrapsMetadataMessageSerializer
{
private readonly HandlerGraph _handlers;
private readonly JsonSerializerOptions _options;
Expand Down Expand Up @@ -188,6 +188,12 @@ public object ReadFromData(Type messageType, Envelope envelope)
return envelope.Message;
}

public void Unwrap(Envelope envelope)
{
var node = JsonNode.Parse(envelope.Data);
MapIncoming(envelope, node);
}

public object ReadFromData(byte[] data)
{
throw new NotSupportedException();
Expand Down
10 changes: 10 additions & 0 deletions src/Wolverine/Runtime/Serialization/IMessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ public interface IMessageSerializer
byte[] WriteMessage(object message);
}

/// <summary>
/// Marker interface that lets Wolverine know that the message serializer
/// may also unwrap metadata on the Envelope. For protocols like CloudEvents
/// that smuggle message metadata through the transport's message body
/// </summary>
public interface IUnwrapsMetadataMessageSerializer : IMessageSerializer
{
void Unwrap(Envelope envelope);
}

/// <summary>
/// Async version of <seealso cref="IMessageSerializer"/>
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Wolverine.Logging;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime.Partitioning;
using Wolverine.Runtime.Serialization;
using Wolverine.Transports;
using Wolverine.Transports.Sending;

Expand Down Expand Up @@ -360,6 +361,11 @@ await executeWithRetriesAsync(async () =>
{
try
{
if (envelope.MessageType.IsEmpty() && envelope.Serializer is IUnwrapsMetadataMessageSerializer serializer)
{
serializer.Unwrap(envelope);
}

envelope.OwnerId = _settings.AssignedNodeNumber;
await _inbox.StoreIncomingAsync(envelope);
envelope.WasPersistedInInbox = true;
Expand Down
Loading