Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown extra connections #3280

Merged
merged 3 commits into from
Apr 11, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

"github.com/dgraph-io/badger/y"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
Expand All @@ -47,7 +48,7 @@ type Pool struct {

lastEcho time.Time
Addr string
ticker *time.Ticker
closer *y.Closer
}

type Pools struct {
Expand Down Expand Up @@ -124,7 +125,7 @@ func (p *Pools) Connect(addr string) *Pool {
return existingPool
}

pool, err := NewPool(addr)
pool, err := newPool(addr)
if err != nil {
glog.Errorf("Unable to connect to host: %s", addr)
return nil
Expand All @@ -134,15 +135,16 @@ func (p *Pools) Connect(addr string) *Pool {
defer p.Unlock()
existingPool, has = p.all[addr]
if has {
go pool.shutdown() // Not being used, so release the resources.
return existingPool
}
glog.Infof("CONNECTED to %v\n", addr)
p.all[addr] = pool
return pool
}

// NewPool creates a new "pool" with one gRPC connection, refcount 0.
func NewPool(addr string) (*Pool, error) {
// newPool creates a new "pool" with one gRPC connection, refcount 0.
func newPool(addr string) (*Pool, error) {
conn, err := grpc.Dial(addr,
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithDefaultCallOptions(
Expand All @@ -153,10 +155,7 @@ func NewPool(addr string) (*Pool, error) {
if err != nil {
return nil, err
}
pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now()}

// Initialize ticker before running monitor health.
pl.ticker = time.NewTicker(echoDuration)
pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: y.NewCloser(1)}
go pl.MonitorHealth()
return pl, nil
}
Expand All @@ -169,7 +168,8 @@ func (p *Pool) Get() *grpc.ClientConn {
}

func (p *Pool) shutdown() {
p.ticker.Stop()
glog.Warningf("Shutting down extra connection to %s", p.Addr)
p.closer.SignalAndWait()
p.conn.Close()
}

Expand All @@ -191,6 +191,15 @@ func (p *Pool) listenToHeartbeat() error {
return err
}

go func() {
select {
case <-ctx.Done():
case <-p.closer.HasBeenClosed():
cancel()
}
}()

// This loop can block indefinitely as long as it keeps on receiving pings back.
for {
_, err := stream.Recv()
if err != nil {
Expand All @@ -205,17 +214,24 @@ func (p *Pool) listenToHeartbeat() error {

// MonitorHealth monitors the health of the connection via Echo. This function blocks forever.
func (p *Pool) MonitorHealth() {
defer p.closer.Done()

var lastErr error
for range p.ticker.C {
err := p.listenToHeartbeat()
if lastErr != nil && err == nil {
glog.Infof("Connection established with %v\n", p.Addr)
} else if err != nil && lastErr == nil {
glog.Warningf("Connection lost with %v. Error: %v\n", p.Addr, err)
for {
select {
case <-p.closer.HasBeenClosed():
return
default:
err := p.listenToHeartbeat()
if lastErr != nil && err == nil {
glog.Infof("Connection established with %v\n", p.Addr)
} else if err != nil && lastErr == nil {
glog.Warningf("Connection lost with %v. Error: %v\n", p.Addr, err)
}
lastErr = err
// Sleep for a bit before retrying.
time.Sleep(echoDuration)
}
lastErr = err
// Sleep for a bit before retrying.
time.Sleep(echoDuration)
}
}

Expand Down