Skip to content
This repository was archived by the owner on Nov 1, 2023. It is now read-only.

Commit 2531692

Browse files
authored
Parallelize some timer work
1 parent 957f1fb commit 2531692

File tree

3 files changed

+37
-28
lines changed

3 files changed

+37
-28
lines changed

src/ApiService/ApiService/Functions/TimerTasks.cs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using Microsoft.Azure.Functions.Worker;
1+
using System.Threading.Tasks;
2+
using Microsoft.Azure.Functions.Worker;
23
using Microsoft.Extensions.Logging;
34
namespace Microsoft.OneFuzz.Service.Functions;
45

@@ -22,35 +23,34 @@ public TimerTasks(ILogger<TimerTasks> logger, ITaskOperations taskOperations, IJ
2223

2324
[Function("TimerTasks")]
2425
public async Async.Task Run([TimerTrigger("00:00:15")] TimerInfo myTimer) {
25-
var expriredTasks = _taskOperations.SearchExpired();
26-
await foreach (var task in expriredTasks) {
26+
// perform up to 10 updates in parallel for each entity type
27+
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };
28+
29+
var expiredTasks = _taskOperations.SearchExpired();
30+
await Parallel.ForEachAsync(expiredTasks, parallelOptions, async (task, _cancel) => {
2731
_logger.LogInformation("stopping expired task. job_id:{JobId} task_id:{TaskId}", task.JobId, task.TaskId);
2832
await _taskOperations.MarkStopping(task, "task is expired");
29-
}
30-
33+
});
3134

3235
var expiredJobs = _jobOperations.SearchExpired();
33-
34-
await foreach (var job in expiredJobs) {
36+
await Parallel.ForEachAsync(expiredJobs, parallelOptions, async (job, _cancel) => {
3537
_logger.LogInformation("stopping expired job. job_id:{JobId}", job.JobId);
3638
_ = await _jobOperations.Stopping(job);
37-
}
39+
});
3840

3941
var jobs = _jobOperations.SearchState(states: JobStateHelper.NeedsWork);
40-
41-
await foreach (var job in jobs) {
42+
await Parallel.ForEachAsync(jobs, parallelOptions, async (job, _cancel) => {
4243
_logger.LogInformation("update job: {JobId}", job.JobId);
4344
_ = await _jobOperations.ProcessStateUpdates(job);
44-
}
45+
});
4546

4647
var tasks = _taskOperations.SearchStates(states: TaskStateHelper.NeedsWorkStates);
47-
await foreach (var task in tasks) {
48+
await Parallel.ForEachAsync(tasks, parallelOptions, async (task, _cancel) => {
4849
_logger.LogInformation("update task: {TaskId}", task.TaskId);
4950
_ = await _taskOperations.ProcessStateUpdate(task);
50-
}
51+
});
5152

5253
await _scheduler.ScheduleTasks();
53-
5454
await _jobOperations.StopNeverStartedJobs();
5555
}
5656
}

src/ApiService/ApiService/Functions/TimerWorkers.cs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using Microsoft.Azure.Functions.Worker;
1+
using System.Threading.Tasks;
2+
using Microsoft.Azure.Functions.Worker;
23
using Microsoft.Extensions.Logging;
34
namespace Microsoft.OneFuzz.Service.Functions;
45

