diff --git a/internal/xds/balancer/priority/balancer_child.go b/internal/xds/balancer/priority/balancer_child.go index 7e8ccbd335e9..8c09b1011710 100644 --- a/internal/xds/balancer/priority/balancer_child.go +++ b/internal/xds/balancer/priority/balancer_child.go @@ -28,6 +28,8 @@ import ( "google.golang.org/grpc/serviceconfig" ) +var timeAfterFunc = time.AfterFunc + type childBalancer struct { name string parent *priorityBalancer @@ -148,7 +150,7 @@ func (cb *childBalancer) startInitTimer() { // to check the stopped boolean. timerW := &timerWrapper{} cb.initTimer = timerW - timerW.timer = time.AfterFunc(DefaultPriorityInitTimeout, func() { + timerW.timer = timeAfterFunc(DefaultPriorityInitTimeout, func() { cb.parent.mu.Lock() defer cb.parent.mu.Unlock() if timerW.stopped { diff --git a/internal/xds/balancer/priority/balancer_priority.go b/internal/xds/balancer/priority/balancer_priority.go index 0be807c134a1..a57df7d142db 100644 --- a/internal/xds/balancer/priority/balancer_priority.go +++ b/internal/xds/balancer/priority/balancer_priority.go @@ -174,7 +174,7 @@ func (b *priorityBalancer) switchToChild(child *childBalancer, priority int) { // handleChildStateUpdate start/close priorities based on the connectivity // state. -func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.State) { +func (b *priorityBalancer) handleChildStateUpdate(childName string, newState balancer.State) { // Update state in child. The updated picker will be sent to parent later if // necessary. child, ok := b.children[childName] @@ -183,15 +183,16 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S return } if !child.started { - b.logger.Warningf("Ignoring update from child policy %q which is not in started state: %+v", childName, s) + b.logger.Warningf("Ignoring update from child policy %q which is not in started state: %+v", childName, newState) return } - child.state = s + oldState := child.state + child.state = newState // We start/stop the init timer of this child based on the new connectivity // state. syncPriority() later will need the init timer (to check if it's // nil or not) to decide which child to switch to. - switch s.ConnectivityState { + switch newState.ConnectivityState { case connectivity.Ready, connectivity.Idle: child.reportedTF = false child.stopInitTimer() @@ -199,7 +200,19 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S child.reportedTF = true child.stopInitTimer() case connectivity.Connecting: - if !child.reportedTF { + // The init timer is created when the child is created and is reset when + // it reports Ready or Idle. Most child policies start off in + // Connecting, but ring_hash starts off in Idle and moves to Connecting + // when a request comes in. To support such cases, we restart the init + // timer when we see Connecting, but only if the child has not reported + // TransientFailure more recently than it reported Ready or Idle. See + // gRFC A42 for details on why ring_hash is special and what provisions + // are required to make it work as a child of the priority LB policy. + // + // We don't want to restart the timer if the child was already in + // Connecting, because we want failover to happen once the timer elapses + // even when the child is still in Connecting. + if !child.reportedTF && oldState.ConnectivityState != connectivity.Connecting { child.startInitTimer() } default: diff --git a/internal/xds/balancer/priority/balancer_test.go b/internal/xds/balancer/priority/balancer_test.go index dbded5317af9..070e95c849f1 100644 --- a/internal/xds/balancer/priority/balancer_test.go +++ b/internal/xds/balancer/priority/balancer_test.go @@ -2098,3 +2098,106 @@ func (s) TestPriority_HighPriorityUpdatesWhenLowInUse(t *testing.T) { t.Fatal(err.Error()) } } + +// Tests that the priority balancer's init timer is not restarted when its child +// reports a state transition from CONNECTING to CONNECTING. +func (s) TestPriority_InitTimerNotRestarted_OnConnectingToConnecting(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + initTimerStarted := make(chan struct{}, 1) + origTimeAfterFunc := timeAfterFunc + timeAfterFunc = func(d time.Duration, f func()) *time.Timer { + select { + case initTimerStarted <- struct{}{}: + case <-ctx.Done(): + t.Errorf("Timeout waiting to send init timer started signal") + } + return time.AfterFunc(d, f) + } + defer func() { timeAfterFunc = origTimeAfterFunc }() + + cc := testutils.NewBalancerClientConn(t) + bb := balancer.Get(Name) + pb := bb.Build(cc, balancer.BuildOptions{}) + defer pb.Close() + + // One child, with two backends. + ccs := balancer.ClientConnState{ + ResolverState: resolver.State{ + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-0"}), + }, + }, + BalancerConfig: &LBConfig{ + Children: map[string]*Child{ + "child-0": {Config: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}}, + }, + Priorities: []string{"child-0"}, + }, + } + if err := pb.UpdateClientConnState(ccs); err != nil { + t.Fatalf("UpdateClientConnState(%+v) failed: %v", ccs, err) + } + + // Wait for child-0 to be started and two subchannels to be created. + var sc0, sc1 *testutils.TestSubConn + for i := range 2 { + var addrs []resolver.Address + select { + case addrs = <-cc.NewSubConnAddrsCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for subconn %d to be created", i) + } + switch got := addrs[0].Addr; got { + case testBackendAddrStrs[0]: + sc0 = <-cc.NewSubConnCh + case testBackendAddrStrs[1]: + sc1 = <-cc.NewSubConnCh + default: + t.Fatalf("Got unexpected new subconn addr: %q, want %q or %q", got, testBackendAddrStrs[0], testBackendAddrStrs[1]) + } + } + + // Move both subchannels to CONNECTING. + sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + + // Ensure that the init timer is started only once. + select { + case <-initTimerStarted: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for init timer to start") + } + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case <-initTimerStarted: + t.Fatalf("Init timer started when second subchannel moved to CONNECTING") + case <-sCtx.Done(): + } + + // Simulate the connection succeeding (subchannel becoming Ready), and then + // failing (subchannel moving to Idle). RR will immediately start connecting + // on the failed subchannel, and will therefore reporting an overall state + // of Connecting. This should trigger a restart of the init timer. + sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) + select { + case <-initTimerStarted: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for init timer to start") + } + + // Move the subchannel to CONNECTING again, and ensure that the init timer + // is not restarted. + sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case <-initTimerStarted: + t.Fatalf("Init timer restarted when subchannel moved from Ready to Idle") + case <-sCtx.Done(): + } +}