diff --git a/clientv3/balancer.go b/clientv3/balancer.go index 6ae047e98419..7f65e82860b4 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -15,9 +15,11 @@ package clientv3 import ( + "fmt" "net/url" "strings" "sync" + "time" "golang.org/x/net/context" "google.golang.org/grpc" @@ -32,8 +34,10 @@ var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address avai // simpleBalancer does the bare minimum to expose multiple eps // to the grpc reconnection code path type simpleBalancer struct { - // addrs are the client's endpoints for grpc - addrs []grpc.Address + keepAlive bool + // addrs are the client's endpoints for grpc, + // mapped to connection activity status + addrs map[grpc.Address]addrConn // notifyCh notifies grpc of the set of addresses for connecting notifyCh chan []grpc.Address @@ -73,9 +77,9 @@ type simpleBalancer struct { func newSimpleBalancer(eps []string) *simpleBalancer { notifyCh := make(chan []grpc.Address, 1) - addrs := make([]grpc.Address, len(eps)) + addrs := make(map[grpc.Address]addrConn, len(eps)) for i := range eps { - addrs[i].Addr = getHost(eps[i]) + addrs[grpc.Address{Addr: getHost(eps[i])}] = addrConn{active: true, last: time.Now()} } sb := &simpleBalancer{ addrs: addrs, @@ -136,9 +140,9 @@ func (b *simpleBalancer) updateAddrs(eps []string) { b.host2ep = np - addrs := make([]grpc.Address, 0, len(eps)) + addrs := make(map[grpc.Address]addrConn, len(eps)) for i := range eps { - addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])}) + addrs[grpc.Address{Addr: getHost(eps[i])}] = addrConn{active: true, last: time.Now()} } b.addrs = addrs @@ -156,8 +160,8 @@ func (b *simpleBalancer) updateAddrs(eps []string) { } } -func hasAddr(addrs []grpc.Address, targetAddr string) bool { - for _, addr := range addrs { +func hasAddr(addrs map[grpc.Address]addrConn, targetAddr string) bool { + for addr := range addrs { if targetAddr == addr.Addr { return true } @@ -165,6 +169,30 @@ func hasAddr(addrs []grpc.Address, targetAddr string) bool { return false } +type addrConn struct { + active bool + last time.Time +} + +func setActive(addrs map[grpc.Address]addrConn, targetAddr string, active bool) (down bool) { + for addr, v := range addrs { + if targetAddr == addr.Addr { + // TODO: configure interval + if !v.active && time.Since(v.last) < time.Minute { + fmt.Println("addr timed out", v.last) + return false + } + ac := addrConn{active: active} + if active { + ac.last = time.Now() + } + addrs[addr] = ac + return true + } + } + return false +} + func (b *simpleBalancer) updateNotifyLoop() { defer close(b.donec) @@ -221,7 +249,14 @@ func (b *simpleBalancer) updateNotifyLoop() { func (b *simpleBalancer) notifyAddrs() { b.mu.RLock() - addrs := b.addrs + multi := len(b.addrs) > 1 // if single, retry the only endpoint + addrs := make([]grpc.Address, 0, len(b.addrs)) + for addr, ac := range b.addrs { + if b.keepAlive && multi && !ac.active { + continue + } + addrs = append(addrs, addr) + } b.mu.RUnlock() select { case b.notifyCh <- addrs: @@ -229,6 +264,9 @@ func (b *simpleBalancer) notifyAddrs() { } } +// Up is called by gRPC client after address connection state +// becomes connectivity.Ready. This is after HTTP/2 client +// establishes the transport. func (b *simpleBalancer) Up(addr grpc.Address) func(error) { b.mu.Lock() defer b.mu.Unlock() @@ -244,6 +282,11 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { if !hasAddr(b.addrs, addr.Addr) { return func(err error) {} } + if !setActive(b.addrs, addr.Addr, true) { // mark connectivity state as active + // it is possible that Up is called before gRPC receives Notify() + // and tears down keepalive timed-out endpoints + return func(err error) {} + } if b.pinAddr != "" { return func(err error) {} } @@ -255,8 +298,15 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { b.readyOnce.Do(func() { close(b.readyc) }) return func(err error) { b.mu.Lock() + if b.keepAlive && + (err.Error() == "grpc: failed with network I/O error" || + err.Error() == "grpc: the connection is drained") { + // set as connectivity.TransientFailure until next Up + // TODO: undo this when connection is up + setActive(b.addrs, addr.Addr, false) + } b.upc = make(chan struct{}) - close(b.downc) + close(b.downc) // trigger notifyAddrs b.pinAddr = "" b.mu.Unlock() } diff --git a/clientv3/client.go b/clientv3/client.go index 1f8c83f5750c..66a3af540161 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -376,6 +376,10 @@ func newClient(cfg *Config) (*Client, error) { } client.balancer = newSimpleBalancer(cfg.Endpoints) + client.balancer.mu.Lock() + client.balancer.keepAlive = cfg.DialKeepAliveTime > 0 + client.balancer.mu.Unlock() + // use Endpoints[0] so that for https:// without any tls config given, then // grpc will assume the ServerName is in the endpoint. conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))