Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ protected ServiceBusSessionProcessor(Azure.Messaging.ServiceBus.ServiceBusClient
public virtual bool IsClosed { get { throw null; } }
public virtual bool IsProcessing { get { throw null; } }
public virtual System.TimeSpan MaxAutoLockRenewalDuration { get { throw null; } }
public virtual int MaxConcurrentCallsAcrossAllSessions { get { throw null; } }
public virtual int MaxConcurrentCallsPerSession { get { throw null; } }
public virtual int MaxConcurrentSessions { get { throw null; } }
public virtual int PrefetchCount { get { throw null; } }
Expand All @@ -407,13 +408,14 @@ protected ServiceBusSessionProcessor(Azure.Messaging.ServiceBus.ServiceBusClient
public virtual System.Threading.Tasks.Task StopProcessingAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
public void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession) { }
public void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession, int? maxConcurrentCallsAcrossAllSessions = default(int?)) { }
Comment thread
JoshLove-msft marked this conversation as resolved.
}
public partial class ServiceBusSessionProcessorOptions
{
public ServiceBusSessionProcessorOptions() { }
public bool AutoCompleteMessages { get { throw null; } set { } }
public System.TimeSpan MaxAutoLockRenewalDuration { get { throw null; } set { } }
public int? MaxConcurrentCallsAcrossAllSessions { get { throw null; } set { } }
public int MaxConcurrentCallsPerSession { get { throw null; } set { } }
public int MaxConcurrentSessions { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ internal AmqpClient(
credential,
options.TransportType,
options.WebProxy,
options.EnableCrossEntityTransactions);
options.EnableCrossEntityTransactions,
options.RetryOptions.TryTimeout);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,28 +146,30 @@ public override bool IsDisposed
private string _sendViaReceiverEntityPath;

private readonly object _syncLock = new();
private readonly TimeSpan _operationTimeout;

/// <summary>
/// Initializes a new instance of the <see cref="AmqpConnectionScope"/> class.
/// </summary>
///
/// <param name="serviceEndpoint">Endpoint for the Service Bus service to which the scope is associated.</param>
/// <param name="credential">The credential to use for authorization with the Service Bus service.</param>
/// <param name="transport">The transport to use for communication.</param>
/// <param name="proxy">The proxy, if any, to use for communication.</param>
/// <param name="useSingleSession">If true, all links will use a single session.</param>
///
/// <param name="operationTimeout">The timeout for operations associated with the connection.</param>
public AmqpConnectionScope(
Uri serviceEndpoint,
ServiceBusTokenCredential credential,
ServiceBusTransportType transport,
IWebProxy proxy,
bool useSingleSession)
bool useSingleSession,
TimeSpan operationTimeout)
{
Argument.AssertNotNull(serviceEndpoint, nameof(serviceEndpoint));
Argument.AssertNotNull(credential, nameof(credential));
ValidateTransport(transport);

_operationTimeout = operationTimeout;
ServiceEndpoint = serviceEndpoint;
Transport = transport;
Proxy = proxy;
Expand Down Expand Up @@ -506,6 +508,7 @@ protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(
var linkSettings = new AmqpLinkSettings();
linkSettings.AddProperty(AmqpClientConstants.TimeoutName, (uint)timeout.CalculateRemaining(stopWatch.GetElapsedTime()).TotalMilliseconds);
linkSettings.AddProperty(AmqpClientConstants.EntityTypeName, AmqpClientConstants.EntityTypeManagement);
linkSettings.OperationTimeout = _operationTimeout;
entityPath += '/' + AmqpClientConstants.ManagementAddress;

// Perform the initial authorization for the link.
Expand Down Expand Up @@ -639,7 +642,8 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(
AutoSendFlow = prefetchCount > 0,
SettleType = (receiveMode == ServiceBusReceiveMode.PeekLock) ? SettleMode.SettleOnDispose : SettleMode.SettleOnSend,
Source = new Source { Address = endpoint.AbsolutePath, FilterSet = filters },
Target = new Target { Address = Guid.NewGuid().ToString() }
Target = new Target { Address = Guid.NewGuid().ToString() },
OperationTimeout = _operationTimeout
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

};

var link = new ReceivingAmqpLink(linkSettings);
Expand Down Expand Up @@ -777,7 +781,8 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(
Role = false,
InitialDeliveryCount = 0,
Source = new Source { Address = Guid.NewGuid().ToString() },
Target = new Target { Address = destinationEndpoint.AbsolutePath }
Target = new Target { Address = destinationEndpoint.AbsolutePath },
OperationTimeout = _operationTimeout
};

linkSettings.AddProperty(AmqpClientConstants.TimeoutName, (uint)timeout.CalculateRemaining(stopWatch.GetElapsedTime()).TotalMilliseconds);
Expand Down Expand Up @@ -1046,40 +1051,9 @@ protected virtual async Task OpenAmqpObjectAsync(
string entityPath = default,
bool isProcessor = default)
{
CancellationTokenRegistration registration;
try
{
var openObjectCompletionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
// Only allow cancelling in-flight opens when it is from a processor.
// This would occur when the processor is stopped or closed by the user.
if (isProcessor)
{
// use a static delegate with tuple state to avoid allocating a closure
registration = cancellationToken.Register(static state =>
{
var (tcs, target) = ((TaskCompletionSource<object>, AmqpObject))state;
if (tcs.TrySetCanceled())
{
target.SafeClose();
}
}, (openObjectCompletionSource, target), useSynchronizationContext: false);
}

static async Task Open(AmqpObject target, TimeSpan timeout, TaskCompletionSource<object> openObjectCompletionSource)
{
try
{
await target.OpenAsync(timeout).ConfigureAwait(false);
openObjectCompletionSource.TrySetResult(null);
}
catch (Exception ex)
{
openObjectCompletionSource.TrySetException(ex);
}
}

_ = Open(target, timeout, openObjectCompletionSource);
await openObjectCompletionSource.Task.ConfigureAwait(false);
await target.OpenAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -1110,13 +1084,6 @@ static async Task Open(AmqpObject target, TimeSpan timeout, TaskCompletionSource
throw;
}
}
finally
{
if (isProcessor)
{
registration.Dispose();
}
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,10 @@ public virtual ServiceBusProcessor CreateProcessor(
/// <param name="options">The set of <see cref="ServiceBusSessionProcessorOptions"/> to use for configuring the
/// <see cref="ServiceBusSessionProcessor"/>.</param>
/// <returns>A <see cref="ServiceBusSessionProcessor"/> scoped to the specified queue.</returns>
/// <exception cref="ArgumentOutOfRangeException">
/// The value specified for <see cref="ServiceBusSessionProcessorOptions.MaxConcurrentCallsAcrossAllSessions"/> was greater than the product of
/// <see cref="ServiceBusSessionProcessorOptions.MaxConcurrentSessions"/> and <see cref="ServiceBusSessionProcessorOptions.MaxConcurrentCallsPerSession"/>.
/// </exception>
public virtual ServiceBusSessionProcessor CreateSessionProcessor(
string queueName,
ServiceBusSessionProcessorOptions options = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ public void ProcessorClientClosedException(string identifier)
}
}

[Event(ProcessorStoppingReceiveCanceledEvent, Level = EventLevel.Verbose, Message = "A receive operation was cancelled while stopping the processor. (Identifier '{0}'). Error Message: '{1}'")]
[Event(ProcessorStoppingReceiveCanceledEvent, Level = EventLevel.Verbose, Message = "A receive operation was cancelled while stopping the processor or scaling down concurrency. (Identifier '{0}'). Error Message: '{1}'")]
public void ProcessorStoppingReceiveCanceled(string identifier, string exception)
{
if (IsEnabled())
Expand All @@ -890,7 +890,7 @@ public void ProcessorStoppingReceiveCanceled(string identifier, string exception
}
}

[Event(ProcessorStoppingAcceptSessionCanceledEvent, Level = EventLevel.Verbose, Message = "An accept session operation was cancelled while stopping the processor. (Namespace '{0}', Entity path '{1}'). Error Message: '{2}'")]
[Event(ProcessorStoppingAcceptSessionCanceledEvent, Level = EventLevel.Verbose, Message = "An accept session operation was cancelled while stopping the processor or scaling down concurrency. (Namespace '{0}', Entity path '{1}'). Error Message: '{2}'")]
public void ProcessorStoppingAcceptSessionCanceled(string fullyQualifiedNamespace, string entityPath, string exception)
{
if (IsEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ public class ServiceBusProcessor : IAsyncDisposable
/// The primitive for ensuring that the service is not overloaded with
/// accept session requests.
/// </summary>
private SemaphoreSlim MaxConcurrentAcceptSessionsSemaphore { get; }
private readonly SemaphoreSlim _maxConcurrentAcceptSessionsSemaphore = new(0, int.MaxValue);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing we don't want to recreate this when concurrency is changed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we use the same approach as for the other semaphore - release/WaitAsync to change available threads


/// <summary>The primitive for synchronizing access during start and close operations.</summary>
private readonly SemaphoreSlim _processingStartStopSemaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _processingStartStopSemaphore = new(1, 1);

private CancellationTokenSource RunningTaskTokenSource { get; set; }

Expand Down Expand Up @@ -123,6 +123,8 @@ public class ServiceBusProcessor : IAsyncDisposable
internal int MaxConcurrentCallsPerSession => _maxConcurrentCallsPerSession;
private volatile int _maxConcurrentCallsPerSession;

private int _currentAcceptSessions;

internal TimeSpan? MaxReceiveWaitTime { get; }

/// <summary>
Expand Down Expand Up @@ -188,7 +190,6 @@ public virtual bool IsClosed
/// <summary>
/// Initializes a new instance of the <see cref="ServiceBusProcessor"/> class.
/// </summary>
///
/// <param name="connection">The <see cref="ServiceBusConnection" /> connection to use for communication with the Service Bus service.</param>
/// <param name="entityPath">The queue name or subscription path to process messages from.</param>
/// <param name="isSessionEntity">Whether or not the processor is associated with a session entity.</param>
Expand All @@ -199,6 +200,7 @@ public virtual bool IsClosed
/// Only applies if isSessionEntity is true.</param>
/// <param name="maxConcurrentCallsPerSession">The max number of concurrent calls per session.
/// Only applies if isSessionEntity is true.</param>
/// <param name="maxConcurrentCallsAcrossAllSessions">The max number of concurrent calls across all sessions.</param>
/// <param name="sessionProcessor">If this is for a session processor, will contain the session processor instance.</param>
internal ServiceBusProcessor(
ServiceBusConnection connection,
Expand All @@ -208,6 +210,7 @@ internal ServiceBusProcessor(
string[] sessionIds = default,
int maxConcurrentSessions = default,
int maxConcurrentCallsPerSession = default,
int? maxConcurrentCallsAcrossAllSessions = default,
ServiceBusSessionProcessor sessionProcessor = default)
{
Argument.AssertNotNullOrWhiteSpace(entityPath, nameof(entityPath));
Expand All @@ -232,16 +235,9 @@ internal ServiceBusProcessor(

if (isSessionEntity)
{
_maxConcurrentCalls = _sessionIds.Length > 0
? Math.Min(_sessionIds.Length, _maxConcurrentSessions)
: _maxConcurrentSessions * _maxConcurrentCallsPerSession;
SetMaxConcurrentCallsAcrossSessions(maxConcurrentCallsAcrossAllSessions);
}

var maxAcceptSessions = Math.Min(_maxConcurrentCalls, 2 * Environment.ProcessorCount);
MaxConcurrentAcceptSessionsSemaphore = new SemaphoreSlim(
maxAcceptSessions,
maxAcceptSessions);

AutoCompleteMessages = Options.AutoCompleteMessages;

IsSessionProcessor = isSessionEntity;
Expand Down Expand Up @@ -607,7 +603,7 @@ private void ReconcileReceiverManagers(int maxConcurrentSessions)
new SessionReceiverManager(
_sessionProcessor,
sessionId,
MaxConcurrentAcceptSessionsSemaphore,
_maxConcurrentAcceptSessionsSemaphore,
_scopeFactory,
KeepOpenOnReceiveTimeout));
}
Expand All @@ -634,7 +630,7 @@ private void ReconcileReceiverManagers(int maxConcurrentSessions)
new SessionReceiverManager(
_sessionProcessor,
null,
MaxConcurrentAcceptSessionsSemaphore,
_maxConcurrentAcceptSessionsSemaphore,
_scopeFactory,
KeepOpenOnReceiveTimeout));
}
Expand Down Expand Up @@ -960,22 +956,45 @@ public void UpdateConcurrency(int maxConcurrentCalls)
}
}

internal void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession)
internal void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession, int? maxConcurrentCallsAcrossAllSessions = default)
{
Argument.AssertAtLeast(maxConcurrentSessions, 1, nameof(maxConcurrentSessions));
Argument.AssertAtLeast(maxConcurrentCallsPerSession, 1, nameof(maxConcurrentCallsPerSession));

if (maxConcurrentCallsAcrossAllSessions.HasValue)
{
Argument.AssertAtLeast(maxConcurrentCallsAcrossAllSessions.Value, 1, nameof(maxConcurrentCallsAcrossAllSessions));
}

lock (_maxConcurrencySyncLock)
{
_maxConcurrentCalls = _sessionIds.Length > 0
? Math.Min(_sessionIds.Length, maxConcurrentSessions)
: maxConcurrentSessions * maxConcurrentCallsPerSession;
_maxConcurrentSessions = maxConcurrentSessions;
_maxConcurrentCallsPerSession = maxConcurrentCallsPerSession;

SetMaxConcurrentCallsAcrossSessions(maxConcurrentCallsAcrossAllSessions);
WakeLoop();
}
}

private void SetMaxConcurrentCallsAcrossSessions(int? maxConcurrentCallsAcrossAllSessions)
{
int calculatedMaxConcurrentCalls = _sessionIds.Length > 0
? Math.Min(_sessionIds.Length, _maxConcurrentSessions)
: _maxConcurrentSessions * _maxConcurrentCallsPerSession;
Comment thread
JoshLove-msft marked this conversation as resolved.
if (maxConcurrentCallsAcrossAllSessions.HasValue)
{
if (maxConcurrentCallsAcrossAllSessions.Value > calculatedMaxConcurrentCalls)
{
throw new ArgumentOutOfRangeException(Resources.InvalidMaxConcurrentCalls);
}
_maxConcurrentCalls = maxConcurrentCallsAcrossAllSessions.Value;
}
else
{
_maxConcurrentCalls = calculatedMaxConcurrentCalls;
}
}

private void WakeLoop()
{
// wake up the handler loop
Expand Down Expand Up @@ -1031,6 +1050,25 @@ private async Task ReconcileConcurrencyAsync()
}
}

if (IsSessionProcessor)
{
int maxAcceptSessions = Math.Min(maxConcurrentCalls, 2 * Environment.ProcessorCount);
int diffAcceptSessions = maxAcceptSessions - _currentAcceptSessions;
if (diffAcceptSessions > 0)
{
_maxConcurrentAcceptSessionsSemaphore.Release(diffAcceptSessions);
}
else
{
int diffAcceptLimit = Math.Abs(diffAcceptSessions);
for (int i = 0; i < diffAcceptLimit; i++)
{
await _maxConcurrentAcceptSessionsSemaphore.WaitAsync().ConfigureAwait(false);
}
}
_currentAcceptSessions = maxAcceptSessions;
}

ReconcileReceiverManagers(maxConcurrentSessions);

_currentConcurrentCalls = maxConcurrentCalls;
Expand Down
Loading