Skip to content

Commit

Permalink
feat: remove exclusion between anchor workers and deployments (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 authored Mar 6, 2024
1 parent 1a5a822 commit c7a913d
Showing 1 changed file with 12 additions and 47 deletions.
59 changes: 12 additions & 47 deletions cd/manager/jobmanager/jobManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,60 +147,28 @@ func (m *JobManager) processJobs() {
if !m.paused {
// Advance each freshly discovered "queued" job to the "dequeued" stage
m.advanceJobs(m.db.QueuedJobs())
// Always attempt to check if we have anchor jobs, even if none were dequeued. This is because we might have a
// configured minimum number of jobs to run.
processAnchorJobs := true
// Jobs in the "dequeued" stage are in the cache but haven't been "started" yet and can thus begin processing
dequeuedJobs := m.db.OrderedJobs(job.JobStage_Dequeued)
if len(dequeuedJobs) > 0 {
// Try to start multiple jobs and collapse similar ones:
// - one deploy at a time
// - any number of anchor workers (compatible with non-deploy jobs)
// - one smoke test at a time (compatible with non-deploy jobs)
// - one E2E test at a time (compatible with non-deploy jobs)
// - one workflow at a time (compatible with non-deploy jobs)
// - any number of anchor workers (independent of any other type of job)
//
// Loop over compatible dequeued jobs until we find an incompatible one and need to wait for existing jobs
// to complete.
log.Printf("processJobs: dequeued %d jobs...", len(dequeuedJobs))
// Check for any force deploy jobs, and only look at the remaining jobs if no deployments were kicked off.
if m.processForceDeployJobs(dequeuedJobs) {
processAnchorJobs = false
} else
// Decide how to proceed based on the first job from the list
if dequeuedJobs[0].Type == job.JobType_Deploy {
m.processDeployJobs(dequeuedJobs)
// There are two scenarios for anchor jobs on encountering a deployment job at the head of the queue:
// - Anchor jobs are started if no deployment was *started*, even if this deployment job was ahead of
// anchor jobs in the queue.
// - Anchor jobs are not started since a deployment job was *dequeued* ahead of them. (This would be the
// normal behavior for a job queue, i.e. jobs get processed in the order they were scheduled.)
//
// The first scenario only applies to the QA environment that is used for running the E2E tests. E2E
// tests need anchor jobs to run, but if all jobs are processed sequentially, anchor jobs required
// for processing test streams can get blocked by deploy jobs, which are in turn blocked by the E2E
// tests themselves. Letting anchor jobs "skip the queue" prevents this "deadlock".
//
// Testing for this scenario can be simplified by checking whether E2E tests were in progress. So,
// anchor jobs will only be able to "skip the queue" if E2E tests were running but fallback to
// sequential processing otherwise. Since E2E tests only run in QA, all other environments (and QA
// for all other scenarios besides active E2E tests) will have the default (sequential) behavior.
e2eTestJobs := m.cache.JobsByMatcher(func(js job.JobState) bool {
return job.IsActiveJob(js) && (js.Type == job.JobType_TestE2E)
})
processAnchorJobs = len(e2eTestJobs) > 0
} else {
// Check for any deployment jobs - first for forcible deployments, then for regular deployments. Only look
// at the remaining jobs if no deployments were kicked off.
if !m.processForceDeployJobs(dequeuedJobs) && !m.processDeployJobs(dequeuedJobs) {
m.processTestJobs(dequeuedJobs)
m.processWorkflowJobs(dequeuedJobs)
}
}
// If no deploy jobs were launched, process pending anchor jobs. We don't want to hold on to anchor jobs queued
// behind deploys because tests need anchors to run, and deploys can't run till tests complete.
//
// Test and anchor jobs can run in parallel, so process pending anchor jobs even if tests were started above.
if processAnchorJobs {
m.processAnchorJobs(dequeuedJobs)
}
// Anchor jobs can be run independently of deployments and do not need any exclusion rules
m.processAnchorJobs(dequeuedJobs)
}
// Wait for all of this iteration's job advancement goroutines to finish before we iterate again. The ticker will
// automatically drop ticks then pick back up later if a round of processing takes longer than 1 tick.
Expand Down Expand Up @@ -292,8 +260,11 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool {
}

func (m *JobManager) processDeployJobs(dequeuedJobs []job.JobState) bool {
// Check if there are any jobs in progress
if len(m.cache.JobsByMatcher(job.IsActiveJob)) == 0 {
// Check if there are any (non-anchor) jobs in progress
activeNonAnchorJobs := m.cache.JobsByMatcher(func(js job.JobState) bool {
return job.IsActiveJob(js) && (js.Type != job.JobType_Anchor)
})
if len(activeNonAnchorJobs) == 0 {
// We know the first job is a deploy, so pick out the component for that job, collapse as many back-to-back jobs
// as possible for that component, then run the final job.
deployJob := dequeuedJobs[0]
Expand Down Expand Up @@ -323,13 +294,7 @@ func (m *JobManager) processDeployJobs(dequeuedJobs []job.JobState) bool {
}

func (m *JobManager) processAnchorJobs(dequeuedJobs []job.JobState) bool {
// Check if there are any deploy jobs in progress
if len(m.getActiveDeploys()) == 0 {
return m.processVxAnchorJobs(dequeuedJobs, true) || m.processVxAnchorJobs(dequeuedJobs, false)
} else {
log.Printf("processAnchorJobs: deployment in progress")
}
return false
return m.processVxAnchorJobs(dequeuedJobs, true) || m.processVxAnchorJobs(dequeuedJobs, false)
}

func (m *JobManager) processVxAnchorJobs(dequeuedJobs []job.JobState, processV5Jobs bool) bool {
Expand Down

0 comments on commit c7a913d

Please sign in to comment.