diff --git a/worker/executor.go b/worker/executor.go index 5c879c74f3a..bfa1546e772 100644 --- a/worker/executor.go +++ b/worker/executor.go @@ -225,6 +225,7 @@ func (e *executor) processMutationCh(ctx context.Context, ch chan *subMutation) graph: g, } + isDependent := false g.Lock() for c := range conflicts { l, ok := g.conflicts[c] @@ -236,6 +237,7 @@ func (e *executor) processMutationCh(ctx context.Context, ch chan *subMutation) for _, dependent := range l { _, ok := dependent.dependentMutations[m.sm.startTs] if !ok { + isDependent = true atomic.AddInt64(&m.inDeg, 1) dependent.dependentMutations[m.sm.startTs] = m } @@ -246,7 +248,10 @@ func (e *executor) processMutationCh(ctx context.Context, ch chan *subMutation) } g.Unlock() - if atomic.LoadInt64(&m.inDeg) == 0 { + // If this mutation doesn't depend on any other mutation then process it right now. + // Otherwise, don't process it. It will be called for processing when the last mutation on + // which it depends is completed. + if !isDependent { x.Check(e.throttle.Do()) go e.worker(m) }