Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,29 @@ public sealed class DurableJobsOptions
/// </summary>
public TimeSpan OverloadBackoffDelay { get; set; } = TimeSpan.FromSeconds(5);

/// <summary>
/// Gets or sets whether concurrent job slow start is enabled.
/// When enabled, job concurrency is gradually increased during startup to avoid starvation
/// issues that can occur before caches, connection pools, and thread pool sizing have warmed up.
/// Concurrency starts at <see cref="SlowStartInitialConcurrency"/> and doubles every
/// <see cref="SlowStartInterval"/> until <see cref="MaxConcurrentJobsPerSilo"/> is reached.
/// Default: <see langword="true"/>.
/// </summary>
public bool ConcurrencySlowStartEnabled { get; set; } = true;

/// <summary>
/// Gets or sets the initial number of concurrent jobs allowed per silo when slow start is enabled.
/// Concurrency will exponentially increase from this value until <see cref="MaxConcurrentJobsPerSilo"/> is reached.
/// Default: <see cref="Environment.ProcessorCount"/>.
/// </summary>
public int SlowStartInitialConcurrency { get; set; } = Environment.ProcessorCount;

/// <summary>
/// Gets or sets the interval at which concurrency is doubled during slow start ramp-up.
/// Default: 10 seconds.
/// </summary>
public TimeSpan SlowStartInterval { get; set; } = TimeSpan.FromSeconds(10);

/// <summary>
/// Gets or sets the function that determines whether a failed job should be retried and when.
/// The function receives the job context and the exception that caused the failure, and returns
Expand Down Expand Up @@ -95,10 +118,25 @@ public void ValidateConfiguration()
{
throw new OrleansConfigurationException("DurableJobsOptions.ShardDuration must be greater than zero.");
}
if (options.ShouldRetry == null)
if (options.ShouldRetry is null)
{
throw new OrleansConfigurationException("DurableJobsOptions.ShouldRetry must not be null.");
}
if (options.ConcurrencySlowStartEnabled && options.SlowStartInitialConcurrency <= 0)
{
throw new OrleansConfigurationException("DurableJobsOptions.SlowStartInitialConcurrency must be greater than zero.");
}
if (options.ConcurrencySlowStartEnabled && options.SlowStartInterval <= TimeSpan.Zero)
{
throw new OrleansConfigurationException("DurableJobsOptions.SlowStartInterval must be greater than zero when slow start is enabled.");
}
if (options.ConcurrencySlowStartEnabled && options.SlowStartInitialConcurrency > options.MaxConcurrentJobsPerSilo)
{
_logger.LogWarning(
"DurableJobsOptions.SlowStartInitialConcurrency ({SlowStartInitialConcurrency}) exceeds MaxConcurrentJobsPerSilo ({MaxConcurrentJobsPerSilo}); slow start will not be applied.",
options.SlowStartInitialConcurrency,
options.MaxConcurrentJobsPerSilo);
}
if (options.MaxAdoptedCount < 0)
{
throw new OrleansConfigurationException("DurableJobsOptions.MaxAdoptedCount must be greater than or equal to zero.");
Expand Down
24 changes: 24 additions & 0 deletions src/Orleans.DurableJobs/ShardExecutor.Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,28 @@ internal sealed partial class ShardExecutor
Message = "Overload cleared for shard {ShardId}, resuming job processing"
)]
private static partial void LogOverloadCleared(ILogger logger, string shardId);

[LoggerMessage(
Level = LogLevel.Information,
Message = "Slow start initiated: initial concurrency {InitialConcurrency}, target {TargetConcurrency}, interval {Interval}"
)]
private static partial void LogSlowStartBegin(ILogger logger, int initialConcurrency, int targetConcurrency, TimeSpan interval);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Slow start: concurrency increased to {CurrentConcurrency} (target: {TargetConcurrency})"
)]
private static partial void LogSlowStartConcurrencyIncreased(ILogger logger, int currentConcurrency, int targetConcurrency);

[LoggerMessage(
Level = LogLevel.Information,
Message = "Slow start complete: concurrency reached target {TargetConcurrency}"
)]
private static partial void LogSlowStartComplete(ILogger logger, int targetConcurrency);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Slow start ramp-up failed; all remaining concurrency has been released"
)]
private static partial void LogSlowStartError(ILogger logger, Exception exception);
}
68 changes: 67 additions & 1 deletion src/Orleans.DurableJobs/ShardExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ internal sealed partial class ShardExecutor
private readonly DurableJobsOptions _options;
private readonly SemaphoreSlim _jobConcurrencyLimiter;
private readonly IOverloadDetector _overloadDetector;
private int _currentCapacity;
private int _slowStartRampUpStarted;

