From f8d03324e7b24334b826cc85e8494eabfdc72a13 Mon Sep 17 00:00:00 2001 From: Benjamin Petit Date: Thu, 12 Feb 2026 13:52:06 +0100 Subject: [PATCH 1/5] feat: implement poisoned shard detection and handling in AzureStorageJobShardManager --- .../AzureStorageJobShardManager.cs | 94 +++++-- .../Hosting/DurableJobsOptions.cs | 22 ++ src/Orleans.DurableJobs/JobShardManager.cs | 96 ++++++- .../InMemoryJobShardManagerTests.cs | 244 ++++++++++++++++++ 4 files changed, 429 insertions(+), 27 deletions(-) create mode 100644 test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs diff --git a/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs b/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs index 4137af0419c..04e89a613f3 100644 --- a/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs +++ b/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs @@ -27,14 +27,19 @@ public sealed partial class AzureStorageJobShardManager : JobShardManager private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; private readonly AzureStorageJobShardOptions _options; + private readonly DurableJobsOptions _durableJobsOptions; private long _shardCounter = 0; // For generating unique shard IDs + private const string StolenCountKey = "StolenCount"; + private const string LastStolenTimeKey = "LastStolenTime"; + public AzureStorageJobShardManager( SiloAddress siloAddress, BlobServiceClient client, string containerName, string blobPrefix, AzureStorageJobShardOptions options, + DurableJobsOptions durableJobsOptions, IClusterMembershipService clusterMembership, ILoggerFactory loggerFactory) : base(siloAddress) @@ -46,14 +51,16 @@ public AzureStorageJobShardManager( _logger = loggerFactory.CreateLogger(); _loggerFactory = loggerFactory; _options = options; + _durableJobsOptions = durableJobsOptions; } public AzureStorageJobShardManager( ILocalSiloDetails localSiloDetails, IOptions options, + IOptions durableJobsOptions, IClusterMembershipService clusterMembership, ILoggerFactory loggerFactory) - : this(localSiloDetails.SiloAddress, options.Value.BlobServiceClient, options.Value.ContainerName, localSiloDetails.ClusterId, options.Value, clusterMembership, loggerFactory) + : this(localSiloDetails.SiloAddress, options.Value.BlobServiceClient, options.Value.ContainerName, localSiloDetails.ClusterId, options.Value, durableJobsOptions.Value, clusterMembership, loggerFactory) { } @@ -112,27 +119,27 @@ public AzureStorageJobShardManager( LogShardStillOwned(_logger, blob.Name, owner!); continue; } - else + + // Determine if this is a stolen shard (taken from dead owner) vs orphaned (gracefully released) + var isStolen = owner is not null && ownerStatus == SiloStatus.Dead; + + // Try to claim orphaned or stolen shard + LogClaimingShard(_logger, blob.Name, SiloAddress, owner); + var blobClient = _client.GetAppendBlobClient(blob.Name); + var metadata = blob.Metadata; + var orphanedShard = new AzureStorageJobShard(blob.Name, shardStartTime, maxDueTime, blobClient, metadata, blob.Properties.ETag, _options, _loggerFactory.CreateLogger()); + if (!await TryTakeOwnership(orphanedShard, metadata, SiloAddress, isStolen, cancellationToken)) { - // Try to claim orphaned shard - LogClaimingShard(_logger, blob.Name, SiloAddress, owner); - var blobClient = _client.GetAppendBlobClient(blob.Name); - var metadata = blob.Metadata; - var orphanedShard = new AzureStorageJobShard(blob.Name, shardStartTime, maxDueTime, blobClient, metadata, blob.Properties.ETag, _options, _loggerFactory.CreateLogger()); - if (!await TryTakeOwnership(orphanedShard, metadata, SiloAddress, cancellationToken)) - { - // Someone else took over the shard, dispose and continue - await orphanedShard.DisposeAsync(); - LogShardOwnershipConflict(_logger, blob.Name, SiloAddress); - continue; - } - await orphanedShard.InitializeAsync(cancellationToken); - // We don't want to add new jobs to shards that we just took ownership of - await orphanedShard.MarkAsCompleteAsync(cancellationToken); - _jobShardCache[blob.Name] = orphanedShard; - LogShardAssigned(_logger, blob.Name, SiloAddress); - result.Add(orphanedShard); + // Either poisoned shard or someone else took ownership - dispose and continue + await orphanedShard.DisposeAsync(); + continue; } + await orphanedShard.InitializeAsync(cancellationToken); + // We don't want to add new jobs to shards that we just took ownership of + await orphanedShard.MarkAsCompleteAsync(cancellationToken); + _jobShardCache[blob.Name] = orphanedShard; + LogShardAssigned(_logger, blob.Name, SiloAddress); + result.Add(orphanedShard); } LogAssignmentCompleted(_logger, result.Count, SiloAddress); @@ -146,6 +153,9 @@ async Task ReleaseOwnership(string blobName) var properties = await blobClient.GetPropertiesAsync(cancellationToken: cancellationToken); var metadata = properties.Value.Metadata; metadata.Remove("Owner"); + // Reset stolen count since we're gracefully releasing + metadata.Remove(StolenCountKey); + metadata.Remove(LastStolenTimeKey); await blobClient.SetMetadataAsync(metadata, new BlobRequestConditions { IfMatch = properties.Value.ETag }, cancellationToken); } catch (Exception ex) @@ -155,10 +165,28 @@ async Task ReleaseOwnership(string blobName) } } - async Task TryTakeOwnership(AzureStorageJobShard shard, IDictionary metadata, SiloAddress newOwner, CancellationToken ct) + async Task TryTakeOwnership(AzureStorageJobShard shard, IDictionary metadata, SiloAddress newOwner, bool isStolen, CancellationToken ct) { + if (isStolen) + { + // Increment stolen count for shards taken from dead owners + var stolenCount = GetStolenCount(metadata) + 1; + + if (stolenCount > _durableJobsOptions.MaxStolenCount) + { + // Shard is poisoned - don't claim it + LogPoisonedShardDetected(_logger, shard.Id, stolenCount, _durableJobsOptions.MaxStolenCount); + return false; + } + + metadata[StolenCountKey] = stolenCount.ToString(CultureInfo.InvariantCulture); + metadata[LastStolenTimeKey] = DateTimeOffset.UtcNow.ToString("o"); + LogShardStolen(_logger, shard.Id, newOwner, stolenCount); + } + metadata["Owner"] = newOwner.ToParsableString(); metadata["MembershipVersion"] = _clusterMembership.CurrentSnapshot.Version.Value.ToString(); + try { await shard.UpdateBlobMetadata(metadata, ct); @@ -172,6 +200,11 @@ async Task TryTakeOwnership(AzureStorageJobShard shard, IDictionary metadata) + { + return metadata.TryGetValue(StolenCountKey, out var countStr) && int.TryParse(countStr, out var count) ? count : 0; + } } public override async Task CreateShardAsync(DateTimeOffset minDueTime, DateTimeOffset maxDueTime, IDictionary metadata, CancellationToken cancellationToken) @@ -244,9 +277,12 @@ public override async Task UnregisterShardAsync(Orleans.DurableJobs.IJobShard sh if (count > 0) { - // There are still jobs in the shard, unregister it + // There are still jobs in the shard, release ownership gracefully. metadata.Remove("Owner"); - var response = await azureShard.BlobClient.SetMetadataAsync(metadata, conditions, cancellationToken); + // Reset stolen count since we're gracefully releasing (not crashing) + metadata.Remove(StolenCountKey); + metadata.Remove(LastStolenTimeKey); + await azureShard.BlobClient.SetMetadataAsync(metadata, conditions, cancellationToken); _jobShardCache.TryRemove(shard.Id, out _); LogShardOwnershipReleased(_logger, shard.Id, SiloAddress, count); } @@ -414,4 +450,16 @@ private static (SiloAddress? owner, MembershipVersion membershipVersion, DateTim Message = "Deleted shard '{ShardId}' by silo {SiloAddress} (no jobs remaining)" )] private static partial void LogShardDeleted(ILogger logger, string shardId, SiloAddress siloAddress); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Poisoned shard detected: '{ShardId}' has been stolen {StolenCount} times (max allowed: {MaxStolenCount}). Shard will not be assigned." + )] + private static partial void LogPoisonedShardDetected(ILogger logger, string shardId, int stolenCount, int maxStolenCount); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Shard '{ShardId}' stolen by silo {SiloAddress} (stolen count: {StolenCount})" + )] + private static partial void LogShardStolen(ILogger logger, string shardId, SiloAddress siloAddress, int stolenCount); } diff --git a/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs b/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs index f271a2b1d32..8544fa6712d 100644 --- a/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs +++ b/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs @@ -47,6 +47,24 @@ public sealed class DurableJobsOptions /// public Func ShouldRetry { get; set; } = DefaultShouldRetry; + /// + /// Gets or sets the maximum number of times a shard can be stolen from a dead owner before + /// being marked as poisoned. A shard that repeatedly causes silos to crash will exceed this + /// threshold as it bounces between owners. When the stolen count is reached, the shard is + /// considered poisoned and will no longer be assigned to any silo. + /// Default: 3. + /// + /// + /// + /// The stolen count is only incremented when a shard is taken from a dead silo (i.e., the previous + /// owner crashed). It is NOT incremented when a silo gracefully shuts down and releases ownership. + /// + /// + /// When a shard completes successfully (all jobs processed), the stolen count is reset to 0. + /// + /// + public int MaxStolenCount { get; set; } = 3; + private static DateTimeOffset? DefaultShouldRetry(IJobRunContext jobContext, Exception ex) { // Default retry logic: retry up to 5 times with exponential backoff @@ -81,6 +99,10 @@ public void ValidateConfiguration() { throw new OrleansConfigurationException("DurableJobsOptions.ShouldRetry must not be null."); } + if (options.MaxStolenCount < 0) + { + throw new OrleansConfigurationException("DurableJobsOptions.MaxStolenCount must be greater than or equal to zero."); + } _logger.LogInformation("DurableJobsOptions validated: ShardDuration={ShardDuration}", options.ShardDuration); } } diff --git a/src/Orleans.DurableJobs/JobShardManager.cs b/src/Orleans.DurableJobs/JobShardManager.cs index b7ab58063aa..71535a272b8 100644 --- a/src/Orleans.DurableJobs/JobShardManager.cs +++ b/src/Orleans.DurableJobs/JobShardManager.cs @@ -60,14 +60,20 @@ internal class InMemoryJobShardManager : JobShardManager private static readonly Dictionary _globalShardStore = new(); private static readonly SemaphoreSlim _asyncLock = new(1, 1); private readonly IClusterMembershipService? _membershipService; + private readonly int _maxStolenCount; - public InMemoryJobShardManager(SiloAddress siloAddress) : base(siloAddress) + public InMemoryJobShardManager(SiloAddress siloAddress) : this(siloAddress, null, 3) { } - public InMemoryJobShardManager(SiloAddress siloAddress, IClusterMembershipService membershipService) : base(siloAddress) + public InMemoryJobShardManager(SiloAddress siloAddress, IClusterMembershipService? membershipService) : this(siloAddress, membershipService, 3) + { + } + + public InMemoryJobShardManager(SiloAddress siloAddress, IClusterMembershipService? membershipService, int maxStolenCount) : base(siloAddress) { _membershipService = membershipService; + _maxStolenCount = maxStolenCount; } /// @@ -86,6 +92,66 @@ internal static async Task ClearAllShardsAsync() } } + /// + /// Gets ownership info for a shard. For testing purposes only. + /// + internal static async Task<(string? Owner, int StolenCount)?> GetOwnershipInfoAsync(string shardId) + { + await _asyncLock.WaitAsync(); + try + { + if (_globalShardStore.TryGetValue(shardId, out var ownership)) + { + return (ownership.OwnerSiloAddress, ownership.StolenCount); + } + return null; + } + finally + { + _asyncLock.Release(); + } + } + + /// + /// Gets debug info for testing. For testing purposes only. + /// + internal static async Task GetDebugInfoAsync(string shardId, IClusterMembershipService? membershipService) + { + await _asyncLock.WaitAsync(); + try + { + var sb = new System.Text.StringBuilder(); + + if (!_globalShardStore.TryGetValue(shardId, out var ownership)) + { + return "Shard not found in store"; + } + + sb.AppendLine($"Owner: '{ownership.OwnerSiloAddress}'"); + sb.AppendLine($"StolenCount: {ownership.StolenCount}"); + + var snapshot = membershipService?.CurrentSnapshot; + if (snapshot is null) + { + sb.AppendLine("Snapshot is null"); + } + else + { + sb.AppendLine($"Snapshot has {snapshot.Members.Count} members"); + foreach (var member in snapshot.Members.Values) + { + sb.AppendLine($" Member: '{member.SiloAddress}' ({member.SiloAddress.ToString()}) Status: {member.Status}"); + } + } + + return sb.ToString(); + } + finally + { + _asyncLock.Release(); + } + } + public override async Task> AssignJobShardsAsync(DateTimeOffset maxDueTime, CancellationToken cancellationToken) { var alreadyOwnedShards = new List(); @@ -121,12 +187,31 @@ public override async Task> AssignJobShardsAsync(DateTimeOffset { alreadyOwnedShards.Add(ownership.Shard); } + continue; } - // Take over orphaned shards or shards from dead silos - else if (ownership.OwnerSiloAddress is null || deadSilos.Contains(ownership.OwnerSiloAddress)) + + // Check if this is an orphaned shard (gracefully released) or stolen (from dead silo) + var isOrphaned = ownership.OwnerSiloAddress is null; + var ownerAddress = ownership.OwnerSiloAddress; + var isFromDeadSilo = ownerAddress is not null && deadSilos.Contains(ownerAddress); + + if (isOrphaned || isFromDeadSilo) { if (ownership.Shard.StartTime <= maxDueTime) { + // If stolen from dead silo, increment stolen count + if (isFromDeadSilo) + { + ownership.StolenCount++; + + // Check if shard is poisoned + if (ownership.StolenCount > _maxStolenCount) + { + // Shard is poisoned - don't assign it + continue; + } + } + ownership.OwnerSiloAddress = SiloAddress.ToString(); stolenShards.Add(ownership.Shard); } @@ -187,6 +272,8 @@ public override async Task UnregisterShardAsync(IJobShard shard, CancellationTok { // Mark as unowned so another silo can pick it up ownership.OwnerSiloAddress = null; + // Reset stolen count since we're gracefully releasing (not crashing) + ownership.StolenCount = 0; } } } @@ -200,5 +287,6 @@ private sealed class ShardOwnership { public required IJobShard Shard { get; init; } public string? OwnerSiloAddress { get; set; } + public int StolenCount { get; set; } } } diff --git a/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs b/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs new file mode 100644 index 00000000000..46c7b10b7de --- /dev/null +++ b/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs @@ -0,0 +1,244 @@ +#nullable enable + +using System.Collections.Immutable; +using System.Net; +using Orleans.DurableJobs; +using Orleans.Runtime; +using NSubstitute; +using Xunit; + +namespace NonSilo.Tests.DurableJobs; + +[TestCategory("DurableJobs")] +public class InMemoryJobShardManagerTests : IAsyncLifetime +{ + private static readonly SiloAddress Silo1 = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5001), 1); + private static readonly SiloAddress Silo2 = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5002), 2); + private static readonly SiloAddress Silo3 = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5003), 3); + private static readonly SiloAddress Silo4 = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5004), 4); + + public Task InitializeAsync() => InMemoryJobShardManager.ClearAllShardsAsync(); + + public Task DisposeAsync() => InMemoryJobShardManager.ClearAllShardsAsync(); + + [Fact] + public async Task CreateShardAsync_CreatesShardOwnedBySilo() + { + var manager = new InMemoryJobShardManager(Silo1); + var minDueTime = DateTimeOffset.UtcNow; + var maxDueTime = minDueTime.AddHours(1); + + var shard = await manager.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); + + Assert.NotNull(shard); + Assert.Equal(minDueTime, shard.StartTime); + Assert.Equal(maxDueTime, shard.EndTime); + } + + [Fact] + public async Task AssignJobShardsAsync_ReturnsOwnedShards() + { + var manager = new InMemoryJobShardManager(Silo1); + var minDueTime = DateTimeOffset.UtcNow; + var maxDueTime = minDueTime.AddHours(1); + + var createdShard = await manager.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); + var assignedShards = await manager.AssignJobShardsAsync(maxDueTime, CancellationToken.None); + + Assert.Single(assignedShards); + Assert.Equal(createdShard.Id, assignedShards[0].Id); + } + + [Fact] + public async Task AssignJobShardsAsync_OrphanedShard_IsAssignedWithoutIncrementingStolenCount() + { + // Silo1 creates a shard and gracefully releases it + var manager1 = new InMemoryJobShardManager(Silo1); + var minDueTime = DateTimeOffset.UtcNow; + var maxDueTime = minDueTime.AddHours(1); + + var shard = await manager1.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); + + // Schedule a job so the shard isn't deleted on unregister + await shard.TryScheduleJobAsync(GrainId.Create("test", "grain1"), "TestJob", minDueTime.AddMinutes(30), null, CancellationToken.None); + + // Gracefully unregister (sets owner to null) + await manager1.UnregisterShardAsync(shard, CancellationToken.None); + + // Silo2 picks up the orphaned shard + var manager2 = new InMemoryJobShardManager(Silo2); + var assignedShards = await manager2.AssignJobShardsAsync(maxDueTime, CancellationToken.None); + + Assert.Single(assignedShards); + Assert.Equal(shard.Id, assignedShards[0].Id); + } + + [Fact] + public async Task AssignJobShardsAsync_StolenFromDeadSilo_IncrementsStolenCount() + { + // Setup membership service that reports Silo1 as dead + var membershipService = CreateMembershipService(deadSilos: [Silo1]); + + // Silo1 creates a shard (simulating it was created before death) + var manager1 = new InMemoryJobShardManager(Silo1, membershipService); + var minDueTime = DateTimeOffset.UtcNow; + var maxDueTime = minDueTime.AddHours(1); + + var shard = await manager1.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); + + // Silo2 steals the shard from dead Silo1 + var manager2 = new InMemoryJobShardManager(Silo2, membershipService, maxStolenCount: 3); + var assignedShards = await manager2.AssignJobShardsAsync(maxDueTime, CancellationToken.None); + + // Shard should be assigned (stolen count = 1, under threshold) + Assert.Single(assignedShards); + Assert.Equal(shard.Id, assignedShards[0].Id); + } + + [Fact] + public async Task AssignJobShardsAsync_PoisonedShard_IsNotAssigned() + { + // Setup membership service + var membershipService = Substitute.For(); + var snapshot = CreateMembershipSnapshot(deadSilos: [Silo1, Silo2, Silo3]); + membershipService.CurrentSnapshot.Returns(snapshot); + + // Silo1 creates a shard + var manager1 = new InMemoryJobShardManager(Silo1, membershipService, maxStolenCount: 2); + var minDueTime = DateTimeOffset.UtcNow; + var maxDueTime = minDueTime.AddHours(1); + + var shard = await manager1.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); + + // Silo2 steals from dead Silo1 (stolen count = 1) + var manager2 = new InMemoryJobShardManager(Silo2, membershipService, maxStolenCount: 2); + var shards2 = await manager2.AssignJobShardsAsync(maxDueTime, CancellationToken.None); + Assert.Single(shards2); + + // Silo3 steals from dead Silo2 (stolen count = 2) + var manager3 = new InMemoryJobShardManager(Silo3, membershipService, maxStolenCount: 2); + var shards3 = await manager3.AssignJobShardsAsync(maxDueTime, CancellationToken.None); + Assert.Single(shards3); + + // Silo4 tries to steal from dead Silo3 (stolen count would be 3, exceeds max of 2) + var manager4 = new InMemoryJobShardManager(Silo4, membershipService, maxStolenCount: 2); + var shards4 = await manager4.AssignJobShardsAsync(maxDueTime, CancellationToken.None); + + // Shard is poisoned and should not be assigned + Assert.Empty(shards4); + } + + [Fact] + public async Task AssignJobShardsAsync_MaxStolenCountOfZero_NeverAssignsStolenShards() + { + // Setup membership service that reports Silo1 as dead + var membershipService = CreateMembershipService(deadSilos: [Silo1]); + + // Silo1 creates a shard + var manager1 = new InMemoryJobShardManager(Silo1, membershipService, maxStolenCount: 0); + var minDueTime = DateTimeOffset.UtcNow; + var maxDueTime = minDueTime.AddHours(1); + + await manager1.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); + + // Silo2 tries to steal from dead Silo1 with maxStolenCount=0 + var manager2 = new InMemoryJobShardManager(Silo2, membershipService, maxStolenCount: 0); + var assignedShards = await manager2.AssignJobShardsAsync(maxDueTime, CancellationToken.None); + + // Shard should not be assigned (stolen count would be 1, exceeds max of 0) + Assert.Empty(assignedShards); + } + + [Fact] + public async Task AssignJobShardsAsync_ShardFromActiveSilo_IsNotAssigned() + { + // Setup membership service that reports Silo1 as active + var membershipService = CreateMembershipService(activeSilos: [Silo1]); + + // Silo1 creates a shard + var manager1 = new InMemoryJobShardManager(Silo1, membershipService); + var minDueTime = DateTimeOffset.UtcNow; + var maxDueTime = minDueTime.AddHours(1); + + await manager1.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); + + // Silo2 tries to get shards - should not get Silo1's shard since Silo1 is active + var manager2 = new InMemoryJobShardManager(Silo2, membershipService); + var assignedShards = await manager2.AssignJobShardsAsync(maxDueTime, CancellationToken.None); + + Assert.Empty(assignedShards); + } + + [Fact] + public async Task UnregisterShardAsync_WithNoJobsRemaining_RemovesShard() + { + var manager = new InMemoryJobShardManager(Silo1); + var minDueTime = DateTimeOffset.UtcNow; + var maxDueTime = minDueTime.AddHours(1); + + var shard = await manager.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); + + // Unregister with no jobs + await manager.UnregisterShardAsync(shard, CancellationToken.None); + + // Shard should be removed, not reassignable + var assignedShards = await manager.AssignJobShardsAsync(maxDueTime, CancellationToken.None); + Assert.Empty(assignedShards); + } + + [Fact] + public async Task UnregisterShardAsync_WithJobsRemaining_MarksShardAsOrphaned() + { + var manager1 = new InMemoryJobShardManager(Silo1); + var minDueTime = DateTimeOffset.UtcNow; + var maxDueTime = minDueTime.AddHours(1); + + var shard = await manager1.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); + + // Add a job + await shard.TryScheduleJobAsync(GrainId.Create("test", "grain1"), "TestJob", minDueTime.AddMinutes(30), null, CancellationToken.None); + + // Unregister with jobs remaining + await manager1.UnregisterShardAsync(shard, CancellationToken.None); + + // Shard should be orphaned and available for another silo + var manager2 = new InMemoryJobShardManager(Silo2); + var assignedShards = await manager2.AssignJobShardsAsync(maxDueTime, CancellationToken.None); + Assert.Single(assignedShards); + } + + private static IClusterMembershipService CreateMembershipService( + SiloAddress[]? activeSilos = null, + SiloAddress[]? deadSilos = null) + { + var membershipService = Substitute.For(); + var snapshot = CreateMembershipSnapshot(activeSilos, deadSilos); + membershipService.CurrentSnapshot.Returns(snapshot); + return membershipService; + } + + private static ClusterMembershipSnapshot CreateMembershipSnapshot( + SiloAddress[]? activeSilos = null, + SiloAddress[]? deadSilos = null) + { + var builder = ImmutableDictionary.CreateBuilder(); + + if (activeSilos is not null) + { + foreach (var silo in activeSilos) + { + builder[silo] = new ClusterMember(silo, SiloStatus.Active, silo.ToString()); + } + } + + if (deadSilos is not null) + { + foreach (var silo in deadSilos) + { + builder[silo] = new ClusterMember(silo, SiloStatus.Dead, silo.ToString()); + } + } + + return new ClusterMembershipSnapshot(builder.ToImmutable(), new MembershipVersion(1)); + } +} From d8bd6f9228c4d221438e55380045a2672ddaeec0 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Sun, 15 Feb 2026 14:30:34 -0800 Subject: [PATCH 2/5] Fix in-memory durable jobs MaxStolenCount wiring Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Hosting/DurableJobsExtensions.cs | 4 ++- .../InMemoryJobShardManagerTests.cs | 28 +++++++++++++++++++ .../InMemoryJobShardManagerTestFixture.cs | 7 +++-- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/Orleans.DurableJobs/Hosting/DurableJobsExtensions.cs b/src/Orleans.DurableJobs/Hosting/DurableJobsExtensions.cs index ea9319a4b42..20bb549c860 100644 --- a/src/Orleans.DurableJobs/Hosting/DurableJobsExtensions.cs +++ b/src/Orleans.DurableJobs/Hosting/DurableJobsExtensions.cs @@ -1,6 +1,7 @@ using System.Linq; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using Orleans.Configuration.Internal; using Orleans.Runtime; using Orleans.DurableJobs; @@ -72,7 +73,8 @@ internal static IServiceCollection UseInMemoryDurableJobs(this IServiceCollectio { var siloDetails = sp.GetRequiredService(); var membershipService = sp.GetRequiredService(); - return new InMemoryJobShardManager(siloDetails.SiloAddress, membershipService); + var durableJobsOptions = sp.GetRequiredService>(); + return new InMemoryJobShardManager(siloDetails.SiloAddress, membershipService, durableJobsOptions.Value.MaxStolenCount); }); services.AddFromExisting(); return services; diff --git a/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs b/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs index 46c7b10b7de..63eefa02c68 100644 --- a/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs +++ b/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs @@ -2,7 +2,9 @@ using System.Collections.Immutable; using System.Net; +using Microsoft.Extensions.DependencyInjection; using Orleans.DurableJobs; +using Orleans.Hosting; using Orleans.Runtime; using NSubstitute; using Xunit; @@ -149,6 +151,32 @@ public async Task AssignJobShardsAsync_MaxStolenCountOfZero_NeverAssignsStolenSh Assert.Empty(assignedShards); } + [Fact] + public async Task UseInMemoryDurableJobs_ConfiguredMaxStolenCount_IsApplied() + { + var membershipService = CreateMembershipService(deadSilos: [Silo2]); + var minDueTime = DateTimeOffset.UtcNow; + var maxDueTime = minDueTime.AddHours(1); + + var ownerManager = new InMemoryJobShardManager(Silo2, membershipService, maxStolenCount: 3); + await ownerManager.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); + + var localSiloDetails = Substitute.For(); + localSiloDetails.SiloAddress.Returns(Silo1); + + var services = new ServiceCollection(); + services.AddSingleton(localSiloDetails); + services.AddSingleton(membershipService); + services.Configure(options => options.MaxStolenCount = 0); + services.UseInMemoryDurableJobs(); + + using var serviceProvider = services.BuildServiceProvider(); + var manager = serviceProvider.GetRequiredService(); + + var assignedShards = await manager.AssignJobShardsAsync(maxDueTime, CancellationToken.None); + Assert.Empty(assignedShards); + } + [Fact] public async Task AssignJobShardsAsync_ShardFromActiveSilo_IsNotAssigned() { diff --git a/test/Tester/DurableJobs/InMemoryJobShardManagerTestFixture.cs b/test/Tester/DurableJobs/InMemoryJobShardManagerTestFixture.cs index e1115183215..f21a7376fc8 100644 --- a/test/Tester/DurableJobs/InMemoryJobShardManagerTestFixture.cs +++ b/test/Tester/DurableJobs/InMemoryJobShardManagerTestFixture.cs @@ -10,15 +10,18 @@ namespace Tester.DurableJobs; /// internal sealed class InMemoryJobShardManagerTestFixture : IJobShardManagerTestFixture { - public InMemoryJobShardManagerTestFixture() + private readonly int _maxStolenCount; + + public InMemoryJobShardManagerTestFixture(int maxStolenCount = 3) { + _maxStolenCount = maxStolenCount; // Clear any state from previous tests InMemoryJobShardManager.ClearAllShardsAsync().GetAwaiter().GetResult(); } public JobShardManager CreateManager(ILocalSiloDetails localSiloDetails, IClusterMembershipService membershipService) { - return new InMemoryJobShardManager(localSiloDetails.SiloAddress, membershipService); + return new InMemoryJobShardManager(localSiloDetails.SiloAddress, membershipService, _maxStolenCount); } public async ValueTask DisposeAsync() From 0608bce697b3f2f75596ce38e172be875ef9cef7 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Sun, 15 Feb 2026 14:47:00 -0800 Subject: [PATCH 3/5] Address PR 9907 review feedback Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../AzureStorageJobShardManager.cs | 49 +++++++++++++++++-- .../Hosting/DurableJobsOptions.cs | 4 +- src/Orleans.DurableJobs/JobShardManager.cs | 40 --------------- .../InMemoryJobShardManagerTests.cs | 12 ++++- 4 files changed, 58 insertions(+), 47 deletions(-) diff --git a/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs b/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs index 04e89a613f3..0d5b4cce515 100644 --- a/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs +++ b/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs @@ -33,6 +33,18 @@ public sealed partial class AzureStorageJobShardManager : JobShardManager private const string StolenCountKey = "StolenCount"; private const string LastStolenTimeKey = "LastStolenTime"; + public AzureStorageJobShardManager( + SiloAddress siloAddress, + BlobServiceClient client, + string containerName, + string blobPrefix, + AzureStorageJobShardOptions options, + IClusterMembershipService clusterMembership, + ILoggerFactory loggerFactory) + : this(siloAddress, client, containerName, blobPrefix, options, new DurableJobsOptions(), clusterMembership, loggerFactory) + { + } + public AzureStorageJobShardManager( SiloAddress siloAddress, BlobServiceClient client, @@ -54,6 +66,15 @@ public AzureStorageJobShardManager( _durableJobsOptions = durableJobsOptions; } + public AzureStorageJobShardManager( + ILocalSiloDetails localSiloDetails, + IOptions options, + IClusterMembershipService clusterMembership, + ILoggerFactory loggerFactory) + : this(localSiloDetails.SiloAddress, options.Value.BlobServiceClient, options.Value.ContainerName, localSiloDetails.ClusterId, options.Value, clusterMembership, loggerFactory) + { + } + public AzureStorageJobShardManager( ILocalSiloDetails localSiloDetails, IOptions options, @@ -169,12 +190,29 @@ async Task TryTakeOwnership(AzureStorageJobShard shard, IDictionary _durableJobsOptions.MaxStolenCount) + { + // Already marked as poisoned. + return false; + } + // Increment stolen count for shards taken from dead owners. + var stolenCount = existingStolenCount + 1; if (stolenCount > _durableJobsOptions.MaxStolenCount) { - // Shard is poisoned - don't claim it + // Persist poisoned marker so this shard is not repeatedly re-evaluated as newly poisoned. + metadata[StolenCountKey] = stolenCount.ToString(CultureInfo.InvariantCulture); + metadata[LastStolenTimeKey] = DateTimeOffset.UtcNow.ToString("o", CultureInfo.InvariantCulture); + try + { + await shard.UpdateBlobMetadata(metadata, ct); + } + catch (RequestFailedException ex) + { + LogOwnershipFailed(_logger, ex, shard.Id, newOwner); + } + LogPoisonedShardDetected(_logger, shard.Id, stolenCount, _durableJobsOptions.MaxStolenCount); return false; } @@ -203,7 +241,10 @@ async Task TryTakeOwnership(AzureStorageJobShard shard, IDictionary metadata) { - return metadata.TryGetValue(StolenCountKey, out var countStr) && int.TryParse(countStr, out var count) ? count : 0; + return metadata.TryGetValue(StolenCountKey, out var countStr) + && int.TryParse(countStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out var count) + ? count + : 0; } } diff --git a/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs b/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs index 8544fa6712d..498bea34cdb 100644 --- a/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs +++ b/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs @@ -50,8 +50,8 @@ public sealed class DurableJobsOptions /// /// Gets or sets the maximum number of times a shard can be stolen from a dead owner before /// being marked as poisoned. A shard that repeatedly causes silos to crash will exceed this - /// threshold as it bounces between owners. When the stolen count is reached, the shard is - /// considered poisoned and will no longer be assigned to any silo. + /// threshold as it bounces between owners. When the next steal would cause the stolen count + /// to exceed this value, the shard is considered poisoned and will no longer be assigned to any silo. /// Default: 3. /// /// diff --git a/src/Orleans.DurableJobs/JobShardManager.cs b/src/Orleans.DurableJobs/JobShardManager.cs index 71535a272b8..ca3df69ec36 100644 --- a/src/Orleans.DurableJobs/JobShardManager.cs +++ b/src/Orleans.DurableJobs/JobShardManager.cs @@ -112,46 +112,6 @@ internal static async Task ClearAllShardsAsync() } } - /// - /// Gets debug info for testing. For testing purposes only. - /// - internal static async Task GetDebugInfoAsync(string shardId, IClusterMembershipService? membershipService) - { - await _asyncLock.WaitAsync(); - try - { - var sb = new System.Text.StringBuilder(); - - if (!_globalShardStore.TryGetValue(shardId, out var ownership)) - { - return "Shard not found in store"; - } - - sb.AppendLine($"Owner: '{ownership.OwnerSiloAddress}'"); - sb.AppendLine($"StolenCount: {ownership.StolenCount}"); - - var snapshot = membershipService?.CurrentSnapshot; - if (snapshot is null) - { - sb.AppendLine("Snapshot is null"); - } - else - { - sb.AppendLine($"Snapshot has {snapshot.Members.Count} members"); - foreach (var member in snapshot.Members.Values) - { - sb.AppendLine($" Member: '{member.SiloAddress}' ({member.SiloAddress.ToString()}) Status: {member.Status}"); - } - } - - return sb.ToString(); - } - finally - { - _asyncLock.Release(); - } - } - public override async Task> AssignJobShardsAsync(DateTimeOffset maxDueTime, CancellationToken cancellationToken) { var alreadyOwnedShards = new List(); diff --git a/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs b/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs index 63eefa02c68..bcb2e8d6d40 100644 --- a/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs +++ b/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs @@ -73,6 +73,11 @@ public async Task AssignJobShardsAsync_OrphanedShard_IsAssignedWithoutIncrementi Assert.Single(assignedShards); Assert.Equal(shard.Id, assignedShards[0].Id); + + var ownershipInfo = await InMemoryJobShardManager.GetOwnershipInfoAsync(shard.Id); + Assert.True(ownershipInfo.HasValue); + Assert.Equal(Silo2.ToString(), ownershipInfo.Value.Owner); + Assert.Equal(0, ownershipInfo.Value.StolenCount); } [Fact] @@ -95,6 +100,11 @@ public async Task AssignJobShardsAsync_StolenFromDeadSilo_IncrementsStolenCount( // Shard should be assigned (stolen count = 1, under threshold) Assert.Single(assignedShards); Assert.Equal(shard.Id, assignedShards[0].Id); + + var ownershipInfo = await InMemoryJobShardManager.GetOwnershipInfoAsync(shard.Id); + Assert.True(ownershipInfo.HasValue); + Assert.Equal(Silo2.ToString(), ownershipInfo.Value.Owner); + Assert.Equal(1, ownershipInfo.Value.StolenCount); } [Fact] @@ -110,7 +120,7 @@ public async Task AssignJobShardsAsync_PoisonedShard_IsNotAssigned() var minDueTime = DateTimeOffset.UtcNow; var maxDueTime = minDueTime.AddHours(1); - var shard = await manager1.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); + await manager1.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); // Silo2 steals from dead Silo1 (stolen count = 1) var manager2 = new InMemoryJobShardManager(Silo2, membershipService, maxStolenCount: 2); From f15138a3359d222878d48a162fe33d8328fc468e Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Sun, 15 Feb 2026 15:11:51 -0800 Subject: [PATCH 4/5] Use IOptions in Azure shard manager Replace direct DurableJobsOptions constructor usage with IOptions and remove redundant constructor overloads. Update Azure durable jobs tests to pass options via IOptions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../AzureStorageJobShardManager.cs | 27 +++---------------- .../AzureStorageJobShardBatchingTests.cs | 4 ++- .../AzureStorageJobShardManagerTestFixture.cs | 3 +++ 3 files changed, 9 insertions(+), 25 deletions(-) diff --git a/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs b/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs index 0d5b4cce515..b83e0aba18b 100644 --- a/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs +++ b/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs @@ -39,19 +39,7 @@ public AzureStorageJobShardManager( string containerName, string blobPrefix, AzureStorageJobShardOptions options, - IClusterMembershipService clusterMembership, - ILoggerFactory loggerFactory) - : this(siloAddress, client, containerName, blobPrefix, options, new DurableJobsOptions(), clusterMembership, loggerFactory) - { - } - - public AzureStorageJobShardManager( - SiloAddress siloAddress, - BlobServiceClient client, - string containerName, - string blobPrefix, - AzureStorageJobShardOptions options, - DurableJobsOptions durableJobsOptions, + IOptions durableJobsOptions, IClusterMembershipService clusterMembership, ILoggerFactory loggerFactory) : base(siloAddress) @@ -63,16 +51,7 @@ public AzureStorageJobShardManager( _logger = loggerFactory.CreateLogger(); _loggerFactory = loggerFactory; _options = options; - _durableJobsOptions = durableJobsOptions; - } - - public AzureStorageJobShardManager( - ILocalSiloDetails localSiloDetails, - IOptions options, - IClusterMembershipService clusterMembership, - ILoggerFactory loggerFactory) - : this(localSiloDetails.SiloAddress, options.Value.BlobServiceClient, options.Value.ContainerName, localSiloDetails.ClusterId, options.Value, clusterMembership, loggerFactory) - { + _durableJobsOptions = durableJobsOptions.Value; } public AzureStorageJobShardManager( @@ -81,7 +60,7 @@ public AzureStorageJobShardManager( IOptions durableJobsOptions, IClusterMembershipService clusterMembership, ILoggerFactory loggerFactory) - : this(localSiloDetails.SiloAddress, options.Value.BlobServiceClient, options.Value.ContainerName, localSiloDetails.ClusterId, options.Value, durableJobsOptions.Value, clusterMembership, loggerFactory) + : this(localSiloDetails.SiloAddress, options.Value.BlobServiceClient, options.Value.ContainerName, localSiloDetails.ClusterId, options.Value, durableJobsOptions, clusterMembership, loggerFactory) { } diff --git a/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardBatchingTests.cs b/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardBatchingTests.cs index b3e2846efe7..c00b32f9777 100644 --- a/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardBatchingTests.cs +++ b/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardBatchingTests.cs @@ -32,11 +32,13 @@ public class AzureStorageJobShardBatchingTests : AzureStorageBasicTests, IAsyncD internal InMemoryClusterMembershipService MembershipService { get; } internal IOptions StorageOptions { get; } + internal IOptions DurableJobsOptions { get; } public AzureStorageJobShardBatchingTests() { MembershipService = new InMemoryClusterMembershipService(); StorageOptions = Options.Create(new AzureStorageJobShardOptions()); + DurableJobsOptions = Options.Create(new DurableJobsOptions()); StorageOptions.Value.ConfigureTestDefaults(); StorageOptions.Value.ContainerName = "test-batch-container-" + Guid.NewGuid().ToString("N"); } @@ -70,7 +72,7 @@ public TestLocalSiloDetails(SiloAddress siloAddress) internal AzureStorageJobShardManager CreateManager(SiloAddress siloAddress) { var localSiloDetails = new TestLocalSiloDetails(siloAddress); - return new AzureStorageJobShardManager(localSiloDetails, StorageOptions, MembershipService, NullLoggerFactory.Instance); + return new AzureStorageJobShardManager(localSiloDetails, StorageOptions, DurableJobsOptions, MembershipService, NullLoggerFactory.Instance); } internal void SetSiloStatus(SiloAddress siloAddress, SiloStatus status) diff --git a/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardManagerTestFixture.cs b/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardManagerTestFixture.cs index c73aee7238e..bfbae50ba93 100644 --- a/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardManagerTestFixture.cs +++ b/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardManagerTestFixture.cs @@ -19,10 +19,12 @@ namespace Orleans.Tests.DurableJobs.AzureStorage; internal sealed class AzureStorageJobShardManagerTestFixture : IJobShardManagerTestFixture { private readonly IOptions _storageOptions; + private readonly IOptions _durableJobsOptions; public AzureStorageJobShardManagerTestFixture() { _storageOptions = Options.Create(new AzureStorageJobShardOptions()); + _durableJobsOptions = Options.Create(new DurableJobsOptions()); _storageOptions.Value.ConfigureTestDefaults(); _storageOptions.Value.ContainerName = "test-container-" + Guid.NewGuid().ToString("N"); } @@ -32,6 +34,7 @@ public JobShardManager CreateManager(ILocalSiloDetails localSiloDetails, ICluste return new AzureStorageJobShardManager( localSiloDetails, _storageOptions, + _durableJobsOptions, membershipService, NullLoggerFactory.Instance); } From cbe7eb9f747a9f62070a8880fdf164d2491cfb57 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Sun, 15 Feb 2026 15:57:07 -0800 Subject: [PATCH 5/5] Rename stolen shard terminology to adopted Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../AzureStorageJobShardManager.cs | 84 +++++++++++-------- .../Hosting/DurableJobsExtensions.cs | 2 +- .../Hosting/DurableJobsOptions.cs | 14 ++-- src/Orleans.DurableJobs/JobShardManager.cs | 34 ++++---- .../InMemoryJobShardManagerTests.cs | 44 +++++----- .../InMemoryJobShardManagerTestFixture.cs | 8 +- 6 files changed, 102 insertions(+), 84 deletions(-) diff --git a/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs b/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs index b83e0aba18b..73eb69694c3 100644 --- a/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs +++ b/src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs @@ -30,8 +30,10 @@ public sealed partial class AzureStorageJobShardManager : JobShardManager private readonly DurableJobsOptions _durableJobsOptions; private long _shardCounter = 0; // For generating unique shard IDs - private const string StolenCountKey = "StolenCount"; - private const string LastStolenTimeKey = "LastStolenTime"; + private const string AdoptedCountKey = "AdoptedCount"; + private const string LastAdoptedTimeKey = "LastAdoptedTime"; + private const string LegacyStolenCountKey = "StolenCount"; + private const string LegacyLastStolenTimeKey = "LastStolenTime"; public AzureStorageJobShardManager( SiloAddress siloAddress, @@ -120,15 +122,15 @@ public AzureStorageJobShardManager( continue; } - // Determine if this is a stolen shard (taken from dead owner) vs orphaned (gracefully released) - var isStolen = owner is not null && ownerStatus == SiloStatus.Dead; + // Determine if this is an adopted shard (taken from dead owner) vs orphaned (gracefully released) + var isAdopted = owner is not null && ownerStatus == SiloStatus.Dead; - // Try to claim orphaned or stolen shard + // Try to claim orphaned or adopted shard LogClaimingShard(_logger, blob.Name, SiloAddress, owner); var blobClient = _client.GetAppendBlobClient(blob.Name); var metadata = blob.Metadata; var orphanedShard = new AzureStorageJobShard(blob.Name, shardStartTime, maxDueTime, blobClient, metadata, blob.Properties.ETag, _options, _loggerFactory.CreateLogger()); - if (!await TryTakeOwnership(orphanedShard, metadata, SiloAddress, isStolen, cancellationToken)) + if (!await TryTakeOwnership(orphanedShard, metadata, SiloAddress, isAdopted, cancellationToken)) { // Either poisoned shard or someone else took ownership - dispose and continue await orphanedShard.DisposeAsync(); @@ -153,9 +155,11 @@ async Task ReleaseOwnership(string blobName) var properties = await blobClient.GetPropertiesAsync(cancellationToken: cancellationToken); var metadata = properties.Value.Metadata; metadata.Remove("Owner"); - // Reset stolen count since we're gracefully releasing - metadata.Remove(StolenCountKey); - metadata.Remove(LastStolenTimeKey); + // Reset adopted count since we're gracefully releasing + metadata.Remove(AdoptedCountKey); + metadata.Remove(LastAdoptedTimeKey); + metadata.Remove(LegacyStolenCountKey); + metadata.Remove(LegacyLastStolenTimeKey); await blobClient.SetMetadataAsync(metadata, new BlobRequestConditions { IfMatch = properties.Value.ETag }, cancellationToken); } catch (Exception ex) @@ -165,24 +169,23 @@ async Task ReleaseOwnership(string blobName) } } - async Task TryTakeOwnership(AzureStorageJobShard shard, IDictionary metadata, SiloAddress newOwner, bool isStolen, CancellationToken ct) + async Task TryTakeOwnership(AzureStorageJobShard shard, IDictionary metadata, SiloAddress newOwner, bool isAdopted, CancellationToken ct) { - if (isStolen) + if (isAdopted) { - var existingStolenCount = GetStolenCount(metadata); - if (existingStolenCount > _durableJobsOptions.MaxStolenCount) + var existingAdoptedCount = GetAdoptedCount(metadata); + if (existingAdoptedCount > _durableJobsOptions.MaxAdoptedCount) { // Already marked as poisoned. return false; } - // Increment stolen count for shards taken from dead owners. - var stolenCount = existingStolenCount + 1; - if (stolenCount > _durableJobsOptions.MaxStolenCount) + // Increment adopted count for shards taken from dead owners. + var adoptedCount = existingAdoptedCount + 1; + if (adoptedCount > _durableJobsOptions.MaxAdoptedCount) { // Persist poisoned marker so this shard is not repeatedly re-evaluated as newly poisoned. - metadata[StolenCountKey] = stolenCount.ToString(CultureInfo.InvariantCulture); - metadata[LastStolenTimeKey] = DateTimeOffset.UtcNow.ToString("o", CultureInfo.InvariantCulture); + SetAdoptedMetadata(metadata, adoptedCount, DateTimeOffset.UtcNow); try { await shard.UpdateBlobMetadata(metadata, ct); @@ -192,13 +195,12 @@ async Task TryTakeOwnership(AzureStorageJobShard shard, IDictionary TryTakeOwnership(AzureStorageJobShard shard, IDictionary metadata) + static int GetAdoptedCount(IDictionary metadata) { - return metadata.TryGetValue(StolenCountKey, out var countStr) - && int.TryParse(countStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out var count) - ? count + if (metadata.TryGetValue(AdoptedCountKey, out var countStr) + && int.TryParse(countStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out var adoptedCount)) + { + return adoptedCount; + } + + return metadata.TryGetValue(LegacyStolenCountKey, out countStr) + && int.TryParse(countStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out var legacyCount) + ? legacyCount : 0; } + + static void SetAdoptedMetadata(IDictionary metadata, int adoptedCount, DateTimeOffset adoptedTime) + { + metadata[AdoptedCountKey] = adoptedCount.ToString(CultureInfo.InvariantCulture); + metadata[LastAdoptedTimeKey] = adoptedTime.ToString("o", CultureInfo.InvariantCulture); + metadata.Remove(LegacyStolenCountKey); + metadata.Remove(LegacyLastStolenTimeKey); + } } public override async Task CreateShardAsync(DateTimeOffset minDueTime, DateTimeOffset maxDueTime, IDictionary metadata, CancellationToken cancellationToken) @@ -299,9 +315,11 @@ public override async Task UnregisterShardAsync(Orleans.DurableJobs.IJobShard sh { // There are still jobs in the shard, release ownership gracefully. metadata.Remove("Owner"); - // Reset stolen count since we're gracefully releasing (not crashing) - metadata.Remove(StolenCountKey); - metadata.Remove(LastStolenTimeKey); + // Reset adopted count since we're gracefully releasing (not crashing) + metadata.Remove(AdoptedCountKey); + metadata.Remove(LastAdoptedTimeKey); + metadata.Remove(LegacyStolenCountKey); + metadata.Remove(LegacyLastStolenTimeKey); await azureShard.BlobClient.SetMetadataAsync(metadata, conditions, cancellationToken); _jobShardCache.TryRemove(shard.Id, out _); LogShardOwnershipReleased(_logger, shard.Id, SiloAddress, count); @@ -473,13 +491,13 @@ private static (SiloAddress? owner, MembershipVersion membershipVersion, DateTim [LoggerMessage( Level = LogLevel.Warning, - Message = "Poisoned shard detected: '{ShardId}' has been stolen {StolenCount} times (max allowed: {MaxStolenCount}). Shard will not be assigned." + Message = "Poisoned shard detected: '{ShardId}' has been adopted {AdoptedCount} times (max allowed: {MaxAdoptedCount}). Shard will not be assigned." )] - private static partial void LogPoisonedShardDetected(ILogger logger, string shardId, int stolenCount, int maxStolenCount); + private static partial void LogPoisonedShardDetected(ILogger logger, string shardId, int adoptedCount, int maxAdoptedCount); [LoggerMessage( Level = LogLevel.Information, - Message = "Shard '{ShardId}' stolen by silo {SiloAddress} (stolen count: {StolenCount})" + Message = "Shard '{ShardId}' adopted by silo {SiloAddress} (adopted count: {AdoptedCount})" )] - private static partial void LogShardStolen(ILogger logger, string shardId, SiloAddress siloAddress, int stolenCount); + private static partial void LogShardAdopted(ILogger logger, string shardId, SiloAddress siloAddress, int adoptedCount); } diff --git a/src/Orleans.DurableJobs/Hosting/DurableJobsExtensions.cs b/src/Orleans.DurableJobs/Hosting/DurableJobsExtensions.cs index 20bb549c860..f0cd24844a3 100644 --- a/src/Orleans.DurableJobs/Hosting/DurableJobsExtensions.cs +++ b/src/Orleans.DurableJobs/Hosting/DurableJobsExtensions.cs @@ -74,7 +74,7 @@ internal static IServiceCollection UseInMemoryDurableJobs(this IServiceCollectio var siloDetails = sp.GetRequiredService(); var membershipService = sp.GetRequiredService(); var durableJobsOptions = sp.GetRequiredService>(); - return new InMemoryJobShardManager(siloDetails.SiloAddress, membershipService, durableJobsOptions.Value.MaxStolenCount); + return new InMemoryJobShardManager(siloDetails.SiloAddress, membershipService, durableJobsOptions.Value.MaxAdoptedCount); }); services.AddFromExisting(); return services; diff --git a/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs b/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs index 498bea34cdb..4dd33a7ebea 100644 --- a/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs +++ b/src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs @@ -48,22 +48,22 @@ public sealed class DurableJobsOptions public Func ShouldRetry { get; set; } = DefaultShouldRetry; /// - /// Gets or sets the maximum number of times a shard can be stolen from a dead owner before + /// Gets or sets the maximum number of times a shard can be adopted from a dead owner before /// being marked as poisoned. A shard that repeatedly causes silos to crash will exceed this - /// threshold as it bounces between owners. When the next steal would cause the stolen count + /// threshold as it bounces between owners. When the next adoption would cause the adopted count /// to exceed this value, the shard is considered poisoned and will no longer be assigned to any silo. /// Default: 3. /// /// /// - /// The stolen count is only incremented when a shard is taken from a dead silo (i.e., the previous + /// The adopted count is only incremented when a shard is taken from a dead silo (i.e., the previous /// owner crashed). It is NOT incremented when a silo gracefully shuts down and releases ownership. /// /// - /// When a shard completes successfully (all jobs processed), the stolen count is reset to 0. + /// When a shard completes successfully (all jobs processed), the adopted count is reset to 0. /// /// - public int MaxStolenCount { get; set; } = 3; + public int MaxAdoptedCount { get; set; } = 3; private static DateTimeOffset? DefaultShouldRetry(IJobRunContext jobContext, Exception ex) { @@ -99,9 +99,9 @@ public void ValidateConfiguration() { throw new OrleansConfigurationException("DurableJobsOptions.ShouldRetry must not be null."); } - if (options.MaxStolenCount < 0) + if (options.MaxAdoptedCount < 0) { - throw new OrleansConfigurationException("DurableJobsOptions.MaxStolenCount must be greater than or equal to zero."); + throw new OrleansConfigurationException("DurableJobsOptions.MaxAdoptedCount must be greater than or equal to zero."); } _logger.LogInformation("DurableJobsOptions validated: ShardDuration={ShardDuration}", options.ShardDuration); } diff --git a/src/Orleans.DurableJobs/JobShardManager.cs b/src/Orleans.DurableJobs/JobShardManager.cs index ca3df69ec36..29dc7a65c5d 100644 --- a/src/Orleans.DurableJobs/JobShardManager.cs +++ b/src/Orleans.DurableJobs/JobShardManager.cs @@ -60,7 +60,7 @@ internal class InMemoryJobShardManager : JobShardManager private static readonly Dictionary _globalShardStore = new(); private static readonly SemaphoreSlim _asyncLock = new(1, 1); private readonly IClusterMembershipService? _membershipService; - private readonly int _maxStolenCount; + private readonly int _maxAdoptedCount; public InMemoryJobShardManager(SiloAddress siloAddress) : this(siloAddress, null, 3) { @@ -70,10 +70,10 @@ public InMemoryJobShardManager(SiloAddress siloAddress, IClusterMembershipServic { } - public InMemoryJobShardManager(SiloAddress siloAddress, IClusterMembershipService? membershipService, int maxStolenCount) : base(siloAddress) + public InMemoryJobShardManager(SiloAddress siloAddress, IClusterMembershipService? membershipService, int maxAdoptedCount) : base(siloAddress) { _membershipService = membershipService; - _maxStolenCount = maxStolenCount; + _maxAdoptedCount = maxAdoptedCount; } /// @@ -95,14 +95,14 @@ internal static async Task ClearAllShardsAsync() /// /// Gets ownership info for a shard. For testing purposes only. /// - internal static async Task<(string? Owner, int StolenCount)?> GetOwnershipInfoAsync(string shardId) + internal static async Task<(string? Owner, int AdoptedCount)?> GetOwnershipInfoAsync(string shardId) { await _asyncLock.WaitAsync(); try { if (_globalShardStore.TryGetValue(shardId, out var ownership)) { - return (ownership.OwnerSiloAddress, ownership.StolenCount); + return (ownership.OwnerSiloAddress, ownership.AdoptedCount); } return null; } @@ -115,7 +115,7 @@ internal static async Task ClearAllShardsAsync() public override async Task> AssignJobShardsAsync(DateTimeOffset maxDueTime, CancellationToken cancellationToken) { var alreadyOwnedShards = new List(); - var stolenShards = new List(); + var adoptedShards = new List(); await _asyncLock.WaitAsync(cancellationToken); try @@ -150,7 +150,7 @@ public override async Task> AssignJobShardsAsync(DateTimeOffset continue; } - // Check if this is an orphaned shard (gracefully released) or stolen (from dead silo) + // Check if this is an orphaned shard (gracefully released) or adopted (from dead silo) var isOrphaned = ownership.OwnerSiloAddress is null; var ownerAddress = ownership.OwnerSiloAddress; var isFromDeadSilo = ownerAddress is not null && deadSilos.Contains(ownerAddress); @@ -159,13 +159,13 @@ public override async Task> AssignJobShardsAsync(DateTimeOffset { if (ownership.Shard.StartTime <= maxDueTime) { - // If stolen from dead silo, increment stolen count + // If adopted from dead silo, increment adopted count if (isFromDeadSilo) { - ownership.StolenCount++; + ownership.AdoptedCount++; // Check if shard is poisoned - if (ownership.StolenCount > _maxStolenCount) + if (ownership.AdoptedCount > _maxAdoptedCount) { // Shard is poisoned - don't assign it continue; @@ -173,7 +173,7 @@ public override async Task> AssignJobShardsAsync(DateTimeOffset } ownership.OwnerSiloAddress = SiloAddress.ToString(); - stolenShards.Add(ownership.Shard); + adoptedShards.Add(ownership.Shard); } } } @@ -183,13 +183,13 @@ public override async Task> AssignJobShardsAsync(DateTimeOffset _asyncLock.Release(); } - foreach (var shard in stolenShards) + foreach (var shard in adoptedShards) { - // Mark stolen shards as complete + // Mark adopted shards as complete await shard.MarkAsCompleteAsync(CancellationToken.None); } - return [.. alreadyOwnedShards, .. stolenShards]; + return [.. alreadyOwnedShards, .. adoptedShards]; } public override async Task CreateShardAsync(DateTimeOffset minDueTime, DateTimeOffset maxDueTime, IDictionary metadata, CancellationToken cancellationToken) @@ -232,8 +232,8 @@ public override async Task UnregisterShardAsync(IJobShard shard, CancellationTok { // Mark as unowned so another silo can pick it up ownership.OwnerSiloAddress = null; - // Reset stolen count since we're gracefully releasing (not crashing) - ownership.StolenCount = 0; + // Reset adopted count since we're gracefully releasing (not crashing) + ownership.AdoptedCount = 0; } } } @@ -247,6 +247,6 @@ private sealed class ShardOwnership { public required IJobShard Shard { get; init; } public string? OwnerSiloAddress { get; set; } - public int StolenCount { get; set; } + public int AdoptedCount { get; set; } } } diff --git a/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs b/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs index bcb2e8d6d40..a3a86d59ee3 100644 --- a/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs +++ b/test/NonSilo.Tests/DurableJobs/InMemoryJobShardManagerTests.cs @@ -52,7 +52,7 @@ public async Task AssignJobShardsAsync_ReturnsOwnedShards() } [Fact] - public async Task AssignJobShardsAsync_OrphanedShard_IsAssignedWithoutIncrementingStolenCount() + public async Task AssignJobShardsAsync_OrphanedShard_IsAssignedWithoutIncrementingAdoptedCount() { // Silo1 creates a shard and gracefully releases it var manager1 = new InMemoryJobShardManager(Silo1); @@ -77,11 +77,11 @@ public async Task AssignJobShardsAsync_OrphanedShard_IsAssignedWithoutIncrementi var ownershipInfo = await InMemoryJobShardManager.GetOwnershipInfoAsync(shard.Id); Assert.True(ownershipInfo.HasValue); Assert.Equal(Silo2.ToString(), ownershipInfo.Value.Owner); - Assert.Equal(0, ownershipInfo.Value.StolenCount); + Assert.Equal(0, ownershipInfo.Value.AdoptedCount); } [Fact] - public async Task AssignJobShardsAsync_StolenFromDeadSilo_IncrementsStolenCount() + public async Task AssignJobShardsAsync_AdoptedFromDeadSilo_IncrementsAdoptedCount() { // Setup membership service that reports Silo1 as dead var membershipService = CreateMembershipService(deadSilos: [Silo1]); @@ -93,18 +93,18 @@ public async Task AssignJobShardsAsync_StolenFromDeadSilo_IncrementsStolenCount( var shard = await manager1.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); - // Silo2 steals the shard from dead Silo1 - var manager2 = new InMemoryJobShardManager(Silo2, membershipService, maxStolenCount: 3); + // Silo2 adopts the shard from dead Silo1 + var manager2 = new InMemoryJobShardManager(Silo2, membershipService, maxAdoptedCount: 3); var assignedShards = await manager2.AssignJobShardsAsync(maxDueTime, CancellationToken.None); - // Shard should be assigned (stolen count = 1, under threshold) + // Shard should be assigned (adopted count = 1, under threshold) Assert.Single(assignedShards); Assert.Equal(shard.Id, assignedShards[0].Id); var ownershipInfo = await InMemoryJobShardManager.GetOwnershipInfoAsync(shard.Id); Assert.True(ownershipInfo.HasValue); Assert.Equal(Silo2.ToString(), ownershipInfo.Value.Owner); - Assert.Equal(1, ownershipInfo.Value.StolenCount); + Assert.Equal(1, ownershipInfo.Value.AdoptedCount); } [Fact] @@ -116,24 +116,24 @@ public async Task AssignJobShardsAsync_PoisonedShard_IsNotAssigned() membershipService.CurrentSnapshot.Returns(snapshot); // Silo1 creates a shard - var manager1 = new InMemoryJobShardManager(Silo1, membershipService, maxStolenCount: 2); + var manager1 = new InMemoryJobShardManager(Silo1, membershipService, maxAdoptedCount: 2); var minDueTime = DateTimeOffset.UtcNow; var maxDueTime = minDueTime.AddHours(1); await manager1.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); - // Silo2 steals from dead Silo1 (stolen count = 1) - var manager2 = new InMemoryJobShardManager(Silo2, membershipService, maxStolenCount: 2); + // Silo2 adopts from dead Silo1 (adopted count = 1) + var manager2 = new InMemoryJobShardManager(Silo2, membershipService, maxAdoptedCount: 2); var shards2 = await manager2.AssignJobShardsAsync(maxDueTime, CancellationToken.None); Assert.Single(shards2); - // Silo3 steals from dead Silo2 (stolen count = 2) - var manager3 = new InMemoryJobShardManager(Silo3, membershipService, maxStolenCount: 2); + // Silo3 adopts from dead Silo2 (adopted count = 2) + var manager3 = new InMemoryJobShardManager(Silo3, membershipService, maxAdoptedCount: 2); var shards3 = await manager3.AssignJobShardsAsync(maxDueTime, CancellationToken.None); Assert.Single(shards3); - // Silo4 tries to steal from dead Silo3 (stolen count would be 3, exceeds max of 2) - var manager4 = new InMemoryJobShardManager(Silo4, membershipService, maxStolenCount: 2); + // Silo4 tries to adopt from dead Silo3 (adopted count would be 3, exceeds max of 2) + var manager4 = new InMemoryJobShardManager(Silo4, membershipService, maxAdoptedCount: 2); var shards4 = await manager4.AssignJobShardsAsync(maxDueTime, CancellationToken.None); // Shard is poisoned and should not be assigned @@ -141,34 +141,34 @@ public async Task AssignJobShardsAsync_PoisonedShard_IsNotAssigned() } [Fact] - public async Task AssignJobShardsAsync_MaxStolenCountOfZero_NeverAssignsStolenShards() + public async Task AssignJobShardsAsync_MaxAdoptedCountOfZero_NeverAssignsAdoptedShards() { // Setup membership service that reports Silo1 as dead var membershipService = CreateMembershipService(deadSilos: [Silo1]); // Silo1 creates a shard - var manager1 = new InMemoryJobShardManager(Silo1, membershipService, maxStolenCount: 0); + var manager1 = new InMemoryJobShardManager(Silo1, membershipService, maxAdoptedCount: 0); var minDueTime = DateTimeOffset.UtcNow; var maxDueTime = minDueTime.AddHours(1); await manager1.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); - // Silo2 tries to steal from dead Silo1 with maxStolenCount=0 - var manager2 = new InMemoryJobShardManager(Silo2, membershipService, maxStolenCount: 0); + // Silo2 tries to adopt from dead Silo1 with maxAdoptedCount=0 + var manager2 = new InMemoryJobShardManager(Silo2, membershipService, maxAdoptedCount: 0); var assignedShards = await manager2.AssignJobShardsAsync(maxDueTime, CancellationToken.None); - // Shard should not be assigned (stolen count would be 1, exceeds max of 0) + // Shard should not be assigned (adopted count would be 1, exceeds max of 0) Assert.Empty(assignedShards); } [Fact] - public async Task UseInMemoryDurableJobs_ConfiguredMaxStolenCount_IsApplied() + public async Task UseInMemoryDurableJobs_ConfiguredMaxAdoptedCount_IsApplied() { var membershipService = CreateMembershipService(deadSilos: [Silo2]); var minDueTime = DateTimeOffset.UtcNow; var maxDueTime = minDueTime.AddHours(1); - var ownerManager = new InMemoryJobShardManager(Silo2, membershipService, maxStolenCount: 3); + var ownerManager = new InMemoryJobShardManager(Silo2, membershipService, maxAdoptedCount: 3); await ownerManager.CreateShardAsync(minDueTime, maxDueTime, new Dictionary(), CancellationToken.None); var localSiloDetails = Substitute.For(); @@ -177,7 +177,7 @@ public async Task UseInMemoryDurableJobs_ConfiguredMaxStolenCount_IsApplied() var services = new ServiceCollection(); services.AddSingleton(localSiloDetails); services.AddSingleton(membershipService); - services.Configure(options => options.MaxStolenCount = 0); + services.Configure(options => options.MaxAdoptedCount = 0); services.UseInMemoryDurableJobs(); using var serviceProvider = services.BuildServiceProvider(); diff --git a/test/Tester/DurableJobs/InMemoryJobShardManagerTestFixture.cs b/test/Tester/DurableJobs/InMemoryJobShardManagerTestFixture.cs index f21a7376fc8..205f1c260b9 100644 --- a/test/Tester/DurableJobs/InMemoryJobShardManagerTestFixture.cs +++ b/test/Tester/DurableJobs/InMemoryJobShardManagerTestFixture.cs @@ -10,18 +10,18 @@ namespace Tester.DurableJobs; /// internal sealed class InMemoryJobShardManagerTestFixture : IJobShardManagerTestFixture { - private readonly int _maxStolenCount; + private readonly int _maxAdoptedCount; - public InMemoryJobShardManagerTestFixture(int maxStolenCount = 3) + public InMemoryJobShardManagerTestFixture(int maxAdoptedCount = 3) { - _maxStolenCount = maxStolenCount; + _maxAdoptedCount = maxAdoptedCount; // Clear any state from previous tests InMemoryJobShardManager.ClearAllShardsAsync().GetAwaiter().GetResult(); } public JobShardManager CreateManager(ILocalSiloDetails localSiloDetails, IClusterMembershipService membershipService) { - return new InMemoryJobShardManager(localSiloDetails.SiloAddress, membershipService, _maxStolenCount); + return new InMemoryJobShardManager(localSiloDetails.SiloAddress, membershipService, _maxAdoptedCount); } public async ValueTask DisposeAsync()