From 76f929a83e3737f3a971613bb8fa0b6376db2e81 Mon Sep 17 00:00:00 2001 From: Ahsan Barkati Date: Mon, 11 Jan 2021 12:32:35 +0530 Subject: [PATCH] Fix logical race in executor --- worker/executor.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/worker/executor.go b/worker/executor.go index 5c879c74f3a..bfa1546e772 100644 --- a/worker/executor.go +++ b/worker/executor.go @@ -225,6 +225,7 @@ func (e *executor) processMutationCh(ctx context.Context, ch chan *subMutation) graph: g, } + isDependent := false g.Lock() for c := range conflicts { l, ok := g.conflicts[c] @@ -236,6 +237,7 @@ func (e *executor) processMutationCh(ctx context.Context, ch chan *subMutation) for _, dependent := range l { _, ok := dependent.dependentMutations[m.sm.startTs] if !ok { + isDependent = true atomic.AddInt64(&m.inDeg, 1) dependent.dependentMutations[m.sm.startTs] = m } @@ -246,7 +248,10 @@ func (e *executor) processMutationCh(ctx context.Context, ch chan *subMutation) } g.Unlock() - if atomic.LoadInt64(&m.inDeg) == 0 { + // If this mutation doesn't depend on any other mutation then process it right now. + // Otherwise, don't process it. It will be called for processing when the last mutation on + // which it depends is completed. + if !isDependent { x.Check(e.throttle.Do()) go e.worker(m) }