Skip to content

Commit 6d924c9

Browse files
Include SubQueue option for Processor (#20719)
* Include SubQueue option for Processor * Remove duplicate line
1 parent 37088a4 commit 6d924c9

File tree

8 files changed

+83
-23
lines changed

8 files changed

+83
-23
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ public ServiceBusProcessorOptions() { }
228228
public int MaxConcurrentCalls { get { throw null; } set { } }
229229
public int PrefetchCount { get { throw null; } set { } }
230230
public Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } set { } }
231+
public Azure.Messaging.ServiceBus.SubQueue SubQueue { get { throw null; } set { } }
231232
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
232233
public override bool Equals(object obj) { throw null; }
233234
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
@@ -278,7 +279,6 @@ protected ServiceBusReceiver() { }
278279
public virtual bool IsClosed { get { throw null; } }
279280
public virtual int PrefetchCount { get { throw null; } }
280281
public virtual Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } }
281-
public virtual string TransactionGroup { get { throw null; } }
282282
public virtual System.Threading.Tasks.Task AbandonMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
283283
public virtual System.Threading.Tasks.Task CloseAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
284284
public virtual System.Threading.Tasks.Task CompleteMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }

sdk/servicebus/Azure.Messaging.ServiceBus/src/EntityNameFormatter.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,20 @@ internal static class EntityNameFormatter
2020
private const string Transfer = "Transfer";
2121
private const string TransferDeadLetterQueueName = SubQueuePrefix + Transfer + PathDelimiter + DeadLetterQueueName;
2222

23+
/// <summary>
24+
/// Formats the entity path for a receiver or processor taking into account whether using a SubQueue.
25+
/// </summary>
26+
public static string FormatEntityPath(string entityPath, SubQueue subQueue)
27+
{
28+
return subQueue switch
29+
{
30+
SubQueue.None => entityPath,
31+
SubQueue.DeadLetter => FormatDeadLetterPath(entityPath),
32+
SubQueue.TransferDeadLetter => FormatTransferDeadLetterPath(entityPath),
33+
_ => null
34+
};
35+
}
36+
2337
/// <summary>
2438
/// Formats the dead letter path for either a queue, or a subscription.
2539
/// </summary>

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ public ReceiverManager(
4141
{
4242
ReceiveMode = ProcessorOptions.ReceiveMode,
4343
PrefetchCount = ProcessorOptions.PrefetchCount,
44+
// Pass None for subqueue since the subqueue has already
45+
// been taken into account when computing the EntityPath of the processor.
46+
SubQueue = SubQueue.None
4447
};
4548
_maxReceiveWaitTime = ProcessorOptions.MaxReceiveWaitTime;
4649
_plugins = plugins;

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ internal ServiceBusProcessor(
197197

198198
Options = options?.Clone() ?? new ServiceBusProcessorOptions();
199199
Connection = connection;
200-
EntityPath = entityPath;
200+
EntityPath = EntityNameFormatter.FormatEntityPath(entityPath, Options.SubQueue);
201201
Identifier = DiagnosticUtilities.GenerateIdentifier(EntityPath);
202202

203203
ReceiveMode = Options.ReceiveMode;
@@ -225,7 +225,6 @@ internal ServiceBusProcessor(
225225

226226
AutoCompleteMessages = Options.AutoCompleteMessages;
227227

228-
EntityPath = entityPath;
229228
IsSessionProcessor = isSessionEntity;
230229
_scopeFactory = new EntityScopeFactory(EntityPath, FullyQualifiedNamespace);
231230
_plugins = plugins;

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessorOptions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ public int MaxConcurrentCalls
120120
}
121121
private int _maxConcurrentCalls = 1;
122122

123+
/// <summary>
124+
/// Gets or sets the subqueue to connect the processor to. By default, the processor will not connect to a subqueue.
125+
/// </summary>
126+
public SubQueue SubQueue { get; set; } = SubQueue.None;
127+
123128
/// <summary>
124129
/// Determines whether the specified <see cref="System.Object" /> is equal to this instance.
125130
/// </summary>
@@ -163,6 +168,7 @@ internal ServiceBusProcessorOptions Clone()
163168
MaxAutoLockRenewalDuration = MaxAutoLockRenewalDuration,
164169
MaxReceiveWaitTime = MaxReceiveWaitTime,
165170
MaxConcurrentCalls = MaxConcurrentCalls,
171+
SubQueue = SubQueue
166172
};
167173
}
168174
}

sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,6 @@ public class ServiceBusReceiver : IAsyncDisposable
6060
/// <remarks>Every new client has a unique ID.</remarks>
6161
internal string Identifier { get; }
6262