@@ -43,16 +44,17 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) {
4344
// (such as shutdown or resize) happen during this iteration `timer_worker`
4445
// rather than the following iteration.
4546

46-
var pools = _poolOps.SearchStates(states: PoolStateHelper.NeedsWork);
47-
await foreach (var pool in pools) {
47+
// we do not expect there to be many pools that need work, process them all in parallel
48+
var pools = await _poolOps.SearchStates(states: PoolStateHelper.NeedsWork).ToListAsync();
49+
await Async.Task.WhenAll(pools.Select(async pool => {
4850
try {
4951
_log.LogInformation("updating pool: {PoolId} ({PoolName}) - state: {PoolState}", pool.PoolId, pool.Name, pool.State);
5052
var newPool = await _poolOps.ProcessStateUpdate(pool);
5153
_log.LogInformation("completed updating pool: {PoolId} ({PoolName}) - now in state {PoolState}", pool.PoolId, pool.Name, newPool.State);
5254
} catch (Exception ex) {
5355
_log.LogError(ex, "failed to process pool");
5456
}
55-
}
57+
}));
5658

5759
// NOTE: Nodes, and Scalesets should be processed in a consistent order such
5860
// during 'pool scale down' operations. This means that pools that are
@@ -63,26 +65,29 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) {
6365
await _nodeOps.MarkOutdatedNodes();
6466
await _nodeOps.CleanupBusyNodesWithoutWork();
6567

68+
// process up to 10 nodes in parallel
6669
var nodes = _nodeOps.SearchStates(states: NodeStateHelper.NeedsWorkStates);
67-
await foreach (var node in nodes) {
70+
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };
71+
await Parallel.ForEachAsync(nodes, parallelOptions, async (node, _cancel) => {
6872
try {
6973
_log.LogInformation("updating node: {MachineId} - state: {NodeState}", node.MachineId, node.State);
7074
var newNode = await _nodeOps.ProcessStateUpdate(node);
7175
_log.LogInformation("completed updating node: {MachineId} - now in state {NodeState}", node.MachineId, newNode.State);
7276
} catch (Exception ex) {
7377
_log.LogError(ex, "failed to process node");
7478
}
75-
}
79+
});
7680

77-
var scalesets = _scaleSetOps.SearchAll();
78-
await foreach (var scaleset in scalesets) {
81+
// we do not expect there to be many scalesets, process them all in parallel
82+
var scalesets = await _scaleSetOps.SearchAll().ToListAsync();
83+
await Async.Task.WhenAll(scalesets.Select(async scaleset => {
7984
try {
8085
_log.LogInformation("updating scaleset: {ScalesetId} - state: {ScalesetState}", scaleset.ScalesetId, scaleset.State);
8186
var newScaleset = await ProcessScalesets(scaleset);
8287
_log.LogInformation("completed updating scaleset: {ScalesetId} - now in state {ScalesetState}", scaleset.ScalesetId, newScaleset.State);
8388
} catch (Exception ex) {
8489
_log.LogError(ex, "failed to process scaleset");
8590
}
86-
}
91+
}));
8792
}
8893
}

src/ApiService/ApiService/onefuzzlib/NodeOperations.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -287,15 +287,17 @@ public async Async.Task MarkOutdatedNodes() {
287287
}
288288

289289
var outdated = SearchOutdated(excludeUpdateScheduled: true);
290-
await foreach (var node in outdated) {
290+
// update up to 10 nodes in parallel
291+
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };
292+
await Parallel.ForEachAsync(outdated, parallelOptions, async (node, _cancel) => {
291293
_logTracer.LogInformation("node is outdated: {MachineId} - {NodeVersion}", node.MachineId, node.Version);
292294

293295
if (node.Version == "1.0.0") {
294296
_ = await ToReimage(node, done: true);
295297
} else {
296298
_ = await ToReimage(node);
297299
}
298-
}
300+
});
299301
}
300302

301303

@@ -352,7 +354,7 @@ IAsyncEnumerable<Node> SearchOutdated(
352354
if (numResults is null) {
353355
return QueryAsync(query);
354356
} else {
355-
return QueryAsync(query).Take(numResults.Value!);
357+
return QueryAsync(query).Take(numResults.Value);
356358
}
357359
}
358360

@@ -362,9 +364,11 @@ public async Async.Task CleanupBusyNodesWithoutWork() {
362364
//# that hit this race condition will get cleaned up.
363365
var nodes = _context.NodeOperations.SearchStates(states: NodeStateHelper.BusyStates);
364366

365-
await foreach (var node in nodes) {
367+
// update up to 10 nodes in parallel
368+
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };
369+
await Parallel.ForEachAsync(nodes, async (node, _cancel) => {
366370
_ = await StopIfComplete(node, true);
367-
}
371+
});
368372
}
369373

370374
public async Async.Task<Node> ToReimage(Node node, bool done = false) {

0 commit comments

Comments
 (0)