Skip to content

Commit

Permalink
Reconnect via a redial in case of disconnection. (#7918) (#7921)
Browse files Browse the repository at this point in the history
In case the heartbeats in a connection pool stop, try to re-establish the connection via a redial. That's a more robust way than waiting for current connection to become usable again and allows much faster recovery from network partitions.

(cherry picked from commit 947a62b)

Co-authored-by: Manish R Jain <[email protected]>
  • Loading branch information
danielmai and manishrjain authored Jun 22, 2021
1 parent b057949 commit 6e103fb
Showing 1 changed file with 86 additions and 27 deletions.
113 changes: 86 additions & 27 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package conn
import (
"context"
"crypto/tls"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -54,6 +55,7 @@ type Pool struct {
Addr string
closer *z.Closer
healthInfo pb.HealthInfo
dialOpts []grpc.DialOption
}

// Pools manages a concurrency-safe set of Pool.
Expand Down Expand Up @@ -127,7 +129,7 @@ func (p *Pools) remove(addr string) {
if !ok {
return
}
glog.Warningf("DISCONNECTING from %s\n", addr)
glog.Warningf("CONN: Disconnecting from %s\n", addr)
delete(p.all, addr)
pool.shutdown()
}
Expand All @@ -148,7 +150,7 @@ func (p *Pools) Connect(addr string, tlsClientConf *tls.Config) *Pool {

pool, err := newPool(addr, tlsClientConf)
if err != nil {
glog.Errorf("Unable to connect to host: %s", addr)
glog.Errorf("CONN: Unable to connect to host: %s", addr)
return nil
}

Expand All @@ -159,10 +161,9 @@ func (p *Pools) Connect(addr string, tlsClientConf *tls.Config) *Pool {
go pool.shutdown() // Not being used, so release the resources.
return existingPool
}
glog.Infof("CONNECTING to %s\n", addr)
glog.Infof("CONN: Connecting to %s\n", addr)
p.all[addr] = pool
return pool

}

// newPool creates a new "pool" with one gRPC connection, refcount 0.
Expand All @@ -188,7 +189,13 @@ func newPool(addr string, tlsClientConf *tls.Config) (*Pool, error) {
return nil, err
}

pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: z.NewCloser(1)}
pl := &Pool{
conn: conn,
Addr: addr,
lastEcho: time.Now(),
dialOpts: conOpts,
closer: z.NewCloser(1),
}
go pl.MonitorHealth()
return pl, nil
}
Expand All @@ -201,7 +208,7 @@ func (p *Pool) Get() *grpc.ClientConn {
}

func (p *Pool) shutdown() {
glog.Warningf("Shutting down extra connection to %s", p.Addr)
glog.Warningf("CONN: Shutting down extra connection to %s", p.Addr)
p.closer.SignalAndWait()
if err := p.conn.Close(); err != nil {
glog.Warningf("Could not close pool connection with error: %s", err)
Expand All @@ -219,7 +226,7 @@ func (p *Pool) listenToHeartbeat() error {
conn := p.Get()
c := pb.NewRaftClient(conn)

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(p.closer.Ctx())
defer cancel()

s, err := c.Heartbeat(ctx, &api.Payload{})
Expand All @@ -228,45 +235,97 @@ func (p *Pool) listenToHeartbeat() error {
}

go func() {
select {
case <-ctx.Done():
case <-p.closer.HasBeenClosed():
cancel()
for {
res, err := s.Recv()
if err != nil || res == nil {
cancel()
return
}

// We do this periodic stream receive based approach to defend against network partitions.
p.Lock()
p.lastEcho = time.Now()
p.healthInfo = *res
p.Unlock()
}
}()

// This loop can block indefinitely as long as it keeps on receiving pings back.
threshold := time.Now().Add(10 * time.Second)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
res, err := s.Recv()
if err != nil || res == nil {
return err
}
select {
case <-ticker.C:
// Don't check before at least 10s since start.
if time.Now().Before(threshold) {
continue
}
p.RLock()
lastEcho := p.lastEcho
p.RUnlock()
if dur := time.Since(lastEcho); dur > 30*time.Second {
glog.Warningf("CONN: No echo to %s for %s. Cancelling connection heartbeats.\n",
p.Addr, dur.Round(time.Second))
cancel()
return fmt.Errorf("too long since last echo")
}

// We do this periodic stream receive based approach to defend against network partitions.
p.Lock()
p.lastEcho = time.Now()
p.healthInfo = *res
p.Unlock()
case <-s.Context().Done():
return s.Context().Err()
case <-ctx.Done():
return ctx.Err()
case <-p.closer.HasBeenClosed():
cancel()
return p.closer.Ctx().Err()
}
}
}

// MonitorHealth monitors the health of the connection via Echo. This function blocks forever.
func (p *Pool) MonitorHealth() {
defer p.closer.Done()

var lastErr error
// We might have lost connection to the destination. In that case, re-dial
// the connection.
reconnect := func() {
for {
time.Sleep(time.Second)
if err := p.closer.Ctx().Err(); err != nil {
return
}
ctx, cancel := context.WithTimeout(p.closer.Ctx(), 10*time.Second)
conn, err := grpc.DialContext(ctx, p.Addr, p.dialOpts...)
if err == nil {
// Make a dummy request to test out the connection.
client := pb.NewRaftClient(conn)
_, err = client.IsPeer(ctx, &pb.RaftContext{})
}
cancel()
if err == nil {
p.Lock()
p.conn.Close()
p.conn = conn
p.Unlock()
return
}
glog.Errorf("CONN: Unable to connect with %s : %s\n", p.Addr, err)
if conn != nil {
conn.Close()
}
}
}

for {
select {
case <-p.closer.HasBeenClosed():
glog.Infof("CONN: Returning from MonitorHealth for %s", p.Addr)
return
default:
err := p.listenToHeartbeat()
if lastErr != nil && err == nil {
glog.Infof("Connection re-established with %v\n", p.Addr)
} else if err != nil && lastErr == nil {
glog.Warningf("Connection lost with %v. Error: %v\n", p.Addr, err)
if err != nil {
reconnect()
glog.Infof("CONN: Re-established connection with %s.\n", p.Addr)
}
lastErr = err
// Sleep for a bit before retrying.
time.Sleep(echoDuration)
}
Expand Down

0 comments on commit 6e103fb

Please sign in to comment.