From 1e2f0e72d7c68d5b703faccb32a4a3a84d56b8e2 Mon Sep 17 00:00:00 2001 From: parasssh Date: Wed, 20 May 2020 10:41:50 -0700 Subject: [PATCH] Fix panic for sending on a closed channel (#5479) --- worker/executor.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/worker/executor.go b/worker/executor.go index 6e0d27175c6..0c3bc11c236 100644 --- a/worker/executor.go +++ b/worker/executor.go @@ -86,17 +86,14 @@ func (e *executor) shutdown() { } } -func (e *executor) getChannel(pred string) (ch chan *subMutation) { - e.RLock() +// getChannelUnderLock obtains the channel for the given pred. It must be called under e.Lock(). +func (e *executor) getChannelUnderLock(pred string) (ch chan *subMutation) { ch, ok := e.predChan[pred] - e.RUnlock() if ok { return ch } // Create a new channel for `pred`. - e.Lock() - defer e.Unlock() ch, ok = e.predChan[pred] if ok { return ch @@ -123,7 +120,17 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir payload.edges = append(payload.edges, edge) } - for attr, payload := range payloadMap { - e.getChannel(attr) <- payload + // Lock() in case the channel gets closed from underneath us. + e.Lock() + defer e.Unlock() + select { + case <-e.closer.HasBeenClosed(): + return + default: + // Closer is not closed. And we have the Lock, so sending on channel should be safe. + for attr, payload := range payloadMap { + e.getChannelUnderLock(attr) <- payload + } } + }