Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions src/Azure/Orleans.DurableJobs.AzureStorage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,14 @@ public class EmailGrain : Grain, IEmailGrain, IDurableJobHandler
};

_durableEmailJob = await _jobManager.ScheduleJobAsync(
this.GetGrainId(),
"SendEmail",
sendTime,
metadata);
new ScheduleJobRequest
{
Target = this.GetGrainId(),
JobName = "SendEmail",
DueTime = sendTime,
Metadata = metadata
},
CancellationToken.None);

_logger.LogInformation(
"Scheduled email to {EmailAddress} for {SendTime} (JobId: {JobId})",
Expand Down Expand Up @@ -253,25 +257,33 @@ public class OrderGrain : Grain, IOrderGrain, IDurableJobHandler
// Schedule payment reminder after 1 hour
var paymentReminderTime = DateTimeOffset.UtcNow.AddHours(1);
await _jobManager.ScheduleJobAsync(
this.GetGrainId(),
"PaymentReminder",
paymentReminderTime,
new Dictionary<string, string>
new ScheduleJobRequest
{
["Step"] = "PaymentReminder",
["CustomerEmail"] = order.CustomerEmail
});
Target = this.GetGrainId(),
JobName = "PaymentReminder",
DueTime = paymentReminderTime,
Metadata = new Dictionary<string, string>
{
["Step"] = "PaymentReminder",
["CustomerEmail"] = order.CustomerEmail
}
},
CancellationToken.None);

// Schedule order expiration after 24 hours
var expirationTime = DateTimeOffset.UtcNow.AddHours(24);
await _jobManager.ScheduleJobAsync(
this.GetGrainId(),
"OrderExpiration",
expirationTime,
new Dictionary<string, string>
new ScheduleJobRequest
{
["Step"] = "OrderExpiration"
});
Target = this.GetGrainId(),
JobName = "OrderExpiration",
DueTime = expirationTime,
Metadata = new Dictionary<string, string>
{
["Step"] = "OrderExpiration"
}
},
CancellationToken.None);

