Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,21 @@ public sealed partial class AzureStorageJobShardManager : JobShardManager
private readonly ILogger<AzureStorageJobShardManager> _logger;
private readonly ILoggerFactory _loggerFactory;
private readonly AzureStorageJobShardOptions _options;
private readonly DurableJobsOptions _durableJobsOptions;
private long _shardCounter = 0; // For generating unique shard IDs

private const string AdoptedCountKey = "AdoptedCount";
private const string LastAdoptedTimeKey = "LastAdoptedTime";
private const string LegacyStolenCountKey = "StolenCount";
private const string LegacyLastStolenTimeKey = "LastStolenTime";

public AzureStorageJobShardManager(
SiloAddress siloAddress,
BlobServiceClient client,
string containerName,
string blobPrefix,
AzureStorageJobShardOptions options,
IOptions<DurableJobsOptions> durableJobsOptions,
IClusterMembershipService clusterMembership,
ILoggerFactory loggerFactory)
: base(siloAddress)
Expand All @@ -46,14 +53,16 @@ public AzureStorageJobShardManager(
_logger = loggerFactory.CreateLogger<AzureStorageJobShardManager>();
_loggerFactory = loggerFactory;
_options = options;
_durableJobsOptions = durableJobsOptions.Value;
}

public AzureStorageJobShardManager(
ILocalSiloDetails localSiloDetails,
IOptions<AzureStorageJobShardOptions> options,
IOptions<DurableJobsOptions> durableJobsOptions,
IClusterMembershipService clusterMembership,
ILoggerFactory loggerFactory)
: this(localSiloDetails.SiloAddress, options.Value.BlobServiceClient, options.Value.ContainerName, localSiloDetails.ClusterId, options.Value, clusterMembership, loggerFactory)
: this(localSiloDetails.SiloAddress, options.Value.BlobServiceClient, options.Value.ContainerName, localSiloDetails.ClusterId, options.Value, durableJobsOptions, clusterMembership, loggerFactory)
{
}

Expand Down Expand Up @@ -112,27 +121,27 @@ public AzureStorageJobShardManager(
LogShardStillOwned(_logger, blob.Name, owner!);
continue;
}
else

// Determine if this is an adopted shard (taken from dead owner) vs orphaned (gracefully released)
var isAdopted = owner is not null && ownerStatus == SiloStatus.Dead;

// Try to claim orphaned or adopted shard
LogClaimingShard(_logger, blob.Name, SiloAddress, owner);
var blobClient = _client.GetAppendBlobClient(blob.Name);
var metadata = blob.Metadata;
var orphanedShard = new AzureStorageJobShard(blob.Name, shardStartTime, maxDueTime, blobClient, metadata, blob.Properties.ETag, _options, _loggerFactory.CreateLogger<AzureStorageJobShard>());
if (!await TryTakeOwnership(orphanedShard, metadata, SiloAddress, isAdopted, cancellationToken))
{
// Try to claim orphaned shard
LogClaimingShard(_logger, blob.Name, SiloAddress, owner);
var blobClient = _client.GetAppendBlobClient(blob.Name);
var metadata = blob.Metadata;
var orphanedShard = new AzureStorageJobShard(blob.Name, shardStartTime, maxDueTime, blobClient, metadata, blob.Properties.ETag, _options, _loggerFactory.CreateLogger<AzureStorageJobShard>());
if (!await TryTakeOwnership(orphanedShard, metadata, SiloAddress, cancellationToken))
{
// Someone else took over the shard, dispose and continue
await orphanedShard.DisposeAsync();
LogShardOwnershipConflict(_logger, blob.Name, SiloAddress);
continue;
}
await orphanedShard.InitializeAsync(cancellationToken);
// We don't want to add new jobs to shards that we just took ownership of
await orphanedShard.MarkAsCompleteAsync(cancellationToken);
_jobShardCache[blob.Name] = orphanedShard;
LogShardAssigned(_logger, blob.Name, SiloAddress);
result.Add(orphanedShard);
// Either poisoned shard or someone else took ownership - dispose and continue
await orphanedShard.DisposeAsync();
continue;
}
await orphanedShard.InitializeAsync(cancellationToken);
// We don't want to add new jobs to shards that we just took ownership of
await orphanedShard.MarkAsCompleteAsync(cancellationToken);
_jobShardCache[blob.Name] = orphanedShard;
LogShardAssigned(_logger, blob.Name, SiloAddress);
result.Add(orphanedShard);
}

