Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,21 @@ private async Task tryRecoverIncomingMessages()
using var session = _store.OpenAsyncSession();
var listeners = await session.Query<IncomingMessage>()
.Where(x => x.OwnerId == 0)
.Select(x => x.ReceivedAt)
.Select(x => new { x.ReceivedAt })
.Distinct()
.ToListAsync();

foreach (var listener in listeners)
foreach (var listener in listeners.Where(x => x.ReceivedAt != null))
{
var circuit = _runtime.Endpoints.FindListenerCircuit(listener);
var receivedAt = listener.ReceivedAt!;
var circuit = _runtime.Endpoints.FindListenerCircuit(receivedAt);
if (circuit.Status != ListeningStatus.Accepting)
{
continue;
}

// Harden around this!
await recoverMessagesForListener(listener, circuit);
await recoverMessagesForListener(receivedAt, circuit);
}
}
catch (Exception e)
Expand All @@ -55,5 +56,5 @@ private async Task recoverMessagesForListener(Uri listener, IListenerCircuit cir
_logger.LogError(e, "Error trying to recover messages from the inbox for listener {Uri}", listener);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ private async Task tryRecoverOutgoingMessagesAsync()

var senders = (await session.Query<OutgoingMessage>().Customize(x => x.WaitForNonStaleResults())
.Where(x => x.OwnerId == 0).ToListAsync())
.Select(x => x.Destination)
.Select(x => new { x.Destination })
.Distinct()
.ToList();

foreach (var sender in senders)
foreach (var sender in senders.Where(x => x.Destination != null))
{
await tryRecoverOutgoingMessagesToSenderAsync(sender);
await tryRecoverOutgoingMessagesToSenderAsync(sender.Destination!);
}
}
catch (Exception e)
Expand All @@ -34,7 +34,7 @@ private async Task tryRecoverOutgoingMessagesToSenderAsync(Uri sender)
{
var sendingAgent = _runtime.Endpoints.GetOrBuildSendingAgent(sender);
if (sendingAgent.Latched) return;

var outgoing = await _parent.Outbox.LoadOutgoingAsync(sendingAgent.Destination);
var expiredMessages = outgoing.Where(x => x.IsExpired()).ToArray();
var good = outgoing.Where(x => !x.IsExpired()).ToArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public OutgoingMessage(Envelope envelope)

public string Id { get; set; }
public int OwnerId { get; set; }
public Uri Destination { get; set; }
public Uri? Destination { get; set; }
public DateTimeOffset? DeliverBy { get; set; }
public byte[] Body { get; set; } = [];
public int Attempts { get; set; }
Expand Down
Loading