Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public ServiceBusOptions() { }
public System.TimeSpan MaxAutoLockRenewalDuration { get { throw null; } set { } }
public System.TimeSpan MaxBatchWaitTime { get { throw null; } set { } }
public int MaxConcurrentCalls { get { throw null; } set { } }
Comment thread
JoshLove-msft marked this conversation as resolved.
public int MaxConcurrentCallsPerSession { get { throw null; } set { } }
public int MaxConcurrentSessions { get { throw null; } set { } }
public int MaxMessageBatchSize { get { throw null; } set { } }
public int MinMessageBatchSize { get { throw null; } set { } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public int MaxConcurrentCalls

/// <summary>
/// Gets or sets the maximum number of sessions that can be processed concurrently by a function.
/// The default value is 8. This does not apply for functions that receive a batch of messages.
/// The default value is 8. This applies only to functions that set <see cref="ServiceBusTriggerAttribute.IsSessionsEnabled"/>
/// to <c>true</c>. This does not apply for functions that receive a batch of messages.
/// When <see cref="ConcurrencyOptions.DynamicConcurrencyEnabled"/> is true, this value will be ignored,
/// and concurrency will be increased/decreased dynamically.
/// </summary>
Expand All @@ -127,10 +128,33 @@ public int MaxConcurrentSessions
_maxConcurrentSessions = value;
}
}
// TODO the default value in Track 1 was the default value from the Track 1 SDK, which was 2000.
// Verify that we are okay to diverge here.
private int _maxConcurrentSessions = 8;

/// <summary>
/// Gets or sets the maximum number of concurrent calls to the function per session.
/// Thus the total number of concurrent calls will be equal to MaxConcurrentSessions * MaxConcurrentCallsPerSession.
/// The default value is 1. This applies only to functions that set <see cref="ServiceBusTriggerAttribute.IsSessionsEnabled"/>
/// to <c>true</c>. This does not apply for functions that receive a batch of messages.
/// When <see cref="ConcurrencyOptions.DynamicConcurrencyEnabled"/> is true, this value will be ignored,
/// and concurrency will be increased/decreased dynamically.
/// </summary>
///
/// <value>The maximum number of concurrent calls to the message handler for each session that is being processed.</value>
/// <exception cref="ArgumentOutOfRangeException">
/// A value that is not positive is attempted to be set for the property.
/// </exception>
public int MaxConcurrentCallsPerSession
{
get => _maxConcurrentCallsPerSessions;

set
{
Argument.AssertAtLeast(value, 1, nameof(MaxConcurrentCallsPerSession));
_maxConcurrentCallsPerSessions = value;
}
}
private int _maxConcurrentCallsPerSessions = 1;

/// <summary>
/// Gets or sets an optional error handler that will be invoked if an exception occurs while attempting to process
/// a message. This does not apply for functions that receive a batch of messages.
Expand Down Expand Up @@ -231,6 +255,7 @@ string IOptionsFormatter.Format()
{ nameof(MaxAutoLockRenewalDuration), MaxAutoLockRenewalDuration },
{ nameof(MaxConcurrentCalls), MaxConcurrentCalls },
{ nameof(MaxConcurrentSessions), MaxConcurrentSessions },
{ nameof(MaxConcurrentCallsPerSession), MaxConcurrentCallsPerSession },
{ nameof(MaxMessageBatchSize), MaxMessageBatchSize },
{ nameof(MinMessageBatchSize), MinMessageBatchSize },
{ nameof(MaxBatchWaitTime), MaxBatchWaitTime },
Expand Down Expand Up @@ -306,10 +331,14 @@ internal ServiceBusSessionProcessorOptions ToSessionProcessorOptions(bool autoCo
// when DC is enabled, session concurrency starts at 1 and will be dynamically adjusted over time
// by UpdateConcurrency.
processorOptions.MaxConcurrentSessions = 1;

// Currently dynamic concurrency does not scale the number of concurrent calls per session.
processorOptions.MaxConcurrentCallsPerSession = 1;
}
else
{
processorOptions.MaxConcurrentSessions = MaxConcurrentSessions;
processorOptions.MaxConcurrentCallsPerSession = MaxConcurrentCallsPerSession;
}

return processorOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,8 @@ public void UpdateConcurrency()
var sessionProcessor = _sessionMessageProcessor.Value.Processor;
if (currentConcurrency != sessionProcessor.MaxConcurrentSessions)
{
// Per session call concurrency is limited to 1 meaning sessions are 1:1 with invocations.
// Per session call concurrency is limited to 1 when dynamic concurrency is enabled,
// meaning the number of sessions are 1:1 with invocations.
// So we can scale MaxConcurrentSessions 1:1 with CurrentConcurrency.
sessionProcessor.UpdateConcurrency(currentConcurrency, sessionProcessor.MaxConcurrentCallsPerSession);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public void ConfigureOptions_AppliesValuesCorrectly()

Assert.AreEqual(123, options.PrefetchCount);
Assert.AreEqual(123, options.MaxConcurrentCalls);
Assert.AreEqual(20, options.MaxConcurrentSessions);
Assert.AreEqual(5, options.MaxConcurrentCallsPerSession);
Assert.AreEqual(20, options.MaxMessageBatchSize);
Assert.AreEqual(10, options.MinMessageBatchSize);
Assert.AreEqual(TimeSpan.FromSeconds(1), options.MaxBatchWaitTime);
Expand Down Expand Up @@ -135,7 +137,8 @@ private static ServiceBusOptions CreateOptionsFromConfig()
{ $"{ExtensionPath}:MaxConcurrentCalls", "123" },
{ $"{ExtensionPath}:AutoCompleteMessages", "false" },
{ $"{ExtensionPath}:MaxAutoLockRenewalDuration", "00:00:15" },
{ $"{ExtensionPath}:MaxConcurrentSessions", "123" },
{ $"{ExtensionPath}:MaxConcurrentSessions", "20" },
{ $"{ExtensionPath}:MaxConcurrentCallsPerSession", "5" },
{ $"{ExtensionPath}:TransportType", "AmqpWebSockets" },
{ $"{ExtensionPath}:MaxMessageBatchSize", "20" },
{ $"{ExtensionPath}:MinMessageBatchSize", "10" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public void ToSessionProcessorOptions_ReturnsExpectedValue()
PrefetchCount = 123,
MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(123),
SessionIdleTimeout = TimeSpan.FromSeconds(123),
MaxConcurrentSessions = 123
MaxConcurrentSessions = 123,
MaxConcurrentCallsPerSession = 5
};

ServiceBusSessionProcessorOptions processorOptions = sbOptions.ToSessionProcessorOptions(true, false);
Expand All @@ -162,6 +163,7 @@ public void ToSessionProcessorOptions_ReturnsExpectedValue()
Assert.AreEqual(sbOptions.MaxAutoLockRenewalDuration, processorOptions.MaxAutoLockRenewalDuration);
Assert.AreEqual(sbOptions.SessionIdleTimeout, processorOptions.SessionIdleTimeout);
Assert.AreEqual(sbOptions.MaxConcurrentSessions, processorOptions.MaxConcurrentSessions);
Assert.AreEqual(sbOptions.MaxConcurrentCallsPerSession, processorOptions.MaxConcurrentCallsPerSession);
}

[Test]
Expand All @@ -174,7 +176,8 @@ public void ToSessionProcessorOptions_DynamicConcurrencyEnabled_ReturnsExpectedV
PrefetchCount = 123,
MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(123),
SessionIdleTimeout = TimeSpan.FromSeconds(123),
MaxConcurrentSessions = 123
MaxConcurrentSessions = 123,
MaxConcurrentCallsPerSession = 5
};

ServiceBusSessionProcessorOptions processorOptions = sbOptions.ToSessionProcessorOptions(true, true);
Expand All @@ -183,6 +186,7 @@ public void ToSessionProcessorOptions_DynamicConcurrencyEnabled_ReturnsExpectedV
Assert.AreEqual(sbOptions.MaxAutoLockRenewalDuration, processorOptions.MaxAutoLockRenewalDuration);
Assert.AreEqual(sbOptions.SessionIdleTimeout, processorOptions.SessionIdleTimeout);
Assert.AreEqual(1, processorOptions.MaxConcurrentSessions);
Assert.AreEqual(1, processorOptions.MaxConcurrentCallsPerSession);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ private async Task ServiceBusEndToEndInternal<T>(IHost host)
" \"MaxBatchWaitTime\":\"00:00:30\",",
$" \"MaxConcurrentCalls\": {16 * Utility.GetProcessorCount()},",
" \"MaxConcurrentSessions\": 8,",
" \"MaxConcurrentCallsPerSession\": 1,",
" \"MaxMessageBatchSize\": 1000,",
" \"MinMessageBatchSize\":1,",
" \"SessionIdleTimeout\": \"\"",
Expand Down