From 56513a54fdd65c1cfec1dc4f592f076f3ce1a531 Mon Sep 17 00:00:00 2001 From: Marko Lahma Date: Mon, 16 Mar 2026 14:29:10 +0200 Subject: [PATCH] Fix null message_type causing NOT NULL violation when dead-lettering CloudEvents with unregistered types When a Kafka CloudEvents message arrives with an unregistered `type`, CloudEventsMapper throws UnknownMessageTypeNameException before setting envelope.MessageType. The subsequent dead-letter INSERT hits a NOT NULL violation on message_type, causing an infinite retry loop. Layered fix: - CloudEventsMapper: preserve raw CloudEvent type on envelope before attempting resolution, so the original type identity survives for dead-letter persistence - MoveToErrorQueue: add fallback guard that assigns a sentinel message type (unknown/{ExceptionName}) when Message is null and MessageType was never set - DatabasePersistence: null-coalesce MessageType to "unknown" in the dead-letter INSERT as a belt-and-suspenders defense --- .../Wolverine.RDBMS/DatabasePersistence.cs | 2 +- .../ErrorHandling/MoveToErrorQueueTests.cs | 74 +++++++++++++ .../CloudEventsMapper_unknown_type_tests.cs | 82 ++++++++++++++ .../moving_unknown_cloudevents_type_to_dlq.cs | 100 ++++++++++++++++++ .../ErrorHandling/MoveToErrorQueue.cs | 4 + .../Runtime/Interop/CloudEventsMapper.cs | 5 + .../Runtime/WorkerQueues/DurableReceiver.cs | 21 +++- 7 files changed, 285 insertions(+), 3 deletions(-) create mode 100644 src/Testing/CoreTests/ErrorHandling/MoveToErrorQueueTests.cs create mode 100644 src/Testing/CoreTests/Runtime/Interop/CloudEventsMapper_unknown_type_tests.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka.Tests/moving_unknown_cloudevents_type_to_dlq.cs diff --git a/src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs b/src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs index 291ef4725..d23e22e41 100644 --- a/src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs +++ b/src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs @@ -162,7 +162,7 @@ public static void ConfigureDeadLetterCommands(DurabilitySettings durability, En builder.AddParameter(envelope.Id), builder.AddParameter(envelope.ScheduledTime), builder.AddParameter(data), - builder.AddParameter(envelope.MessageType), + builder.AddParameter(envelope.MessageType ?? "unknown"), builder.AddParameter(envelope.Destination?.ToString()), builder.AddParameter(envelope.Source), builder.AddParameter(exception?.GetType().FullNameInCode()), diff --git a/src/Testing/CoreTests/ErrorHandling/MoveToErrorQueueTests.cs b/src/Testing/CoreTests/ErrorHandling/MoveToErrorQueueTests.cs new file mode 100644 index 000000000..6b9968eb5 --- /dev/null +++ b/src/Testing/CoreTests/ErrorHandling/MoveToErrorQueueTests.cs @@ -0,0 +1,74 @@ +using NSubstitute; +using Wolverine.ErrorHandling; +using Wolverine.Logging; +using Wolverine.Runtime; +using Wolverine.Runtime.Interop; +using Wolverine.Util; +using Xunit; + +namespace CoreTests.ErrorHandling; + +public class MoveToErrorQueueTests +{ + private readonly IEnvelopeLifecycle _lifecycle; + private readonly IWolverineRuntime _runtime; + private readonly Envelope _envelope; + + public MoveToErrorQueueTests() + { + _lifecycle = Substitute.For(); + _runtime = Substitute.For(); + _runtime.Options.Returns(new WolverineOptions()); + _runtime.MessageTracking.Returns(Substitute.For()); + + _envelope = new Envelope + { + Destination = new Uri("local://queue") + }; + _lifecycle.Envelope.Returns(_envelope); + } + + [Fact] + public async Task should_assign_fallback_MessageType_when_Message_and_MessageType_are_null() + { + _envelope.Message = null; + _envelope.MessageType = null; + + var exception = new UnknownMessageTypeNameException("test"); + var continuation = new MoveToErrorQueue(exception); + + await continuation.ExecuteAsync(_lifecycle, _runtime, DateTimeOffset.UtcNow, null); + + _envelope.MessageType.ShouldBe("unknown/UnknownMessageTypeNameException"); + } + + [Fact] + public async Task should_preserve_existing_MessageType_when_Message_is_null() + { + _envelope.Message = null; + _envelope.MessageType = "com.example.orders.placed.v1"; + + var exception = new UnknownMessageTypeNameException("test"); + var continuation = new MoveToErrorQueue(exception); + + await continuation.ExecuteAsync(_lifecycle, _runtime, DateTimeOffset.UtcNow, null); + + _envelope.MessageType.ShouldBe("com.example.orders.placed.v1"); + } + + [Fact] + public async Task should_use_dotnet_type_when_Message_is_present() + { + _envelope.Message = new SampleMessage("test"); + _envelope.MessageType = "some.old.value"; + + var exception = new InvalidOperationException("test"); + var continuation = new MoveToErrorQueue(exception); + + await continuation.ExecuteAsync(_lifecycle, _runtime, DateTimeOffset.UtcNow, null); + + _envelope.MessageType.ShouldBe(typeof(SampleMessage).ToMessageTypeName()); + } +} + +public record SampleMessage(string Name); diff --git a/src/Testing/CoreTests/Runtime/Interop/CloudEventsMapper_unknown_type_tests.cs b/src/Testing/CoreTests/Runtime/Interop/CloudEventsMapper_unknown_type_tests.cs new file mode 100644 index 000000000..e8b915e39 --- /dev/null +++ b/src/Testing/CoreTests/Runtime/Interop/CloudEventsMapper_unknown_type_tests.cs @@ -0,0 +1,82 @@ +using System.Text.Json; +using Wolverine.Runtime.Handlers; +using Wolverine.Runtime.Interop; +using Xunit; + +namespace CoreTests.Runtime.Interop; + +public class CloudEventsMapper_unknown_type_tests +{ + private readonly HandlerGraph _handlers; + private readonly CloudEventsMapper _mapper; + + public CloudEventsMapper_unknown_type_tests() + { + _handlers = new HandlerGraph(); + _handlers.RegisterMessageType(typeof(ApproveOrder), "com.dapr.event.sent"); + + var options = new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; + _mapper = new CloudEventsMapper(_handlers, options); + } + + [Fact] + public void should_preserve_raw_type_on_envelope_for_unknown_type() + { + var json = """ + { + "data": { "orderId": 1 }, + "id": "5929aaac-a5e2-4ca1-859c-edfe73f11565", + "specversion": "1.0", + "type": "some.unknown.event.v1", + "source": "test" + } + """; + + var envelope = new Envelope(); + + var ex = Should.Throw(() => _mapper.MapIncoming(envelope, json)); + + ex.Message.ShouldContain("some.unknown.event.v1"); + envelope.MessageType.ShouldBe("some.unknown.event.v1"); + } + + [Fact] + public void should_not_set_MessageType_when_type_field_missing() + { + var json = """ + { + "data": { "orderId": 1 }, + "id": "5929aaac-a5e2-4ca1-859c-edfe73f11565", + "specversion": "1.0", + "source": "test" + } + """; + + var envelope = new Envelope(); + _mapper.MapIncoming(envelope, json); + + envelope.MessageType.ShouldBeNull(); + } + + [Fact] + public void should_overwrite_raw_type_with_resolved_type_on_success() + { + var json = """ + { + "data": { "orderId": 1 }, + "id": "5929aaac-a5e2-4ca1-859c-edfe73f11565", + "specversion": "1.0", + "type": "com.dapr.event.sent", + "source": "test" + } + """; + + var envelope = new Envelope(); + _mapper.MapIncoming(envelope, json); + + // Should be the resolved .NET type name, not the raw CloudEvent type + envelope.MessageType.ShouldNotBe("com.dapr.event.sent"); + envelope.MessageType.ShouldNotBeNull(); + envelope.Message.ShouldBeOfType(); + } +} diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/moving_unknown_cloudevents_type_to_dlq.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/moving_unknown_cloudevents_type_to_dlq.cs new file mode 100644 index 000000000..2564d1c9e --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/moving_unknown_cloudevents_type_to_dlq.cs @@ -0,0 +1,100 @@ +using System.Text; +using Confluent.Kafka; +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Resources; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Kafka.Internals; +using Wolverine.Persistence.Durability.DeadLetterManagement; +using Wolverine.Postgresql; +using Wolverine.Runtime; +using Wolverine.Tracking; + +namespace Wolverine.Kafka.Tests; + +public class moving_unknown_cloudevents_type_to_dlq : IAsyncLifetime +{ + private IHost _receiver; + + private readonly string _topicName = $"cloudevents-dlq-{Guid.NewGuid():N}"; + + public async Task InitializeAsync() + { + _receiver = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:9092") + .AutoProvision() + .ConfigureConsumers(c => c.AutoOffsetReset = AutoOffsetReset.Earliest); + + opts.ListenToKafkaTopic(_topicName) + .InteropWithCloudEvents(); + + opts.Discovery.IncludeAssembly(GetType().Assembly); + + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "kafka_ce_dlq"); + + opts.Services.AddResourceSetupOnStartup(); + + opts.Policies.UseDurableInboxOnAllListeners(); + + opts.UnknownMessageBehavior = UnknownMessageBehavior.DeadLetterQueue; + }).StartAsync(); + + await _receiver.RebuildAllEnvelopeStorageAsync(); + } + + public async Task DisposeAsync() + { + await _receiver.StopAsync(); + _receiver.Dispose(); + } + + [Fact] + public async Task cloudevents_message_with_unknown_type_should_be_dead_lettered() + { + var cloudEventsJson = """ + { + "data": { "orderId": 99 }, + "id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "specversion": "1.0", + "datacontenttype": "application/json; charset=utf-8", + "source": "integration-test", + "type": "com.test.unregistered.event.v1", + "time": "2026-01-01T00:00:00Z" + } + """; + + var transport = _receiver.GetRuntime().Options.Transports.GetOrCreate(); + var producerBuilder = new ProducerBuilder(transport.ProducerConfig); + using var producer = producerBuilder.Build(); + + await producer.ProduceAsync(_topicName, new Message + { + Value = Encoding.UTF8.GetBytes(cloudEventsJson) + }); + producer.Flush(); + + // Poll until the message appears in the dead letter queue + var storage = _receiver.GetRuntime().Storage; + var deadline = DateTimeOffset.UtcNow.Add(2.Minutes()); + DeadLetterEnvelopeResults deadLetters = null!; + + while (DateTimeOffset.UtcNow < deadline) + { + deadLetters = await storage.DeadLetters.QueryAsync( + new DeadLetterEnvelopeQuery(TimeRange.AllTime()), + CancellationToken.None); + + if (deadLetters.Envelopes.Any()) break; + + await Task.Delay(1.Seconds()); + } + + deadLetters.Envelopes.ShouldNotBeEmpty(); + var envelope = deadLetters.Envelopes.First(); + envelope.MessageType.ShouldBe("com.test.unregistered.event.v1"); + envelope.ExceptionType.ShouldContain("UnknownMessageTypeNameException"); + } +} diff --git a/src/Wolverine/ErrorHandling/MoveToErrorQueue.cs b/src/Wolverine/ErrorHandling/MoveToErrorQueue.cs index e7615ab0f..c41aed4e1 100644 --- a/src/Wolverine/ErrorHandling/MoveToErrorQueue.cs +++ b/src/Wolverine/ErrorHandling/MoveToErrorQueue.cs @@ -41,6 +41,10 @@ await lifecycle.SendFailureAcknowledgementAsync( { lifecycle.Envelope.MessageType = lifecycle.Envelope.Message.GetType().ToMessageTypeName(); } + else + { + lifecycle.Envelope.MessageType ??= $"unknown/{Exception.GetType().Name}"; + } await lifecycle.MoveToDeadLetterQueueAsync(Exception); diff --git a/src/Wolverine/Runtime/Interop/CloudEventsMapper.cs b/src/Wolverine/Runtime/Interop/CloudEventsMapper.cs index 8305419b5..ca76a7348 100644 --- a/src/Wolverine/Runtime/Interop/CloudEventsMapper.cs +++ b/src/Wolverine/Runtime/Interop/CloudEventsMapper.cs @@ -161,6 +161,10 @@ public void MapIncoming(Envelope envelope, JsonNode? node) if (node.TryGetValue("type", out var cloudEventType)) { + // Preserve the raw CloudEvent type on the envelope before resolution. + // If resolution fails, the raw type survives for dead-letter persistence. + envelope.MessageType = cloudEventType; + if (_handlers.TryFindMessageType(cloudEventType, out var messageType)) { var data = node["data"]; @@ -169,6 +173,7 @@ public void MapIncoming(Envelope envelope, JsonNode? node) envelope.Message = data.Deserialize(messageType, _options); } + // Overwrite with the canonical Wolverine message type name envelope.MessageType = messageType.ToMessageTypeName(); } else diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index a60110062..392d7f3e0 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -415,9 +415,26 @@ await executeWithRetriesAsync(async () => { if (envelope.MessageType.IsEmpty() && envelope.Serializer is IUnwrapsMetadataMessageSerializer serializer) { - serializer.Unwrap(envelope); + try + { + serializer.Unwrap(envelope); + } + catch (Exception e) + { + _logger.LogInformation(e, "Failed to unwrap metadata for Envelope {Id} received at durable {Destination}. Moving to dead letter queue", envelope.Id, envelope.Destination); + + if (envelope.Id == Guid.Empty) + { + envelope.Id = NewId.NextSequentialGuid(); + } + + envelope.MessageType ??= $"unknown/{e.GetType().Name}"; + envelope.Failure = e; + await _moveToErrors.PostAsync(envelope); + return; + } } - + // Have to do this before moving to the DLQ if (envelope.Id == Guid.Empty) {