Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Wolverine.Runtime;
using Wolverine.Runtime.Handlers;
using Wolverine.Runtime.Interop;
using Wolverine.Util;
using Xunit;

namespace CoreTests.Runtime.Interop;
Expand Down Expand Up @@ -85,6 +90,31 @@ public void has_a_sent_time()
theEnvelope.SentAt.ShouldBe(DateTimeOffset.Parse("2020-09-23T06:23:21Z"));
theCloudEventEnvelope.Time.ShouldBe(theEnvelope.SentAt.ToString("O"));
}

[Fact]
public async Task try_deserialize_envelope_unwraps_metadata_when_message_type_is_missing()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.RegisterMessageType(typeof(ApproveOrder), "com.dapr.event.sent");
}).StartAsync();

var runtime = host.Services.GetRequiredService<IWolverineRuntime>();
var serializer = new CloudEventsMapper(runtime.Options.HandlerGraph,
new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase });

var envelope = new Envelope
{
Data = Encoding.UTF8.GetBytes(Json),
Serializer = serializer
};

await runtime.Pipeline.TryDeserializeEnvelope(envelope);

envelope.Message.ShouldBeOfType<ApproveOrder>().OrderId.ShouldBe(1);
envelope.MessageType.ShouldBe(typeof(ApproveOrder).ToMessageTypeName());
}
}

public record ApproveOrder(int OrderId);
public record ApproveOrder(int OrderId);
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@

namespace Wolverine.Kafka.Tests;

internal static class CloudEventsKafkaTestConstants
{
public const string ColorMessageTypeAlias = "wolverine.kafka.tests.color";
}

public class end_to_end_with_CloudEvents : IAsyncLifetime
{
private IHost _receiver;
Expand All @@ -25,12 +30,8 @@ public async Task InitializeAsync()
{
//opts.EnableAutomaticFailureAcks = false;
opts.UseKafka("localhost:9092").AutoProvision();
opts.ListenToKafkaTopic("cloudevents")

// You do have to tell Wolverine what the message type
// is that you'll receive here so that it can deserialize the
// incoming data
.InteropWithCloudEvents();
opts.RegisterMessageType(typeof(ColorMessage), CloudEventsKafkaTestConstants.ColorMessageTypeAlias);
opts.ListenToKafkaTopic("cloudevents").InteropWithCloudEvents();

// Include test assembly for handler discovery
opts.Discovery.IncludeAssembly(GetType().Assembly);
Expand All @@ -49,6 +50,7 @@ public async Task InitializeAsync()
{
opts.UseKafka("localhost:9092").AutoProvision();
opts.Policies.DisableConventionalLocalRouting();
opts.RegisterMessageType(typeof(ColorMessage), CloudEventsKafkaTestConstants.ColorMessageTypeAlias);

opts.Services.AddResourceSetupOnStartup();

Expand Down Expand Up @@ -78,6 +80,61 @@ public async Task end_to_end()
}
}

public class inline_end_to_end_with_CloudEvents : IAsyncLifetime
{
private IHost _receiver;
private IHost _sender;

public async Task InitializeAsync()
{
_receiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092").AutoProvision();
opts.RegisterMessageType(typeof(ColorMessage), CloudEventsKafkaTestConstants.ColorMessageTypeAlias);
opts.ListenToKafkaTopic("cloudevents-inline")
.InteropWithCloudEvents()
.ProcessInline();

opts.Discovery.IncludeAssembly(GetType().Assembly);
opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

_sender = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092").AutoProvision();
opts.Policies.DisableConventionalLocalRouting();
opts.RegisterMessageType(typeof(ColorMessage), CloudEventsKafkaTestConstants.ColorMessageTypeAlias);

opts.Services.AddResourceSetupOnStartup();

opts.PublishAllMessages().ToKafkaTopic("cloudevents-inline")
.UseInterop((runtime, topic) => new CloudEventsOnlyMapper(new CloudEventsMapper(runtime.Options.HandlerGraph, new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase })));
}).StartAsync();
}

public async Task DisposeAsync()
{
await _sender.StopAsync();
_sender.Dispose();
await _receiver.StopAsync();
_receiver.Dispose();
}

