Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/xds/balancer/priority/balancer_child.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"google.golang.org/grpc/serviceconfig"
)

var timeAfterFunc = time.AfterFunc

type childBalancer struct {
name string
parent *priorityBalancer
Expand Down Expand Up @@ -148,7 +150,7 @@ func (cb *childBalancer) startInitTimer() {
// to check the stopped boolean.
timerW := &timerWrapper{}
cb.initTimer = timerW
timerW.timer = time.AfterFunc(DefaultPriorityInitTimeout, func() {
timerW.timer = timeAfterFunc(DefaultPriorityInitTimeout, func() {
cb.parent.mu.Lock()
defer cb.parent.mu.Unlock()
if timerW.stopped {
Expand Down
23 changes: 18 additions & 5 deletions internal/xds/balancer/priority/balancer_priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (b *priorityBalancer) switchToChild(child *childBalancer, priority int) {

// handleChildStateUpdate start/close priorities based on the connectivity
// state.
func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.State) {
func (b *priorityBalancer) handleChildStateUpdate(childName string, newState balancer.State) {
// Update state in child. The updated picker will be sent to parent later if
// necessary.
child, ok := b.children[childName]
Expand All @@ -183,23 +183,36 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S
return
}
if !child.started {
b.logger.Warningf("Ignoring update from child policy %q which is not in started state: %+v", childName, s)
b.logger.Warningf("Ignoring update from child policy %q which is not in started state: %+v", childName, newState)
return
}
child.state = s
oldState := child.state
child.state = newState

// We start/stop the init timer of this child based on the new connectivity
// state. syncPriority() later will need the init timer (to check if it's
// nil or not) to decide which child to switch to.
switch s.ConnectivityState {
switch newState.ConnectivityState {
case connectivity.Ready, connectivity.Idle:
child.reportedTF = false
child.stopInitTimer()
case connectivity.TransientFailure:
child.reportedTF = true
child.stopInitTimer()
case connectivity.Connecting:
if !child.reportedTF {
// The init timer is created when the child is created and is reset when
// it reports Ready or Idle. Most child policies start off in
// Connecting, but ring_hash starts off in Idle and moves to Connecting
// when a request comes in. To support such cases, we restart the init
// timer when we see Connecting, but only if the child has not reported
// TransientFailure more recently than it reported Ready or Idle. See
// gRFC A42 for details on why ring_hash is special and what provisions
// are required to make it work as a child of the priority LB policy.
//
// We don't want to restart the timer if the child was already in
// Connecting, because we want failover to happen once the timer elapses
// even when the child is still in Connecting.
if !child.reportedTF && oldState.ConnectivityState != connectivity.Connecting {
child.startInitTimer()
}
default:
Expand Down
103 changes: 103 additions & 0 deletions internal/xds/balancer/priority/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2098,3 +2098,106 @@ func (s) TestPriority_HighPriorityUpdatesWhenLowInUse(t *testing.T) {
t.Fatal(err.Error())
}
}

// Tests that the priority balancer's init timer is not restarted when its child
// reports a state transition from CONNECTING to CONNECTING.
func (s) TestPriority_InitTimerNotRestarted_OnConnectingToConnecting(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

initTimerStarted := make(chan struct{}, 1)
origTimeAfterFunc := timeAfterFunc
timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
select {
case initTimerStarted <- struct{}{}:
case <-ctx.Done():
t.Errorf("Timeout waiting to send init timer started signal")
}
return time.AfterFunc(d, f)
}
defer func() { timeAfterFunc = origTimeAfterFunc }()

cc := testutils.NewBalancerClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()

// One child, with two backends.
ccs := balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: []resolver.Endpoint{
hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}),
hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-0"}),
},
},
BalancerConfig: &LBConfig{
Children: map[string]*Child{
"child-0": {Config: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}},
},
Priorities: []string{"child-0"},
},
}
if err := pb.UpdateClientConnState(ccs); err != nil {
t.Fatalf("UpdateClientConnState(%+v) failed: %v", ccs, err)
}

// Wait for child-0 to be started and two subchannels to be created.
var sc0, sc1 *testutils.TestSubConn
for i := range 2 {
var addrs []resolver.Address
select {
case addrs = <-cc.NewSubConnAddrsCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for subconn %d to be created", i)
}
switch got := addrs[0].Addr; got {
case testBackendAddrStrs[0]:
sc0 = <-cc.NewSubConnCh
case testBackendAddrStrs[1]:
sc1 = <-cc.NewSubConnCh
default:
t.Fatalf("Got unexpected new subconn addr: %q, want %q or %q", got, testBackendAddrStrs[0], testBackendAddrStrs[1])
}
}

// Move both subchannels to CONNECTING.
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})

// Ensure that the init timer is started only once.
select {
case <-initTimerStarted:
case <-ctx.Done():
t.Fatalf("Timeout waiting for init timer to start")
}
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-initTimerStarted:
t.Fatalf("Init timer started when second subchannel moved to CONNECTING")
case <-sCtx.Done():
}

// Simulate the connection succeeding (subchannel becoming Ready), and then
// failing (subchannel moving to Idle). RR will immediately start connecting
// on the failed subchannel, and will therefore reporting an overall state
// of Connecting. This should trigger a restart of the init timer.
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
select {
case <-initTimerStarted:
case <-ctx.Done():
t.Fatalf("Timeout waiting for init timer to start")
}

// Move the subchannel to CONNECTING again, and ensure that the init timer
// is not restarted.
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-initTimerStarted:
t.Fatalf("Init timer restarted when subchannel moved from Ready to Idle")
case <-sCtx.Done():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If UpdateState is asynchronous, and happens after sCtx.Done, it can give false positives. This race can happen if the test environment is very slow

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UpdateState is not asynchronous. See here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pattern that we use very commonly in our tests to check for things that we expect not to happen. This is a tradeoff between reasonable test execution times and correctness/flakiness. I don't recall having any problems with this approach so far though. But if you have an idea for how we could do this better, I'd be happy to hear. Thanks.

}
}