_logger.LogInformation(
"Scheduled payment reminder for {ReminderTime} and expiration for {ExpirationTime}",
Expand Down
7 changes: 2 additions & 5 deletions src/Orleans.DurableJobs/ILocalDurableJobManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@ public interface ILocalDurableJobManager
/// <summary>
/// Schedules a job to be executed at a specific time on the target grain.
/// </summary>
/// <param name="target">The grain identifier of the target grain that will receive the durable job.</param>
/// <param name="jobName">The name of the job for identification purposes.</param>
/// <param name="dueTime">The date and time when the job should be executed.</param>
/// <param name="metadata">Optional metadata associated with the job.</param>
/// <param name="request">The request containing the job scheduling parameters.</param>
/// <param name="cancellationToken">A cancellation token to cancel the operation.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation that returns the durable job.</returns>
Task<DurableJob> ScheduleJobAsync(GrainId target, string jobName, DateTimeOffset dueTime, IReadOnlyDictionary<string, string>? metadata, CancellationToken cancellationToken);
Task<DurableJob> ScheduleJobAsync(ScheduleJobRequest request, CancellationToken cancellationToken);

/// <summary>
/// Attempts to cancel a previously scheduled durable job.
Expand Down
23 changes: 10 additions & 13 deletions src/Orleans.DurableJobs/JobShard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,11 @@ public interface IJobShard : IAsyncDisposable
/// <summary>
/// Attempts to schedule a new job on this shard.
/// </summary>
/// <param name="target">The grain identifier of the target grain that will execute the job.</param>
/// <param name="jobName">The name of the job to schedule.</param>
/// <param name="dueTime">The time when the job should be executed.</param>
/// <param name="metadata">Optional metadata to associate with the job.</param>
/// <param name="request">The request containing the job scheduling parameters.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation. The task result contains the durable job if successful, or null if the job could not be scheduled (e.g., the shard was marked as complete).</returns>
/// <exception cref="ArgumentOutOfRangeException">Thrown when the due time is outside the shard's time range.</exception>
Task<DurableJob?> TryScheduleJobAsync(GrainId target, string jobName, DateTimeOffset dueTime, IReadOnlyDictionary<string, string>? metadata, CancellationToken cancellationToken);
Task<DurableJob?> TryScheduleJobAsync(ScheduleJobRequest request, CancellationToken cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -142,30 +139,30 @@ public IAsyncEnumerable<IJobRunContext> ConsumeDurableJobsAsync()
}

/// <inheritdoc/>
public async Task<DurableJob?> TryScheduleJobAsync(GrainId target, string jobName, DateTimeOffset dueTime, IReadOnlyDictionary<string, string>? metadata, CancellationToken cancellationToken)
public async Task<DurableJob?> TryScheduleJobAsync(ScheduleJobRequest request, CancellationToken cancellationToken)
{
if (IsAddingCompleted)
{
return null;
}

if (dueTime < StartTime || dueTime > EndTime)
if (request.DueTime < StartTime || request.DueTime > EndTime)
{
throw new ArgumentOutOfRangeException(nameof(dueTime), "Scheduled time is out of shard bounds.");
throw new ArgumentOutOfRangeException(nameof(request), "Scheduled time is out of shard bounds.");
}

var jobId = Guid.NewGuid().ToString();
var job = new DurableJob
{
Id = jobId,
TargetGrainId = target,
Name = jobName,
DueTime = dueTime,
TargetGrainId = request.Target,
Name = request.JobName,
DueTime = request.DueTime,
ShardId = Id,
Metadata = metadata
Metadata = request.Metadata
};

await PersistAddJobAsync(jobId, jobName, dueTime, target, metadata, cancellationToken);
await PersistAddJobAsync(jobId, request.JobName, request.DueTime, request.Target, request.Metadata, cancellationToken);
_jobQueue.Enqueue(job, 0);
return job;
}
Expand Down
10 changes: 5 additions & 5 deletions src/Orleans.DurableJobs/LocalDurableJobManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,21 @@ public LocalDurableJobManager(
}

/// <inheritdoc/>
public async Task<DurableJob> ScheduleJobAsync(GrainId target, string jobName, DateTimeOffset dueTime, IReadOnlyDictionary<string, string>? metadata, CancellationToken cancellationToken)
public async Task<DurableJob> ScheduleJobAsync(ScheduleJobRequest request, CancellationToken cancellationToken)
{
LogSchedulingJob(_logger, jobName, target, dueTime);
LogSchedulingJob(_logger, request.JobName, request.Target, request.DueTime);

var shardKey = GetShardKey(dueTime);
var shardKey = GetShardKey(request.DueTime);

while (true)
{
// Fast path: shard already exists
if (_writeableShards.TryGetValue(shardKey, out var existingShard))
{
var job = await existingShard.TryScheduleJobAsync(target, jobName, dueTime, metadata, cancellationToken);
var job = await existingShard.TryScheduleJobAsync(request, cancellationToken);
if (job is not null)
{
LogJobScheduled(_logger, jobName, job.Id, existingShard.Id, target);
LogJobScheduled(_logger, request.JobName, job.Id, existingShard.Id, request.Target);
return job;
}

Expand Down
59 changes: 38 additions & 21 deletions src/Orleans.DurableJobs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,14 @@ public class NotificationGrain : Grain, INotificationGrain, IDurableJobHandler
};

_durableJob = await _jobManager.ScheduleJobAsync(
this.GetGrainId(),
"SendNotification",
sendTime,
metadata);
new ScheduleJobRequest
{
Target = this.GetGrainId(),
JobName = "SendNotification",
DueTime = sendTime,
Metadata = metadata
},
CancellationToken.None);

_logger.LogInformation(
"Scheduled notification for user {UserId} at {SendTime} (JobId: {JobId})",
Expand Down Expand Up @@ -225,26 +229,34 @@ public class OrderGrain : Grain, IOrderGrain, IDurableJobHandler
// Schedule delivery reminder for 24 hours before delivery
var reminderTime = details.DeliveryDate.AddHours(-24);
await _jobManager.ScheduleJobAsync(
this.GetGrainId(),
"DeliveryReminder",
reminderTime,
new Dictionary<string, string>
new ScheduleJobRequest
{
["Step"] = "DeliveryReminder",
["CustomerId"] = details.CustomerId,
["OrderNumber"] = details.OrderNumber
});
Target = this.GetGrainId(),
JobName = "DeliveryReminder",
DueTime = reminderTime,
Metadata = new Dictionary<string, string>
{
["Step"] = "DeliveryReminder",
["CustomerId"] = details.CustomerId,
["OrderNumber"] = details.OrderNumber
}
},
CancellationToken.None);

// Schedule order expiration if payment not received
var expirationTime = DateTimeOffset.UtcNow.AddHours(24);
await _jobManager.ScheduleJobAsync(
this.GetGrainId(),
"OrderExpiration",
expirationTime,
new Dictionary<string, string>
new ScheduleJobRequest
{
["Step"] = "OrderExpiration"
});
Target = this.GetGrainId(),
JobName = "OrderExpiration",
DueTime = expirationTime,
Metadata = new Dictionary<string, string>
{
["Step"] = "OrderExpiration"
}
},
CancellationToken.None);
}