[Fact]
public async Task end_to_end_without_default_incoming_message_type()
{
var session = await _sender.TrackActivity()
.AlsoTrack(_receiver)
.WaitForMessageToBeReceivedAt<ColorMessage>(_receiver)
.PublishMessageAndWaitAsync(new ColorMessage("yellow"));

session.Received.SingleMessage<ColorMessage>()
.Color.ShouldBe("yellow");
}
}

internal class CloudEventsOnlyMapper : IKafkaEnvelopeMapper
{
private readonly CloudEventsMapper _cloudEvents;
Expand All @@ -97,4 +154,4 @@ public void MapIncomingToEnvelope(Envelope envelope, Message<string, byte[]> inc
{
throw new NotImplementedException();
}
}
}
10 changes: 8 additions & 2 deletions src/Wolverine/Runtime/HandlerPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,20 @@ public async ValueTask<IContinuation> TryDeserializeEnvelope(Envelope envelope)
try
{
var serializer = envelope.Serializer ?? _runtime.Options.DetermineSerializer(envelope);
serializer.UnwrapEnvelopeIfNecessary(envelope);

if (envelope.Data == null)
{
throw new ArgumentOutOfRangeException(nameof(envelope),
"Envelope does not have a message or deserialized message data");
}

if (envelope.MessageType == null)
if (envelope.Message != null)
{
return NullContinuation.Instance;
}

if (string.IsNullOrEmpty(envelope.MessageType))
{
throw new ArgumentOutOfRangeException(nameof(envelope),
"The envelope has no Message or MessageType name");
Expand Down Expand Up @@ -193,4 +199,4 @@ private async Task<IContinuation> executeAsync(MessageContext context, Envelope

return await executor.ExecuteAsync(context, _cancellation);
}
}
}
13 changes: 12 additions & 1 deletion src/Wolverine/Runtime/Serialization/IMessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ public interface IUnwrapsMetadataMessageSerializer : IMessageSerializer
void Unwrap(Envelope envelope);
}

public static class MessageSerializerExtensions
{
public static void UnwrapEnvelopeIfNecessary(this IMessageSerializer serializer, Envelope envelope)
{
if (string.IsNullOrEmpty(envelope.MessageType) && serializer is IUnwrapsMetadataMessageSerializer metadataSerializer)
{
metadataSerializer.Unwrap(envelope);
}
}
}

/// <summary>
/// Async version of <seealso cref="IMessageSerializer"/>
/// </summary>
Expand All @@ -34,4 +45,4 @@ public interface IAsyncMessageSerializer : IMessageSerializer
ValueTask<byte[]> WriteAsync(Envelope envelope);

ValueTask<object?> ReadFromDataAsync(Type messageType, Envelope envelope);
}
}
31 changes: 14 additions & 17 deletions src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@
await EnqueueAsync(envelope);
}

public IHandlerPipeline? Pipeline { get; }

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

public Uri Uri { get; set; }

Expand Down Expand Up @@ -413,26 +413,23 @@
{
try
{
if (envelope.MessageType.IsEmpty() && envelope.Serializer is IUnwrapsMetadataMessageSerializer serializer)
try
{
try
envelope.Serializer?.UnwrapEnvelopeIfNecessary(envelope);
}
catch (Exception e)
{
_logger.LogInformation(e, "Failed to unwrap metadata for Envelope {Id} received at durable {Destination}. Moving to dead letter queue", envelope.Id, envelope.Destination);

if (envelope.Id == Guid.Empty)
{
serializer.Unwrap(envelope);
envelope.Id = NewId.NextSequentialGuid();
}
catch (Exception e)
{
_logger.LogInformation(e, "Failed to unwrap metadata for Envelope {Id} received at durable {Destination}. Moving to dead letter queue", envelope.Id, envelope.Destination);

if (envelope.Id == Guid.Empty)
{
envelope.Id = NewId.NextSequentialGuid();
}

envelope.MessageType ??= $"unknown/{e.GetType().Name}";
envelope.Failure = e;
await _moveToErrors.PostAsync(envelope);
return;
}
envelope.MessageType ??= $"unknown/{e.GetType().Name}";
envelope.Failure = e;
await _moveToErrors.PostAsync(envelope);
return;
}

// Have to do this before moving to the DLQ
Expand Down Expand Up @@ -574,4 +571,4 @@
{
_latched = true;
}
}
}
Loading