Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public static void ConfigureDeadLetterCommands(DurabilitySettings durability, En
builder.AddParameter(envelope.Id),
builder.AddParameter(envelope.ScheduledTime),
builder.AddParameter(data),
builder.AddParameter(envelope.MessageType),
builder.AddParameter(envelope.MessageType ?? "unknown"),
builder.AddParameter(envelope.Destination?.ToString()),
builder.AddParameter(envelope.Source),
builder.AddParameter(exception?.GetType().FullNameInCode()),
Expand Down
74 changes: 74 additions & 0 deletions src/Testing/CoreTests/ErrorHandling/MoveToErrorQueueTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using NSubstitute;
using Wolverine.ErrorHandling;
using Wolverine.Logging;
using Wolverine.Runtime;
using Wolverine.Runtime.Interop;
using Wolverine.Util;
using Xunit;

namespace CoreTests.ErrorHandling;

public class MoveToErrorQueueTests
{
private readonly IEnvelopeLifecycle _lifecycle;
private readonly IWolverineRuntime _runtime;
private readonly Envelope _envelope;

public MoveToErrorQueueTests()
{
_lifecycle = Substitute.For<IEnvelopeLifecycle>();
_runtime = Substitute.For<IWolverineRuntime>();
_runtime.Options.Returns(new WolverineOptions());
_runtime.MessageTracking.Returns(Substitute.For<IMessageTracker>());

_envelope = new Envelope
{
Destination = new Uri("local://queue")
};
_lifecycle.Envelope.Returns(_envelope);
}

[Fact]
public async Task should_assign_fallback_MessageType_when_Message_and_MessageType_are_null()
{
_envelope.Message = null;
_envelope.MessageType = null;

var exception = new UnknownMessageTypeNameException("test");
var continuation = new MoveToErrorQueue(exception);

await continuation.ExecuteAsync(_lifecycle, _runtime, DateTimeOffset.UtcNow, null);

_envelope.MessageType.ShouldBe("unknown/UnknownMessageTypeNameException");
}

[Fact]
public async Task should_preserve_existing_MessageType_when_Message_is_null()
{
_envelope.Message = null;
_envelope.MessageType = "com.example.orders.placed.v1";

var exception = new UnknownMessageTypeNameException("test");
var continuation = new MoveToErrorQueue(exception);

await continuation.ExecuteAsync(_lifecycle, _runtime, DateTimeOffset.UtcNow, null);

_envelope.MessageType.ShouldBe("com.example.orders.placed.v1");
}

[Fact]
public async Task should_use_dotnet_type_when_Message_is_present()
{
_envelope.Message = new SampleMessage("test");
_envelope.MessageType = "some.old.value";

var exception = new InvalidOperationException("test");
var continuation = new MoveToErrorQueue(exception);

await continuation.ExecuteAsync(_lifecycle, _runtime, DateTimeOffset.UtcNow, null);

_envelope.MessageType.ShouldBe(typeof(SampleMessage).ToMessageTypeName());
}
}

public record SampleMessage(string Name);
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using System.Text.Json;
using Wolverine.Runtime.Handlers;
using Wolverine.Runtime.Interop;
using Xunit;

namespace CoreTests.Runtime.Interop;

public class CloudEventsMapper_unknown_type_tests
{
private readonly HandlerGraph _handlers;
private readonly CloudEventsMapper _mapper;

public CloudEventsMapper_unknown_type_tests()
{
_handlers = new HandlerGraph();
_handlers.RegisterMessageType(typeof(ApproveOrder), "com.dapr.event.sent");

var options = new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
_mapper = new CloudEventsMapper(_handlers, options);
}

[Fact]
public void should_preserve_raw_type_on_envelope_for_unknown_type()
{
var json = """
{
"data": { "orderId": 1 },
"id": "5929aaac-a5e2-4ca1-859c-edfe73f11565",
"specversion": "1.0",
"type": "some.unknown.event.v1",
"source": "test"
}
""";

var envelope = new Envelope();

var ex = Should.Throw<UnknownMessageTypeNameException>(() => _mapper.MapIncoming(envelope, json));

ex.Message.ShouldContain("some.unknown.event.v1");
envelope.MessageType.ShouldBe("some.unknown.event.v1");
}

[Fact]
public void should_not_set_MessageType_when_type_field_missing()
{
var json = """
{
"data": { "orderId": 1 },
"id": "5929aaac-a5e2-4ca1-859c-edfe73f11565",
"specversion": "1.0",
"source": "test"
}
""";

var envelope = new Envelope();
_mapper.MapIncoming(envelope, json);

envelope.MessageType.ShouldBeNull();
}

[Fact]
public void should_overwrite_raw_type_with_resolved_type_on_success()
{
var json = """
{
"data": { "orderId": 1 },
"id": "5929aaac-a5e2-4ca1-859c-edfe73f11565",
"specversion": "1.0",
"type": "com.dapr.event.sent",
"source": "test"
}
""";

var envelope = new Envelope();
_mapper.MapIncoming(envelope, json);

// Should be the resolved .NET type name, not the raw CloudEvent type
envelope.MessageType.ShouldNotBe("com.dapr.event.sent");
envelope.MessageType.ShouldNotBeNull();
envelope.Message.ShouldBeOfType<ApproveOrder>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using System.Text;
using Confluent.Kafka;
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Kafka.Internals;
using Wolverine.Persistence.Durability.DeadLetterManagement;
using Wolverine.Postgresql;
using Wolverine.Runtime;
using Wolverine.Tracking;

namespace Wolverine.Kafka.Tests;

public class moving_unknown_cloudevents_type_to_dlq : IAsyncLifetime
{
private IHost _receiver;

private readonly string _topicName = $"cloudevents-dlq-{Guid.NewGuid():N}";

public async Task InitializeAsync()
{
_receiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092")
.AutoProvision()
.ConfigureConsumers(c => c.AutoOffsetReset = AutoOffsetReset.Earliest);

opts.ListenToKafkaTopic(_topicName)
.InteropWithCloudEvents();

opts.Discovery.IncludeAssembly(GetType().Assembly);

opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "kafka_ce_dlq");

opts.Services.AddResourceSetupOnStartup();

opts.Policies.UseDurableInboxOnAllListeners();

opts.UnknownMessageBehavior = UnknownMessageBehavior.DeadLetterQueue;
}).StartAsync();

await _receiver.RebuildAllEnvelopeStorageAsync();
}

public async Task DisposeAsync()
{
await _receiver.StopAsync();
_receiver.Dispose();
}

[Fact]
public async Task cloudevents_message_with_unknown_type_should_be_dead_lettered()
{
var cloudEventsJson = """
{
"data": { "orderId": 99 },
"id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
"specversion": "1.0",
"datacontenttype": "application/json; charset=utf-8",
"source": "integration-test",
"type": "com.test.unregistered.event.v1",
"time": "2026-01-01T00:00:00Z"
}
""";

var transport = _receiver.GetRuntime().Options.Transports.GetOrCreate<KafkaTransport>();
var producerBuilder = new ProducerBuilder<string, byte[]>(transport.ProducerConfig);
using var producer = producerBuilder.Build();

await producer.ProduceAsync(_topicName, new Message<string, byte[]>
{
Value = Encoding.UTF8.GetBytes(cloudEventsJson)
});
producer.Flush();

// Poll until the message appears in the dead letter queue
var storage = _receiver.GetRuntime().Storage;
var deadline = DateTimeOffset.UtcNow.Add(2.Minutes());
DeadLetterEnvelopeResults deadLetters = null!;

while (DateTimeOffset.UtcNow < deadline)
{
deadLetters = await storage.DeadLetters.QueryAsync(
new DeadLetterEnvelopeQuery(TimeRange.AllTime()),
CancellationToken.None);

if (deadLetters.Envelopes.Any()) break;

await Task.Delay(1.Seconds());
}

deadLetters.Envelopes.ShouldNotBeEmpty();
var envelope = deadLetters.Envelopes.First();
envelope.MessageType.ShouldBe("com.test.unregistered.event.v1");
envelope.ExceptionType.ShouldContain("UnknownMessageTypeNameException");
}
}
4 changes: 4 additions & 0 deletions src/Wolverine/ErrorHandling/MoveToErrorQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ await lifecycle.SendFailureAcknowledgementAsync(
{
lifecycle.Envelope.MessageType = lifecycle.Envelope.Message.GetType().ToMessageTypeName();
}
else
{
lifecycle.Envelope.MessageType ??= $"unknown/{Exception.GetType().Name}";
}

await lifecycle.MoveToDeadLetterQueueAsync(Exception);

Expand Down
5 changes: 5 additions & 0 deletions src/Wolverine/Runtime/Interop/CloudEventsMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ public void MapIncoming(Envelope envelope, JsonNode? node)

if (node.TryGetValue<string>("type", out var cloudEventType))
{
// Preserve the raw CloudEvent type on the envelope before resolution.
// If resolution fails, the raw type survives for dead-letter persistence.
envelope.MessageType = cloudEventType;

if (_handlers.TryFindMessageType(cloudEventType, out var messageType))
{
var data = node["data"];
Expand All @@ -169,6 +173,7 @@ public void MapIncoming(Envelope envelope, JsonNode? node)
envelope.Message = data.Deserialize(messageType, _options);
}

// Overwrite with the canonical Wolverine message type name
envelope.MessageType = messageType.ToMessageTypeName();
}
else
Expand Down
21 changes: 19 additions & 2 deletions src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@
await EnqueueAsync(envelope);
}

public IHandlerPipeline? Pipeline { get; }

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

public Uri Uri { get; set; }

Expand Down Expand Up @@ -415,9 +415,26 @@
{
if (envelope.MessageType.IsEmpty() && envelope.Serializer is IUnwrapsMetadataMessageSerializer serializer)
{
serializer.Unwrap(envelope);
try
{
serializer.Unwrap(envelope);
}
catch (Exception e)
{
_logger.LogInformation(e, "Failed to unwrap metadata for Envelope {Id} received at durable {Destination}. Moving to dead letter queue", envelope.Id, envelope.Destination);

if (envelope.Id == Guid.Empty)
{
envelope.Id = NewId.NextSequentialGuid();
}

envelope.MessageType ??= $"unknown/{e.GetType().Name}";
envelope.Failure = e;
await _moveToErrors.PostAsync(envelope);
return;
}
}

// Have to do this before moving to the DLQ
if (envelope.Id == Guid.Empty)
{
Expand Down
Loading