Skip to content

Commit

Permalink
cluster manager: Add Graceful Switch functionality to Cluster Manager (
Browse files Browse the repository at this point in the history
…#5265)

* cluster manager: Add Graceful Switch functionality to Cluster Manager
  • Loading branch information
zasweq authored Mar 30, 2022
1 parent 42cadc1 commit 99aae34
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 0 deletions.
5 changes: 5 additions & 0 deletions xds/internal/balancer/clustermanager/clustermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) {
b.stateAggregator.add(name)
// Then add to the balancer group.
b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
} else {
// Already present, check for type change and if so send down a new builder.
if newT.ChildPolicy.Name != b.children[name].ChildPolicy.Name {
b.bg.UpdateBuilder(name, balancer.Get(newT.ChildPolicy.Name))
}
}
// TODO: handle error? How to aggregate errors and return?
_ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{
Expand Down
131 changes: 131 additions & 0 deletions xds/internal/balancer/clustermanager/clustermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
Expand All @@ -48,6 +49,11 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
)

var (
rtBuilder balancer.Builder
rtParser balancer.ConfigParser
Expand Down Expand Up @@ -102,6 +108,7 @@ func init() {
rtParser = rtBuilder.(balancer.ConfigParser)

balancer.Register(&ignoreAttrsRRBuilder{balancer.Get(roundrobin.Name)})
balancer.Register(wrappedPickFirstBalancerBuilder{})

balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
}
Expand Down Expand Up @@ -632,3 +639,127 @@ func TestInitialIdle(t *testing.T) {
t.Fatalf("Received aggregated state: %v, want Idle", state1)
}
}

// TestClusterGracefulSwitch tests the graceful switch functionality for a child
// of the cluster manager. At first, the child is configured as a round robin
// load balancer, and thus should behave accordingly. The test then gracefully
// switches this child to a pick first load balancer. Once that balancer updates
// it's state and completes the graceful switch process the new picker should
// reflect this change.
func TestClusterGracefulSwitch(t *testing.T) {
cc := testutils.NewTestClientConn(t)
rtb := rtBuilder.Build(cc, balancer.BuildOptions{})

configJSON1 := `{
"children": {
"csp:cluster":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }
}
}`
config1, err := rtParser.ParseConfig([]byte(configJSON1))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
{Addr: testBackendAddrStrs[1], BalancerAttributes: nil},
}
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"csp:cluster"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}

sc1 := <-cc.NewSubConnCh
rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p1 := <-cc.NewPickerCh
pi := balancer.PickInfo{
Ctx: SetPickedCluster(context.Background(), "csp:cluster"),
}
testPick(t, p1, pi, sc1, nil)

// Same cluster, different balancer type.
configJSON2 := `{
"children": {
"csp:cluster":{ "childPolicy": [{"wrappedPickFirstBalancer":""}] }
}
}`
config2, err := rtParser.ParseConfig([]byte(configJSON2))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[1], []string{"csp:cluster"}),
}},
BalancerConfig: config2,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
sc2 := <-cc.NewSubConnCh
// Update the pick first balancers SubConn as CONNECTING. This will cause
// the pick first balancer to UpdateState() with CONNECTING, which shouldn't send
// a Picker update back, as the Graceful Switch process is not complete.
rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
select {
case <-cc.NewPickerCh:
t.Fatalf("No new picker should have been sent due to the Graceful Switch process not completing")
case <-ctx.Done():
}

// Update the pick first balancers SubConn as READY. This will cause
// the pick first balancer to UpdateState() with READY, which should send a
// Picker update back, as the Graceful Switch process is complete. This
// Picker should always pick the pick first's created SubConn.
rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p2 := <-cc.NewPickerCh
testPick(t, p2, pi, sc2, nil)
// The Graceful Switch process completing for the child should cause the
// SubConns for the balancer being gracefully switched from to get deleted.
ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
select {
case <-ctx.Done():
t.Fatalf("error waiting for RemoveSubConn()")
case rsc := <-cc.RemoveSubConnCh:
// The SubConn removed should have been the created SubConn
// from the child before switching.
if rsc != sc1 {
t.Fatalf("RemoveSubConn() got: %v, want %v", rsc, sc1)
}
}
}

type wrappedPickFirstBalancerBuilder struct{}

func (wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
builder := balancer.Get(grpc.PickFirstBalancerName)
wpfb := &wrappedPickFirstBalancer{
ClientConn: cc,
}
pf := builder.Build(wpfb, opts)
wpfb.Balancer = pf
return wpfb
}

func (wrappedPickFirstBalancerBuilder) Name() string {
return "wrappedPickFirstBalancer"
}

type wrappedPickFirstBalancer struct {
balancer.Balancer
balancer.ClientConn
}

func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) {
// Eat it if IDLE - allows it to switch over only on a READY SubConn.
if state.ConnectivityState == connectivity.Idle {
return
}
wb.ClientConn.UpdateState(state)
}

0 comments on commit 99aae34

Please sign in to comment.