Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 16 additions & 19 deletions services/actions/job_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,21 +151,28 @@ func findBlockedRunByConcurrency(ctx context.Context, repoID int64, concurrencyG
func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs []*actions_model.ActionRunJob, err error) {
checkedConcurrencyGroup := make(container.Set[string])

// check run (workflow-level) concurrency
if run.ConcurrencyGroup != "" {
concurrentRun, err := findBlockedRunByConcurrency(ctx, run.RepoID, run.ConcurrencyGroup)
collect := func(concurrencyGroup string) error {
concurrentRun, err := findBlockedRunByConcurrency(ctx, run.RepoID, concurrencyGroup)
if err != nil {
return nil, nil, fmt.Errorf("find blocked run by concurrency: %w", err)
return fmt.Errorf("find blocked run by concurrency: %w", err)
}
if concurrentRun != nil && !concurrentRun.NeedApproval {
js, ujs, err := checkJobsOfRun(ctx, concurrentRun)
if err != nil {
return nil, nil, err
return err
}
jobs = append(jobs, js...)
updatedJobs = append(updatedJobs, ujs...)
}
checkedConcurrencyGroup.Add(run.ConcurrencyGroup)
checkedConcurrencyGroup.Add(concurrencyGroup)
return nil
}

// check run (workflow-level) concurrency
if run.ConcurrencyGroup != "" {
if err := collect(run.ConcurrencyGroup); err != nil {
return nil, nil, err
}
}

// check job concurrency
Expand All @@ -177,22 +184,12 @@ func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (job
if !job.Status.IsDone() {
continue
}
if job.ConcurrencyGroup == "" && checkedConcurrencyGroup.Contains(job.ConcurrencyGroup) {
if job.ConcurrencyGroup == "" || checkedConcurrencyGroup.Contains(job.ConcurrencyGroup) {
continue
}
concurrentRun, err := findBlockedRunByConcurrency(ctx, job.RepoID, job.ConcurrencyGroup)
if err != nil {
return nil, nil, fmt.Errorf("find blocked run by concurrency: %w", err)
}
if concurrentRun != nil && !concurrentRun.NeedApproval {
js, ujs, err := checkJobsOfRun(ctx, concurrentRun)
if err != nil {
return nil, nil, err
}
jobs = append(jobs, js...)
updatedJobs = append(updatedJobs, ujs...)
if err := collect(job.ConcurrencyGroup); err != nil {
return nil, nil, err
}
checkedConcurrencyGroup.Add(job.ConcurrencyGroup)
}
return jobs, updatedJobs, nil
}
Expand Down
67 changes: 67 additions & 0 deletions services/actions/job_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"

actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unittest"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -134,3 +136,68 @@ jobs:
})
}
}

// Test_checkRunConcurrency_NoDuplicateConcurrencyGroupCheck verifies that when a run's
// ConcurrencyGroup has already been checked at the run level, the same group is not
// re-checked for individual jobs.
func Test_checkRunConcurrency_NoDuplicateConcurrencyGroupCheck(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
ctx := t.Context()

// Run A: the triggering run with a concurrency group.
runA := &actions_model.ActionRun{
RepoID: 4,
OwnerID: 1,
TriggerUserID: 1,
WorkflowID: "test.yml",
Index: 9901,
Ref: "refs/heads/main",
Status: actions_model.StatusRunning,
ConcurrencyGroup: "test-cg",
}
assert.NoError(t, db.Insert(ctx, runA))

// A done job for run A with the same ConcurrencyGroup.
// This triggers the job-level concurrency check in checkRunConcurrency.
jobADone := &actions_model.ActionRunJob{
RunID: runA.ID,
RepoID: 4,
OwnerID: 1,
JobID: "job1",
Name: "job1",
Status: actions_model.StatusSuccess,
ConcurrencyGroup: "test-cg",
}
assert.NoError(t, db.Insert(ctx, jobADone))

// Blocked run B competing for the same concurrency group.
runB := &actions_model.ActionRun{
RepoID: 4,
OwnerID: 1,
TriggerUserID: 1,
WorkflowID: "test.yml",
Index: 9902,
Ref: "refs/heads/main",
Status: actions_model.StatusBlocked,
ConcurrencyGroup: "test-cg",
}
assert.NoError(t, db.Insert(ctx, runB))

// A blocked job belonging to run B (no job-level concurrency group).
jobBBlocked := &actions_model.ActionRunJob{
RunID: runB.ID,
RepoID: 4,
OwnerID: 1,
JobID: "job1",
Name: "job1",
Status: actions_model.StatusBlocked,
}
assert.NoError(t, db.Insert(ctx, jobBBlocked))

jobs, _, err := checkRunConcurrency(ctx, runA)
assert.NoError(t, err)

if assert.Len(t, jobs, 1) {
assert.Equal(t, jobBBlocked.ID, jobs[0].ID)
}
}