Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Shutdown extra connections (hypermodeinc#3280)
Browse files Browse the repository at this point in the history
There is a race condition which allows multiple goroutines to cause connections to be created to other servers via `conn.Connect`. We only store one and the rest just get dropped on the floor. However, they are still ticking and causing pings to the other servers. In a recent incident, we saw thousands of `MonitorHealth` goroutines, which is obviously a huge drain on resources.

This PR shuts down any connection which doesn't make it to the map, releasing resources correctly. Tested on live cluster.
manishrjain authored and dna2github committed Jul 19, 2019
1 parent c321aa1 commit 49b1469
Showing 1 changed file with 34 additions and 18 deletions.
52 changes: 34 additions & 18 deletions conn/pool.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ import (
"sync"
"time"

"github.com/dgraph-io/badger/y"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
@@ -47,7 +48,7 @@ type Pool struct {

lastEcho time.Time
Addr string
ticker *time.Ticker
closer *y.Closer
}

type Pools struct {
@@ -124,7 +125,7 @@ func (p *Pools) Connect(addr string) *Pool {
return existingPool
}

pool, err := NewPool(addr)
pool, err := newPool(addr)
if err != nil {
glog.Errorf("Unable to connect to host: %s", addr)
return nil
@@ -134,15 +135,16 @@ func (p *Pools) Connect(addr string) *Pool {
defer p.Unlock()
existingPool, has = p.all[addr]
if has {
go pool.shutdown() // Not being used, so release the resources.
return existingPool
}
glog.Infof("CONNECTED to %v\n", addr)
p.all[addr] = pool
return pool
}

// NewPool creates a new "pool" with one gRPC connection, refcount 0.
func NewPool(addr string) (*Pool, error) {
// newPool creates a new "pool" with one gRPC connection, refcount 0.
func newPool(addr string) (*Pool, error) {
conn, err := grpc.Dial(addr,
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithDefaultCallOptions(
@@ -153,10 +155,7 @@ func NewPool(addr string) (*Pool, error) {
if err != nil {
return nil, err
}
pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now()}

// Initialize ticker before running monitor health.
pl.ticker = time.NewTicker(echoDuration)
pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: y.NewCloser(1)}
go pl.MonitorHealth()
return pl, nil
}
@@ -169,7 +168,8 @@ func (p *Pool) Get() *grpc.ClientConn {
}

func (p *Pool) shutdown() {
p.ticker.Stop()
glog.Warningf("Shutting down extra connection to %s", p.Addr)
p.closer.SignalAndWait()
p.conn.Close()
}

@@ -191,6 +191,15 @@ func (p *Pool) listenToHeartbeat() error {
return err
}

go func() {
select {
case <-ctx.Done():
case <-p.closer.HasBeenClosed():
cancel()
}
}()

// This loop can block indefinitely as long as it keeps on receiving pings back.
for {
_, err := stream.Recv()
if err != nil {
@@ -205,17 +214,24 @@ func (p *Pool) listenToHeartbeat() error {

// MonitorHealth monitors the health of the connection via Echo. This function blocks forever.
func (p *Pool) MonitorHealth() {
defer p.closer.Done()

var lastErr error
for range p.ticker.C {
err := p.listenToHeartbeat()
if lastErr != nil && err == nil {
glog.Infof("Connection established with %v\n", p.Addr)
} else if err != nil && lastErr == nil {
glog.Warningf("Connection lost with %v. Error: %v\n", p.Addr, err)
for {
select {
case <-p.closer.HasBeenClosed():
return
default:
err := p.listenToHeartbeat()
if lastErr != nil && err == nil {
glog.Infof("Connection established with %v\n", p.Addr)
} else if err != nil && lastErr == nil {
glog.Warningf("Connection lost with %v. Error: %v\n", p.Addr, err)
}
lastErr = err
// Sleep for a bit before retrying.
time.Sleep(echoDuration)
}
lastErr = err
// Sleep for a bit before retrying.
time.Sleep(echoDuration)
}
}

0 comments on commit 49b1469

Please sign in to comment.