LogAssignmentCompleted(_logger, result.Count, SiloAddress);
Expand All @@ -146,6 +155,11 @@ async Task ReleaseOwnership(string blobName)
var properties = await blobClient.GetPropertiesAsync(cancellationToken: cancellationToken);
var metadata = properties.Value.Metadata;
metadata.Remove("Owner");
// Reset adopted count since we're gracefully releasing
metadata.Remove(AdoptedCountKey);
metadata.Remove(LastAdoptedTimeKey);
metadata.Remove(LegacyStolenCountKey);
metadata.Remove(LegacyLastStolenTimeKey);
await blobClient.SetMetadataAsync(metadata, new BlobRequestConditions { IfMatch = properties.Value.ETag }, cancellationToken);
}
catch (Exception ex)
Expand All @@ -155,10 +169,43 @@ async Task ReleaseOwnership(string blobName)
}
}

async Task<bool> TryTakeOwnership(AzureStorageJobShard shard, IDictionary<string, string> metadata, SiloAddress newOwner, CancellationToken ct)
async Task<bool> TryTakeOwnership(AzureStorageJobShard shard, IDictionary<string, string> metadata, SiloAddress newOwner, bool isAdopted, CancellationToken ct)
{
if (isAdopted)
{
var existingAdoptedCount = GetAdoptedCount(metadata);
if (existingAdoptedCount > _durableJobsOptions.MaxAdoptedCount)
{
// Already marked as poisoned.
return false;
}

// Increment adopted count for shards taken from dead owners.
var adoptedCount = existingAdoptedCount + 1;
if (adoptedCount > _durableJobsOptions.MaxAdoptedCount)
{
// Persist poisoned marker so this shard is not repeatedly re-evaluated as newly poisoned.
SetAdoptedMetadata(metadata, adoptedCount, DateTimeOffset.UtcNow);
try
{
await shard.UpdateBlobMetadata(metadata, ct);
}
catch (RequestFailedException ex)
{
LogOwnershipFailed(_logger, ex, shard.Id, newOwner);
}

LogPoisonedShardDetected(_logger, shard.Id, adoptedCount, _durableJobsOptions.MaxAdoptedCount);
return false;
}

SetAdoptedMetadata(metadata, adoptedCount, DateTimeOffset.UtcNow);
LogShardAdopted(_logger, shard.Id, newOwner, adoptedCount);
}

metadata["Owner"] = newOwner.ToParsableString();
metadata["MembershipVersion"] = _clusterMembership.CurrentSnapshot.Version.Value.ToString();

try
{
await shard.UpdateBlobMetadata(metadata, ct);
Expand All @@ -172,6 +219,28 @@ async Task<bool> TryTakeOwnership(AzureStorageJobShard shard, IDictionary<string
return false;
}
}

static int GetAdoptedCount(IDictionary<string, string> metadata)
{
if (metadata.TryGetValue(AdoptedCountKey, out var countStr)
&& int.TryParse(countStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out var adoptedCount))
{
return adoptedCount;
}

return metadata.TryGetValue(LegacyStolenCountKey, out countStr)
&& int.TryParse(countStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out var legacyCount)
? legacyCount
: 0;
}

static void SetAdoptedMetadata(IDictionary<string, string> metadata, int adoptedCount, DateTimeOffset adoptedTime)
{
metadata[AdoptedCountKey] = adoptedCount.ToString(CultureInfo.InvariantCulture);
metadata[LastAdoptedTimeKey] = adoptedTime.ToString("o", CultureInfo.InvariantCulture);
metadata.Remove(LegacyStolenCountKey);
metadata.Remove(LegacyLastStolenTimeKey);
}
}