public async Task CancelOrder()
Expand Down Expand Up @@ -340,9 +352,14 @@ public class WorkflowGrain : Grain, IDurableJobHandler
public async Task<IDurableJob> ScheduleWorkflowStep(string stepName, DateTimeOffset executeAt)
{
var job = await _jobManager.ScheduleJobAsync(
this.GetGrainId(),
stepName,
executeAt);
new ScheduleJobRequest
{
Target = this.GetGrainId(),
JobName = stepName,
DueTime = executeAt,
Metadata = null
},
CancellationToken.None);

_pendingJobs[job.Id] = new TaskCompletionSource();
return job;
Expand Down
31 changes: 31 additions & 0 deletions src/Orleans.DurableJobs/ScheduleJobRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using Orleans.Runtime;

namespace Orleans.DurableJobs;

/// <summary>
/// Represents a request to schedule a durable job.
/// </summary>
public readonly struct ScheduleJobRequest
{
/// <summary>
/// Gets the grain identifier of the target grain that will receive the durable job.
/// </summary>
public required GrainId Target { get; init; }

/// <summary>
/// Gets the name of the job for identification purposes.
/// </summary>
public required string JobName { get; init; }

/// <summary>
/// Gets the date and time when the job should be executed.
/// </summary>
public required DateTimeOffset DueTime { get; init; }

/// <summary>
/// Gets optional metadata associated with the job.
/// </summary>
public IReadOnlyDictionary<string, string>? Metadata { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public async Task AzureStorageJobShard_MultipleOperationsBatched()
var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
tasks.Add(shard.TryScheduleJobAsync(GrainId.Create("type", $"target{i}"), $"job{i}", date.AddMilliseconds(i*10), null, cancellationToken));
tasks.Add(shard.TryScheduleJobAsync(new ScheduleJobRequest { Target = GrainId.Create("type", $"target{i}"), JobName = $"job{i}", DueTime = date.AddMilliseconds(i * 10d), Metadata = null }, cancellationToken));
}

await Task.WhenAll(tasks);
Expand Down Expand Up @@ -152,9 +152,9 @@ public async Task AzureStorageJobShard_PartialBatchFlushesOnTimeout()

// Schedule only 3 jobs (less than MinBatchSize of 10)
var tasks = new Task[3];
tasks[0] = shard.TryScheduleJobAsync(GrainId.Create("type", "target1"), "job1", date.AddSeconds(1), null, cancellationToken);
tasks[1] = shard.TryScheduleJobAsync(GrainId.Create("type", "target2"), "job2", date.AddSeconds(2), null, cancellationToken);
tasks[2] = shard.TryScheduleJobAsync(GrainId.Create("type", "target3"), "job3", date.AddSeconds(3), null, cancellationToken);
tasks[0] = shard.TryScheduleJobAsync(new ScheduleJobRequest { Target = GrainId.Create("type", "target1"), JobName = "job1", DueTime = date.AddSeconds(1), Metadata = null }, cancellationToken);
tasks[1] = shard.TryScheduleJobAsync(new ScheduleJobRequest { Target = GrainId.Create("type", "target2"), JobName = "job2", DueTime = date.AddSeconds(2), Metadata = null }, cancellationToken);
tasks[2] = shard.TryScheduleJobAsync(new ScheduleJobRequest { Target = GrainId.Create("type", "target3"), JobName = "job3", DueTime = date.AddSeconds(3), Metadata = null }, cancellationToken);

await Task.WhenAll(tasks);

Expand Down Expand Up @@ -203,7 +203,7 @@ public async Task AzureStorageJobShard_MaxBatchSizeEnforced()
var tasks = new List<Task>();
for (int i = 0; i < 50; i++)
{
tasks.Add(shard.TryScheduleJobAsync(GrainId.Create("type", $"target{i}"), $"job{i}", date.AddMilliseconds(i), null, cancellationToken));
tasks.Add(shard.TryScheduleJobAsync(new ScheduleJobRequest { Target = GrainId.Create("type", $"target{i}"), JobName = $"job{i}", DueTime = date.AddMilliseconds(i), Metadata = null }, cancellationToken));
}

await Task.WhenAll(tasks);
Expand Down Expand Up @@ -257,7 +257,7 @@ public async Task AzureStorageJobShard_MetadataOperationsBreakBatches()
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
tasks.Add(shard.TryScheduleJobAsync(GrainId.Create("type", $"target{i}"), $"job{i}", date.AddMilliseconds(i), null, cancellationToken));
tasks.Add(shard.TryScheduleJobAsync(new ScheduleJobRequest { Target = GrainId.Create("type", $"target{i}"), JobName = $"job{i}", DueTime = date.AddMilliseconds(i), Metadata = null }, cancellationToken));
}

