From adf4a9865c32b5da7c073d757c4359de6bc99977 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 2 Dec 2025 11:01:25 -0600 Subject: [PATCH] Cloud events mapper can handle non-Guid message ids. Closes GH-1889 --- .../Runtime/Interop/CloudEventsMapper.cs | 12 ++++- .../Runtime/WorkerQueues/DurableReceiver.cs | 47 +++++++++++++------ 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/src/Wolverine/Runtime/Interop/CloudEventsMapper.cs b/src/Wolverine/Runtime/Interop/CloudEventsMapper.cs index a8da0a021..6d3251891 100644 --- a/src/Wolverine/Runtime/Interop/CloudEventsMapper.cs +++ b/src/Wolverine/Runtime/Interop/CloudEventsMapper.cs @@ -2,6 +2,7 @@ using System.Text.Json.Nodes; using System.Text.Json.Serialization; using JasperFx.Core.Reflection; +using MassTransit; using Wolverine.Runtime.Handlers; using Wolverine.Runtime.Serialization; using Wolverine.Util; @@ -137,9 +138,16 @@ public void MapIncoming(Envelope envelope, JsonNode? node) envelope.SentAt = time; } - if (node.TryGetValue("id", out var id)) + if (node.TryGetValue("id", out var raw)) { - envelope.Id = id; + if (Guid.TryParse(raw, out var id)) + { + envelope.Id = id; + } + else + { + envelope.Id = NewId.NextSequentialGuid(); + } } if (node.TryGetValue("type", out var cloudEventType)) diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index d94ca0830..34694507d 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -1,5 +1,6 @@ using JasperFx.Blocks; using JasperFx.Core; +using MassTransit; using Microsoft.Extensions.Logging; using Wolverine.Configuration; using Wolverine.Logging; @@ -366,26 +367,26 @@ await executeWithRetriesAsync(async () => serializer.Unwrap(envelope); } + // Have to do this before moving to the DLQ + if (envelope.Id == Guid.Empty) + { + envelope.Id = NewId.NextSequentialGuid(); + } + + if (envelope.MessageType.IsEmpty()) + { + _logger.LogInformation("Empty or missing message type name for Envelope {Id} received at durable {Destination}. Moving to dead letter queue", envelope.Id, envelope.Destination); + await _moveToErrors.PostAsync(envelope); + return; + } + envelope.OwnerId = _settings.AssignedNodeNumber; await _inbox.StoreIncomingAsync(envelope); envelope.WasPersistedInInbox = true; } catch (DuplicateIncomingEnvelopeException e) { - _logger.LogError(e, "Duplicate incoming envelope detected"); - - if (envelope.Listener != null) - { - try - { - await envelope.Listener.CompleteAsync(envelope); - } - catch (Exception exception) - { - _logger.LogError(exception, "Error trying to complete duplicated message {Id} from {Uri}", - envelope.Id, Uri); - } - } + await handleDuplicateIncomingEnvelope(envelope, e); return; } @@ -404,6 +405,24 @@ await executeWithRetriesAsync(async () => } } + private async Task handleDuplicateIncomingEnvelope(Envelope envelope, DuplicateIncomingEnvelopeException e) + { + _logger.LogError(e, "Duplicate incoming envelope detected"); + + if (envelope.Listener != null) + { + try + { + await envelope.Listener.CompleteAsync(envelope); + } + catch (Exception exception) + { + _logger.LogError(exception, "Error trying to complete duplicated message {Id} from {Uri}", + envelope.Id, Uri); + } + } + } + private async Task executeWithRetriesAsync(Func action) { var i = 0;