public override async Task<Orleans.DurableJobs.IJobShard> CreateShardAsync(DateTimeOffset minDueTime, DateTimeOffset maxDueTime, IDictionary<string, string> metadata, CancellationToken cancellationToken)
Expand Down Expand Up @@ -244,9 +313,14 @@ public override async Task UnregisterShardAsync(Orleans.DurableJobs.IJobShard sh

if (count > 0)
{
// There are still jobs in the shard, unregister it
// There are still jobs in the shard, release ownership gracefully.
metadata.Remove("Owner");
var response = await azureShard.BlobClient.SetMetadataAsync(metadata, conditions, cancellationToken);
// Reset adopted count since we're gracefully releasing (not crashing)
metadata.Remove(AdoptedCountKey);
metadata.Remove(LastAdoptedTimeKey);
metadata.Remove(LegacyStolenCountKey);
metadata.Remove(LegacyLastStolenTimeKey);
await azureShard.BlobClient.SetMetadataAsync(metadata, conditions, cancellationToken);
_jobShardCache.TryRemove(shard.Id, out _);
LogShardOwnershipReleased(_logger, shard.Id, SiloAddress, count);
}
Expand Down Expand Up @@ -414,4 +488,16 @@ private static (SiloAddress? owner, MembershipVersion membershipVersion, DateTim
Message = "Deleted shard '{ShardId}' by silo {SiloAddress} (no jobs remaining)"
)]
private static partial void LogShardDeleted(ILogger logger, string shardId, SiloAddress siloAddress);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Poisoned shard detected: '{ShardId}' has been adopted {AdoptedCount} times (max allowed: {MaxAdoptedCount}). Shard will not be assigned."
)]
private static partial void LogPoisonedShardDetected(ILogger logger, string shardId, int adoptedCount, int maxAdoptedCount);

[LoggerMessage(
Level = LogLevel.Information,
Message = "Shard '{ShardId}' adopted by silo {SiloAddress} (adopted count: {AdoptedCount})"
)]
private static partial void LogShardAdopted(ILogger logger, string shardId, SiloAddress siloAddress, int adoptedCount);
}
4 changes: 3 additions & 1 deletion src/Orleans.DurableJobs/Hosting/DurableJobsExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration.Internal;
using Orleans.Runtime;
using Orleans.DurableJobs;
Expand Down Expand Up @@ -72,7 +73,8 @@ internal static IServiceCollection UseInMemoryDurableJobs(this IServiceCollectio
{
var siloDetails = sp.GetRequiredService<ILocalSiloDetails>();
var membershipService = sp.GetRequiredService<IClusterMembershipService>();
return new InMemoryJobShardManager(siloDetails.SiloAddress, membershipService);
var durableJobsOptions = sp.GetRequiredService<IOptions<DurableJobsOptions>>();
return new InMemoryJobShardManager(siloDetails.SiloAddress, membershipService, durableJobsOptions.Value.MaxAdoptedCount);
});
services.AddFromExisting<JobShardManager, InMemoryJobShardManager>();
return services;
Expand Down
22 changes: 22 additions & 0 deletions src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,24 @@ public sealed class DurableJobsOptions
/// </summary>
public Func<IJobRunContext, Exception, DateTimeOffset?> ShouldRetry { get; set; } = DefaultShouldRetry;

/// <summary>
/// Gets or sets the maximum number of times a shard can be adopted from a dead owner before
/// being marked as poisoned. A shard that repeatedly causes silos to crash will exceed this
/// threshold as it bounces between owners. When the next adoption would cause the adopted count
/// to exceed this value, the shard is considered poisoned and will no longer be assigned to any silo.
/// Default: 3.
/// </summary>
/// <remarks>
/// <para>
/// The adopted count is only incremented when a shard is taken from a dead silo (i.e., the previous
/// owner crashed). It is NOT incremented when a silo gracefully shuts down and releases ownership.
/// </para>
/// <para>
/// When a shard completes successfully (all jobs processed), the adopted count is reset to 0.
/// </para>
/// </remarks>
public int MaxAdoptedCount { get; set; } = 3;

private static DateTimeOffset? DefaultShouldRetry(IJobRunContext jobContext, Exception ex)
{
// Default retry logic: retry up to 5 times with exponential backoff
Expand Down Expand Up @@ -81,6 +99,10 @@ public void ValidateConfiguration()
{
throw new OrleansConfigurationException("DurableJobsOptions.ShouldRetry must not be null.");
}
if (options.MaxAdoptedCount < 0)
{
throw new OrleansConfigurationException("DurableJobsOptions.MaxAdoptedCount must be greater than or equal to zero.");
}
_logger.LogInformation("DurableJobsOptions validated: ShardDuration={ShardDuration}", options.ShardDuration);
}
}
66 changes: 57 additions & 9 deletions src/Orleans.DurableJobs/JobShardManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,20 @@ internal class InMemoryJobShardManager : JobShardManager
private static readonly Dictionary<string, ShardOwnership> _globalShardStore = new();
private static readonly SemaphoreSlim _asyncLock = new(1, 1);
private readonly IClusterMembershipService? _membershipService;
private readonly int _maxAdoptedCount;

