From c7a913db8d0c2a605db13b7b4c2976912da09ef3 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Tue, 5 Mar 2024 20:54:34 -0500 Subject: [PATCH] feat: remove exclusion between anchor workers and deployments (#57) --- cd/manager/jobmanager/jobManager.go | 59 ++++++----------------------- 1 file changed, 12 insertions(+), 47 deletions(-) diff --git a/cd/manager/jobmanager/jobManager.go b/cd/manager/jobmanager/jobManager.go index 3cbddf9..d310dd0 100644 --- a/cd/manager/jobmanager/jobManager.go +++ b/cd/manager/jobmanager/jobManager.go @@ -147,60 +147,28 @@ func (m *JobManager) processJobs() { if !m.paused { // Advance each freshly discovered "queued" job to the "dequeued" stage m.advanceJobs(m.db.QueuedJobs()) - // Always attempt to check if we have anchor jobs, even if none were dequeued. This is because we might have a - // configured minimum number of jobs to run. - processAnchorJobs := true // Jobs in the "dequeued" stage are in the cache but haven't been "started" yet and can thus begin processing dequeuedJobs := m.db.OrderedJobs(job.JobStage_Dequeued) if len(dequeuedJobs) > 0 { // Try to start multiple jobs and collapse similar ones: // - one deploy at a time - // - any number of anchor workers (compatible with non-deploy jobs) // - one smoke test at a time (compatible with non-deploy jobs) // - one E2E test at a time (compatible with non-deploy jobs) // - one workflow at a time (compatible with non-deploy jobs) + // - any number of anchor workers (independent of any other type of job) // // Loop over compatible dequeued jobs until we find an incompatible one and need to wait for existing jobs // to complete. log.Printf("processJobs: dequeued %d jobs...", len(dequeuedJobs)) - // Check for any force deploy jobs, and only look at the remaining jobs if no deployments were kicked off. - if m.processForceDeployJobs(dequeuedJobs) { - processAnchorJobs = false - } else - // Decide how to proceed based on the first job from the list - if dequeuedJobs[0].Type == job.JobType_Deploy { - m.processDeployJobs(dequeuedJobs) - // There are two scenarios for anchor jobs on encountering a deployment job at the head of the queue: - // - Anchor jobs are started if no deployment was *started*, even if this deployment job was ahead of - // anchor jobs in the queue. - // - Anchor jobs are not started since a deployment job was *dequeued* ahead of them. (This would be the - // normal behavior for a job queue, i.e. jobs get processed in the order they were scheduled.) - // - // The first scenario only applies to the QA environment that is used for running the E2E tests. E2E - // tests need anchor jobs to run, but if all jobs are processed sequentially, anchor jobs required - // for processing test streams can get blocked by deploy jobs, which are in turn blocked by the E2E - // tests themselves. Letting anchor jobs "skip the queue" prevents this "deadlock". - // - // Testing for this scenario can be simplified by checking whether E2E tests were in progress. So, - // anchor jobs will only be able to "skip the queue" if E2E tests were running but fallback to - // sequential processing otherwise. Since E2E tests only run in QA, all other environments (and QA - // for all other scenarios besides active E2E tests) will have the default (sequential) behavior. - e2eTestJobs := m.cache.JobsByMatcher(func(js job.JobState) bool { - return job.IsActiveJob(js) && (js.Type == job.JobType_TestE2E) - }) - processAnchorJobs = len(e2eTestJobs) > 0 - } else { + // Check for any deployment jobs - first for forcible deployments, then for regular deployments. Only look + // at the remaining jobs if no deployments were kicked off. + if !m.processForceDeployJobs(dequeuedJobs) && !m.processDeployJobs(dequeuedJobs) { m.processTestJobs(dequeuedJobs) m.processWorkflowJobs(dequeuedJobs) } } - // If no deploy jobs were launched, process pending anchor jobs. We don't want to hold on to anchor jobs queued - // behind deploys because tests need anchors to run, and deploys can't run till tests complete. - // - // Test and anchor jobs can run in parallel, so process pending anchor jobs even if tests were started above. - if processAnchorJobs { - m.processAnchorJobs(dequeuedJobs) - } + // Anchor jobs can be run independently of deployments and do not need any exclusion rules + m.processAnchorJobs(dequeuedJobs) } // Wait for all of this iteration's job advancement goroutines to finish before we iterate again. The ticker will // automatically drop ticks then pick back up later if a round of processing takes longer than 1 tick. @@ -292,8 +260,11 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool { } func (m *JobManager) processDeployJobs(dequeuedJobs []job.JobState) bool { - // Check if there are any jobs in progress - if len(m.cache.JobsByMatcher(job.IsActiveJob)) == 0 { + // Check if there are any (non-anchor) jobs in progress + activeNonAnchorJobs := m.cache.JobsByMatcher(func(js job.JobState) bool { + return job.IsActiveJob(js) && (js.Type != job.JobType_Anchor) + }) + if len(activeNonAnchorJobs) == 0 { // We know the first job is a deploy, so pick out the component for that job, collapse as many back-to-back jobs // as possible for that component, then run the final job. deployJob := dequeuedJobs[0] @@ -323,13 +294,7 @@ func (m *JobManager) processDeployJobs(dequeuedJobs []job.JobState) bool { } func (m *JobManager) processAnchorJobs(dequeuedJobs []job.JobState) bool { - // Check if there are any deploy jobs in progress - if len(m.getActiveDeploys()) == 0 { - return m.processVxAnchorJobs(dequeuedJobs, true) || m.processVxAnchorJobs(dequeuedJobs, false) - } else { - log.Printf("processAnchorJobs: deployment in progress") - } - return false + return m.processVxAnchorJobs(dequeuedJobs, true) || m.processVxAnchorJobs(dequeuedJobs, false) } func (m *JobManager) processVxAnchorJobs(dequeuedJobs []job.JobState, processV5Jobs bool) bool {