From 51b14a290839bb9aab3a470594227352d8d2fca1 Mon Sep 17 00:00:00 2001 From: Ahsan Barkati Date: Thu, 24 Dec 2020 15:18:32 +0530 Subject: [PATCH] fix(ludicrous): Fix data race in executor (#7203) The operations on inDeg field of mutation struct should be done atomically because it is being used to check if a mutation in dependent on any other mutation. And the mutations are run concurrently, so this update/read on inDeg should be protected. --- worker/executor.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/worker/executor.go b/worker/executor.go index 108454a1931..5c879c74f3a 100644 --- a/worker/executor.go +++ b/worker/executor.go @@ -130,9 +130,9 @@ func generateConflictKeys(ctx context.Context, p *subMutation) map[uint64]struct } type mutation struct { + inDeg int64 sm *subMutation conflictKeys map[uint64]struct{} - inDeg int dependentMutations map[uint64]*mutation graph *graph } @@ -185,8 +185,7 @@ func (e *executor) worker(mut *mutation) { // Decrease inDeg of dependents. If this mutation unblocks them, queue them. for _, dependent := range mut.dependentMutations { - dependent.inDeg -= 1 - if dependent.inDeg == 0 { + if atomic.AddInt64(&dependent.inDeg, -1) == 0 { x.Check(e.throttle.Do()) go e.worker(dependent) } @@ -224,7 +223,6 @@ func (e *executor) processMutationCh(ctx context.Context, ch chan *subMutation) conflictKeys: conflicts, dependentMutations: make(map[uint64]*mutation), graph: g, - inDeg: 0, } g.Lock() @@ -238,7 +236,7 @@ func (e *executor) processMutationCh(ctx context.Context, ch chan *subMutation) for _, dependent := range l { _, ok := dependent.dependentMutations[m.sm.startTs] if !ok { - m.inDeg += 1 + atomic.AddInt64(&m.inDeg, 1) dependent.dependentMutations[m.sm.startTs] = m } } @@ -248,7 +246,7 @@ func (e *executor) processMutationCh(ctx context.Context, ch chan *subMutation) } g.Unlock() - if m.inDeg == 0 { + if atomic.LoadInt64(&m.inDeg) == 0 { x.Check(e.throttle.Do()) go e.worker(m) }