-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Durable Jobs (aka Reminders v2) #9717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 76 commits
Commits
Show all changes
89 commits
Select commit
Hold shift + click to select a range
451fad0
Scheduled jobs wip
benjaminpetit 4b0672f
Address comments
benjaminpetit e75606e
Add README
benjaminpetit 0039ab1
Remove ShardId from interface
benjaminpetit ba4dc2c
Split extensions
benjaminpetit 1df4b02
Add metadata
benjaminpetit cb4edde
Decouple extension
benjaminpetit 6c5a85e
More robust test
benjaminpetit e8871b0
Move context
benjaminpetit cb39739
Retry wip
benjaminpetit b7f36dd
Listen to cluster changes
benjaminpetit 2ee5990
wip
benjaminpetit 5b28fce
Add metadata
benjaminpetit 7dfcfd9
Add metadata test
benjaminpetit 5cb8609
Add cancellation test
benjaminpetit 5329033
Add comments
benjaminpetit 45df0eb
Mark test as skippable
benjaminpetit 281f800
Add logging to AzureStorageJobShardManager
benjaminpetit 9be56be
Plumb job cancellation
benjaminpetit 7194243
More consistent naming
benjaminpetit 0643e8c
Fix retry
benjaminpetit 4545d02
Add comments to InMemoryJobQueue and some unit tests
benjaminpetit 3a8807c
Add tests and fix AzureStorageJobShardManager
benjaminpetit 4248b9e
Enable basic parrallelism limit on shard processing
benjaminpetit 72b9275
keep track of running shards
benjaminpetit 79d6ac5
Add logs and comments to LocalScheduledJobManager
benjaminpetit b576c61
Merge remote-tracking branch 'dotnet/main' into wip/scheduled-jobs
benjaminpetit 2008fcc
Add READMEs
benjaminpetit 2964e3a
Add READMEs
benjaminpetit de69620
Update src/Orleans.ScheduledJobs/Hosting/ScheduledJobsOptions.cs
ReubenBond 026caaa
Update src/Orleans.ScheduledJobs/InMemoryJobQueue.cs
ReubenBond e90f6b8
Apply suggestions from code review
benjaminpetit fc10282
Add Cancellation tokens to LocalScheduledJobManager
benjaminpetit 8b8a416
Remove default values in JobShard
benjaminpetit d09874c
Switch from line-based format to netstring format for blob operations
benjaminpetit 7ed5369
Refactor JobShard
benjaminpetit e3a021b
Add membership version in AzureStorageJobShard
benjaminpetit 98fa1ca
Refactor storage operations to use Channel for single-threaded writes…
benjaminpetit 2091124
Refactor job execution to run concurrently
benjaminpetit f829bd9
Split LocalScheduledJobManager into separate files for better readabi…
benjaminpetit 616e488
Move ILocalScheduledJobManager interface to its own file
benjaminpetit c5d41da
Merge branch 'wip/scheduled-jobs' of https://github.com/benjaminpetit…
benjaminpetit cee9969
Refactor job scheduling methods to use TryScheduleJobAsync for improv…
benjaminpetit e412923
Very simplistic concurrency control on shard creation
benjaminpetit bedf374
Enhance IScheduledJobReceiverExtension with improved logging and docu…
benjaminpetit 73cba74
Update ScheduledJobTests and InMemoryJobQueueTests to use UtcNow for …
benjaminpetit 4ee03f3
Refactor shard management to simplify task handling and ensure proper…
benjaminpetit ede6b13
Refactor AssignJobShardsAsync to improve owner/creator status checks
benjaminpetit 329f75d
Clarify shard start-time naming and logging; update defaults and proj…
benjaminpetit 2bb5172
Fix netstring encoding/delimiter and format MembershipVersion invaria…
benjaminpetit 9ca1cb4
Scale default MaxConcurrentJobsPerSilo by CPU count
benjaminpetit c440fae
Rename GetJobCount to GetJobCountAsync and IsComplete to IsAddingComp…
benjaminpetit c2f4c70
Return removal result from CancelJob and update tests
benjaminpetit 4f68cc0
Return removal result from RemoveJobAsync and update cancel flow & tests
benjaminpetit d4c5cd8
Consolidate scheduling API: merge ScheduleJobWithMetadataAsync into S…
benjaminpetit 043fffd
Add cross-grain scheduling test and SchedulerGrain; make ScheduleJobA…
benjaminpetit db44fad
Add job retry test and RetryTestGrain
benjaminpetit c5498d6
Extract scheduled job test logic into ScheduledJobTestsRunner and add…
benjaminpetit 069fe77
Add Azure Blob Storage hosting integration for scheduled jobs
benjaminpetit c6e8cc8
Add AzureStorageScheduledJobTests to run scheduled job tests against …
benjaminpetit 6a05384
Remove stuff
benjaminpetit f72d4c9
Remove stuff
benjaminpetit 156659a
Add CancellationToken to IClusterMembershipService.Refresh and propag…
benjaminpetit 560aae9
Add blob-prefix support to AzureStorageJobShardManager and update tests
benjaminpetit 29a0dec
Replace TaskCanceledException with OperationCanceledException
benjaminpetit e23e1ac
Cancel pending storage operations on shutdown
benjaminpetit 5b00a75
Use MembershipVersion.Value for blob metadata; rename/add InMemorySch…
benjaminpetit 173921a
Rename LocalScheduledJobManager system target grain type to "job-mana…
benjaminpetit 1413e15
Replace IScheduledJob interface with ScheduledJob concrete type
benjaminpetit 10e0565
Fix shard bucketing, correct shard assignment window, and add options…
benjaminpetit 7a5a526
Gracefully stop shard storage processor; fix caching and metadata par…
benjaminpetit 7446b8d
Dispose newly-created shard on ownership conflict; remove unused blob…
benjaminpetit 5199685
Remove unused Microsoft.CodeAnalysis using; clarify shard-too-new com…
benjaminpetit 12d694e
Introduce NetstringEncoder utility and tests; use it in AzureStorageJ…
benjaminpetit 0655f6a
Fix bad rename
benjaminpetit fc15a75
Replace NetstringEncoder with NetstringJsonSerializer; update usages …
benjaminpetit 4ac0c41
Make NetstringJsonSerializer stream-based with pooled buffers; update…
benjaminpetit fc6ba4d
Batch Azure Storage append operations; add batching options and tests
benjaminpetit 5b9ba88
Rename RegisterShard -> CreateShardAsync and simplify ownership behavior
benjaminpetit fa3eb54
Make InMemoryJobQueue synchronization and enumeration safer
benjaminpetit bc26774
Add periodic shard checker and shard activation buffer option
benjaminpetit 46fe7d0
Centralize shard lifecycle in manager; add periodic shard checker and…
benjaminpetit 8ba69f4
Unify StorageOperation completion handling and make shutdown cancellable
benjaminpetit e20e7a8
Remove Creator metadata and simplify ownership/ID handling; add blob-…
benjaminpetit 9e42a9b
Add structured logging to AzureStorageJobShard and propagate ILoggerF…
benjaminpetit 00a772e
Make InMemoryJobQueue internal, add InternalsVisibleTo
benjaminpetit e07538f
Drop explicit '= false' initializer for IsAddingCompleted property in…
benjaminpetit 5c781dc
wip
benjaminpetit c0f937b
Split test into a runner for reusability; make them pass faster; make
benjaminpetit File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
270 changes: 270 additions & 0 deletions
270
src/Azure/Orleans.ScheduledJobs.AzureStorage/AzureStorageJobShard.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,270 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.IO; | ||
| using System.Text.Json; | ||
| using System.Threading; | ||
| using System.Threading.Channels; | ||
| using System.Threading.Tasks; | ||
| using Azure; | ||
| using Azure.Storage.Blobs; | ||
| using Azure.Storage.Blobs.Models; | ||
| using Azure.Storage.Blobs.Specialized; | ||
| using Orleans.Runtime; | ||
|
|
||
| namespace Orleans.ScheduledJobs.AzureStorage; | ||
|
|
||
| internal sealed class AzureStorageJobShard : JobShard | ||
| { | ||
| private readonly Channel<StorageOperation> _storageOperationChannel; | ||
| private readonly Task _storageProcessorTask; | ||
| private readonly CancellationTokenSource _shutdownCts = new(); | ||
|
|
||
| internal AppendBlobClient BlobClient { get; init; } | ||
| internal ETag? ETag { get; private set; } | ||
|
|
||
| public AzureStorageJobShard(string id, DateTimeOffset startTime, DateTimeOffset endTime, AppendBlobClient blobClient, IDictionary<string, string>? metadata, ETag? eTag) | ||
| : base(id, startTime, endTime) | ||
| { | ||
| BlobClient = blobClient; | ||
| ETag = eTag; | ||
| Metadata = metadata; | ||
|
|
||
| // Create unbounded channel for storage operations | ||
| // In the future, we could add batching here | ||
| _storageOperationChannel = Channel.CreateUnbounded<StorageOperation>(new UnboundedChannelOptions | ||
| { | ||
| SingleReader = true, | ||
| SingleWriter = false | ||
| }); | ||
|
|
||
| // Start the background task that processes storage operations | ||
| _storageProcessorTask = ProcessStorageOperationsAsync(); | ||
| } | ||
|
|
||
| protected override async Task PersistAddJobAsync(string jobId, string jobName, DateTimeOffset dueTime, GrainId target, IReadOnlyDictionary<string, string>? metadata, CancellationToken cancellationToken) | ||
| { | ||
| var operation = JobOperation.CreateAddOperation(jobId, jobName, dueTime, target, metadata); | ||
| await EnqueueStorageOperationAsync(StorageOperation.CreateAppendOperation(operation), cancellationToken); | ||
| } | ||
|
|
||
| protected override async Task PersistRemoveJobAsync(string jobId, CancellationToken cancellationToken) | ||
| { | ||
| var operation = JobOperation.CreateRemoveOperation(jobId); | ||
|
benjaminpetit marked this conversation as resolved.
|
||
| await EnqueueStorageOperationAsync(StorageOperation.CreateAppendOperation(operation), cancellationToken); | ||
| } | ||
|
|
||
| protected override async Task PersistRetryJobAsync(string jobId, DateTimeOffset newDueTime, CancellationToken cancellationToken) | ||
| { | ||
| var operation = JobOperation.CreateRetryOperation(jobId, newDueTime); | ||
| await EnqueueStorageOperationAsync(StorageOperation.CreateAppendOperation(operation), cancellationToken); | ||
| } | ||
|
|
||
| public async Task UpdateBlobMetadata(IDictionary<string, string> metadata, CancellationToken cancellationToken) | ||
| { | ||
| await EnqueueStorageOperationAsync(StorageOperation.CreateMetadataOperation(metadata), cancellationToken); | ||
| } | ||
|
|
||
| public async ValueTask InitializeAsync(CancellationToken cancellationToken) | ||
| { | ||
| // Load existing blob | ||
| var response = await BlobClient.DownloadAsync(cancellationToken: cancellationToken); | ||
| using var stream = response.Value.Content; | ||
|
|
||
| // Rebuild state by replaying operations | ||
| var addedJobs = new Dictionary<string, JobOperation>(); | ||
| var deletedJobs = new HashSet<string>(); | ||
| var jobRetryCounters = new Dictionary<string, (int dequeueCount, DateTimeOffset? newDueTime)>(); | ||
|
|
||
| await foreach (var operation in NetstringJsonSerializer.DecodeAsync(stream, JobOperationJsonContext.Default.JobOperation)) | ||
| { | ||
| switch (operation.Type) | ||
| { | ||
| case JobOperation.OperationType.Add: | ||
| if (!deletedJobs.Contains(operation.Id)) | ||
| { | ||
| addedJobs[operation.Id] = operation; | ||
| } | ||
| break; | ||
| case JobOperation.OperationType.Remove: | ||
| deletedJobs.Add(operation.Id); | ||
| addedJobs.Remove(operation.Id); | ||
| jobRetryCounters.Remove(operation.Id); | ||
| break; | ||
| case JobOperation.OperationType.Retry: | ||
| if (!deletedJobs.Contains(operation.Id)) | ||
| { | ||
| if (!jobRetryCounters.ContainsKey(operation.Id)) | ||
| { | ||
| jobRetryCounters[operation.Id] = (1, operation.DueTime); | ||
| } | ||
| else | ||
| { | ||
| var entry = jobRetryCounters[operation.Id]; | ||
| jobRetryCounters[operation.Id] = (entry.dequeueCount + 1, operation.DueTime); | ||
| } | ||
| } | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| // Rebuild the priority queue | ||
| foreach (var op in addedJobs.Values) | ||
| { | ||
| var retryCounter = 0; | ||
| var dueTime = op.DueTime!.Value; | ||
| if (jobRetryCounters.TryGetValue(op.Id, out var retryEntries)) | ||
| { | ||
| retryCounter = retryEntries.dequeueCount; | ||
| dueTime = retryEntries.newDueTime ?? dueTime; | ||
| } | ||
|
|
||
| EnqueueJob(new ScheduledJob | ||
| { | ||
| Id = op.Id, | ||
| Name = op.Name!, | ||
| DueTime = dueTime, | ||
| TargetGrainId = op.TargetGrainId!.Value, | ||
| ShardId = Id, | ||
| Metadata = op.Metadata, | ||
| }, | ||
| retryCounter); | ||
| } | ||
|
|
||
| ETag = response.Value.Details.ETag; | ||
| } | ||
|
|
||
| private async Task EnqueueStorageOperationAsync(StorageOperation operation, CancellationToken cancellationToken) | ||
| { | ||
| var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); | ||
|
benjaminpetit marked this conversation as resolved.
Outdated
|
||
| operation.CompletionSource = tcs; | ||
|
|
||
| await _storageOperationChannel.Writer.WriteAsync(operation, cancellationToken); | ||
| await tcs.Task; | ||
| } | ||
|
|
||
| private async Task ProcessStorageOperationsAsync() | ||
| { | ||
| await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ContinueOnCapturedContext | ConfigureAwaitOptions.ForceYielding); | ||
|
|
||
| var cancellationToken = _shutdownCts.Token; | ||
| // TODO: AppendBlob has a limit of 50,000 blocks. Implement blob rotation when this limit is approached. | ||
| try | ||
| { | ||
| await foreach (var operation in _storageOperationChannel.Reader.ReadAllAsync(cancellationToken)) | ||
|
ReubenBond marked this conversation as resolved.
Outdated
|
||
| { | ||
| try | ||
| { | ||
| switch (operation.Type) | ||
|
ReubenBond marked this conversation as resolved.
Outdated
|
||
| { | ||
| case StorageOperationType.AppendJobOperation: | ||
| await AppendJobOperationAsync(operation.JobOperation!.Value, cancellationToken); | ||
| break; | ||
| case StorageOperationType.UpdateMetadata: | ||
| await UpdateMetadataAsync(operation.Metadata!, cancellationToken); | ||
| break; | ||
| } | ||
|
|
||
| operation.CompletionSource?.TrySetResult(true); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| operation.CompletionSource?.TrySetException(ex); | ||
| } | ||
| } | ||
| } | ||
| catch (OperationCanceledException) | ||
|
ReubenBond marked this conversation as resolved.
|
||
| { | ||
| // Expected during shutdown - cancel all pending operations | ||
| while (_storageOperationChannel.Reader.TryRead(out var operation)) | ||
| { | ||
| operation.CompletionSource?.TrySetCanceled(cancellationToken); | ||
| } | ||
| } | ||
|
benjaminpetit marked this conversation as resolved.
|
||
| } | ||
|
benjaminpetit marked this conversation as resolved.
|
||
|
|
||
| private async Task AppendJobOperationAsync(JobOperation operation, CancellationToken cancellationToken) | ||
| { | ||
| var content = NetstringJsonSerializer.Encode(operation, JobOperationJsonContext.Default.JobOperation); | ||
| using var stream = new MemoryStream(content); | ||
|
ReubenBond marked this conversation as resolved.
Outdated
|
||
| var result = await BlobClient.AppendBlockAsync( | ||
| stream, | ||
| new AppendBlobAppendBlockOptions { Conditions = new AppendBlobRequestConditions { IfMatch = ETag } }, | ||
| cancellationToken); | ||
| ETag = result.Value.ETag; | ||
| } | ||
|
|
||
| private async Task UpdateMetadataAsync(IDictionary<string, string> metadata, CancellationToken cancellationToken) | ||
| { | ||
| var result = await BlobClient.SetMetadataAsync( | ||
| metadata, | ||
| new BlobRequestConditions { IfMatch = ETag }, | ||
| cancellationToken); | ||
| ETag = result.Value.ETag; | ||
| Metadata = metadata; | ||
| } | ||
|
ReubenBond marked this conversation as resolved.
|
||
|
|
||
|
|
||
|
ReubenBond marked this conversation as resolved.
Outdated
|
||
|
|
||
| /// <summary> | ||
| /// Stops the background storage processor and waits for all pending operations to complete. | ||
| /// After calling this method, no new storage operations can be enqueued. | ||
| /// This method is idempotent and can be called multiple times safely. | ||
| /// </summary> | ||
| internal async Task StopProcessorAsync() | ||
|
ReubenBond marked this conversation as resolved.
Outdated
|
||
| { | ||
| // Complete the channel to stop accepting new operations (idempotent operation) | ||
| if (_storageOperationChannel.Writer.TryComplete()) | ||
| { | ||
| _shutdownCts.Cancel(); | ||
| } | ||
|
|
||
| // Wait for the background processor to finish all pending operations | ||
| try | ||
| { | ||
| await _storageProcessorTask; | ||
|
ReubenBond marked this conversation as resolved.
Outdated
|
||
| } | ||
| catch (OperationCanceledException) | ||
| { | ||
| // Expected during normal shutdown | ||
| } | ||
| } | ||
|
|
||
| public override async ValueTask DisposeAsync() | ||
| { | ||
| await StopProcessorAsync(); | ||
| _shutdownCts.Dispose(); | ||
| await base.DisposeAsync(); | ||
| } | ||
| } | ||
|
|
||
| internal enum StorageOperationType | ||
| { | ||
| AppendJobOperation, | ||
| UpdateMetadata | ||
| } | ||
|
|
||
| internal sealed class StorageOperation | ||
| { | ||
| public required StorageOperationType Type { get; init; } | ||
| public JobOperation? JobOperation { get; init; } | ||
| public IDictionary<string, string>? Metadata { get; init; } | ||
| public TaskCompletionSource<bool>? CompletionSource { get; set; } | ||
|
ReubenBond marked this conversation as resolved.
Outdated
|
||
|
|
||
| public static StorageOperation CreateAppendOperation(JobOperation jobOperation) | ||
| { | ||
| return new StorageOperation | ||
| { | ||
| Type = StorageOperationType.AppendJobOperation, | ||
| JobOperation = jobOperation | ||
| }; | ||
| } | ||
|
|
||
| public static StorageOperation CreateMetadataOperation(IDictionary<string, string> metadata) | ||
| { | ||
| return new StorageOperation | ||
| { | ||
| Type = StorageOperationType.UpdateMetadata, | ||
| Metadata = metadata | ||
| }; | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.