public InMemoryJobShardManager(SiloAddress siloAddress) : base(siloAddress)
public InMemoryJobShardManager(SiloAddress siloAddress) : this(siloAddress, null, 3)
{
}

public InMemoryJobShardManager(SiloAddress siloAddress, IClusterMembershipService membershipService) : base(siloAddress)
public InMemoryJobShardManager(SiloAddress siloAddress, IClusterMembershipService? membershipService) : this(siloAddress, membershipService, 3)
{
}

public InMemoryJobShardManager(SiloAddress siloAddress, IClusterMembershipService? membershipService, int maxAdoptedCount) : base(siloAddress)
{
_membershipService = membershipService;
_maxAdoptedCount = maxAdoptedCount;
}

/// <summary>
Expand All @@ -86,10 +92,30 @@ internal static async Task ClearAllShardsAsync()
}
}

/// <summary>
/// Gets ownership info for a shard. For testing purposes only.
/// </summary>
internal static async Task<(string? Owner, int AdoptedCount)?> GetOwnershipInfoAsync(string shardId)
{
await _asyncLock.WaitAsync();
try
{
if (_globalShardStore.TryGetValue(shardId, out var ownership))
{
return (ownership.OwnerSiloAddress, ownership.AdoptedCount);
}
return null;
}
finally
{
_asyncLock.Release();
}
}

public override async Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset maxDueTime, CancellationToken cancellationToken)
{
var alreadyOwnedShards = new List<IJobShard>();
var stolenShards = new List<IJobShard>();
var adoptedShards = new List<IJobShard>();

await _asyncLock.WaitAsync(cancellationToken);
try
Expand Down Expand Up @@ -121,14 +147,33 @@ public override async Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset
{
alreadyOwnedShards.Add(ownership.Shard);
}
continue;
}
// Take over orphaned shards or shards from dead silos
else if (ownership.OwnerSiloAddress is null || deadSilos.Contains(ownership.OwnerSiloAddress))

// Check if this is an orphaned shard (gracefully released) or adopted (from dead silo)
var isOrphaned = ownership.OwnerSiloAddress is null;
var ownerAddress = ownership.OwnerSiloAddress;
var isFromDeadSilo = ownerAddress is not null && deadSilos.Contains(ownerAddress);

if (isOrphaned || isFromDeadSilo)
{
if (ownership.Shard.StartTime <= maxDueTime)
{
// If adopted from dead silo, increment adopted count
if (isFromDeadSilo)
{
ownership.AdoptedCount++;

// Check if shard is poisoned
if (ownership.AdoptedCount > _maxAdoptedCount)
{
// Shard is poisoned - don't assign it
continue;
}
}

ownership.OwnerSiloAddress = SiloAddress.ToString();
stolenShards.Add(ownership.Shard);
adoptedShards.Add(ownership.Shard);
}
}
}
Expand All @@ -138,13 +183,13 @@ public override async Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset
_asyncLock.Release();
}

foreach (var shard in stolenShards)
foreach (var shard in adoptedShards)
{
// Mark stolen shards as complete
// Mark adopted shards as complete
await shard.MarkAsCompleteAsync(CancellationToken.None);
}

return [.. alreadyOwnedShards, .. stolenShards];
return [.. alreadyOwnedShards, .. adoptedShards];
}

public override async Task<IJobShard> CreateShardAsync(DateTimeOffset minDueTime, DateTimeOffset maxDueTime, IDictionary<string, string> metadata, CancellationToken cancellationToken)
Expand Down Expand Up @@ -187,6 +232,8 @@ public override async Task UnregisterShardAsync(IJobShard shard, CancellationTok
{
// Mark as unowned so another silo can pick it up
ownership.OwnerSiloAddress = null;
// Reset adopted count since we're gracefully releasing (not crashing)
ownership.AdoptedCount = 0;
}
}
}
Expand All @@ -200,5 +247,6 @@ private sealed class ShardOwnership
{
public required IJobShard Shard { get; init; }
public string? OwnerSiloAddress { get; set; }
public int AdoptedCount { get; set; }
}
}
Loading
Loading