diff --git a/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Incoming.cs b/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Incoming.cs index 0f53bf58a..8ec21b112 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Incoming.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Incoming.cs @@ -15,20 +15,21 @@ private async Task tryRecoverIncomingMessages() using var session = _store.OpenAsyncSession(); var listeners = await session.Query() .Where(x => x.OwnerId == 0) - .Select(x => x.ReceivedAt) + .Select(x => new { x.ReceivedAt }) .Distinct() .ToListAsync(); - foreach (var listener in listeners) + foreach (var listener in listeners.Where(x => x.ReceivedAt != null)) { - var circuit = _runtime.Endpoints.FindListenerCircuit(listener); + var receivedAt = listener.ReceivedAt!; + var circuit = _runtime.Endpoints.FindListenerCircuit(receivedAt); if (circuit.Status != ListeningStatus.Accepting) { continue; } // Harden around this! - await recoverMessagesForListener(listener, circuit); + await recoverMessagesForListener(receivedAt, circuit); } } catch (Exception e) @@ -55,5 +56,5 @@ private async Task recoverMessagesForListener(Uri listener, IListenerCircuit cir _logger.LogError(e, "Error trying to recover messages from the inbox for listener {Uri}", listener); } } - + } \ No newline at end of file diff --git a/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Outgoing.cs b/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Outgoing.cs index e140d2113..288d24cf8 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Outgoing.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Outgoing.cs @@ -13,13 +13,13 @@ private async Task tryRecoverOutgoingMessagesAsync() var senders = (await session.Query().Customize(x => x.WaitForNonStaleResults()) .Where(x => x.OwnerId == 0).ToListAsync()) - .Select(x => x.Destination) + .Select(x => new { x.Destination }) .Distinct() .ToList(); - foreach (var sender in senders) + foreach (var sender in senders.Where(x => x.Destination != null)) { - await tryRecoverOutgoingMessagesToSenderAsync(sender); + await tryRecoverOutgoingMessagesToSenderAsync(sender.Destination!); } } catch (Exception e) @@ -34,7 +34,7 @@ private async Task tryRecoverOutgoingMessagesToSenderAsync(Uri sender) { var sendingAgent = _runtime.Endpoints.GetOrBuildSendingAgent(sender); if (sendingAgent.Latched) return; - + var outgoing = await _parent.Outbox.LoadOutgoingAsync(sendingAgent.Destination); var expiredMessages = outgoing.Where(x => x.IsExpired()).ToArray(); var good = outgoing.Where(x => !x.IsExpired()).ToArray(); diff --git a/src/Persistence/Wolverine.RavenDb/Internals/OutgoingMessage.cs b/src/Persistence/Wolverine.RavenDb/Internals/OutgoingMessage.cs index c1476439b..1098d0e14 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/OutgoingMessage.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/OutgoingMessage.cs @@ -22,7 +22,7 @@ public OutgoingMessage(Envelope envelope) public string Id { get; set; } public int OwnerId { get; set; } - public Uri Destination { get; set; } + public Uri? Destination { get; set; } public DateTimeOffset? DeliverBy { get; set; } public byte[] Body { get; set; } = []; public int Attempts { get; set; }