From 9f106b592a5ec0b995c732664f8b99e59924b725 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Feb 2026 13:17:58 +0000 Subject: [PATCH 1/2] Flow CancellationToken through durable jobs tests with 2-minute timeout Add CancellationToken parameter to all DurableJobTestsRunner and JobShardManagerTestsRunner public methods. Replace CancellationToken.None with the passed token in JobShardManagerTestsRunner. Remove local CancellationTokenSource instances where they are no longer needed. Create CancellationTokenSource with 2-minute timeout in each test method and pass the token to the runner methods. This applies to: - InMemoryDurableJobsTests - InMemoryJobShardManagerTests - AzureStorageBlobDurableJobsTests - AzureStorageJobShardManagerTests - AzureStorageJobShardBatchingTests Co-authored-by: benjaminpetit <20427417+benjaminpetit@users.noreply.github.com> --- .../InMemoryDurableJobsTests.cs | 93 ++++++--- .../AzureStorageBlobDurableJobsTests.cs | 92 +++++--- .../AzureStorageJobShardBatchingTests.cs | 74 +++---- .../AzureStorageJobShardManagerTests.cs | 77 +++++-- .../DurableJobs/DurableJobTestsRunner.cs | 27 +-- .../InMemoryJobShardManagerTests.cs | 68 +++++- .../DurableJobs/JobShardManagerTestsRunner.cs | 196 +++++++++--------- 7 files changed, 393 insertions(+), 234 deletions(-) diff --git a/test/DefaultCluster.Tests/InMemoryDurableJobsTests.cs b/test/DefaultCluster.Tests/InMemoryDurableJobsTests.cs index d4c036f414c..15d6036c119 100644 --- a/test/DefaultCluster.Tests/InMemoryDurableJobsTests.cs +++ b/test/DefaultCluster.Tests/InMemoryDurableJobsTests.cs @@ -1,3 +1,5 @@ +using System; +using System.Threading; using System.Threading.Tasks; using Tester.DurableJobs; using TestExtensions; @@ -15,54 +17,93 @@ public InMemoryDurableJobsTests(DefaultClusterFixture fixture) : base(fixture) } [Fact, TestCategory("BVT"), TestCategory("DurableJobs")] - public Task DurableJobGrain() - => _runner.DurableJobGrain(); + public async Task DurableJobGrain() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.DurableJobGrain(cts.Token); + } [Fact, TestCategory("BVT"), TestCategory("DurableJobs")] - public Task JobExecutionOrder() - => _runner.JobExecutionOrder(); + public async Task JobExecutionOrder() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobExecutionOrder(cts.Token); + } [Fact, TestCategory("BVT"), TestCategory("DurableJobs")] - public Task PastDueTime() - => _runner.PastDueTime(); + public async Task PastDueTime() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.PastDueTime(cts.Token); + } [Fact, TestCategory("BVT"), TestCategory("DurableJobs")] - public Task JobWithMetadata() - => _runner.JobWithMetadata(); + public async Task JobWithMetadata() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobWithMetadata(cts.Token); + } [Fact, TestCategory("BVT"), TestCategory("DurableJobs")] - public Task MultipleGrains() - => _runner.MultipleGrains(); + public async Task MultipleGrains() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.MultipleGrains(cts.Token); + } [Fact, TestCategory("BVT"), TestCategory("DurableJobs")] - public Task DuplicateJobNames() - => _runner.DuplicateJobNames(); + public async Task DuplicateJobNames() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.DuplicateJobNames(cts.Token); + } [Fact, TestCategory("BVT"), TestCategory("DurableJobs")] - public Task CancelNonExistentJob() - => _runner.CancelNonExistentJob(); + public async Task CancelNonExistentJob() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.CancelNonExistentJob(cts.Token); + } [Fact, TestCategory("BVT"), TestCategory("DurableJobs")] - public Task CancelAlreadyExecutedJob() - => _runner.CancelAlreadyExecutedJob(); + public async Task CancelAlreadyExecutedJob() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.CancelAlreadyExecutedJob(cts.Token); + } [Fact, TestCategory("BVT"), TestCategory("DurableJobs")] - public Task ConcurrentScheduling() - => _runner.ConcurrentScheduling(); + public async Task ConcurrentScheduling() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ConcurrentScheduling(cts.Token); + } [Fact, TestCategory("BVT"), TestCategory("DurableJobs")] - public Task JobPropertiesVerification() - => _runner.JobPropertiesVerification(); + public async Task JobPropertiesVerification() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobPropertiesVerification(cts.Token); + } [Fact, TestCategory("BVT"), TestCategory("DurableJobs")] - public Task DequeueCount() - => _runner.DequeueCount(); + public async Task DequeueCount() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.DequeueCount(cts.Token); + } [Fact, TestCategory("BVT"), TestCategory("DurableJobs")] - public Task ScheduleJobOnAnotherGrain() - => _runner.ScheduleJobOnAnotherGrain(); + public async Task ScheduleJobOnAnotherGrain() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ScheduleJobOnAnotherGrain(cts.Token); + } [Fact, TestCategory("BVT"), TestCategory("DurableJobs")] - public Task JobRetry() - => _runner.JobRetry(); + public async Task JobRetry() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobRetry(cts.Token); + } } diff --git a/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageBlobDurableJobsTests.cs b/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageBlobDurableJobsTests.cs index e371ac1901f..4dd964c8f0d 100644 --- a/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageBlobDurableJobsTests.cs +++ b/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageBlobDurableJobsTests.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Orleans.Configuration; @@ -38,54 +39,93 @@ public void Configure(ISiloBuilder hostBuilder) } [SkippableFact, TestCategory("Azure"), TestCategory("DurableJobs")] - public Task DurableJobGrain() - => _runner.DurableJobGrain(); + public async Task DurableJobGrain() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.DurableJobGrain(cts.Token); + } [SkippableFact, TestCategory("Azure"), TestCategory("DurableJobs")] - public Task JobExecutionOrder() - => _runner.JobExecutionOrder(); + public async Task JobExecutionOrder() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobExecutionOrder(cts.Token); + } [SkippableFact, TestCategory("Azure"), TestCategory("DurableJobs")] - public Task PastDueTime() - => _runner.PastDueTime(); + public async Task PastDueTime() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.PastDueTime(cts.Token); + } [SkippableFact, TestCategory("Azure"), TestCategory("DurableJobs")] - public Task JobWithMetadata() - => _runner.JobWithMetadata(); + public async Task JobWithMetadata() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobWithMetadata(cts.Token); + } [SkippableFact, TestCategory("Azure"), TestCategory("DurableJobs")] - public Task MultipleGrains() - => _runner.MultipleGrains(); + public async Task MultipleGrains() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.MultipleGrains(cts.Token); + } [SkippableFact, TestCategory("Azure"), TestCategory("DurableJobs")] - public Task DuplicateJobNames() - => _runner.DuplicateJobNames(); + public async Task DuplicateJobNames() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.DuplicateJobNames(cts.Token); + } [SkippableFact, TestCategory("Azure"), TestCategory("DurableJobs")] - public Task CancelNonExistentJob() - => _runner.CancelNonExistentJob(); + public async Task CancelNonExistentJob() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.CancelNonExistentJob(cts.Token); + } [SkippableFact, TestCategory("Azure"), TestCategory("DurableJobs")] - public Task CancelAlreadyExecutedJob() - => _runner.CancelAlreadyExecutedJob(); + public async Task CancelAlreadyExecutedJob() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.CancelAlreadyExecutedJob(cts.Token); + } [SkippableFact, TestCategory("Azure"), TestCategory("DurableJobs")] - public Task ConcurrentScheduling() - => _runner.ConcurrentScheduling(); + public async Task ConcurrentScheduling() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ConcurrentScheduling(cts.Token); + } [SkippableFact, TestCategory("Azure"), TestCategory("DurableJobs")] - public Task JobPropertiesVerification() - => _runner.JobPropertiesVerification(); + public async Task JobPropertiesVerification() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobPropertiesVerification(cts.Token); + } [SkippableFact, TestCategory("Azure"), TestCategory("DurableJobs")] - public Task DequeueCount() - => _runner.DequeueCount(); + public async Task DequeueCount() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.DequeueCount(cts.Token); + } [SkippableFact, TestCategory("Azure"), TestCategory("DurableJobs")] - public Task ScheduleJobOnAnotherGrain() - => _runner.ScheduleJobOnAnotherGrain(); + public async Task ScheduleJobOnAnotherGrain() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ScheduleJobOnAnotherGrain(cts.Token); + } [SkippableFact, TestCategory("Azure"), TestCategory("DurableJobs")] - public Task JobRetry() - => _runner.JobRetry(); + public async Task JobRetry() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobRetry(cts.Token); + } } diff --git a/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardBatchingTests.cs b/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardBatchingTests.cs index b3e2846efe7..54341233b7b 100644 --- a/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardBatchingTests.cs +++ b/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardBatchingTests.cs @@ -81,6 +81,8 @@ internal void SetSiloStatus(SiloAddress siloAddress, SiloStatus status) [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] public async Task AzureStorageJobShard_MultipleOperationsBatched() { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + var cancellationToken = cts.Token; // Configure batching options to batch multiple operations StorageOptions.Value.MinBatchSize = 5; StorageOptions.Value.MaxBatchSize = 50; @@ -91,19 +93,19 @@ public async Task AzureStorageJobShard_MultipleOperationsBatched() var manager = CreateManager(localAddress); var date = DateTime.UtcNow; - var shard = await manager.CreateShardAsync(date, date.AddHours(1), _metadata, CancellationToken.None); + var shard = await manager.CreateShardAsync(date, date.AddHours(1), _metadata, cancellationToken); // Schedule 10 jobs rapidly to trigger batching var tasks = new List(); for (int i = 0; i < 10; i++) { - tasks.Add(shard.TryScheduleJobAsync(GrainId.Create("type", $"target{i}"), $"job{i}", date.AddMilliseconds(i*10), null, CancellationToken.None)); + tasks.Add(shard.TryScheduleJobAsync(GrainId.Create("type", $"target{i}"), $"job{i}", date.AddMilliseconds(i*10), null, cancellationToken)); } await Task.WhenAll(tasks); // Wait for batches to flush - await Task.Delay(TimeSpan.FromMilliseconds(300)); + await Task.Delay(TimeSpan.FromMilliseconds(300), cancellationToken); // Verify batching occurred - should have fewer committed blocks than individual operations var azureShard = (AzureStorageJobShard)shard; @@ -115,24 +117,25 @@ public async Task AzureStorageJobShard_MultipleOperationsBatched() SetSiloStatus(newSiloAddress, SiloStatus.Active); var newManager = CreateManager(newSiloAddress); - var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), CancellationToken.None); + var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken); Assert.Single(shards); var consumedJobs = new List(); - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); - await foreach (var jobCtx in shards[0].ConsumeDurableJobsAsync().WithCancellation(cts.Token)) + await foreach (var jobCtx in shards[0].ConsumeDurableJobsAsync().WithCancellation(cancellationToken)) { consumedJobs.Add(jobCtx.Job.Name); - await shards[0].RemoveJobAsync(jobCtx.Job.Id, CancellationToken.None); + await shards[0].RemoveJobAsync(jobCtx.Job.Id, cancellationToken); } Assert.Equal(10, consumedJobs.Count); - await newManager.UnregisterShardAsync(shards[0], CancellationToken.None); + await newManager.UnregisterShardAsync(shards[0], cancellationToken); } [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] public async Task AzureStorageJobShard_PartialBatchFlushesOnTimeout() { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + var cancellationToken = cts.Token; // Configure batching to require 10 operations but with a short timeout StorageOptions.Value.MinBatchSize = 10; StorageOptions.Value.MaxBatchSize = 100; @@ -143,13 +146,13 @@ public async Task AzureStorageJobShard_PartialBatchFlushesOnTimeout() var manager = CreateManager(localAddress); var date = DateTime.UtcNow; - var shard = await manager.CreateShardAsync(date, date.AddHours(1), _metadata, CancellationToken.None); + var shard = await manager.CreateShardAsync(date, date.AddHours(1), _metadata, cancellationToken); // Schedule only 3 jobs (less than MinBatchSize of 10) var tasks = new Task[3]; - tasks[0] = shard.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", date.AddSeconds(1), null, CancellationToken.None); - tasks[1] = shard.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job2", date.AddSeconds(2), null, CancellationToken.None); - tasks[2] = shard.TryScheduleJobAsync(GrainId.Create("type", "target3"), "job3", date.AddSeconds(3), null, CancellationToken.None); + tasks[0] = shard.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", date.AddSeconds(1), null, cancellationToken); + tasks[1] = shard.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job2", date.AddSeconds(2), null, cancellationToken); + tasks[2] = shard.TryScheduleJobAsync(GrainId.Create("type", "target3"), "job3", date.AddSeconds(3), null, cancellationToken); await Task.WhenAll(tasks); @@ -163,24 +166,25 @@ public async Task AzureStorageJobShard_PartialBatchFlushesOnTimeout() SetSiloStatus(newSiloAddress, SiloStatus.Active); var newManager = CreateManager(newSiloAddress); - var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), CancellationToken.None); + var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken); Assert.Single(shards); var consumedJobs = new List(); - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); - await foreach (var jobCtx in shards[0].ConsumeDurableJobsAsync().WithCancellation(cts.Token)) + await foreach (var jobCtx in shards[0].ConsumeDurableJobsAsync().WithCancellation(cancellationToken)) { consumedJobs.Add(jobCtx.Job.Name); - await shards[0].RemoveJobAsync(jobCtx.Job.Id, CancellationToken.None); + await shards[0].RemoveJobAsync(jobCtx.Job.Id, cancellationToken); } Assert.Equal(3, consumedJobs.Count); - await newManager.UnregisterShardAsync(shards[0], CancellationToken.None); + await newManager.UnregisterShardAsync(shards[0], cancellationToken); } [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] public async Task AzureStorageJobShard_MaxBatchSizeEnforced() { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + var cancellationToken = cts.Token; // Configure batching with a small max batch size StorageOptions.Value.MinBatchSize = 1; StorageOptions.Value.MaxBatchSize = 20; @@ -191,19 +195,19 @@ public async Task AzureStorageJobShard_MaxBatchSizeEnforced() var manager = CreateManager(localAddress); var date = DateTime.UtcNow; - var shard = await manager.CreateShardAsync(date, date.AddHours(1), _metadata, CancellationToken.None); + var shard = await manager.CreateShardAsync(date, date.AddHours(1), _metadata, cancellationToken); // Schedule 50 jobs rapidly (exceeds MaxBatchSize of 20) var tasks = new List(); for (int i = 0; i < 50; i++) { - tasks.Add(shard.TryScheduleJobAsync(GrainId.Create("type", $"target{i}"), $"job{i}", date.AddMilliseconds(i), null, CancellationToken.None)); + tasks.Add(shard.TryScheduleJobAsync(GrainId.Create("type", $"target{i}"), $"job{i}", date.AddMilliseconds(i), null, cancellationToken)); } await Task.WhenAll(tasks); // Wait for all batches to flush - await Task.Delay(TimeSpan.FromMilliseconds(500)); + await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken); // Verify multiple batches were created due to MaxBatchSize limit // With 50 jobs and MaxBatchSize=20, expect at least 3 blocks (50/20 = 2.5, rounded up) @@ -216,24 +220,25 @@ public async Task AzureStorageJobShard_MaxBatchSizeEnforced() SetSiloStatus(newSiloAddress, SiloStatus.Active); var newManager = CreateManager(newSiloAddress); - var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), CancellationToken.None); + var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken); Assert.Single(shards); var consumedJobs = new List(); - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await foreach (var jobCtx in shards[0].ConsumeDurableJobsAsync().WithCancellation(cts.Token)) + await foreach (var jobCtx in shards[0].ConsumeDurableJobsAsync().WithCancellation(cancellationToken)) { consumedJobs.Add(jobCtx.Job.Name); - await shards[0].RemoveJobAsync(jobCtx.Job.Id, CancellationToken.None); + await shards[0].RemoveJobAsync(jobCtx.Job.Id, cancellationToken); } Assert.Equal(50, consumedJobs.Count); - await newManager.UnregisterShardAsync(shards[0], CancellationToken.None); + await newManager.UnregisterShardAsync(shards[0], cancellationToken); } [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] public async Task AzureStorageJobShard_MetadataOperationsBreakBatches() { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + var cancellationToken = cts.Token; // Configure batching to require large batch StorageOptions.Value.MinBatchSize = 10; StorageOptions.Value.MaxBatchSize = 100; @@ -244,17 +249,17 @@ public async Task AzureStorageJobShard_MetadataOperationsBreakBatches() var manager = CreateManager(localAddress); var date = DateTime.UtcNow; - var shard = await manager.CreateShardAsync(date, date.AddHours(1), _metadata, CancellationToken.None); + var shard = await manager.CreateShardAsync(date, date.AddHours(1), _metadata, cancellationToken); // Schedule 5 jobs (less than MinBatchSize) var tasks = new List(); for (int i = 0; i < 5; i++) { - tasks.Add(shard.TryScheduleJobAsync(GrainId.Create("type", $"target{i}"), $"job{i}", date.AddMilliseconds(i), null, CancellationToken.None)); + tasks.Add(shard.TryScheduleJobAsync(GrainId.Create("type", $"target{i}"), $"job{i}", date.AddMilliseconds(i), null, cancellationToken)); } // Give operations time to queue - await Task.Delay(50); + await Task.Delay(50, cancellationToken); // Verify no blocks committed yet (batch still pending) var azureShard = (AzureStorageJobShard)shard; @@ -262,13 +267,13 @@ public async Task AzureStorageJobShard_MetadataOperationsBreakBatches() // Update metadata (should flush pending batch and process immediately) var newMetadata = new Dictionary(shard.Metadata) { ["Updated"] = "true" }; - await azureShard.UpdateBlobMetadata(newMetadata, CancellationToken.None); + await azureShard.UpdateBlobMetadata(newMetadata, cancellationToken); Assert.All(tasks, t => Assert.True(t.IsCompletedSuccessfully, "Expected all job scheduling tasks to complete successfully")); Assert.True(azureShard.CommitedBlockCount > blockCountBefore, "Expected metadata update to flush pending batch"); // Verify metadata was updated - var props = await azureShard.BlobClient.GetPropertiesAsync(); + var props = await azureShard.BlobClient.GetPropertiesAsync(cancellationToken: cancellationToken); Assert.True(props.Value.Metadata.ContainsKey("Updated")); Assert.Equal("true", props.Value.Metadata["Updated"]); @@ -283,19 +288,18 @@ public async Task AzureStorageJobShard_MetadataOperationsBreakBatches() StorageOptions.Value.BatchFlushInterval = TimeSpan.FromMilliseconds(100); var newManager = CreateManager(newSiloAddress); - var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), CancellationToken.None); + var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken); Assert.Single(shards); var consumedJobs = new List(); - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); - await foreach (var jobCtx in shards[0].ConsumeDurableJobsAsync().WithCancellation(cts.Token)) + await foreach (var jobCtx in shards[0].ConsumeDurableJobsAsync().WithCancellation(cancellationToken)) { consumedJobs.Add(jobCtx.Job.Name); - await shards[0].RemoveJobAsync(jobCtx.Job.Id, CancellationToken.None); + await shards[0].RemoveJobAsync(jobCtx.Job.Id, cancellationToken); } Assert.Equal(5, consumedJobs.Count); - await newManager.UnregisterShardAsync(shards[0], CancellationToken.None); + await newManager.UnregisterShardAsync(shards[0], cancellationToken); } public class InMemoryClusterMembershipService : IClusterMembershipService diff --git a/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardManagerTests.cs b/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardManagerTests.cs index 3d32a6ec460..135103074c3 100644 --- a/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardManagerTests.cs +++ b/test/Extensions/TesterAzureUtils/DurableJobs/AzureStorageJobShardManagerTests.cs @@ -62,48 +62,66 @@ public async ValueTask DisposeAsync() /// This test is delegated to the runner for reuse across providers. /// [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] - public Task AzureStorageJobShardManager_Creation_Assignation() - => _runner.ShardCreationAndAssignment(); + public async Task AzureStorageJobShardManager_Creation_Assignation() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ShardCreationAndAssignment(cts.Token); + } /// /// Tests reading and consuming jobs from a frozen shard after ownership transfer. /// This test is delegated to the runner for reuse across providers. /// [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] - public Task AzureStorageJobShardManager_ReadFrozenShard() - => _runner.ReadFrozenShard(); + public async Task AzureStorageJobShardManager_ReadFrozenShard() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ReadFrozenShard(cts.Token); + } /// /// Tests consuming jobs from a live shard. /// This test is delegated to the runner for reuse across providers. /// [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] - public Task AzureStorageJobShardManager_LiveShard() - => _runner.LiveShard(); + public async Task AzureStorageJobShardManager_LiveShard() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.LiveShard(cts.Token); + } /// /// Tests job metadata persistence across ownership transfers. /// This test is delegated to the runner for reuse across providers. /// [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] - public Task AzureStorageJobShardManager_JobMetadata() - => _runner.JobMetadata(); + public async Task AzureStorageJobShardManager_JobMetadata() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobMetadata(cts.Token); + } /// /// Tests concurrent shard assignment to verify ownership conflict resolution. /// This test is delegated to the runner for reuse across providers. /// [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] - public Task AzureStorageJobShardManager_ConcurrentShardAssignment_OwnershipConflicts() - => _runner.ConcurrentShardAssignment_OwnershipConflicts(); + public async Task AzureStorageJobShardManager_ConcurrentShardAssignment_OwnershipConflicts() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ConcurrentShardAssignment_OwnershipConflicts(cts.Token); + } /// /// Tests shard metadata preservation across ownership transfers. /// This test is delegated to the runner for reuse across providers. /// [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] - public Task AzureStorageJobShardManager_ShardMetadataMerge() - => _runner.ShardMetadataMerge(); + public async Task AzureStorageJobShardManager_ShardMetadataMerge() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ShardMetadataMerge(cts.Token); + } #endregion @@ -112,38 +130,53 @@ public Task AzureStorageJobShardManager_ShardMetadataMerge() /// This test is delegated to the runner for reuse across providers. /// [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] - public Task AzureStorageJobShardManager_StopProcessingShard() - => _runner.StopProcessingShard(); + public async Task AzureStorageJobShardManager_StopProcessingShard() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.StopProcessingShard(cts.Token); + } /// /// Tests retrying a job with a new due time. /// This test is delegated to the runner for reuse across providers. /// [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] - public Task AzureStorageJobShardManager_RetryJobLater() - => _runner.RetryJobLater(); + public async Task AzureStorageJobShardManager_RetryJobLater() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.RetryJobLater(cts.Token); + } /// /// Tests job cancellation before and during processing. /// This test is delegated to the runner for reuse across providers. /// [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] - public Task AzureStorageJobShardManager_JobCancellation() - => _runner.JobCancellation(); + public async Task AzureStorageJobShardManager_JobCancellation() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobCancellation(cts.Token); + } /// /// Tests that multiple shard registrations with the same time range produce unique IDs. /// This test is delegated to the runner for reuse across providers. /// [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] - public Task AzureStorageJobShardManager_ShardRegistrationRetry_IdCollisions() - => _runner.ShardRegistrationRetry_IdCollisions(); + public async Task AzureStorageJobShardManager_ShardRegistrationRetry_IdCollisions() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ShardRegistrationRetry_IdCollisions(cts.Token); + } /// /// Tests that unregistering a shard with remaining jobs preserves the shard for reassignment. /// This test is delegated to the runner for reuse across providers. /// [SkippableFact, TestCategory("Azure"), TestCategory("Functional")] - public Task AzureStorageJobShardManager_UnregisterShard_WithJobsRemaining() - => _runner.UnregisterShard_WithJobsRemaining(); + public async Task AzureStorageJobShardManager_UnregisterShard_WithJobsRemaining() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.UnregisterShard_WithJobsRemaining(cts.Token); + } } diff --git a/test/Tester/DurableJobs/DurableJobTestsRunner.cs b/test/Tester/DurableJobs/DurableJobTestsRunner.cs index bbaf6f660b6..f94a8ac1ae0 100644 --- a/test/Tester/DurableJobs/DurableJobTestsRunner.cs +++ b/test/Tester/DurableJobs/DurableJobTestsRunner.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Orleans; using Orleans.Internal; @@ -23,7 +24,7 @@ public DurableJobTestsRunner(IGrainFactory grainFactory) _grainFactory = grainFactory; } - public async Task DurableJobGrain() + public async Task DurableJobGrain(CancellationToken cancellationToken) { var grain = _grainFactory.GetGrain("test-job-grain"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(5); @@ -53,7 +54,7 @@ public async Task DurableJobGrain() Assert.False(await grain.HasJobRan(canceledJob.Id)); } - public async Task JobExecutionOrder() + public async Task JobExecutionOrder(CancellationToken cancellationToken) { var grain = _grainFactory.GetGrain("test-execution-order"); var baseTime = DateTimeOffset.UtcNow.AddSeconds(2); @@ -74,7 +75,7 @@ public async Task JobExecutionOrder() Assert.True(time2 < time3, $"Job2 executed at {time2}, Job3 at {time3}"); } - public async Task PastDueTime() + public async Task PastDueTime(CancellationToken cancellationToken) { var grain = _grainFactory.GetGrain("test-past-due"); var pastTime = DateTimeOffset.UtcNow.AddSeconds(-5); @@ -86,7 +87,7 @@ public async Task PastDueTime() Assert.True(await grain.HasJobRan(job.Id)); } - public async Task JobWithMetadata() + public async Task JobWithMetadata(CancellationToken cancellationToken) { var grain = _grainFactory.GetGrain("test-metadata"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(3); @@ -113,7 +114,7 @@ public async Task JobWithMetadata() Assert.Equal("user123", context.Job.Metadata["UserId"]); } - public async Task MultipleGrains() + public async Task MultipleGrains(CancellationToken cancellationToken) { var grain1 = _grainFactory.GetGrain("test-grain-1"); var grain2 = _grainFactory.GetGrain("test-grain-2"); @@ -137,7 +138,7 @@ public async Task MultipleGrains() Assert.False(await grain3.HasJobRan(job1.Id)); } - public async Task DuplicateJobNames() + public async Task DuplicateJobNames(CancellationToken cancellationToken) { var grain = _grainFactory.GetGrain("test-duplicate-names"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(3); @@ -163,7 +164,7 @@ public async Task DuplicateJobNames() Assert.True(await grain.HasJobRan(job3.Id)); } - public async Task CancelNonExistentJob() + public async Task CancelNonExistentJob(CancellationToken cancellationToken) { var grain = _grainFactory.GetGrain("test-cancel-nonexistent"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(10); @@ -186,7 +187,7 @@ public async Task CancelNonExistentJob() Assert.False(await grain.HasJobRan(fakeJob.Id)); } - public async Task CancelAlreadyExecutedJob() + public async Task CancelAlreadyExecutedJob(CancellationToken cancellationToken) { var grain = _grainFactory.GetGrain("test-cancel-executed"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(2); @@ -200,7 +201,7 @@ public async Task CancelAlreadyExecutedJob() Assert.False(cancelResult); } - public async Task ConcurrentScheduling() + public async Task ConcurrentScheduling(CancellationToken cancellationToken) { var grain = _grainFactory.GetGrain("test-concurrent"); var baseTime = DateTimeOffset.UtcNow.AddSeconds(5); @@ -226,7 +227,7 @@ public async Task ConcurrentScheduling() } } - public async Task JobPropertiesVerification() + public async Task JobPropertiesVerification(CancellationToken cancellationToken) { var grain = _grainFactory.GetGrain("test-properties"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(3); @@ -253,7 +254,7 @@ public async Task JobPropertiesVerification() Assert.NotEmpty(context.RunId); } - public async Task DequeueCount() + public async Task DequeueCount(CancellationToken cancellationToken) { var grain = _grainFactory.GetGrain("test-dequeue-count"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(3); @@ -267,7 +268,7 @@ public async Task DequeueCount() Assert.Equal(1, context.DequeueCount); } - public async Task ScheduleJobOnAnotherGrain() + public async Task ScheduleJobOnAnotherGrain(CancellationToken cancellationToken) { var schedulerGrain = _grainFactory.GetGrain("scheduler-grain"); var targetGrain = _grainFactory.GetGrain("target-grain"); @@ -289,7 +290,7 @@ public async Task ScheduleJobOnAnotherGrain() Assert.Equal("CrossGrainJob", context.Job.Name); } - public async Task JobRetry() + public async Task JobRetry(CancellationToken cancellationToken) { var grain = _grainFactory.GetGrain("retry-test-grain"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(2); diff --git a/test/Tester/DurableJobs/InMemoryJobShardManagerTests.cs b/test/Tester/DurableJobs/InMemoryJobShardManagerTests.cs index de8a3f10227..e5dbd5b3713 100644 --- a/test/Tester/DurableJobs/InMemoryJobShardManagerTests.cs +++ b/test/Tester/DurableJobs/InMemoryJobShardManagerTests.cs @@ -1,3 +1,5 @@ +using System; +using System.Threading; using System.Threading.Tasks; using Xunit; @@ -24,35 +26,79 @@ public InMemoryJobShardManagerTests() public Task DisposeAsync() => _fixture.DisposeAsync().AsTask(); [SkippableFact] - public Task InMemoryJobShardManager_ShardCreationAndAssignment() => _runner.ShardCreationAndAssignment(); + public async Task InMemoryJobShardManager_ShardCreationAndAssignment() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ShardCreationAndAssignment(cts.Token); + } [SkippableFact] - public Task InMemoryJobShardManager_ReadFrozenShard() => _runner.ReadFrozenShard(); + public async Task InMemoryJobShardManager_ReadFrozenShard() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ReadFrozenShard(cts.Token); + } [SkippableFact] - public Task InMemoryJobShardManager_LiveShard() => _runner.LiveShard(); + public async Task InMemoryJobShardManager_LiveShard() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.LiveShard(cts.Token); + } [SkippableFact] - public Task InMemoryJobShardManager_JobMetadata() => _runner.JobMetadata(); + public async Task InMemoryJobShardManager_JobMetadata() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobMetadata(cts.Token); + } [SkippableFact] - public Task InMemoryJobShardManager_ConcurrentShardAssignment_OwnershipConflicts() => _runner.ConcurrentShardAssignment_OwnershipConflicts(); + public async Task InMemoryJobShardManager_ConcurrentShardAssignment_OwnershipConflicts() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ConcurrentShardAssignment_OwnershipConflicts(cts.Token); + } [SkippableFact] - public Task InMemoryJobShardManager_ShardMetadataMerge() => _runner.ShardMetadataMerge(); + public async Task InMemoryJobShardManager_ShardMetadataMerge() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ShardMetadataMerge(cts.Token); + } [SkippableFact] - public Task InMemoryJobShardManager_StopProcessingShard() => _runner.StopProcessingShard(); + public async Task InMemoryJobShardManager_StopProcessingShard() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.StopProcessingShard(cts.Token); + } [SkippableFact] - public Task InMemoryJobShardManager_RetryJobLater() => _runner.RetryJobLater(); + public async Task InMemoryJobShardManager_RetryJobLater() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.RetryJobLater(cts.Token); + } [SkippableFact] - public Task InMemoryJobShardManager_JobCancellation() => _runner.JobCancellation(); + public async Task InMemoryJobShardManager_JobCancellation() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobCancellation(cts.Token); + } [SkippableFact] - public Task InMemoryJobShardManager_ShardRegistrationRetry_IdCollisions() => _runner.ShardRegistrationRetry_IdCollisions(); + public async Task InMemoryJobShardManager_ShardRegistrationRetry_IdCollisions() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ShardRegistrationRetry_IdCollisions(cts.Token); + } [SkippableFact] - public Task InMemoryJobShardManager_UnregisterShard_WithJobsRemaining() => _runner.UnregisterShard_WithJobsRemaining(); + public async Task InMemoryJobShardManager_UnregisterShard_WithJobsRemaining() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.UnregisterShard_WithJobsRemaining(cts.Token); + } } diff --git a/test/Tester/DurableJobs/JobShardManagerTestsRunner.cs b/test/Tester/DurableJobs/JobShardManagerTestsRunner.cs index 9ad0ff31534..97748dc95cc 100644 --- a/test/Tester/DurableJobs/JobShardManagerTestsRunner.cs +++ b/test/Tester/DurableJobs/JobShardManagerTestsRunner.cs @@ -54,7 +54,7 @@ private JobShardManager CreateManager(SiloAddress siloAddress) /// Tests basic shard creation and assignment workflow. /// Verifies that shards are created with unique IDs and correctly assigned to their creator silo. /// - public async Task ShardCreationAndAssignment() + public async Task ShardCreationAndAssignment(CancellationToken cancellationToken) { var silo1Address = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5000), 0); var silo2Address = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5001), 0); @@ -69,26 +69,26 @@ public async Task ShardCreationAndAssignment() // Register multiple shards and ensure they are distinct // two of them have the same time range - var shard1 = await silo1Manager.CreateShardAsync(date, maxDate, _testMetadata, CancellationToken.None); - var shard2 = await silo1Manager.CreateShardAsync(date, maxDate, _testMetadata, CancellationToken.None); - var shard3 = await silo1Manager.CreateShardAsync(date.AddHours(2), maxDate, _testMetadata, CancellationToken.None); + var shard1 = await silo1Manager.CreateShardAsync(date, maxDate, _testMetadata, cancellationToken); + var shard2 = await silo1Manager.CreateShardAsync(date, maxDate, _testMetadata, cancellationToken); + var shard3 = await silo1Manager.CreateShardAsync(date.AddHours(2), maxDate, _testMetadata, cancellationToken); Assert.Distinct([shard1.Id, shard2.Id, shard3.Id]); // All shards are now assigned to the creator silo - var assignedShards = await silo1Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(3), CancellationToken.None); + var assignedShards = await silo1Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(3), cancellationToken); Assert.Equal(3, assignedShards.Count); Assert.Contains(shard1.Id, assignedShards.Select(s => s.Id)); Assert.Contains(shard2.Id, assignedShards.Select(s => s.Id)); Assert.Contains(shard3.Id, assignedShards.Select(s => s.Id)); - var emptyShards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(3), CancellationToken.None); + var emptyShards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(3), cancellationToken); Assert.Empty(emptyShards); // Mark the local silo as dead SetSiloStatus(silo1Address, SiloStatus.Dead); // Now we can take over all three shards - var shards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(3), CancellationToken.None); + var shards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(3), cancellationToken); Assert.Equal(3, shards.Count); Assert.Contains(shard1.Id, shards.Select(s => s.Id)); Assert.Contains(shard2.Id, shards.Select(s => s.Id)); @@ -100,14 +100,14 @@ public async Task ShardCreationAndAssignment() var silo3Manager = CreateManager(silo3Address); // No unassigned shards - Assert.Empty(await silo3Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), CancellationToken.None)); + Assert.Empty(await silo3Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken)); } /// /// Tests reading and consuming jobs from a shard after ownership transfer. /// Verifies that jobs are preserved during failover and can be consumed by the new owner. /// - public async Task ReadFrozenShard() + public async Task ReadFrozenShard(CancellationToken cancellationToken) { var silo1Address = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5000), 0); var silo2Address = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5001), 0); @@ -117,42 +117,41 @@ public async Task ReadFrozenShard() var silo2Manager = CreateManager(silo2Address); var date = DateTime.UtcNow; - var shard1 = await silo1Manager.CreateShardAsync(date, date.AddHours(1), _testMetadata, CancellationToken.None); + var shard1 = await silo1Manager.CreateShardAsync(date, date.AddHours(1), _testMetadata, cancellationToken); // Schedule some jobs - await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", date.AddSeconds(1), null, CancellationToken.None); - await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job3", date.AddSeconds(3), null, CancellationToken.None); - await shard1.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job2", date.AddSeconds(2), null, CancellationToken.None); - await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job4", date.AddSeconds(4), null, CancellationToken.None); + await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", date.AddSeconds(1), null, cancellationToken); + await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job3", date.AddSeconds(3), null, cancellationToken); + await shard1.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job2", date.AddSeconds(2), null, cancellationToken); + await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job4", date.AddSeconds(4), null, cancellationToken); // Mark the silo1 as dead, and create a new incarnation SetSiloStatus(silo1Address, SiloStatus.Dead); // Take over the shard - var shards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), CancellationToken.None); + var shards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken); Assert.Single(shards); shard1 = shards[0]; var counter = 1; - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); - await foreach (var jobCtx in shard1.ConsumeDurableJobsAsync().WithCancellation(cts.Token)) + await foreach (var jobCtx in shard1.ConsumeDurableJobsAsync().WithCancellation(cancellationToken)) { Assert.Equal($"job{counter}", jobCtx.Job.Name); - await shard1.RemoveJobAsync(jobCtx.Job.Id, cts.Token); + await shard1.RemoveJobAsync(jobCtx.Job.Id, cancellationToken); counter++; } Assert.Equal(5, counter); - await silo2Manager.UnregisterShardAsync(shard1, CancellationToken.None); + await silo2Manager.UnregisterShardAsync(shard1, cancellationToken); // No unassigned shards - Assert.Empty(await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), CancellationToken.None)); + Assert.Empty(await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken)); } /// /// Tests consuming jobs from a live shard (one that continues to accept new jobs). /// Verifies job scheduling, consumption, and cancellation during processing. /// - public async Task LiveShard() + public async Task LiveShard(CancellationToken cancellationToken) { var startTime = DateTime.UtcNow; var localAddress = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5000), 0); @@ -160,37 +159,36 @@ public async Task LiveShard() var manager = CreateManager(localAddress); var date = DateTime.UtcNow; - var shard1 = await manager.CreateShardAsync(date, date.AddYears(1), _testMetadata, CancellationToken.None); + var shard1 = await manager.CreateShardAsync(date, date.AddYears(1), _testMetadata, cancellationToken); // Schedule some jobs - await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job0", startTime.AddSeconds(1), null, CancellationToken.None); - await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job2", startTime.AddSeconds(3), null, CancellationToken.None); - await shard1.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job1", startTime.AddSeconds(2), null, CancellationToken.None); - var lastJob = await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job3", startTime.AddSeconds(4), null, CancellationToken.None); - var jobToCancel = await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job4", startTime.AddSeconds(5), null, CancellationToken.None); + await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job0", startTime.AddSeconds(1), null, cancellationToken); + await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job2", startTime.AddSeconds(3), null, cancellationToken); + await shard1.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job1", startTime.AddSeconds(2), null, cancellationToken); + var lastJob = await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job3", startTime.AddSeconds(4), null, cancellationToken); + var jobToCancel = await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job4", startTime.AddSeconds(5), null, cancellationToken); var counter = 0; - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await shard1.MarkAsCompleteAsync(CancellationToken.None); - await shard1.RemoveJobAsync(jobToCancel.Id, CancellationToken.None); - await foreach (var jobCtx in shard1.ConsumeDurableJobsAsync().WithCancellation(cts.Token)) + await shard1.MarkAsCompleteAsync(cancellationToken); + await shard1.RemoveJobAsync(jobToCancel.Id, cancellationToken); + await foreach (var jobCtx in shard1.ConsumeDurableJobsAsync().WithCancellation(cancellationToken)) { Assert.Equal($"job{counter}", jobCtx.Job.Name); - await shard1.RemoveJobAsync(jobCtx.Job.Id, CancellationToken.None); + await shard1.RemoveJobAsync(jobCtx.Job.Id, cancellationToken); counter++; } Assert.Equal(4, counter); Assert.True(lastJob.DueTime <= DateTimeOffset.UtcNow); - await manager.UnregisterShardAsync(shard1, CancellationToken.None); + await manager.UnregisterShardAsync(shard1, cancellationToken); // No unassigned shards - Assert.Empty(await manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), CancellationToken.None)); + Assert.Empty(await manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken)); } /// /// Tests job metadata persistence and retrieval across shard ownership transfer. /// - public async Task JobMetadata() + public async Task JobMetadata(CancellationToken cancellationToken) { // Initialize 2 silos with two managers var silo1Address = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5000), 0); @@ -202,7 +200,7 @@ public async Task JobMetadata() var silo2Manager = CreateManager(silo2Address); var date = DateTime.UtcNow; - var shard = await silo1Manager.CreateShardAsync(date, date.AddYears(1), _testMetadata, CancellationToken.None); + var shard = await silo1Manager.CreateShardAsync(date, date.AddYears(1), _testMetadata, cancellationToken); // Schedule jobs with different metadata on a single shard var jobMetadata1 = new Dictionary @@ -217,9 +215,9 @@ public async Task JobMetadata() { "Category", "Notification" } }; - var job1 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", DateTime.UtcNow.AddSeconds(1), jobMetadata1, CancellationToken.None); - var job2 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job2", DateTime.UtcNow.AddSeconds(2), jobMetadata2, CancellationToken.None); - var job3 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target3"), "job3", DateTime.UtcNow.AddSeconds(3), null, CancellationToken.None); + var job1 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", DateTime.UtcNow.AddSeconds(1), jobMetadata1, cancellationToken); + var job2 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job2", DateTime.UtcNow.AddSeconds(2), jobMetadata2, cancellationToken); + var job3 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target3"), "job3", DateTime.UtcNow.AddSeconds(3), null, cancellationToken); // Verify metadata is set on the durable jobs Assert.Equal(jobMetadata1, job1.Metadata); @@ -230,17 +228,16 @@ public async Task JobMetadata() SetSiloStatus(silo1Address, SiloStatus.Dead); // Take over the shard with the other silo - var shards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), CancellationToken.None); + var shards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken); Assert.Single(shards); shard = shards[0]; // Consume jobs and verify metadata is preserved var consumedJobs = new List(); - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await foreach (var jobCtx in shard.ConsumeDurableJobsAsync().WithCancellation(cts.Token)) + await foreach (var jobCtx in shard.ConsumeDurableJobsAsync().WithCancellation(cancellationToken)) { consumedJobs.Add(jobCtx.Job); - await shard.RemoveJobAsync(jobCtx.Job.Id, CancellationToken.None); + await shard.RemoveJobAsync(jobCtx.Job.Id, cancellationToken); } Assert.Equal(3, consumedJobs.Count); @@ -253,13 +250,13 @@ public async Task JobMetadata() Assert.Equal(jobMetadata2, consumedJob2.Metadata); Assert.Null(consumedJob3.Metadata); - await silo2Manager.UnregisterShardAsync(shard, CancellationToken.None); + await silo2Manager.UnregisterShardAsync(shard, cancellationToken); } /// /// Tests concurrent shard assignment to verify that only one silo can claim ownership of an orphaned shard. /// - public async Task ConcurrentShardAssignment_OwnershipConflicts() + public async Task ConcurrentShardAssignment_OwnershipConflicts(CancellationToken cancellationToken) { // Initialize 3 silos with 3 managers var silo1Address = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5000), 0); @@ -276,15 +273,15 @@ public async Task ConcurrentShardAssignment_OwnershipConflicts() var date = DateTime.UtcNow; // Create two shards on the first silo - var shard1 = await silo1Manager.CreateShardAsync(date, date.AddHours(1), _testMetadata, CancellationToken.None); - var shard2 = await silo1Manager.CreateShardAsync(date, date.AddHours(2), _testMetadata, CancellationToken.None); + var shard1 = await silo1Manager.CreateShardAsync(date, date.AddHours(1), _testMetadata, cancellationToken); + var shard2 = await silo1Manager.CreateShardAsync(date, date.AddHours(2), _testMetadata, cancellationToken); // Mark the first silo as dead SetSiloStatus(silo1Address, SiloStatus.Dead); // Concurrently try to assign shards from silo2 and silo3 - var assignTask2 = silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(3), CancellationToken.None); - var assignTask3 = silo3Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(3), CancellationToken.None); + var assignTask2 = silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(3), cancellationToken); + var assignTask3 = silo3Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(3), cancellationToken); await Task.WhenAll(assignTask2, assignTask3); @@ -304,7 +301,7 @@ public async Task ConcurrentShardAssignment_OwnershipConflicts() /// /// Tests that shard metadata is correctly preserved and merged during ownership transfers. /// - public async Task ShardMetadataMerge() + public async Task ShardMetadataMerge(CancellationToken cancellationToken) { // Initialize 2 silos with 2 managers var silo1Address = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5000), 0); @@ -324,7 +321,7 @@ public async Task ShardMetadataMerge() { "TenantId", "tenant-123" } }; - var shard = await silo1Manager.CreateShardAsync(date, date.AddHours(1), customMetadata, CancellationToken.None); + var shard = await silo1Manager.CreateShardAsync(date, date.AddHours(1), customMetadata, cancellationToken); Assert.NotNull(shard.Metadata); Assert.All(customMetadata, kvp => { @@ -333,12 +330,12 @@ public async Task ShardMetadataMerge() }); // Schedule a job to ensure shard persistence - await shard.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", DateTime.UtcNow.AddSeconds(5), null, CancellationToken.None); + await shard.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", DateTime.UtcNow.AddSeconds(5), null, cancellationToken); SetSiloStatus(silo1Address, SiloStatus.Dead); // Take over the shard from silo2 and verify the metadata is preserved - var shards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), CancellationToken.None); + var shards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken); Assert.Single(shards); shard = shards[0]; @@ -353,35 +350,34 @@ public async Task ShardMetadataMerge() /// /// Tests stopping shard processing and verifying jobs remain for reassignment. /// - public async Task StopProcessingShard() + public async Task StopProcessingShard(CancellationToken cancellationToken) { var localAddress = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5000), 0); SetSiloStatus(localAddress, SiloStatus.Active); var manager = CreateManager(localAddress); var date = DateTime.UtcNow; - var shard1 = await manager.CreateShardAsync(date, date.AddYears(1), _testMetadata, CancellationToken.None); + var shard1 = await manager.CreateShardAsync(date, date.AddYears(1), _testMetadata, cancellationToken); // Schedule some jobs - await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", DateTime.UtcNow.AddSeconds(5), null, CancellationToken.None); - await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job3", DateTime.UtcNow.AddSeconds(10), null, CancellationToken.None); - await shard1.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job2", DateTime.UtcNow.AddSeconds(6), null, CancellationToken.None); - await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job4", DateTime.UtcNow.AddSeconds(15), null, CancellationToken.None); + await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", DateTime.UtcNow.AddSeconds(5), null, cancellationToken); + await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job3", DateTime.UtcNow.AddSeconds(10), null, cancellationToken); + await shard1.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job2", DateTime.UtcNow.AddSeconds(6), null, cancellationToken); + await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job4", DateTime.UtcNow.AddSeconds(15), null, cancellationToken); var counter = 1; - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(40)); - await foreach (var jobCtx in shard1.ConsumeDurableJobsAsync().WithCancellation(cts.Token)) + await foreach (var jobCtx in shard1.ConsumeDurableJobsAsync().WithCancellation(cancellationToken)) { Assert.Equal($"job{counter}", jobCtx.Job.Name); if (counter == 2) break; - await shard1.RemoveJobAsync(jobCtx.Job.Id, CancellationToken.None); + await shard1.RemoveJobAsync(jobCtx.Job.Id, cancellationToken); counter++; } Assert.Equal(2, counter); - await manager.UnregisterShardAsync(shard1, CancellationToken.None); + await manager.UnregisterShardAsync(shard1, cancellationToken); - var shards = await manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), CancellationToken.None); + var shards = await manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken); Assert.Single(shards); Assert.Equal(shard1.Id, shards[0].Id); } @@ -389,41 +385,40 @@ public async Task StopProcessingShard() /// /// Tests retrying a job with a new due time. /// - public async Task RetryJobLater() + public async Task RetryJobLater(CancellationToken cancellationToken) { var localAddress = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5000), 0); SetSiloStatus(localAddress, SiloStatus.Active); var manager = CreateManager(localAddress); var date = DateTime.UtcNow; - var shard1 = await manager.CreateShardAsync(date, date.AddYears(1), _testMetadata, CancellationToken.None); + var shard1 = await manager.CreateShardAsync(date, date.AddYears(1), _testMetadata, cancellationToken); // Schedule a job - var job = await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", DateTime.UtcNow.AddSeconds(1), null, CancellationToken.None); - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(40)); - await foreach (var jobCtx in shard1.ConsumeDurableJobsAsync().WithCancellation(cts.Token)) + var job = await shard1.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", DateTime.UtcNow.AddSeconds(1), null, cancellationToken); + await foreach (var jobCtx in shard1.ConsumeDurableJobsAsync().WithCancellation(cancellationToken)) { Assert.Equal("job1", jobCtx.Job.Name); var newDueTime = DateTimeOffset.UtcNow.AddSeconds(1); - await shard1.RetryJobLaterAsync(jobCtx, newDueTime, CancellationToken.None); + await shard1.RetryJobLaterAsync(jobCtx, newDueTime, cancellationToken); break; } // Consume again - await foreach (var jobCtx in shard1.ConsumeDurableJobsAsync().WithCancellation(cts.Token)) + await foreach (var jobCtx in shard1.ConsumeDurableJobsAsync().WithCancellation(cancellationToken)) { Assert.Equal("job1", jobCtx.Job.Name); Assert.NotEqual(job.DueTime, jobCtx.Job.DueTime); - await shard1.RemoveJobAsync(jobCtx.Job.Id, CancellationToken.None); + await shard1.RemoveJobAsync(jobCtx.Job.Id, cancellationToken); break; } - await manager.UnregisterShardAsync(shard1, CancellationToken.None); + await manager.UnregisterShardAsync(shard1, cancellationToken); } /// /// Tests job cancellation before and during processing. /// - public async Task JobCancellation() + public async Task JobCancellation(CancellationToken cancellationToken) { // Initialize 2 silos with two managers var silo1Address = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5000), 0); @@ -435,32 +430,31 @@ public async Task JobCancellation() var silo2Manager = CreateManager(silo2Address); var date = DateTime.UtcNow; - var shard = await silo1Manager.CreateShardAsync(date, date.AddYears(1), _testMetadata, CancellationToken.None); + var shard = await silo1Manager.CreateShardAsync(date, date.AddYears(1), _testMetadata, cancellationToken); // Schedule multiple jobs in a single shard - var job1 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", DateTime.UtcNow.AddMilliseconds(500), null, CancellationToken.None); - var job2 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job2", DateTime.UtcNow.AddMilliseconds(1000), null, CancellationToken.None); - var job3 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target3"), "job3", DateTime.UtcNow.AddMilliseconds(1500), null, CancellationToken.None); - var job4 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target4"), "job4", DateTime.UtcNow.AddMilliseconds(2000), null, CancellationToken.None); + var job1 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", DateTime.UtcNow.AddMilliseconds(500), null, cancellationToken); + var job2 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job2", DateTime.UtcNow.AddMilliseconds(1000), null, cancellationToken); + var job3 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target3"), "job3", DateTime.UtcNow.AddMilliseconds(1500), null, cancellationToken); + var job4 = await shard.TryScheduleJobAsync(GrainId.Create("type", "target4"), "job4", DateTime.UtcNow.AddMilliseconds(2000), null, cancellationToken); // Cancel job2 before processing starts - await shard.RemoveJobAsync(job2.Id, CancellationToken.None); + await shard.RemoveJobAsync(job2.Id, cancellationToken); // Start consuming jobs var consumedJobs = new List(); - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await foreach (var jobCtx in shard.ConsumeDurableJobsAsync().WithCancellation(cts.Token)) + await foreach (var jobCtx in shard.ConsumeDurableJobsAsync().WithCancellation(cancellationToken)) { consumedJobs.Add(jobCtx.Job.Name); // Cancel job4 during processing (after job1 is consumed) if (jobCtx.Job.Name == "job1") { - await shard.RemoveJobAsync(job4.Id, CancellationToken.None); + await shard.RemoveJobAsync(job4.Id, cancellationToken); } - await shard.RemoveJobAsync(jobCtx.Job.Id, CancellationToken.None); + await shard.RemoveJobAsync(jobCtx.Job.Id, cancellationToken); if (consumedJobs.Count >= 2) { @@ -478,12 +472,13 @@ public async Task JobCancellation() // Mark the shard owner silo as dead and reassign to verify cancelled jobs are not in storage SetSiloStatus(silo1Address, SiloStatus.Dead); - var shards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), CancellationToken.None); + var shards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken); Assert.Single(shards); shard = shards[0]; var hasJobs = false; - cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(TimeSpan.FromSeconds(5)); await foreach (var jobCtx in shard.ConsumeDurableJobsAsync().WithCancellation(cts.Token)) { hasJobs = true; @@ -491,13 +486,13 @@ public async Task JobCancellation() } Assert.False(hasJobs); - await silo2Manager.UnregisterShardAsync(shard, CancellationToken.None); + await silo2Manager.UnregisterShardAsync(shard, cancellationToken); } /// /// Tests that multiple shard registrations with the same time range produce unique IDs. /// - public async Task ShardRegistrationRetry_IdCollisions() + public async Task ShardRegistrationRetry_IdCollisions(CancellationToken cancellationToken) { var localAddress = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5000), 0); SetSiloStatus(localAddress, SiloStatus.Active); @@ -506,9 +501,9 @@ public async Task ShardRegistrationRetry_IdCollisions() var date = DateTime.UtcNow; - var shard1 = await manager.CreateShardAsync(date, date.AddHours(1), _testMetadata, CancellationToken.None); - var shard2 = await manager.CreateShardAsync(date, date.AddHours(1), _testMetadata, CancellationToken.None); - var shard3 = await manager.CreateShardAsync(date, date.AddHours(1), _testMetadata, CancellationToken.None); + var shard1 = await manager.CreateShardAsync(date, date.AddHours(1), _testMetadata, cancellationToken); + var shard2 = await manager.CreateShardAsync(date, date.AddHours(1), _testMetadata, cancellationToken); + var shard3 = await manager.CreateShardAsync(date, date.AddHours(1), _testMetadata, cancellationToken); Assert.Distinct([shard1.Id, shard2.Id, shard3.Id]); } @@ -516,7 +511,7 @@ public async Task ShardRegistrationRetry_IdCollisions() /// /// Tests that unregistering a shard with remaining jobs preserves the shard for reassignment. /// - public async Task UnregisterShard_WithJobsRemaining() + public async Task UnregisterShard_WithJobsRemaining(CancellationToken cancellationToken) { // Initialize 2 silos with 2 managers var silo1Address = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, 5000), 0); @@ -528,34 +523,33 @@ public async Task UnregisterShard_WithJobsRemaining() var silo2Manager = CreateManager(silo2Address); var date = DateTime.UtcNow; - var shard = await silo1Manager.CreateShardAsync(date, date.AddHours(1), _testMetadata, CancellationToken.None); + var shard = await silo1Manager.CreateShardAsync(date, date.AddHours(1), _testMetadata, cancellationToken); // Create a shard on silo1, schedule some jobs, then unregister the shard - await shard.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", DateTime.UtcNow.AddSeconds(1), null, CancellationToken.None); - await shard.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job2", DateTime.UtcNow.AddSeconds(2), null, CancellationToken.None); + await shard.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", DateTime.UtcNow.AddSeconds(1), null, cancellationToken); + await shard.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job2", DateTime.UtcNow.AddSeconds(2), null, cancellationToken); - await silo1Manager.UnregisterShardAsync(shard, CancellationToken.None); + await silo1Manager.UnregisterShardAsync(shard, cancellationToken); // The shard should NOT have been deleted since there were jobs remaining SetSiloStatus(silo1Address, SiloStatus.Dead); // Take over the shard from silo2 and consume the jobs - var shards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), CancellationToken.None); + var shards = await silo2Manager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken); Assert.Single(shards); Assert.Equal(shard.Id, shards[0].Id); var consumedJobs = new List(); - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); - await foreach (var jobCtx in shards[0].ConsumeDurableJobsAsync().WithCancellation(cts.Token)) + await foreach (var jobCtx in shards[0].ConsumeDurableJobsAsync().WithCancellation(cancellationToken)) { consumedJobs.Add(jobCtx.Job.Name); - await shards[0].RemoveJobAsync(jobCtx.Job.Id, CancellationToken.None); + await shards[0].RemoveJobAsync(jobCtx.Job.Id, cancellationToken); } Assert.Equal(2, consumedJobs.Count); Assert.Contains("job1", consumedJobs); Assert.Contains("job2", consumedJobs); - await silo2Manager.UnregisterShardAsync(shards[0], CancellationToken.None); + await silo2Manager.UnregisterShardAsync(shards[0], cancellationToken); } /// From 436f30ff2686b4f73d6f8e36d660dffff8d4ea12 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Feb 2026 13:25:39 +0000 Subject: [PATCH 2/2] Use .WaitAsync(cancellationToken) in DurableJobTestsRunner to observe cancellation token Replace .WithTimeout() calls with .WaitAsync(cancellationToken) on all grain interface calls so the cancellation token is actively observed. This ensures the 2-minute test timeout properly cancels hung operations. Co-authored-by: benjaminpetit <20427417+benjaminpetit@users.noreply.github.com> --- .../DurableJobs/DurableJobTestsRunner.cs | 147 +++++++++--------- 1 file changed, 73 insertions(+), 74 deletions(-) diff --git a/test/Tester/DurableJobs/DurableJobTestsRunner.cs b/test/Tester/DurableJobs/DurableJobTestsRunner.cs index f94a8ac1ae0..9df6f4a3cba 100644 --- a/test/Tester/DurableJobs/DurableJobTestsRunner.cs +++ b/test/Tester/DurableJobs/DurableJobTestsRunner.cs @@ -4,7 +4,6 @@ using System.Threading; using System.Threading.Tasks; using Orleans; -using Orleans.Internal; using Orleans.DurableJobs; using Xunit; using UnitTests.GrainInterfaces; @@ -28,22 +27,22 @@ public async Task DurableJobGrain(CancellationToken cancellationToken) { var grain = _grainFactory.GetGrain("test-job-grain"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(5); - var job1 = await grain.ScheduleJobAsync("TestJob", dueTime); + var job1 = await grain.ScheduleJobAsync("TestJob", dueTime).WaitAsync(cancellationToken); Assert.NotNull(job1); Assert.Equal("TestJob", job1.Name); Assert.Equal(dueTime, job1.DueTime); - var job2 = await grain.ScheduleJobAsync("TestJob2", dueTime); - var job3 = await grain.ScheduleJobAsync("TestJob3", dueTime.AddSeconds(4)); - var job4 = await grain.ScheduleJobAsync("TestJob4", dueTime); - var job5 = await grain.ScheduleJobAsync("TestJob5", dueTime.AddSeconds(1)); - var canceledJob = await grain.ScheduleJobAsync("CanceledJob", dueTime.AddSeconds(2)); - Assert.True(await grain.TryCancelJobAsync(canceledJob)); + var job2 = await grain.ScheduleJobAsync("TestJob2", dueTime).WaitAsync(cancellationToken); + var job3 = await grain.ScheduleJobAsync("TestJob3", dueTime.AddSeconds(4)).WaitAsync(cancellationToken); + var job4 = await grain.ScheduleJobAsync("TestJob4", dueTime).WaitAsync(cancellationToken); + var job5 = await grain.ScheduleJobAsync("TestJob5", dueTime.AddSeconds(1)).WaitAsync(cancellationToken); + var canceledJob = await grain.ScheduleJobAsync("CanceledJob", dueTime.AddSeconds(2)).WaitAsync(cancellationToken); + Assert.True(await grain.TryCancelJobAsync(canceledJob).WaitAsync(cancellationToken)); // Wait for the job to run foreach (var job in new[] { job1, job2, job3, job4, job5 }) { try { - await grain.WaitForJobToRun(job.Id).WithTimeout(TimeSpan.FromSeconds(10)); + await grain.WaitForJobToRun(job.Id).WaitAsync(cancellationToken); } catch (TimeoutException) { @@ -51,7 +50,7 @@ public async Task DurableJobGrain(CancellationToken cancellationToken) } } // Verify the canceled job did not run - Assert.False(await grain.HasJobRan(canceledJob.Id)); + Assert.False(await grain.HasJobRan(canceledJob.Id).WaitAsync(cancellationToken)); } public async Task JobExecutionOrder(CancellationToken cancellationToken) @@ -59,17 +58,17 @@ public async Task JobExecutionOrder(CancellationToken cancellationToken) var grain = _grainFactory.GetGrain("test-execution-order"); var baseTime = DateTimeOffset.UtcNow.AddSeconds(2); - var job1 = await grain.ScheduleJobAsync("FirstJob", baseTime); - var job2 = await grain.ScheduleJobAsync("SecondJob", baseTime.AddSeconds(2)); - var job3 = await grain.ScheduleJobAsync("ThirdJob", baseTime.AddSeconds(4)); + var job1 = await grain.ScheduleJobAsync("FirstJob", baseTime).WaitAsync(cancellationToken); + var job2 = await grain.ScheduleJobAsync("SecondJob", baseTime.AddSeconds(2)).WaitAsync(cancellationToken); + var job3 = await grain.ScheduleJobAsync("ThirdJob", baseTime.AddSeconds(4)).WaitAsync(cancellationToken); - await grain.WaitForJobToRun(job1.Id).WithTimeout(TimeSpan.FromSeconds(10)); - await grain.WaitForJobToRun(job2.Id).WithTimeout(TimeSpan.FromSeconds(10)); - await grain.WaitForJobToRun(job3.Id).WithTimeout(TimeSpan.FromSeconds(10)); + await grain.WaitForJobToRun(job1.Id).WaitAsync(cancellationToken); + await grain.WaitForJobToRun(job2.Id).WaitAsync(cancellationToken); + await grain.WaitForJobToRun(job3.Id).WaitAsync(cancellationToken); - var time1 = await grain.GetJobExecutionTime(job1.Id); - var time2 = await grain.GetJobExecutionTime(job2.Id); - var time3 = await grain.GetJobExecutionTime(job3.Id); + var time1 = await grain.GetJobExecutionTime(job1.Id).WaitAsync(cancellationToken); + var time2 = await grain.GetJobExecutionTime(job2.Id).WaitAsync(cancellationToken); + var time3 = await grain.GetJobExecutionTime(job3.Id).WaitAsync(cancellationToken); Assert.True(time1 < time2, $"Job1 executed at {time1}, Job2 at {time2}"); Assert.True(time2 < time3, $"Job2 executed at {time2}, Job3 at {time3}"); @@ -80,11 +79,11 @@ public async Task PastDueTime(CancellationToken cancellationToken) var grain = _grainFactory.GetGrain("test-past-due"); var pastTime = DateTimeOffset.UtcNow.AddSeconds(-5); - var job = await grain.ScheduleJobAsync("PastDueJob", pastTime); + var job = await grain.ScheduleJobAsync("PastDueJob", pastTime).WaitAsync(cancellationToken); Assert.NotNull(job); - await grain.WaitForJobToRun(job.Id).WithTimeout(TimeSpan.FromSeconds(5)); - Assert.True(await grain.HasJobRan(job.Id)); + await grain.WaitForJobToRun(job.Id).WaitAsync(cancellationToken); + Assert.True(await grain.HasJobRan(job.Id).WaitAsync(cancellationToken)); } public async Task JobWithMetadata(CancellationToken cancellationToken) @@ -98,7 +97,7 @@ public async Task JobWithMetadata(CancellationToken cancellationToken) ["Priority"] = "High" }; - var job = await grain.ScheduleJobAsync("MetadataJob", dueTime, metadata); + var job = await grain.ScheduleJobAsync("MetadataJob", dueTime, metadata).WaitAsync(cancellationToken); Assert.NotNull(job); Assert.NotNull(job.Metadata); Assert.Equal(3, job.Metadata.Count); @@ -106,9 +105,9 @@ public async Task JobWithMetadata(CancellationToken cancellationToken) Assert.Equal("SendEmail", job.Metadata["Action"]); Assert.Equal("High", job.Metadata["Priority"]); - await grain.WaitForJobToRun(job.Id).WithTimeout(TimeSpan.FromSeconds(10)); + await grain.WaitForJobToRun(job.Id).WaitAsync(cancellationToken); - var context = await grain.GetJobRun(job.Id); + var context = await grain.GetJobRun(job.Id).WaitAsync(cancellationToken); Assert.NotNull(context); Assert.NotNull(context.Job.Metadata); Assert.Equal("user123", context.Job.Metadata["UserId"]); @@ -121,21 +120,21 @@ public async Task MultipleGrains(CancellationToken cancellationToken) var grain3 = _grainFactory.GetGrain("test-grain-3"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(3); - var job1 = await grain1.ScheduleJobAsync("Job1", dueTime); - var job2 = await grain2.ScheduleJobAsync("Job2", dueTime); - var job3 = await grain3.ScheduleJobAsync("Job3", dueTime); + var job1 = await grain1.ScheduleJobAsync("Job1", dueTime).WaitAsync(cancellationToken); + var job2 = await grain2.ScheduleJobAsync("Job2", dueTime).WaitAsync(cancellationToken); + var job3 = await grain3.ScheduleJobAsync("Job3", dueTime).WaitAsync(cancellationToken); - await grain1.WaitForJobToRun(job1.Id).WithTimeout(TimeSpan.FromSeconds(10)); - await grain2.WaitForJobToRun(job2.Id).WithTimeout(TimeSpan.FromSeconds(10)); - await grain3.WaitForJobToRun(job3.Id).WithTimeout(TimeSpan.FromSeconds(10)); + await grain1.WaitForJobToRun(job1.Id).WaitAsync(cancellationToken); + await grain2.WaitForJobToRun(job2.Id).WaitAsync(cancellationToken); + await grain3.WaitForJobToRun(job3.Id).WaitAsync(cancellationToken); - Assert.True(await grain1.HasJobRan(job1.Id)); - Assert.True(await grain2.HasJobRan(job2.Id)); - Assert.True(await grain3.HasJobRan(job3.Id)); + Assert.True(await grain1.HasJobRan(job1.Id).WaitAsync(cancellationToken)); + Assert.True(await grain2.HasJobRan(job2.Id).WaitAsync(cancellationToken)); + Assert.True(await grain3.HasJobRan(job3.Id).WaitAsync(cancellationToken)); - Assert.False(await grain1.HasJobRan(job2.Id)); - Assert.False(await grain2.HasJobRan(job3.Id)); - Assert.False(await grain3.HasJobRan(job1.Id)); + Assert.False(await grain1.HasJobRan(job2.Id).WaitAsync(cancellationToken)); + Assert.False(await grain2.HasJobRan(job3.Id).WaitAsync(cancellationToken)); + Assert.False(await grain3.HasJobRan(job1.Id).WaitAsync(cancellationToken)); } public async Task DuplicateJobNames(CancellationToken cancellationToken) @@ -143,9 +142,9 @@ public async Task DuplicateJobNames(CancellationToken cancellationToken) var grain = _grainFactory.GetGrain("test-duplicate-names"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(3); - var job1 = await grain.ScheduleJobAsync("SameName", dueTime); - var job2 = await grain.ScheduleJobAsync("SameName", dueTime.AddSeconds(1)); - var job3 = await grain.ScheduleJobAsync("SameName", dueTime.AddSeconds(2)); + var job1 = await grain.ScheduleJobAsync("SameName", dueTime).WaitAsync(cancellationToken); + var job2 = await grain.ScheduleJobAsync("SameName", dueTime.AddSeconds(1)).WaitAsync(cancellationToken); + var job3 = await grain.ScheduleJobAsync("SameName", dueTime.AddSeconds(2)).WaitAsync(cancellationToken); Assert.NotEqual(job1.Id, job2.Id); Assert.NotEqual(job2.Id, job3.Id); @@ -155,13 +154,13 @@ public async Task DuplicateJobNames(CancellationToken cancellationToken) Assert.Equal("SameName", job2.Name); Assert.Equal("SameName", job3.Name); - await grain.WaitForJobToRun(job1.Id).WithTimeout(TimeSpan.FromSeconds(10)); - await grain.WaitForJobToRun(job2.Id).WithTimeout(TimeSpan.FromSeconds(10)); - await grain.WaitForJobToRun(job3.Id).WithTimeout(TimeSpan.FromSeconds(10)); + await grain.WaitForJobToRun(job1.Id).WaitAsync(cancellationToken); + await grain.WaitForJobToRun(job2.Id).WaitAsync(cancellationToken); + await grain.WaitForJobToRun(job3.Id).WaitAsync(cancellationToken); - Assert.True(await grain.HasJobRan(job1.Id)); - Assert.True(await grain.HasJobRan(job2.Id)); - Assert.True(await grain.HasJobRan(job3.Id)); + Assert.True(await grain.HasJobRan(job1.Id).WaitAsync(cancellationToken)); + Assert.True(await grain.HasJobRan(job2.Id).WaitAsync(cancellationToken)); + Assert.True(await grain.HasJobRan(job3.Id).WaitAsync(cancellationToken)); } public async Task CancelNonExistentJob(CancellationToken cancellationToken) @@ -169,7 +168,7 @@ public async Task CancelNonExistentJob(CancellationToken cancellationToken) var grain = _grainFactory.GetGrain("test-cancel-nonexistent"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(10); - var job = await grain.ScheduleJobAsync("RealJob", dueTime); + var job = await grain.ScheduleJobAsync("RealJob", dueTime).WaitAsync(cancellationToken); var fakeJob = new DurableJob { @@ -180,11 +179,11 @@ public async Task CancelNonExistentJob(CancellationToken cancellationToken) TargetGrainId = job.TargetGrainId }; - var cancelResult = await grain.TryCancelJobAsync(fakeJob); + var cancelResult = await grain.TryCancelJobAsync(fakeJob).WaitAsync(cancellationToken); Assert.False(cancelResult); - await Task.Delay(100); - Assert.False(await grain.HasJobRan(fakeJob.Id)); + await Task.Delay(100, cancellationToken); + Assert.False(await grain.HasJobRan(fakeJob.Id).WaitAsync(cancellationToken)); } public async Task CancelAlreadyExecutedJob(CancellationToken cancellationToken) @@ -192,12 +191,12 @@ public async Task CancelAlreadyExecutedJob(CancellationToken cancellationToken) var grain = _grainFactory.GetGrain("test-cancel-executed"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(2); - var job = await grain.ScheduleJobAsync("QuickJob", dueTime); + var job = await grain.ScheduleJobAsync("QuickJob", dueTime).WaitAsync(cancellationToken); - await grain.WaitForJobToRun(job.Id).WithTimeout(TimeSpan.FromSeconds(10)); - Assert.True(await grain.HasJobRan(job.Id)); + await grain.WaitForJobToRun(job.Id).WaitAsync(cancellationToken); + Assert.True(await grain.HasJobRan(job.Id).WaitAsync(cancellationToken)); - var cancelResult = await grain.TryCancelJobAsync(job); + var cancelResult = await grain.TryCancelJobAsync(job).WaitAsync(cancellationToken); Assert.False(cancelResult); } @@ -213,17 +212,17 @@ public async Task ConcurrentScheduling(CancellationToken cancellationToken) scheduleTasks.Add(grain.ScheduleJobAsync($"ConcurrentJob{i}", baseTime.AddMilliseconds(i * 100))); } - var jobs = await Task.WhenAll(scheduleTasks); + var jobs = await Task.WhenAll(scheduleTasks).WaitAsync(cancellationToken); Assert.Equal(jobCount, jobs.Length); Assert.Equal(jobCount, jobs.Select(j => j.Id).Distinct().Count()); - var waitTasks = jobs.Select(j => grain.WaitForJobToRun(j.Id).WithTimeout(TimeSpan.FromSeconds(15))); - await Task.WhenAll(waitTasks); + var waitTasks = jobs.Select(j => grain.WaitForJobToRun(j.Id)); + await Task.WhenAll(waitTasks).WaitAsync(cancellationToken); foreach (var job in jobs) { - Assert.True(await grain.HasJobRan(job.Id), $"Job {job.Name} did not run"); + Assert.True(await grain.HasJobRan(job.Id).WaitAsync(cancellationToken), $"Job {job.Name} did not run"); } } @@ -233,7 +232,7 @@ public async Task JobPropertiesVerification(CancellationToken cancellationToken) var dueTime = DateTimeOffset.UtcNow.AddSeconds(3); var metadata = new Dictionary { ["Key"] = "Value" }; - var job = await grain.ScheduleJobAsync("PropertyTestJob", dueTime, metadata); + var job = await grain.ScheduleJobAsync("PropertyTestJob", dueTime, metadata).WaitAsync(cancellationToken); Assert.NotNull(job.Id); Assert.NotEmpty(job.Id); @@ -244,9 +243,9 @@ public async Task JobPropertiesVerification(CancellationToken cancellationToken) Assert.NotNull(job.Metadata); Assert.Single(job.Metadata); - await grain.WaitForJobToRun(job.Id).WithTimeout(TimeSpan.FromSeconds(10)); + await grain.WaitForJobToRun(job.Id).WaitAsync(cancellationToken); - var context = await grain.GetJobRun(job.Id); + var context = await grain.GetJobRun(job.Id).WaitAsync(cancellationToken); Assert.NotNull(context); Assert.Equal(job.Id, context.Job.Id); Assert.Equal(job.Name, context.Job.Name); @@ -259,11 +258,11 @@ public async Task DequeueCount(CancellationToken cancellationToken) var grain = _grainFactory.GetGrain("test-dequeue-count"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(3); - var job = await grain.ScheduleJobAsync("DequeueTestJob", dueTime); + var job = await grain.ScheduleJobAsync("DequeueTestJob", dueTime).WaitAsync(cancellationToken); - await grain.WaitForJobToRun(job.Id).WithTimeout(TimeSpan.FromSeconds(10)); + await grain.WaitForJobToRun(job.Id).WaitAsync(cancellationToken); - var context = await grain.GetJobRun(job.Id); + var context = await grain.GetJobRun(job.Id).WaitAsync(cancellationToken); Assert.NotNull(context); Assert.Equal(1, context.DequeueCount); } @@ -274,17 +273,17 @@ public async Task ScheduleJobOnAnotherGrain(CancellationToken cancellationToken) var targetGrain = _grainFactory.GetGrain("target-grain"); var dueTime = DateTimeOffset.UtcNow.AddSeconds(3); - var job = await schedulerGrain.ScheduleJobOnAnotherGrainAsync("target-grain", "CrossGrainJob", dueTime); + var job = await schedulerGrain.ScheduleJobOnAnotherGrainAsync("target-grain", "CrossGrainJob", dueTime).WaitAsync(cancellationToken); Assert.NotNull(job); Assert.Equal("CrossGrainJob", job.Name); Assert.Equal(dueTime, job.DueTime); - await targetGrain.WaitForJobToRun(job.Id).WithTimeout(TimeSpan.FromSeconds(10)); + await targetGrain.WaitForJobToRun(job.Id).WaitAsync(cancellationToken); - Assert.True(await targetGrain.HasJobRan(job.Id)); + Assert.True(await targetGrain.HasJobRan(job.Id).WaitAsync(cancellationToken)); - var context = await targetGrain.GetJobRun(job.Id); + var context = await targetGrain.GetJobRun(job.Id).WaitAsync(cancellationToken); Assert.NotNull(context); Assert.Equal(job.Id, context.Job.Id); Assert.Equal("CrossGrainJob", context.Job.Name); @@ -299,7 +298,7 @@ public async Task JobRetry(CancellationToken cancellationToken) ["FailUntilAttempt"] = "3" }; - var job = await grain.ScheduleJobAsync("RetryJob", dueTime, metadata); + var job = await grain.ScheduleJobAsync("RetryJob", dueTime, metadata).WaitAsync(cancellationToken); Assert.NotNull(job); Assert.Equal("RetryJob", job.Name); @@ -310,20 +309,20 @@ public async Task JobRetry(CancellationToken cancellationToken) // Default retry policy: retry up to 5 times with exponential backoff (1s, 2s, 4s, 8s, 16s) // We expect 3 attempts: fail at DequeueCount=1, fail at DequeueCount=2, succeed at DequeueCount=3 // Total time: ~2s (initial) + 1s (first retry delay) + 2s (second retry delay) = ~5s - await grain.WaitForJobToSucceed(job.Id).WithTimeout(TimeSpan.FromSeconds(15)); + await grain.WaitForJobToSucceed(job.Id).WaitAsync(cancellationToken); - Assert.True(await grain.HasJobSucceeded(job.Id)); + Assert.True(await grain.HasJobSucceeded(job.Id).WaitAsync(cancellationToken)); - var attemptCount = await grain.GetJobExecutionAttemptCount(job.Id); + var attemptCount = await grain.GetJobExecutionAttemptCount(job.Id).WaitAsync(cancellationToken); Assert.Equal(3, attemptCount); - var dequeueCountHistory = await grain.GetJobDequeueCountHistory(job.Id); + var dequeueCountHistory = await grain.GetJobDequeueCountHistory(job.Id).WaitAsync(cancellationToken); Assert.Equal(3, dequeueCountHistory.Count); Assert.Equal(1, dequeueCountHistory[0]); Assert.Equal(2, dequeueCountHistory[1]); Assert.Equal(3, dequeueCountHistory[2]); - var finalContext = await grain.GetFinalJobRun(job.Id); + var finalContext = await grain.GetFinalJobRun(job.Id).WaitAsync(cancellationToken); Assert.NotNull(finalContext); Assert.Equal(3, finalContext.DequeueCount); Assert.Equal(job.Id, finalContext.Job.Id);