63-
/// <summary>
64-
/// Gets the transaction group associated with the receiver. This is an
65-
/// arbitrary string that is used to all senders, receivers, and processors that you
66-
/// wish to use in a transaction that spans multiple different queues, topics, or subscriptions.
67-
/// </summary>
68-
public virtual string TransactionGroup { get; }
69-
7063
/// <summary>
7164
/// Indicates whether or not this <see cref="ServiceBusReceiver"/> has been closed.
7265
/// </summary>
@@ -162,18 +155,7 @@ internal ServiceBusReceiver(
162155
ReceiveMode = options.ReceiveMode;
163156
PrefetchCount = options.PrefetchCount;
164157

165-
switch (options.SubQueue)
166-
{
167-
case SubQueue.None:
168-
EntityPath = entityPath;
169-
break;
170-
case SubQueue.DeadLetter:
171-
EntityPath = EntityNameFormatter.FormatDeadLetterPath(entityPath);
172-
break;
173-
case SubQueue.TransferDeadLetter:
174-
EntityPath = EntityNameFormatter.FormatTransferDeadLetterPath(entityPath);
175-
break;
176-
}
158+
EntityPath = EntityNameFormatter.FormatEntityPath(entityPath, options.SubQueue);
177159

178160
IsSessionReceiver = isSessionEntity;
179161
_innerReceiver = _connection.CreateTransportReceiver(

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,5 +775,59 @@ Task processErrorAsync(ProcessErrorEventArgs args)
775775
}
776776
}
777777
}
778+
779+
[Test]
780+
public async Task ProcessDlq()
781+
{
782+
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
783+
{
784+
await using var client = CreateClient();
785+
var sender = client.CreateSender(scope.QueueName);
786+
int messageCount = 10;
787+
788+
// send some messages
789+
await sender.SendMessagesAsync(GetMessages(messageCount));
790+
791+
// receive the messages
792+
var receiver = client.CreateReceiver(scope.QueueName);
793+
int remaining = messageCount;
794+
795+
// move the messages to the DLQ
796+
while (remaining > 0)
797+
{
798+
var messages = await receiver.ReceiveMessagesAsync(remaining);
799+
remaining -= messages.Count;
800+
foreach (ServiceBusReceivedMessage message in messages)
801+
{
802+
await receiver.DeadLetterMessageAsync(message);
803+
}
804+
}
805+
806+
// process the DLQ
807+
await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
808+
{
809+
SubQueue = SubQueue.DeadLetter
810+
});
811+
812+
int receivedCount = 0;
813+
var tcs = new TaskCompletionSource<bool>();
814+
815+
Task ProcessMessage(ProcessMessageEventArgs args)
816+
{
817+
var ct = Interlocked.Increment(ref receivedCount);
818+
if (ct == messageCount)
819+
{
820+
tcs.SetResult(true);
821+
}
822+
return Task.CompletedTask;
823+
}
824+
processor.ProcessMessageAsync += ProcessMessage;
825+
processor.ProcessErrorAsync += ExceptionHandler;
826+
827+
await processor.StartProcessingAsync();
828+
await tcs.Task;
829+
await processor.StopProcessingAsync();
830+
}
831+
}
778832
}
779833
}

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ public void ProcessorOptionsSetOnClient()
139139
PrefetchCount = 5,
140140
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
141141
MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(60),
142-
MaxReceiveWaitTime = TimeSpan.FromSeconds(10)
142+
MaxReceiveWaitTime = TimeSpan.FromSeconds(10),
143+
SubQueue = SubQueue.DeadLetter
143144
};
144145
var processor = client.CreateProcessor("queueName", options);
145146
Assert.AreEqual(options.AutoCompleteMessages, processor.AutoCompleteMessages);
@@ -149,6 +150,7 @@ public void ProcessorOptionsSetOnClient()
149150
Assert.AreEqual(options.MaxAutoLockRenewalDuration, processor.MaxAutoLockRenewalDuration);
150151
Assert.AreEqual(options.MaxReceiveWaitTime, processor.MaxReceiveWaitTime);
151152
Assert.AreEqual(fullyQualifiedNamespace, processor.FullyQualifiedNamespace);
153+
Assert.AreEqual(EntityNameFormatter.FormatDeadLetterPath("queueName"), processor.EntityPath);
152154
Assert.IsFalse(processor.IsClosed);
153155
Assert.IsFalse(processor.IsProcessing);
154156
}

0 commit comments

Comments
 (0)