From ce1158a32574e6ba50f165518a36bd074447639c Mon Sep 17 00:00:00 2001 From: Zhao vistart Date: Fri, 14 Jun 2024 16:09:51 +0800 Subject: [PATCH] [standard]fix: data race of task result sent to result channel while result is being assigned when context done --- workflow/standard/cache/interface.go | 30 ++++++++---------------- workflow/standard/worker/pool.go | 34 ++++++++++++++-------------- 2 files changed, 27 insertions(+), 37 deletions(-) diff --git a/workflow/standard/cache/interface.go b/workflow/standard/cache/interface.go index 36f2ae1..bce7a1b 100644 --- a/workflow/standard/cache/interface.go +++ b/workflow/standard/cache/interface.go @@ -17,47 +17,37 @@ type Interface interface { // Get retrieves the value of the specified key from the cache. // // Parameter - // - // - key: an object implementing the KeyGetter interface to retrieve the key to get. + // - key: an object implementing the KeyGetter interface to retrieve the key to get. // // Returns - // - // - any: the retrieved value, you may need to check if it is nil. - // - // - error: if an error occurs, it returns the corresponding error message. + // - any: the retrieved value, you may need to check if it is nil. + // - error: if an error occurs, it returns the corresponding error message. Get(key KeyGetter) (any, error) // Set sets the value of the specified key in the cache. // // Parameter - // - // - key: an object implementing the KeyGetter interface to retrieve the key to set. - // - // - value: the value to set, passed as an empty interface. - // - // - options: optional options to set cache items. + // - key: an object implementing the KeyGetter interface to retrieve the key to set. + // - value: the value to set, passed as an empty interface. + // - options: optional options to set cache items. // // Returns - // - // - error: if an error occurs, it returns the corresponding error message. + // - error: if an error occurs, it returns the corresponding error message. Set(key KeyGetter, value any, options ...ItemOption) error // Delete deletes the value of the specified key from the cache. // // Parameter - // - // - key: an object implementing the KeyGetter interface to retrieve the key to delete. + // - key: an object implementing the KeyGetter interface to retrieve the key to delete. // // Returns - // - // - error: if an error occurs, it returns the corresponding error message. + // - error: if an error occurs, it returns the corresponding error message. Delete(key KeyGetter) error // Clear clears the cache, deleting all cache items. // // Returns - // - // - error: if an error occurs, it returns the corresponding error message. + // - error: if an error occurs, it returns the corresponding error message. Clear() error } diff --git a/workflow/standard/worker/pool.go b/workflow/standard/worker/pool.go index 9336987..8a0de5c 100644 --- a/workflow/standard/worker/pool.go +++ b/workflow/standard/worker/pool.go @@ -182,7 +182,7 @@ func (p *pool) startWorker() string { // Execute the task asynchronously done := make(chan struct{}) - go func(done chan struct{}, channels *workerChannels) { + go func() { for { select { case <-done: @@ -197,12 +197,12 @@ func (p *pool) startWorker() string { return } } - }(done, channels) - go func(done chan struct{}) { + }() + go func() { defer close(done) result.Result, err = taskWithArgs.task.Execute(taskWithArgs.ctx, taskWithArgs.args...) result.Err = err - }(done) + }() // Wait for the task to complete or the context to be cancelled delta := metricsUpdate{deltaWorking: -1} @@ -213,6 +213,8 @@ func (p *pool) startWorker() string { // Task was cancelled delta.deltaCanceled = 1 waiting = false + // Send the empty result back to the submitter + taskWithArgs.resultChan <- TaskResult{} case <-done: // Task completed if err != nil { @@ -225,12 +227,11 @@ func (p *pool) startWorker() string { delta.deltaSuccessful = 1 } waiting = false + // Send the task result back to the submitter + taskWithArgs.resultChan <- result } } p.metricsChan <- delta - - // Send the result back to the submitter - taskWithArgs.resultChan <- result close(taskWithArgs.resultChan) // Close the resultChan after sending the result case <-channels.cancel: continue @@ -317,9 +318,9 @@ func (p *pool) getWorkerID() string { // the order in which tasks finish. // // Parameters: -// - ctx: The context used to derive a new context passed to the task's Execute method. -// - task: The task to be executed. -// - args: Additional arguments to be passed to the task. +// - ctx: The context used to derive a new context passed to the task's Execute method. +// - task: The task to be executed. +// - args: Additional arguments to be passed to the task. // // Returns a channel that will receive the TaskResult upon completion. func (p *pool) Submit(ctx context.Context, task Task, args ...interface{}) <-chan TaskResult { @@ -355,8 +356,8 @@ func (p *pool) Submit(ctx context.Context, task Task, args ...interface{}) <-cha // notification will be sent. // // Parameters: -// - id: The ID of the worker to be exited. -// - stopWorker: A boolean flag indicating whether to cancel the task's context. +// - id: The ID of the worker to be exited. +// - stopWorker: A boolean flag indicating whether to cancel the task's context. // // If the specified ID exists, the worker will be exited immediately. Subsequent calls to this method // with the same ID will return an error indicating that the worker ID does not exist, even if the @@ -404,8 +405,8 @@ func (p *pool) StopWorkerByID(id string) error { // // This method changes the ID of a worker to a new ID. // -// oldID: The current ID of the worker. -// newID: The new ID to assign to the worker. +// - oldID: The current ID of the worker. +// - newID: The new ID to assign to the worker. // // Returns an error if the old ID is not found or the new ID already exists. func (p *pool) RenameWorker(oldID, newID string) error { @@ -475,12 +476,11 @@ func (p *pool) handleMetricsUpdates() { continue } - p.mutex.Lock() metrics, ok := p.metrics.(*metrics) if !ok { - p.mutex.Unlock() continue } + metrics.Lock() metrics.data.CurrentCapacity += update.deltaCapacity metrics.data.WorkingWorkers += update.deltaWorking metrics.data.IdleWorkers = metrics.data.CurrentCapacity - metrics.data.WorkingWorkers @@ -490,6 +490,6 @@ func (p *pool) handleMetricsUpdates() { metrics.data.FailedTasks += update.deltaFailed metrics.data.CanceledTasks += update.deltaCanceled metrics.data.TotalCompletedTasks += update.deltaSuccessful + update.deltaFailed + update.deltaCanceled - p.mutex.Unlock() + metrics.Unlock() } }