// Give operations time to queue
Expand Down
9 changes: 8 additions & 1 deletion test/Grains/TestGrains/DurableJobGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ public Task ExecuteJobAsync(IJobRunContext ctx, CancellationToken cancellationTo

public async Task<DurableJob> ScheduleJobAsync(string jobName, DateTimeOffset scheduledTime, IReadOnlyDictionary<string, string> metadata = null)
{
var job = await _localDurableJobManager.ScheduleJobAsync(this.GetGrainId(), jobName, scheduledTime, metadata, CancellationToken.None);
var request = new ScheduleJobRequest
{
Target = this.GetGrainId(),
JobName = jobName,
DueTime = scheduledTime,
Metadata = metadata
};
var job = await _localDurableJobManager.ScheduleJobAsync(request, CancellationToken.None);
jobRunStatus[job.Id] = new TaskCompletionSource();
return job;
}
Expand Down
14 changes: 8 additions & 6 deletions test/Grains/TestGrains/RetryTestGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ public Task ExecuteJobAsync(IJobRunContext ctx, CancellationToken cancellationTo

public async Task<DurableJob> ScheduleJobAsync(string jobName, DateTimeOffset scheduledTime, IReadOnlyDictionary<string, string> metadata = null)
{
var job = await _localDurableJobManager.ScheduleJobAsync(
this.GetGrainId(),
jobName,
scheduledTime,
metadata,
CancellationToken.None);
var request = new ScheduleJobRequest
{
Target = this.GetGrainId(),
JobName = jobName,
DueTime = scheduledTime,
Metadata = metadata
};
var job = await _localDurableJobManager.ScheduleJobAsync(request, CancellationToken.None);

_jobSuccessStatus[job.Id] = new TaskCompletionSource();

Expand Down
14 changes: 8 additions & 6 deletions test/Grains/TestGrains/SchedulerGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ public async Task<DurableJob> ScheduleJobOnAnotherGrainAsync(string targetGrainK
targetGrainKey,
this.GetPrimaryKeyString());

var job = await _localDurableJobManager.ScheduleJobAsync(
targetGrainId,
jobName,
scheduledTime,
null,
CancellationToken.None);
var request = new ScheduleJobRequest
{
Target = targetGrainId,
JobName = jobName,
DueTime = scheduledTime,
Metadata = null
};
var job = await _localDurableJobManager.ScheduleJobAsync(request, CancellationToken.None);

return job;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public async Task AssignJobShardsAsync_OrphanedShard_IsAssignedWithoutIncrementi
var shard = await manager1.CreateShardAsync(minDueTime, maxDueTime, new Dictionary<string, string>(), CancellationToken.None);

// Schedule a job so the shard isn't deleted on unregister
await shard.TryScheduleJobAsync(GrainId.Create("test", "grain1"), "TestJob", minDueTime.AddMinutes(30), null, CancellationToken.None);
await shard.TryScheduleJobAsync(new ScheduleJobRequest { Target = GrainId.Create("test", "grain1"), JobName = "TestJob", DueTime = minDueTime.AddMinutes(30), Metadata = null }, CancellationToken.None);

// Gracefully unregister (sets owner to null)
await manager1.UnregisterShardAsync(shard, CancellationToken.None);
Expand Down Expand Up @@ -234,7 +234,7 @@ public async Task UnregisterShardAsync_WithJobsRemaining_MarksShardAsOrphaned()
var shard = await manager1.CreateShardAsync(minDueTime, maxDueTime, new Dictionary<string, string>(), CancellationToken.None);

// Add a job
await shard.TryScheduleJobAsync(GrainId.Create("test", "grain1"), "TestJob", minDueTime.AddMinutes(30), null, CancellationToken.None);
await shard.TryScheduleJobAsync(new ScheduleJobRequest { Target = GrainId.Create("test", "grain1"), JobName = "TestJob", DueTime = minDueTime.AddMinutes(30), Metadata = null }, CancellationToken.None);

// Unregister with jobs remaining
await manager1.UnregisterShardAsync(shard, CancellationToken.None);
Expand Down
Loading
Loading