Skip to content

Commit

Permalink
Fix panic for sending on a closed channel (#5479)
Browse files Browse the repository at this point in the history
  • Loading branch information
parasssh authored May 20, 2020
1 parent b16b611 commit 1e2f0e7
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions worker/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,14 @@ func (e *executor) shutdown() {
}
}

func (e *executor) getChannel(pred string) (ch chan *subMutation) {
e.RLock()
// getChannelUnderLock obtains the channel for the given pred. It must be called under e.Lock().
func (e *executor) getChannelUnderLock(pred string) (ch chan *subMutation) {
ch, ok := e.predChan[pred]
e.RUnlock()
if ok {
return ch
}

// Create a new channel for `pred`.
e.Lock()
defer e.Unlock()
ch, ok = e.predChan[pred]
if ok {
return ch
Expand All @@ -123,7 +120,17 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir
payload.edges = append(payload.edges, edge)
}

for attr, payload := range payloadMap {
e.getChannel(attr) <- payload
// Lock() in case the channel gets closed from underneath us.
e.Lock()
defer e.Unlock()
select {
case <-e.closer.HasBeenClosed():
return
default:
// Closer is not closed. And we have the Lock, so sending on channel should be safe.
for attr, payload := range payloadMap {
e.getChannelUnderLock(attr) <- payload
}
}

}

0 comments on commit 1e2f0e7

Please sign in to comment.