Skip to content

Commit

Permalink
grpclb: update picker synchronously on config update
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Sep 12, 2024
1 parent 3eb0145 commit 52ef72e
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 2 deletions.
25 changes: 23 additions & 2 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ const (

var errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
var logger = grpclog.Component("grpclb")
var (
// Below function is no-op in actual code, but can be overridden in
// tests to give tests visibility into exactly when certain events happen.
newPickerUpdated = func() {}
clientConnUpdateHook = func() {}
)

func convertDuration(d *durationpb.Duration) time.Duration {
if d == nil {
Expand Down Expand Up @@ -195,6 +201,12 @@ type lbBalancer struct {
fallbackTimeout time.Duration
doneCh chan struct{}

// Set during UpdateClientConnState when pushing updates to child policies.
// Prevents state updates from child policies causing new pickers to be sent
// up the channel. Cleared after all child policies have processed the
// updates sent to them, after which a new picker is sent up the channel.
inhibitPickerUpdates bool

// manualResolver is used in the remote LB ClientConn inside grpclb. When
// resolved address updates are received by grpclb, filtered updates will be
// send to remote LB ClientConn through this resolver.
Expand Down Expand Up @@ -395,7 +407,9 @@ func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop
cc = lb.cc.ClientConn
}

cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
if !lb.inhibitPickerUpdates {
cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
}
}

// fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
Expand Down Expand Up @@ -468,7 +482,6 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
}
gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
lb.handleServiceConfig(gc)

backendAddrs := ccs.ResolverState.Addresses

var remoteBalancerAddrs []resolver.Address
Expand All @@ -484,6 +497,10 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
return balancer.ErrBadResolverState
}

lb.mu.Lock()
lb.inhibitPickerUpdates = true
lb.mu.Unlock()

if len(remoteBalancerAddrs) == 0 {
if lb.ccRemoteLB != nil {
lb.ccRemoteLB.close()
Expand Down Expand Up @@ -515,7 +532,11 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
// list of backends being used to the new fallback backends.
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
}
lb.inhibitPickerUpdates = false
newPickerUpdated()
lb.cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
lb.mu.Unlock()
clientConnUpdateHook()
return nil
}

Expand Down
78 changes: 78 additions & 0 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,3 +1651,81 @@ func (s) TestGRPCLBStatsQuashEmpty(t *testing.T) {
t.Fatal(err)
}
}

// Test ensures the picker is updated synchronously upon receipt
// of a configuration update.
func (s) TestPickerUpdatedSynchronouslyOnConfigUpdate(t *testing.T) {
// Override the newPickerUpdated to ensure picker was updated.
pickerUpdated := make(chan struct{}, 1)
origNewPickerUpdated := newPickerUpdated
newPickerUpdated = func() { t.Logf("Invoking the picker update."); pickerUpdated <- struct{}{} }
defer func() { newPickerUpdated = origNewPickerUpdated }()

// Override the clientConn update hook to get notified.
clientConnUpdateDone := make(chan struct{}, 1)
origClientConnUpdateHook := clientConnUpdateHook
clientConnUpdateHook = func() { t.Logf("Client conn update done."); clientConnUpdateDone <- struct{}{} }
defer func() {
clientConnUpdateHook = origClientConnUpdateHook
}()

tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 3, "", nil)
if err != nil {
t.Fatalf("failed to create new load balancer: %v", err)
}
defer cleanup()

_ = []*lbpb.Server{{
IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken,
}, {
IpAddress: tss.beIPs[1],
Port: int32(tss.bePorts[1]),
LoadBalanceToken: lbToken,
}, {
IpAddress: tss.beIPs[2],
Port: int32(tss.bePorts[2]),
LoadBalanceToken: lbToken,
}}

// Connect to the test backends.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithResolvers(r),
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer),
}
cc, err := grpc.Dial(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
defer cc.Close()

// Push a service config with grpclb as the load balancing policy and
// configure pick_first as its child policy.
rs := resolver.State{ServiceConfig: r.CC.ParseServiceConfig(`{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`)}

// Push a resolver update with the remote balancer address specified via
// attributes.
r.UpdateState(grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: tss.lbAddr, ServerName: lbServerName}}}))

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
select {
case <-pickerUpdated:
t.Log("Picker was updated synchronously on receipt of configuration update.")
case <-clientConnUpdateDone:
t.Fatal("Client conn update was completed before picker update.")
case <-ctx.Done():
t.Fatal("Timed out waiting for picker update upon receiving a configuration update")
}

// Once picker was updated, wait for client conn update
// to complete.
select {
case <-clientConnUpdateDone:
case <-ctx.Done():
t.Fatal("Timed out waiting for client conn update upon receiving a configuration update")
}
}

0 comments on commit 52ef72e

Please sign in to comment.