/// <summary>
/// Initializes a new instance of the <see cref="ShardExecutor"/> class.
Expand All @@ -38,8 +40,13 @@ public ShardExecutor(
_grainFactory = grainFactory;
_logger = logger;
_options = options.Value;
_jobConcurrencyLimiter = new SemaphoreSlim(_options.MaxConcurrentJobsPerSilo);
_overloadDetector = overloadDetector;

_currentCapacity = _options.ConcurrencySlowStartEnabled && _options.SlowStartInitialConcurrency < _options.MaxConcurrentJobsPerSilo
? _options.SlowStartInitialConcurrency
: _options.MaxConcurrentJobsPerSilo;

_jobConcurrencyLimiter = new SemaphoreSlim(_currentCapacity);
}

/// <summary>
Expand All @@ -52,6 +59,12 @@ public async Task RunShardAsync(IJobShard shard, CancellationToken cancellationT
{
await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding | ConfigureAwaitOptions.ContinueOnCapturedContext);

if (Volatile.Read(ref _currentCapacity) < _options.MaxConcurrentJobsPerSilo
&& Interlocked.CompareExchange(ref _slowStartRampUpStarted, 1, 0) == 0)
{
_ = Task.Run(SlowStartRampUpAsync);
}

var tasks = new ConcurrentDictionary<string, Task>();
try
{
Expand Down Expand Up @@ -99,6 +112,59 @@ public async Task RunShardAsync(IJobShard shard, CancellationToken cancellationT
}
}

private async Task SlowStartRampUpAsync()
{
var targetCapacity = _options.MaxConcurrentJobsPerSilo;
LogSlowStartBegin(_logger, Volatile.Read(ref _currentCapacity), targetCapacity, _options.SlowStartInterval);

try
{
while (Volatile.Read(ref _currentCapacity) < targetCapacity)
{
await Task.Delay(_options.SlowStartInterval);

while (true)
{
var currentCapacity = Volatile.Read(ref _currentCapacity);
if (currentCapacity >= targetCapacity)
{
break;
}

var newCapacity = (int)Math.Min((long)currentCapacity * 2, targetCapacity);
var toRelease = newCapacity - currentCapacity;
if (toRelease <= 0)
{
break;
}

if (Interlocked.CompareExchange(ref _currentCapacity, newCapacity, currentCapacity) == currentCapacity)
{
_jobConcurrencyLimiter.Release(toRelease);
LogSlowStartConcurrencyIncreased(_logger, newCapacity, targetCapacity);
break;
}
}
}
}
catch (Exception ex)
{
// If the ramp-up fails for any reason, release all remaining capacity to avoid being stuck at low concurrency.
var currentCapacity = Volatile.Read(ref _currentCapacity);
var remaining = targetCapacity - currentCapacity;
if (remaining > 0)
{
_jobConcurrencyLimiter.Release(remaining);
Interlocked.Exchange(ref _currentCapacity, targetCapacity);
}

LogSlowStartError(_logger, ex);
return;
}

LogSlowStartComplete(_logger, Volatile.Read(ref _currentCapacity));
}

private async Task RunJobAsync(
IJobRunContext jobContext,
IJobShard shard,
Expand Down
136 changes: 134 additions & 2 deletions test/NonSilo.Tests/DurableJobs/ShardExecutorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -371,18 +371,150 @@ public async Task RunShardAsync_WhenExecutionIsCanceled_DoesNotRetryOrRemove()
await shard.DidNotReceive().RemoveJobAsync(Arg.Any<string>(), Arg.Any<CancellationToken>());
}

[Fact]
public async Task RunShardAsync_WithSlowStart_GraduallyIncreasesConcurrency()
{
var initialConcurrency = 2;
var maxConcurrency = 16;
var options = CreateOptions(
maxConcurrentJobs: maxConcurrency,
concurrencySlowStartEnabled: true,
slowStartInitialConcurrency: initialConcurrency,
slowStartInterval: TimeSpan.FromMilliseconds(100));
var overloadDetector = CreateOverloadDetector(isOverloaded: false);
var grainFactory = CreateGrainFactory();
var executor = new ShardExecutor(grainFactory, options, overloadDetector, NullLogger<ShardExecutor>.Instance);

var currentConcurrent = 0;
var maxObservedConcurrent = 0;
var concurrentLock = new object();
var releaseJobs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var concurrencyIncreased = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

// Enough jobs to exercise the slow start ramp-up
var jobs = CreateJobs(20);
var shard = CreateJobShard(jobs);

ConfigureGrainFactoryWithSlowJobExecution(grainFactory, async () =>
{
lock (concurrentLock)
{
currentConcurrent++;
if (currentConcurrent > maxObservedConcurrent)
{
maxObservedConcurrent = currentConcurrent;
if (maxObservedConcurrent > initialConcurrency)
{
concurrencyIncreased.TrySetResult();
}
}
}

await releaseJobs.Task;

lock (concurrentLock)
{
currentConcurrent--;
}
});

var runTask = executor.RunShardAsync(shard, CancellationToken.None);
try
{
var completedTask = await Task.WhenAny(concurrencyIncreased.Task, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.Same(concurrencyIncreased.Task, completedTask);
}
finally
{
releaseJobs.TrySetResult();
await runTask;
}

// Slow start should limit initial concurrency, then ramp up
Assert.True(maxObservedConcurrent <= maxConcurrency,
$"Max concurrent jobs was {maxObservedConcurrent}, but limit was {maxConcurrency}");
Assert.True(maxObservedConcurrent > initialConcurrency,
$"Expected concurrency to increase beyond initial {initialConcurrency}, but max observed was {maxObservedConcurrent}");
Comment thread
ReubenBond marked this conversation as resolved.
}

[Fact]
public async Task RunShardAsync_WithSlowStartDisabled_UsesFullConcurrencyImmediately()
{
var maxConcurrency = 5;
var options = CreateOptions(
maxConcurrentJobs: maxConcurrency,
concurrencySlowStartEnabled: false);
var overloadDetector = CreateOverloadDetector(isOverloaded: false);
var grainFactory = CreateGrainFactory();
var executor = new ShardExecutor(grainFactory, options, overloadDetector, NullLogger<ShardExecutor>.Instance);

var currentConcurrent = 0;
var maxObservedConcurrent = 0;
var concurrentLock = new object();

var jobs = CreateJobs(10);
var shard = CreateJobShard(jobs);

ConfigureGrainFactoryWithSlowJobExecution(grainFactory, async () =>
{
lock (concurrentLock)
{
currentConcurrent++;
if (currentConcurrent > maxObservedConcurrent)
{
maxObservedConcurrent = currentConcurrent;
}
}

await Task.Delay(100);

lock (concurrentLock)
{
currentConcurrent--;
}
});

await executor.RunShardAsync(shard, CancellationToken.None);

// Without slow start, all concurrency slots should be available immediately
Assert.True(maxObservedConcurrent <= maxConcurrency,
$"Max concurrent jobs was {maxObservedConcurrent}, but limit was {maxConcurrency}");
Assert.Equal(maxConcurrency, maxObservedConcurrent);
}

[Fact]
public void ValidateConfiguration_WithSlowStartDisabled_AllowsNonPositiveInitialConcurrency()
{
var options = Options.Create(new DurableJobsOptions
{
ConcurrencySlowStartEnabled = false,
SlowStartInitialConcurrency = 0
});
var validator = new Orleans.Hosting.DurableJobsOptionsValidator(
NullLogger<Orleans.Hosting.DurableJobsOptionsValidator>.Instance,
options);

validator.ValidateConfiguration();
}

// Helper methods

private static IOptions<DurableJobsOptions> CreateOptions(
int maxConcurrentJobs = 10,
TimeSpan? overloadBackoffDelay = null,
Func<IJobRunContext, Exception, DateTimeOffset?> shouldRetry = null)
Func<IJobRunContext, Exception, DateTimeOffset?> shouldRetry = null,
bool concurrencySlowStartEnabled = false,
int? slowStartInitialConcurrency = null,
TimeSpan? slowStartInterval = null)
{
var options = new DurableJobsOptions
{
MaxConcurrentJobsPerSilo = maxConcurrentJobs,
OverloadBackoffDelay = overloadBackoffDelay ?? TimeSpan.FromMilliseconds(100),
ShouldRetry = shouldRetry ?? ((_, _) => null) // Default: no retry
ShouldRetry = shouldRetry ?? ((_, _) => null), // Default: no retry
ConcurrencySlowStartEnabled = concurrencySlowStartEnabled,
SlowStartInitialConcurrency = slowStartInitialConcurrency ?? Environment.ProcessorCount,
SlowStartInterval = slowStartInterval ?? TimeSpan.FromSeconds(10)
};
return Options.Create(options);
}
Expand Down
Loading