Skip to content

Commit

Permalink
rc/job: use mutex for adding listeners thread safety
Browse files Browse the repository at this point in the history
Fix in extreme cases, when the job is executing finish(), the listener added by calling OnFinish() will never be executed.

This change should not cause compatibility issues, as consumers should not make assumptions about whether listeners will be run in a new goroutine
  • Loading branch information
hayden-pan authored and ncw committed Dec 15, 2024
1 parent 19f4580 commit caac95f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 8 deletions.
12 changes: 4 additions & 8 deletions fs/rc/jobs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ func (job *Job) finish(out rc.Params, err error) {
running.kickExpire() // make sure this job gets expired
}

func (job *Job) addListener(fn *func()) {
job.mu.Lock()
defer job.mu.Unlock()
job.listeners = append(job.listeners, fn)
}

func (job *Job) removeListener(fn *func()) {
job.mu.Lock()
defer job.mu.Unlock()
Expand All @@ -94,10 +88,12 @@ func (job *Job) removeListener(fn *func()) {
// OnFinish adds listener to job that will be triggered when job is finished.
// It returns a function to cancel listening.
func (job *Job) OnFinish(fn func()) func() {
job.mu.Lock()
defer job.mu.Unlock()
if job.Finished {
fn()
go fn()
} else {
job.addListener(&fn)
job.listeners = append(job.listeners, &fn)
}
return func() { job.removeListener(&fn) }
}
Expand Down
49 changes: 49 additions & 0 deletions fs/rc/jobs/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,3 +554,52 @@ func TestOnFinishAlreadyFinished(t *testing.T) {
t.Fatal("Timeout waiting for OnFinish to fire")
}
}

func TestOnFinishDataRace(t *testing.T) {
jobID.Store(0)
job, _, err := NewJob(context.Background(), ctxFn, rc.Params{"_async": true})
assert.NoError(t, err)
var expect, got uint64
finished := make(chan struct{})
stop, stopped := make(chan struct{}), make(chan struct{})
go func() {
Loop:
for {
select {
case <-stop:
break Loop
default:
_, err := OnFinish(job.ID, func() {
finished <- struct{}{}
})
assert.NoError(t, err)
expect += 1
}
}
close(stopped)
}()

time.Sleep(10 * time.Millisecond)
job.Stop()

// Wait for the first OnFinish to fire
<-finished
got += 1

// Stop the OnFinish producer
close(stop)
<-stopped

timeout := time.After(5 * time.Second)
for {
if got == expect {
break
}
select {
case <-finished:
got += 1
case <-timeout:
t.Fatal("Timeout waiting for all OnFinish calls to fire")
}
}
}

0 comments on commit caac95f

Please sign in to comment.