diff --git a/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs b/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs index 4dd33a7ebea..e8a4d82f925 100644 --- a/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs +++ b/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs @@ -39,6 +39,29 @@ public sealed class DurableJobsOptions /// public TimeSpan OverloadBackoffDelay { get; set; } = TimeSpan.FromSeconds(5); + /// + /// Gets or sets whether concurrent job slow start is enabled. + /// When enabled, job concurrency is gradually increased during startup to avoid starvation + /// issues that can occur before caches, connection pools, and thread pool sizing have warmed up. + /// Concurrency starts at and doubles every + /// until is reached. + /// Default: . + /// + public bool ConcurrencySlowStartEnabled { get; set; } = true; + + /// + /// Gets or sets the initial number of concurrent jobs allowed per silo when slow start is enabled. + /// Concurrency will exponentially increase from this value until is reached. + /// Default: . + /// + public int SlowStartInitialConcurrency { get; set; } = Environment.ProcessorCount; + + /// + /// Gets or sets the interval at which concurrency is doubled during slow start ramp-up. + /// Default: 10 seconds. + /// + public TimeSpan SlowStartInterval { get; set; } = TimeSpan.FromSeconds(10); + /// /// Gets or sets the function that determines whether a failed job should be retried and when. /// The function receives the job context and the exception that caused the failure, and returns @@ -95,10 +118,25 @@ public void ValidateConfiguration() { throw new OrleansConfigurationException("DurableJobsOptions.ShardDuration must be greater than zero."); } - if (options.ShouldRetry == null) + if (options.ShouldRetry is null) { throw new OrleansConfigurationException("DurableJobsOptions.ShouldRetry must not be null."); } + if (options.ConcurrencySlowStartEnabled && options.SlowStartInitialConcurrency <= 0) + { + throw new OrleansConfigurationException("DurableJobsOptions.SlowStartInitialConcurrency must be greater than zero."); + } + if (options.ConcurrencySlowStartEnabled && options.SlowStartInterval <= TimeSpan.Zero) + { + throw new OrleansConfigurationException("DurableJobsOptions.SlowStartInterval must be greater than zero when slow start is enabled."); + } + if (options.ConcurrencySlowStartEnabled && options.SlowStartInitialConcurrency > options.MaxConcurrentJobsPerSilo) + { + _logger.LogWarning( + "DurableJobsOptions.SlowStartInitialConcurrency ({SlowStartInitialConcurrency}) exceeds MaxConcurrentJobsPerSilo ({MaxConcurrentJobsPerSilo}); slow start will not be applied.", + options.SlowStartInitialConcurrency, + options.MaxConcurrentJobsPerSilo); + } if (options.MaxAdoptedCount < 0) { throw new OrleansConfigurationException("DurableJobsOptions.MaxAdoptedCount must be greater than or equal to zero."); diff --git a/src/Orleans.DurableJobs/ShardExecutor.Log.cs b/src/Orleans.DurableJobs/ShardExecutor.Log.cs index 3844c75d51a..b230ffd7609 100644 --- a/src/Orleans.DurableJobs/ShardExecutor.Log.cs +++ b/src/Orleans.DurableJobs/ShardExecutor.Log.cs @@ -83,4 +83,28 @@ internal sealed partial class ShardExecutor Message = "Overload cleared for shard {ShardId}, resuming job processing" )] private static partial void LogOverloadCleared(ILogger logger, string shardId); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Slow start initiated: initial concurrency {InitialConcurrency}, target {TargetConcurrency}, interval {Interval}" + )] + private static partial void LogSlowStartBegin(ILogger logger, int initialConcurrency, int targetConcurrency, TimeSpan interval); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Slow start: concurrency increased to {CurrentConcurrency} (target: {TargetConcurrency})" + )] + private static partial void LogSlowStartConcurrencyIncreased(ILogger logger, int currentConcurrency, int targetConcurrency); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Slow start complete: concurrency reached target {TargetConcurrency}" + )] + private static partial void LogSlowStartComplete(ILogger logger, int targetConcurrency); + + [LoggerMessage( + Level = LogLevel.Error, + Message = "Slow start ramp-up failed; all remaining concurrency has been released" + )] + private static partial void LogSlowStartError(ILogger logger, Exception exception); } diff --git a/src/Orleans.DurableJobs/ShardExecutor.cs b/src/Orleans.DurableJobs/ShardExecutor.cs index b56bd4b1863..0c98e2c599b 100644 --- a/src/Orleans.DurableJobs/ShardExecutor.cs +++ b/src/Orleans.DurableJobs/ShardExecutor.cs @@ -21,6 +21,8 @@ internal sealed partial class ShardExecutor private readonly DurableJobsOptions _options; private readonly SemaphoreSlim _jobConcurrencyLimiter; private readonly IOverloadDetector _overloadDetector; + private int _currentCapacity; + private int _slowStartRampUpStarted; /// /// Initializes a new instance of the class. @@ -38,8 +40,13 @@ public ShardExecutor( _grainFactory = grainFactory; _logger = logger; _options = options.Value; - _jobConcurrencyLimiter = new SemaphoreSlim(_options.MaxConcurrentJobsPerSilo); _overloadDetector = overloadDetector; + + _currentCapacity = _options.ConcurrencySlowStartEnabled && _options.SlowStartInitialConcurrency < _options.MaxConcurrentJobsPerSilo + ? _options.SlowStartInitialConcurrency + : _options.MaxConcurrentJobsPerSilo; + + _jobConcurrencyLimiter = new SemaphoreSlim(_currentCapacity); } /// @@ -52,6 +59,12 @@ public async Task RunShardAsync(IJobShard shard, CancellationToken cancellationT { await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding | ConfigureAwaitOptions.ContinueOnCapturedContext); + if (Volatile.Read(ref _currentCapacity) < _options.MaxConcurrentJobsPerSilo + && Interlocked.CompareExchange(ref _slowStartRampUpStarted, 1, 0) == 0) + { + _ = Task.Run(SlowStartRampUpAsync); + } + var tasks = new ConcurrentDictionary(); try { @@ -99,6 +112,59 @@ public async Task RunShardAsync(IJobShard shard, CancellationToken cancellationT } } + private async Task SlowStartRampUpAsync() + { + var targetCapacity = _options.MaxConcurrentJobsPerSilo; + LogSlowStartBegin(_logger, Volatile.Read(ref _currentCapacity), targetCapacity, _options.SlowStartInterval); + + try + { + while (Volatile.Read(ref _currentCapacity) < targetCapacity) + { + await Task.Delay(_options.SlowStartInterval); + + while (true) + { + var currentCapacity = Volatile.Read(ref _currentCapacity); + if (currentCapacity >= targetCapacity) + { + break; + } + + var newCapacity = (int)Math.Min((long)currentCapacity * 2, targetCapacity); + var toRelease = newCapacity - currentCapacity; + if (toRelease <= 0) + { + break; + } + + if (Interlocked.CompareExchange(ref _currentCapacity, newCapacity, currentCapacity) == currentCapacity) + { + _jobConcurrencyLimiter.Release(toRelease); + LogSlowStartConcurrencyIncreased(_logger, newCapacity, targetCapacity); + break; + } + } + } + } + catch (Exception ex) + { + // If the ramp-up fails for any reason, release all remaining capacity to avoid being stuck at low concurrency. + var currentCapacity = Volatile.Read(ref _currentCapacity); + var remaining = targetCapacity - currentCapacity; + if (remaining > 0) + { + _jobConcurrencyLimiter.Release(remaining); + Interlocked.Exchange(ref _currentCapacity, targetCapacity); + } + + LogSlowStartError(_logger, ex); + return; + } + + LogSlowStartComplete(_logger, Volatile.Read(ref _currentCapacity)); + } + private async Task RunJobAsync( IJobRunContext jobContext, IJobShard shard, diff --git a/test/NonSilo.Tests/DurableJobs/ShardExecutorTests.cs b/test/NonSilo.Tests/DurableJobs/ShardExecutorTests.cs index 81e5de27704..ca355780ded 100644 --- a/test/NonSilo.Tests/DurableJobs/ShardExecutorTests.cs +++ b/test/NonSilo.Tests/DurableJobs/ShardExecutorTests.cs @@ -371,18 +371,150 @@ public async Task RunShardAsync_WhenExecutionIsCanceled_DoesNotRetryOrRemove() await shard.DidNotReceive().RemoveJobAsync(Arg.Any(), Arg.Any()); } + [Fact] + public async Task RunShardAsync_WithSlowStart_GraduallyIncreasesConcurrency() + { + var initialConcurrency = 2; + var maxConcurrency = 16; + var options = CreateOptions( + maxConcurrentJobs: maxConcurrency, + concurrencySlowStartEnabled: true, + slowStartInitialConcurrency: initialConcurrency, + slowStartInterval: TimeSpan.FromMilliseconds(100)); + var overloadDetector = CreateOverloadDetector(isOverloaded: false); + var grainFactory = CreateGrainFactory(); + var executor = new ShardExecutor(grainFactory, options, overloadDetector, NullLogger.Instance); + + var currentConcurrent = 0; + var maxObservedConcurrent = 0; + var concurrentLock = new object(); + var releaseJobs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var concurrencyIncreased = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + // Enough jobs to exercise the slow start ramp-up + var jobs = CreateJobs(20); + var shard = CreateJobShard(jobs); + + ConfigureGrainFactoryWithSlowJobExecution(grainFactory, async () => + { + lock (concurrentLock) + { + currentConcurrent++; + if (currentConcurrent > maxObservedConcurrent) + { + maxObservedConcurrent = currentConcurrent; + if (maxObservedConcurrent > initialConcurrency) + { + concurrencyIncreased.TrySetResult(); + } + } + } + + await releaseJobs.Task; + + lock (concurrentLock) + { + currentConcurrent--; + } + }); + + var runTask = executor.RunShardAsync(shard, CancellationToken.None); + try + { + var completedTask = await Task.WhenAny(concurrencyIncreased.Task, Task.Delay(TimeSpan.FromSeconds(10))); + Assert.Same(concurrencyIncreased.Task, completedTask); + } + finally + { + releaseJobs.TrySetResult(); + await runTask; + } + + // Slow start should limit initial concurrency, then ramp up + Assert.True(maxObservedConcurrent <= maxConcurrency, + $"Max concurrent jobs was {maxObservedConcurrent}, but limit was {maxConcurrency}"); + Assert.True(maxObservedConcurrent > initialConcurrency, + $"Expected concurrency to increase beyond initial {initialConcurrency}, but max observed was {maxObservedConcurrent}"); + } + + [Fact] + public async Task RunShardAsync_WithSlowStartDisabled_UsesFullConcurrencyImmediately() + { + var maxConcurrency = 5; + var options = CreateOptions( + maxConcurrentJobs: maxConcurrency, + concurrencySlowStartEnabled: false); + var overloadDetector = CreateOverloadDetector(isOverloaded: false); + var grainFactory = CreateGrainFactory(); + var executor = new ShardExecutor(grainFactory, options, overloadDetector, NullLogger.Instance); + + var currentConcurrent = 0; + var maxObservedConcurrent = 0; + var concurrentLock = new object(); + + var jobs = CreateJobs(10); + var shard = CreateJobShard(jobs); + + ConfigureGrainFactoryWithSlowJobExecution(grainFactory, async () => + { + lock (concurrentLock) + { + currentConcurrent++; + if (currentConcurrent > maxObservedConcurrent) + { + maxObservedConcurrent = currentConcurrent; + } + } + + await Task.Delay(100); + + lock (concurrentLock) + { + currentConcurrent--; + } + }); + + await executor.RunShardAsync(shard, CancellationToken.None); + + // Without slow start, all concurrency slots should be available immediately + Assert.True(maxObservedConcurrent <= maxConcurrency, + $"Max concurrent jobs was {maxObservedConcurrent}, but limit was {maxConcurrency}"); + Assert.Equal(maxConcurrency, maxObservedConcurrent); + } + + [Fact] + public void ValidateConfiguration_WithSlowStartDisabled_AllowsNonPositiveInitialConcurrency() + { + var options = Options.Create(new DurableJobsOptions + { + ConcurrencySlowStartEnabled = false, + SlowStartInitialConcurrency = 0 + }); + var validator = new Orleans.Hosting.DurableJobsOptionsValidator( + NullLogger.Instance, + options); + + validator.ValidateConfiguration(); + } + // Helper methods private static IOptions CreateOptions( int maxConcurrentJobs = 10, TimeSpan? overloadBackoffDelay = null, - Func shouldRetry = null) + Func shouldRetry = null, + bool concurrencySlowStartEnabled = false, + int? slowStartInitialConcurrency = null, + TimeSpan? slowStartInterval = null) { var options = new DurableJobsOptions { MaxConcurrentJobsPerSilo = maxConcurrentJobs, OverloadBackoffDelay = overloadBackoffDelay ?? TimeSpan.FromMilliseconds(100), - ShouldRetry = shouldRetry ?? ((_, _) => null) // Default: no retry + ShouldRetry = shouldRetry ?? ((_, _) => null), // Default: no retry + ConcurrencySlowStartEnabled = concurrencySlowStartEnabled, + SlowStartInitialConcurrency = slowStartInitialConcurrency ?? Environment.ProcessorCount, + SlowStartInterval = slowStartInterval ?? TimeSpan.FromSeconds(10) }; return Options.Create(options); }