From ffd304b776c74e5f5e122030bbd6a94d698b300e Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 15 Jan 2024 15:19:03 -0600 Subject: [PATCH] Azure Service Bus transport improvements for FIFO queues et al. Closes GH-686. Closes GH-680. --- .../OpenApiDemonstrator.csproj | 2 +- .../Internal/AzureServiceBusEnvelope.cs | 34 +++++++++++++++++-- .../BatchedAzureServiceBusListener.cs | 6 ++-- .../Internal/InlineAzureServiceBusListener.cs | 8 ++--- .../Internal/SessionSpecificListener.cs | 14 ++++---- 5 files changed, 45 insertions(+), 19 deletions(-) diff --git a/src/Http/OpenApiDemonstrator/OpenApiDemonstrator.csproj b/src/Http/OpenApiDemonstrator/OpenApiDemonstrator.csproj index 12b47e280..065689404 100644 --- a/src/Http/OpenApiDemonstrator/OpenApiDemonstrator.csproj +++ b/src/Http/OpenApiDemonstrator/OpenApiDemonstrator.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusEnvelope.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusEnvelope.cs index 9546ba169..22429df8d 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusEnvelope.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusEnvelope.cs @@ -4,9 +4,10 @@ namespace Wolverine.AzureServiceBus.Internal; public class AzureServiceBusEnvelope : Envelope { - public AzureServiceBusEnvelope(ServiceBusReceivedMessage message) + public AzureServiceBusEnvelope(ServiceBusReceivedMessage message, ServiceBusSessionReceiver sessionReceiver) { AzureMessage = message; + SessionReceiver = sessionReceiver; } public AzureServiceBusEnvelope(ProcessMessageEventArgs args) @@ -15,9 +16,36 @@ public AzureServiceBusEnvelope(ProcessMessageEventArgs args) AzureMessage = args.Message; } - public ProcessMessageEventArgs Args { get; set; } + public AzureServiceBusEnvelope(ServiceBusReceivedMessage message, ServiceBusReceiver sessionReceiver) + { + AzureMessage = message; + ServiceBusReceiver = sessionReceiver; + } + + public Task CompleteAsync(CancellationToken token) + { + return Args?.CompleteMessageAsync(AzureMessage, token) ?? ServiceBusReceiver?.CompleteMessageAsync(AzureMessage, token) ?? + SessionReceiver?.CompleteMessageAsync(AzureMessage, token) ?? Task.CompletedTask; + } + + public Task DeferAsync(CancellationToken token) + { + return Args?.DeferMessageAsync(AzureMessage, cancellationToken: token) ?? ServiceBusReceiver?.DeferMessageAsync(AzureMessage, cancellationToken: token) ?? + SessionReceiver?.DeferMessageAsync(AzureMessage, cancellationToken: token) ?? Task.CompletedTask; + } + + public Task DeadLetterAsync(CancellationToken token, string? deadLetterReason = null, string? deadLetterErrorDescription = null) + { + return Args?.DeadLetterMessageAsync(AzureMessage, cancellationToken: token, deadLetterReason: deadLetterReason, deadLetterErrorDescription:deadLetterErrorDescription) + ?? ServiceBusReceiver?.DeadLetterMessageAsync(AzureMessage, cancellationToken: token, deadLetterReason: deadLetterReason, deadLetterErrorDescription:deadLetterErrorDescription) + ?? SessionReceiver?.DeadLetterMessageAsync(AzureMessage, cancellationToken: token, deadLetterReason: deadLetterReason, deadLetterErrorDescription:deadLetterErrorDescription) ?? Task.CompletedTask; + } + + private ProcessMessageEventArgs? Args { get; set; } - public ServiceBusReceivedMessage AzureMessage { get; } + private ServiceBusReceivedMessage AzureMessage { get; } + private ServiceBusSessionReceiver? SessionReceiver { get; } + private ServiceBusReceiver? ServiceBusReceiver { get; } public Exception Exception { get; set; } public bool IsCompleted { get; set; } public ServiceBusReceiver? Receiver { get; set; } diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/BatchedAzureServiceBusListener.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/BatchedAzureServiceBusListener.cs index 0560c4a51..1a9e5e0ae 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/BatchedAzureServiceBusListener.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/BatchedAzureServiceBusListener.cs @@ -37,7 +37,7 @@ public BatchedAzureServiceBusListener(AzureServiceBusEndpoint endpoint, ILogger _complete = new RetryBlock((e, _) => { - return _receiver.CompleteMessageAsync(e.AzureMessage); + return e.CompleteAsync(_cancellation.Token); }, _logger, _cancellation.Token); _defer = new RetryBlock(async (envelope, _) => @@ -46,7 +46,7 @@ public BatchedAzureServiceBusListener(AzureServiceBusEndpoint endpoint, ILogger }, logger, _cancellation.Token); _deadLetter = - new RetryBlock((e, c) => _receiver.DeadLetterMessageAsync(e.AzureMessage, deadLetterReason:e.Exception?.GetType().NameInCode(), deadLetterErrorDescription:e.Exception?.Message), logger, + new RetryBlock((e, c) => e.DeadLetterAsync(_cancellation.Token, deadLetterReason:e.Exception?.GetType().NameInCode(), deadLetterErrorDescription:e.Exception?.Message), logger, _cancellation.Token); } @@ -116,7 +116,7 @@ private async Task listenForMessages() { try { - var envelope = new AzureServiceBusEnvelope(message); + var envelope = new AzureServiceBusEnvelope(message, _receiver); _mapper.MapIncomingToEnvelope(envelope, message); envelopes.Add(envelope); diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/InlineAzureServiceBusListener.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/InlineAzureServiceBusListener.cs index 6444bae97..6cd76e61c 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/InlineAzureServiceBusListener.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/InlineAzureServiceBusListener.cs @@ -36,14 +36,14 @@ public InlineAzureServiceBusListener(AzureServiceBusEndpoint endpoint, _complete = new RetryBlock((e, _) => { - return e.Args.CompleteMessageAsync(e.AzureMessage); + return e.CompleteAsync(_cancellation.Token); }, _logger, _cancellation.Token); _defer = new RetryBlock(async (envelope, _) => { - if (envelope is AzureServiceBusEnvelope e) + if (envelope is { } e) { - await e.Args.CompleteMessageAsync(e.AzureMessage); + await e.CompleteAsync(_cancellation.Token); e.IsCompleted = true; } @@ -51,7 +51,7 @@ public InlineAzureServiceBusListener(AzureServiceBusEndpoint endpoint, }, logger, _cancellation.Token); _deadLetter = - new RetryBlock((e, c) => e.Args.DeadLetterMessageAsync(e.AzureMessage, deadLetterReason:e.Exception?.GetType().NameInCode(), deadLetterErrorDescription:e.Exception?.Message), logger, + new RetryBlock((e, c) => e.DeadLetterAsync(_cancellation.Token, deadLetterReason:e.Exception?.GetType().NameInCode(), deadLetterErrorDescription:e.Exception?.Message), logger, _cancellation.Token); _processor.ProcessMessageAsync += processMessageAsync; diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/SessionSpecificListener.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/SessionSpecificListener.cs index 6afc9770f..e97169848 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/SessionSpecificListener.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/SessionSpecificListener.cs @@ -146,16 +146,13 @@ public SessionSpecificListener(ServiceBusSessionReceiver sessionReceiver, AzureS _mapper = mapper; _logger = logger; - _complete = new RetryBlock((e, _) => - { - return e.Args.CompleteMessageAsync(e.AzureMessage); - }, _logger, _cancellation.Token); + _complete = new RetryBlock((e, _) => e.CompleteAsync(_cancellation.Token), _logger, _cancellation.Token); _defer = new RetryBlock(async (envelope, _) => { - if (envelope is AzureServiceBusEnvelope e) + if (envelope is { } e) { - await e.Args.CompleteMessageAsync(e.AzureMessage); + await e.CompleteAsync(_cancellation.Token); e.IsCompleted = true; } @@ -163,7 +160,7 @@ public SessionSpecificListener(ServiceBusSessionReceiver sessionReceiver, AzureS }, logger, _cancellation.Token); _deadLetter = - new RetryBlock((e, c) => _sessionReceiver.DeadLetterMessageAsync(e.AzureMessage, deadLetterReason:e.Exception?.GetType().NameInCode(), deadLetterErrorDescription:e.Exception?.Message), logger, + new RetryBlock((e, c) => e.DeadLetterAsync(_cancellation.Token, deadLetterReason:e.Exception?.GetType().NameInCode(), deadLetterErrorDescription:e.Exception?.Message), logger, _cancellation.Token); } @@ -176,7 +173,8 @@ public async Task ExecuteAsync(CancellationToken cancellationToken) { try { - var envelope = new AzureServiceBusEnvelope(message); + var envelope = new AzureServiceBusEnvelope(message, _sessionReceiver); + _mapper.MapIncomingToEnvelope(envelope, message); try