Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ public ServiceBusProcessorOptions() { }
public int MaxConcurrentCalls { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } set { } }
public Azure.Messaging.ServiceBus.SubQueue SubQueue { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
Expand Down Expand Up @@ -278,7 +279,6 @@ protected ServiceBusReceiver() { }
public virtual bool IsClosed { get { throw null; } }
public virtual int PrefetchCount { get { throw null; } }
public virtual Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } }
public virtual string TransactionGroup { get { throw null; } }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was erroneously left over from a previous beta.

public virtual System.Threading.Tasks.Task AbandonMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task CloseAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task CompleteMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@ internal static class EntityNameFormatter
private const string Transfer = "Transfer";
private const string TransferDeadLetterQueueName = SubQueuePrefix + Transfer + PathDelimiter + DeadLetterQueueName;

/// <summary>
/// Formats the entity path for a receiver or processor taking into account whether using a SubQueue.
/// </summary>
public static string FormatEntityPath(string entityPath, SubQueue subQueue)
{
return subQueue switch
{
SubQueue.None => entityPath,
SubQueue.DeadLetter => FormatDeadLetterPath(entityPath),
SubQueue.TransferDeadLetter => FormatTransferDeadLetterPath(entityPath),
_ => null
};
}

/// <summary>
/// Formats the dead letter path for either a queue, or a subscription.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public ReceiverManager(
{
ReceiveMode = ProcessorOptions.ReceiveMode,
PrefetchCount = ProcessorOptions.PrefetchCount,
// Pass None for subqueue since the subqueue has already
// been taken into account when computing the EntityPath of the processor.
SubQueue = SubQueue.None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not technically necessary (since None is the default), but I included it to help document why the option is not passed to the receiver.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this helpful to follow the rationale.

};
_maxReceiveWaitTime = ProcessorOptions.MaxReceiveWaitTime;
_plugins = plugins;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ internal ServiceBusProcessor(

Options = options?.Clone() ?? new ServiceBusProcessorOptions();
Connection = connection;
EntityPath = entityPath;
EntityPath = EntityNameFormatter.FormatEntityPath(entityPath, Options.SubQueue);
Identifier = DiagnosticUtilities.GenerateIdentifier(EntityPath);

ReceiveMode = Options.ReceiveMode;
Expand Down Expand Up @@ -225,7 +225,6 @@ internal ServiceBusProcessor(

AutoCompleteMessages = Options.AutoCompleteMessages;

EntityPath = entityPath;
IsSessionProcessor = isSessionEntity;
_scopeFactory = new EntityScopeFactory(EntityPath, FullyQualifiedNamespace);
_plugins = plugins;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public int MaxConcurrentCalls
}
private int _maxConcurrentCalls = 1;

/// <summary>
/// Gets or sets the subqueue to connect the processor to. By default, the processor will not connect to a subqueue.
/// </summary>
public SubQueue SubQueue { get; set; } = SubQueue.None;

/// <summary>
/// Determines whether the specified <see cref="System.Object" /> is equal to this instance.
/// </summary>
Expand Down Expand Up @@ -163,6 +168,7 @@ internal ServiceBusProcessorOptions Clone()
MaxAutoLockRenewalDuration = MaxAutoLockRenewalDuration,
MaxReceiveWaitTime = MaxReceiveWaitTime,
MaxConcurrentCalls = MaxConcurrentCalls,
SubQueue = SubQueue
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ public class ServiceBusReceiver : IAsyncDisposable
/// <remarks>Every new client has a unique ID.</remarks>
internal string Identifier { get; }

/// <summary>
/// Gets the transaction group associated with the receiver. This is an
/// arbitrary string that is used to all senders, receivers, and processors that you
/// wish to use in a transaction that spans multiple different queues, topics, or subscriptions.
/// </summary>
public virtual string TransactionGroup { get; }

/// <summary>
/// Indicates whether or not this <see cref="ServiceBusReceiver"/> has been closed.
/// </summary>
Expand Down Expand Up @@ -162,18 +155,7 @@ internal ServiceBusReceiver(
ReceiveMode = options.ReceiveMode;
PrefetchCount = options.PrefetchCount;

switch (options.SubQueue)
{
case SubQueue.None:
EntityPath = entityPath;
break;
case SubQueue.DeadLetter:
EntityPath = EntityNameFormatter.FormatDeadLetterPath(entityPath);
break;
case SubQueue.TransferDeadLetter:
EntityPath = EntityNameFormatter.FormatTransferDeadLetterPath(entityPath);
break;
}
EntityPath = EntityNameFormatter.FormatEntityPath(entityPath, options.SubQueue);

IsSessionReceiver = isSessionEntity;
_innerReceiver = _connection.CreateTransportReceiver(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,5 +775,59 @@ Task processErrorAsync(ProcessErrorEventArgs args)
}
}
}

[Test]
public async Task ProcessDlq()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = CreateClient();
var sender = client.CreateSender(scope.QueueName);
int messageCount = 10;

// send some messages
await sender.SendMessagesAsync(GetMessages(messageCount));

// receive the messages
var receiver = client.CreateReceiver(scope.QueueName);
int remaining = messageCount;

// move the messages to the DLQ
while (remaining > 0)
{
var messages = await receiver.ReceiveMessagesAsync(remaining);
remaining -= messages.Count;
foreach (ServiceBusReceivedMessage message in messages)
{
await receiver.DeadLetterMessageAsync(message);
}
}

// process the DLQ
await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
{
SubQueue = SubQueue.DeadLetter
});

int receivedCount = 0;
var tcs = new TaskCompletionSource<bool>();

Task ProcessMessage(ProcessMessageEventArgs args)
{
var ct = Interlocked.Increment(ref receivedCount);
if (ct == messageCount)
{
tcs.SetResult(true);
}
return Task.CompletedTask;
}
processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;

await processor.StartProcessingAsync();
await tcs.Task;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include a cancellation token timeout here to ensure that the test doesn't hang if the right number of messages aren't read?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach we use is to just let the tests timeout after 5 minutes. We could add a shorter timeout for some of these tests but I'm not sure it would be worth it.

await processor.StopProcessingAsync();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public void ProcessorOptionsSetOnClient()
PrefetchCount = 5,
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(60),
MaxReceiveWaitTime = TimeSpan.FromSeconds(10)
MaxReceiveWaitTime = TimeSpan.FromSeconds(10),
SubQueue = SubQueue.DeadLetter
};
var processor = client.CreateProcessor("queueName", options);
Assert.AreEqual(options.AutoCompleteMessages, processor.AutoCompleteMessages);
Expand All @@ -149,6 +150,7 @@ public void ProcessorOptionsSetOnClient()
Assert.AreEqual(options.MaxAutoLockRenewalDuration, processor.MaxAutoLockRenewalDuration);
Assert.AreEqual(options.MaxReceiveWaitTime, processor.MaxReceiveWaitTime);
Assert.AreEqual(fullyQualifiedNamespace, processor.FullyQualifiedNamespace);
Assert.AreEqual(EntityNameFormatter.FormatDeadLetterPath("queueName"), processor.EntityPath);
Assert.IsFalse(processor.IsClosed);
Assert.IsFalse(processor.IsProcessing);
}
Expand Down