Skip to content

Commit

Permalink
fix: serialize all workflow jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Oct 16, 2024
1 parent 0a254f4 commit dff7635
Showing 1 changed file with 10 additions and 31 deletions.
41 changes: 10 additions & 31 deletions cd/manager/jobmanager/jobManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool {
}

func (m *JobManager) processDeployJobs(dequeuedJobs []job.JobState) bool {
// Check if there are any (non-anchor) jobs in progress
// Check if there are any non-anchor jobs in progress. Deployments can run in parallel with anchor jobs but not with
// any other jobs.
activeNonAnchorJobs := m.getActiveNonAnchorJobs()
if len(activeNonAnchorJobs) == 0 {
// We know the first job is a deploy, so pick out the component for that job, collapse as many back-to-back jobs
Expand Down Expand Up @@ -355,7 +356,7 @@ func (m *JobManager) processTestJobs(dequeuedJobs []job.JobState) bool {
if len(m.getActiveDeploys()) == 0 {
// - Collapse all smoke tests between deployments into a single run
// - Collapse all E2E tests between deployments into a single run
dequeuedTests := make(map[job.JobType]job.JobState, 0)
dequeuedTests := make(map[job.JobType]job.JobState)
for _, dequeuedJob := range dequeuedJobs {
// Break out of the loop as soon as we find a deploy job so that we don't collapse test jobs across deploys.
if dequeuedJob.Type == job.JobType_Deploy {
Expand All @@ -382,39 +383,17 @@ func (m *JobManager) processTestJobs(dequeuedJobs []job.JobState) bool {
}

func (m *JobManager) processWorkflowJobs(dequeuedJobs []job.JobState) bool {
// Check if there are any deploy jobs in progress
if len(m.getActiveDeploys()) == 0 {
dequeuedWorkflows := make([]job.JobState, 0)
// Do not collapse back-to-back workflow jobs because they could be pointing to different actual workflows
// Check if there are any non-anchor jobs in progress. Workflows can run in parallel with anchor jobs but not with
// any other jobs.
if len(m.getActiveNonAnchorJobs()) == 0 {
for _, dequeuedJob := range dequeuedJobs {
// Break out of the loop as soon as we find a deploy job so that we don't collapse workflow jobs across
// deploys.
if dequeuedJob.Type == job.JobType_Deploy {
break
} else if dequeuedJob.Type == job.JobType_Workflow {
if workflow, err := job.CreateWorkflowJob(dequeuedJob); err != nil {
if err = m.updateJobStage(dequeuedJob, job.JobStage_Failed, err); err != nil {
log.Printf("processWorkflowJobs: job update failed: %v, %s", err, manager.PrintJob(dequeuedJob))
}
} else if workflow.IsType(job.WorkflowJobLabel_Deploy) {
// If we haven't accumulated any workflow jobs yet, this is the first job in the list. If there were
// no other active jobs, queue it and break from the loop because we can't run any other jobs in
// parallel with deployments.
if (len(dequeuedWorkflows) == 0) && (len(m.getActiveNonAnchorJobs()) == 0) {
dequeuedWorkflows = append(dequeuedWorkflows, dequeuedJob)
}
// If there were some accumulated jobs by this point, we know they must be test jobs. We'll still
// break out of the loop here so that we don't run test jobs across deploy jobs.
break
} else {
dequeuedWorkflows = append(dequeuedWorkflows, dequeuedJob)
}
if dequeuedJob.Type == job.JobType_Workflow {
m.advanceJob(dequeuedJob)
return true
}
}
m.advanceJobs(dequeuedWorkflows)
return len(dequeuedWorkflows) > 0
} else {
log.Printf("processWorkflowJobs: deployment in progress")
log.Printf("processWorkflowJobs: other jobs in progress")
}
return false
}
Expand Down

0 comments on commit dff7635

Please sign in to comment.