From 309e2d8f1f476a485f30769c9e6cbb73852141fc Mon Sep 17 00:00:00 2001 From: nghialv Date: Thu, 6 Jul 2017 17:04:13 +0900 Subject: [PATCH] fix deadlock of roundrobin balancer --- balancer.go | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/balancer.go b/balancer.go index 2acc8827ba66..0dfe05633405 100644 --- a/balancer.go +++ b/balancer.go @@ -144,14 +144,15 @@ type addrInfo struct { } type roundRobin struct { - r naming.Resolver - w naming.Watcher - addrs []*addrInfo // all the addresses the client should potentially connect - mu sync.Mutex - addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to. - next int // index of the next address to return for Get() - waitCh chan struct{} // the channel to block when there is no connected address available - done bool // The Balancer is closed. + r naming.Resolver + w naming.Watcher + addrs []*addrInfo // all the addresses the client should potentially connect + mu sync.Mutex + addrChMu sync.Mutex // protects addrCh + addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to. + next int // index of the next address to return for Get() + waitCh chan struct{} // the channel to block when there is no connected address available + done bool // The Balancer is closed. } func (rr *roundRobin) watchAddrUpdates() error { @@ -160,8 +161,10 @@ func (rr *roundRobin) watchAddrUpdates() error { grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err) return err } + rr.addrChMu.Lock() + defer rr.addrChMu.Unlock() rr.mu.Lock() - defer rr.mu.Unlock() + for _, update := range updates { addr := Address{ Addr: update.Addr, @@ -199,9 +202,15 @@ func (rr *roundRobin) watchAddrUpdates() error { open[i] = v.addr } if rr.done { + rr.mu.Unlock() return ErrClientConnClosing } - rr.addrCh <- open + done := rr.done + rr.mu.Unlock() + + if !done { + rr.addrCh <- open + } return nil } @@ -373,8 +382,11 @@ func (rr *roundRobin) Notify() <-chan []Address { } func (rr *roundRobin) Close() error { + rr.addrChMu.Lock() + defer rr.addrChMu.Unlock() rr.mu.Lock() defer rr.mu.Unlock() + if rr.done { return errBalancerClosed }