diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs index 10b11926f312..709c04c6102d 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs @@ -228,6 +228,7 @@ public ServiceBusProcessorOptions() { } public int MaxConcurrentCalls { get { throw null; } set { } } public int PrefetchCount { get { throw null; } set { } } public Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } set { } } + public Azure.Messaging.ServiceBus.SubQueue SubQueue { get { throw null; } set { } } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override bool Equals(object obj) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] @@ -278,7 +279,6 @@ protected ServiceBusReceiver() { } public virtual bool IsClosed { get { throw null; } } public virtual int PrefetchCount { get { throw null; } } public virtual Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } } - public virtual string TransactionGroup { get { throw null; } } public virtual System.Threading.Tasks.Task AbandonMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task CloseAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task CompleteMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/EntityNameFormatter.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/EntityNameFormatter.cs index f643d6f2d79a..90f8e1b66e68 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/EntityNameFormatter.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/EntityNameFormatter.cs @@ -20,6 +20,20 @@ internal static class EntityNameFormatter private const string Transfer = "Transfer"; private const string TransferDeadLetterQueueName = SubQueuePrefix + Transfer + PathDelimiter + DeadLetterQueueName; + /// + /// Formats the entity path for a receiver or processor taking into account whether using a SubQueue. + /// + public static string FormatEntityPath(string entityPath, SubQueue subQueue) + { + return subQueue switch + { + SubQueue.None => entityPath, + SubQueue.DeadLetter => FormatDeadLetterPath(entityPath), + SubQueue.TransferDeadLetter => FormatTransferDeadLetterPath(entityPath), + _ => null + }; + } + /// /// Formats the dead letter path for either a queue, or a subscription. /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs index 6c2501849bab..4464b02ff2f6 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs @@ -41,6 +41,9 @@ public ReceiverManager( { ReceiveMode = ProcessorOptions.ReceiveMode, PrefetchCount = ProcessorOptions.PrefetchCount, + // Pass None for subqueue since the subqueue has already + // been taken into account when computing the EntityPath of the processor. + SubQueue = SubQueue.None }; _maxReceiveWaitTime = ProcessorOptions.MaxReceiveWaitTime; _plugins = plugins; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs index d032fbca54a6..ed6f755f9f56 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs @@ -197,7 +197,7 @@ internal ServiceBusProcessor( Options = options?.Clone() ?? new ServiceBusProcessorOptions(); Connection = connection; - EntityPath = entityPath; + EntityPath = EntityNameFormatter.FormatEntityPath(entityPath, Options.SubQueue); Identifier = DiagnosticUtilities.GenerateIdentifier(EntityPath); ReceiveMode = Options.ReceiveMode; @@ -225,7 +225,6 @@ internal ServiceBusProcessor( AutoCompleteMessages = Options.AutoCompleteMessages; - EntityPath = entityPath; IsSessionProcessor = isSessionEntity; _scopeFactory = new EntityScopeFactory(EntityPath, FullyQualifiedNamespace); _plugins = plugins; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessorOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessorOptions.cs index 0226fe82b9a1..8880ba49dc9a 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessorOptions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessorOptions.cs @@ -120,6 +120,11 @@ public int MaxConcurrentCalls } private int _maxConcurrentCalls = 1; + /// + /// Gets or sets the subqueue to connect the processor to. By default, the processor will not connect to a subqueue. + /// + public SubQueue SubQueue { get; set; } = SubQueue.None; + /// /// Determines whether the specified is equal to this instance. /// @@ -163,6 +168,7 @@ internal ServiceBusProcessorOptions Clone() MaxAutoLockRenewalDuration = MaxAutoLockRenewalDuration, MaxReceiveWaitTime = MaxReceiveWaitTime, MaxConcurrentCalls = MaxConcurrentCalls, + SubQueue = SubQueue }; } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs index fd223b9c1f26..483cf6ad927e 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs @@ -60,13 +60,6 @@ public class ServiceBusReceiver : IAsyncDisposable /// Every new client has a unique ID. internal string Identifier { get; } - /// - /// Gets the transaction group associated with the receiver. This is an - /// arbitrary string that is used to all senders, receivers, and processors that you - /// wish to use in a transaction that spans multiple different queues, topics, or subscriptions. - /// - public virtual string TransactionGroup { get; } - /// /// Indicates whether or not this has been closed. /// @@ -162,18 +155,7 @@ internal ServiceBusReceiver( ReceiveMode = options.ReceiveMode; PrefetchCount = options.PrefetchCount; - switch (options.SubQueue) - { - case SubQueue.None: - EntityPath = entityPath; - break; - case SubQueue.DeadLetter: - EntityPath = EntityNameFormatter.FormatDeadLetterPath(entityPath); - break; - case SubQueue.TransferDeadLetter: - EntityPath = EntityNameFormatter.FormatTransferDeadLetterPath(entityPath); - break; - } + EntityPath = EntityNameFormatter.FormatEntityPath(entityPath, options.SubQueue); IsSessionReceiver = isSessionEntity; _innerReceiver = _connection.CreateTransportReceiver( diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs index 6b90e1976ebe..f42509d5ac73 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs @@ -775,5 +775,59 @@ Task processErrorAsync(ProcessErrorEventArgs args) } } } + + [Test] + public async Task ProcessDlq() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var client = CreateClient(); + var sender = client.CreateSender(scope.QueueName); + int messageCount = 10; + + // send some messages + await sender.SendMessagesAsync(GetMessages(messageCount)); + + // receive the messages + var receiver = client.CreateReceiver(scope.QueueName); + int remaining = messageCount; + + // move the messages to the DLQ + while (remaining > 0) + { + var messages = await receiver.ReceiveMessagesAsync(remaining); + remaining -= messages.Count; + foreach (ServiceBusReceivedMessage message in messages) + { + await receiver.DeadLetterMessageAsync(message); + } + } + + // process the DLQ + await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions + { + SubQueue = SubQueue.DeadLetter + }); + + int receivedCount = 0; + var tcs = new TaskCompletionSource(); + + Task ProcessMessage(ProcessMessageEventArgs args) + { + var ct = Interlocked.Increment(ref receivedCount); + if (ct == messageCount) + { + tcs.SetResult(true); + } + return Task.CompletedTask; + } + processor.ProcessMessageAsync += ProcessMessage; + processor.ProcessErrorAsync += ExceptionHandler; + + await processor.StartProcessingAsync(); + await tcs.Task; + await processor.StopProcessingAsync(); + } + } } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs index d586c1184848..5156eced7662 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs @@ -139,7 +139,8 @@ public void ProcessorOptionsSetOnClient() PrefetchCount = 5, ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete, MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(60), - MaxReceiveWaitTime = TimeSpan.FromSeconds(10) + MaxReceiveWaitTime = TimeSpan.FromSeconds(10), + SubQueue = SubQueue.DeadLetter }; var processor = client.CreateProcessor("queueName", options); Assert.AreEqual(options.AutoCompleteMessages, processor.AutoCompleteMessages); @@ -149,6 +150,7 @@ public void ProcessorOptionsSetOnClient() Assert.AreEqual(options.MaxAutoLockRenewalDuration, processor.MaxAutoLockRenewalDuration); Assert.AreEqual(options.MaxReceiveWaitTime, processor.MaxReceiveWaitTime); Assert.AreEqual(fullyQualifiedNamespace, processor.FullyQualifiedNamespace); + Assert.AreEqual(EntityNameFormatter.FormatDeadLetterPath("queueName"), processor.EntityPath); Assert.IsFalse(processor.IsClosed); Assert.IsFalse(processor.IsProcessing); }