Skip to content
This repository was archived by the owner on Nov 1, 2023. It is now read-only.

Commit 54a0680

Browse files
authored
Undo some parallelism
1 parent 2531692 commit 54a0680

File tree

2 files changed

+10
-5
lines changed

2 files changed

+10
-5
lines changed

src/ApiService/ApiService/Functions/TimerTasks.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,28 +27,33 @@ public async Async.Task Run([TimerTrigger("00:00:15")] TimerInfo myTimer) {
2727
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };
2828

2929
var expiredTasks = _taskOperations.SearchExpired();
30-
await Parallel.ForEachAsync(expiredTasks, parallelOptions, async (task, _cancel) => {
30+
// marking one task stopping can also mark other tasks in the same job as failed
31+
// this could make parallel updates stomp on each other, so don't do it in parallel
32+
await foreach (var task in expiredTasks) {
3133
_logger.LogInformation("stopping expired task. job_id:{JobId} task_id:{TaskId}", task.JobId, task.TaskId);
3234
await _taskOperations.MarkStopping(task, "task is expired");
33-
});
35+
}
3436

3537
var expiredJobs = _jobOperations.SearchExpired();
38+
// job updates are all distinct and only update tasks owned by that job, can be performed in parallel
3639
await Parallel.ForEachAsync(expiredJobs, parallelOptions, async (job, _cancel) => {
3740
_logger.LogInformation("stopping expired job. job_id:{JobId}", job.JobId);
3841
_ = await _jobOperations.Stopping(job);
3942
});
4043

4144
var jobs = _jobOperations.SearchState(states: JobStateHelper.NeedsWork);
45+
// job updates are okay to do in parallel
4246
await Parallel.ForEachAsync(jobs, parallelOptions, async (job, _cancel) => {
4347
_logger.LogInformation("update job: {JobId}", job.JobId);
4448
_ = await _jobOperations.ProcessStateUpdates(job);
4549
});
4650

4751
var tasks = _taskOperations.SearchStates(states: TaskStateHelper.NeedsWorkStates);
48-
await Parallel.ForEachAsync(tasks, parallelOptions, async (task, _cancel) => {
52+
// task state transitions might affect the job, so parallel updates could stomp on each other
53+
await foreach (var task in tasks) {
4954
_logger.LogInformation("update task: {TaskId}", task.TaskId);
5055
_ = await _taskOperations.ProcessStateUpdate(task);
51-
});
56+
}
5257

5358
await _scheduler.ScheduleTasks();
5459
await _jobOperations.StopNeverStartedJobs();

src/ApiService/ApiService/onefuzzlib/JobOperations.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public async Async.Task<Job> Init(Job job) {
105105

106106
public async Async.Task<Job> Stopping(Job job) {
107107
job = job with { State = JobState.Stopping };
108-
var tasks = await _context.TaskOperations.QueryAsync(Query.PartitionKey(job.JobId.ToString())).ToListAsync();
108+
var tasks = await _context.TaskOperations.GetByJobId(job.JobId).ToListAsync();
109109
var taskNotStopped = tasks.ToLookup(task => task.State != TaskState.Stopped);
110110

111111
var notStopped = taskNotStopped[true];

0 commit comments

Comments
 (0)