Skip to content

Commit

Permalink
clientv3: notify active addresses to balancer with keepalive
Browse files Browse the repository at this point in the history
Signed-off-by: Gyu-Ho Lee <[email protected]>
  • Loading branch information
gyuho committed Aug 19, 2017
1 parent 7690916 commit e5d8187
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 9 deletions.
52 changes: 43 additions & 9 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address avai
// simpleBalancer does the bare minimum to expose multiple eps
// to the grpc reconnection code path
type simpleBalancer struct {
// addrs are the client's endpoints for grpc
addrs []grpc.Address
keepAlive bool
// addrs are the client's endpoints for grpc,
// mapped to connection activity status
addrs map[grpc.Address]uint32
// notifyCh notifies grpc of the set of addresses for connecting
notifyCh chan []grpc.Address

Expand Down Expand Up @@ -73,9 +75,9 @@ type simpleBalancer struct {

func newSimpleBalancer(eps []string) *simpleBalancer {
notifyCh := make(chan []grpc.Address, 1)
addrs := make([]grpc.Address, len(eps))
addrs := make(map[grpc.Address]uint32, len(eps))
for i := range eps {
addrs[i].Addr = getHost(eps[i])
addrs[grpc.Address{Addr: getHost(eps[i])}] = 1
}
sb := &simpleBalancer{
addrs: addrs,
Expand Down Expand Up @@ -136,9 +138,9 @@ func (b *simpleBalancer) updateAddrs(eps []string) {

b.host2ep = np

addrs := make([]grpc.Address, 0, len(eps))
addrs := make(map[grpc.Address]uint32, len(eps))
for i := range eps {
addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])})
addrs[grpc.Address{Addr: getHost(eps[i])}] = 1
}
b.addrs = addrs

Expand All @@ -156,15 +158,24 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
}
}

func hasAddr(addrs []grpc.Address, targetAddr string) bool {
for _, addr := range addrs {
func hasAddr(addrs map[grpc.Address]uint32, targetAddr string) bool {
for addr := range addrs {
if targetAddr == addr.Addr {
return true
}
}
return false
}

func setActivity(addrs map[grpc.Address]uint32, targetAddr string, activity uint32) {
for addr := range addrs {
if targetAddr == addr.Addr {
addrs[addr] = activity
return
}
}
}

func (b *simpleBalancer) updateNotifyLoop() {
defer close(b.donec)

Expand Down Expand Up @@ -221,14 +232,24 @@ func (b *simpleBalancer) updateNotifyLoop() {

func (b *simpleBalancer) notifyAddrs() {
b.mu.RLock()
addrs := b.addrs
multi := len(b.addrs) > 1 // if single, retry the only endpoint
addrs := make([]grpc.Address, 0, len(b.addrs))
for addr, activity := range b.addrs {
if b.keepAlive && multi && activity == 0 {
continue
}
addrs = append(addrs, addr)
}
b.mu.RUnlock()
select {
case b.notifyCh <- addrs:
case <-b.stopc:
}
}

// Up is called by gRPC client after address connection state
// becomes connectivity.Ready. This is after HTTP/2 client
// establishes the transport.
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.mu.Lock()
defer b.mu.Unlock()
Expand All @@ -244,6 +265,7 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
if !hasAddr(b.addrs, addr.Addr) {
return func(err error) {}
}
setActivity(b.addrs, addr.Addr, 1) // mark connectivity state as active
if b.pinAddr != "" {
return func(err error) {}
}
Expand All @@ -254,11 +276,23 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
return func(err error) {
needNotify := b.keepAlive && err.Error() == "grpc: failed with network I/O error"
b.mu.Lock()
if needNotify {
// set as connectivity.TransientFailure until next Up
setActivity(b.addrs, addr.Addr, 0)
}
b.upc = make(chan struct{})
close(b.downc)
b.pinAddr = ""
b.mu.Unlock()
if needNotify {
select {
case <-b.stopc:
default:
b.notifyAddrs()
}
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ func newClient(cfg *Config) (*Client, error) {
}

client.balancer = newSimpleBalancer(cfg.Endpoints)
client.balancer.mu.Lock()
client.balancer.keepAlive = cfg.DialKeepAliveTime > 0
client.balancer.mu.Unlock()

// use Endpoints[0] so that for https:// without any tls config given, then
// grpc will assume the ServerName is in the endpoint.
conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
Expand Down

0 comments on commit e5d8187

Please sign in to comment.