diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index e1a18ae338d3..d316d6a62a91 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" @@ -69,6 +70,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba requestCountMax: defaultRequestCountMax, } b.logger = prefixLogger(b) + b.child = gracefulswitch.NewBalancer(b, bOpts) go b.run() b.logger.Infof("Created") return b @@ -102,7 +104,7 @@ type clusterImplBalancer struct { xdsClient xdsclient.XDSClient config *LBConfig - childLB balancer.Balancer + child *gracefulswitch.Balancer cancelLoadReport func() edsServiceName string lrsServer *bootstrap.ServerConfig @@ -251,31 +253,19 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) return err } - // If child policy is a different type, recreate the sub-balancer. if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name { - if b.childLB != nil { - b.childLB.Close() + if err := b.child.SwitchTo(bb); err != nil { + return fmt.Errorf("error switching to child of type %q: %v", newConfig.ChildPolicy.Name, err) } - b.childLB = bb.Build(b, b.bOpts) } b.config = newConfig - if b.childLB == nil { - // This is not an expected situation, and should be super rare in - // practice. - // - // When this happens, we already applied all the other configurations - // (drop/circuit breaking), but there's no child policy. This balancer - // will be stuck, and we report the error to the parent. - return fmt.Errorf("child policy is nil, this means balancer %q's Build() returned nil", newConfig.ChildPolicy.Name) - } - // Notify run() of this new config, in case drop and request counter need // update (which means a new picker needs to be generated). b.pickerUpdateCh.Put(newConfig) // Addresses and sub-balancer config are sent to sub-balancer. - return b.childLB.UpdateClientConnState(balancer.ClientConnState{ + return b.child.UpdateClientConnState(balancer.ClientConnState{ ResolverState: s.ResolverState, BalancerConfig: b.config.ChildPolicy.Config, }) @@ -286,10 +276,7 @@ func (b *clusterImplBalancer) ResolverError(err error) { b.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err) return } - - if b.childLB != nil { - b.childLB.ResolverError(err) - } + b.child.ResolverError(err) } func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { @@ -318,9 +305,7 @@ func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer } } b.scWrappersMu.Unlock() - if b.childLB != nil { - b.childLB.UpdateSubConnState(sc, s) - } + b.child.UpdateSubConnState(sc, s) } func (b *clusterImplBalancer) Close() { @@ -328,29 +313,15 @@ func (b *clusterImplBalancer) Close() { b.closed.Fire() b.mu.Unlock() - if b.childLB != nil { - b.childLB.Close() - b.childLB = nil - b.childState = balancer.State{} - } + b.child.Close() + b.childState = balancer.State{} b.pickerUpdateCh.Close() <-b.done.Done() b.logger.Infof("Shutdown") } func (b *clusterImplBalancer) ExitIdle() { - if b.childLB == nil { - return - } - if ei, ok := b.childLB.(balancer.ExitIdler); ok { - ei.ExitIdle() - return - } - // Fallback for children that don't support ExitIdle -- connect to all - // SubConns. - for _, sc := range b.scWrappers { - sc.Connect() - } + b.child.ExitIdle() } // Override methods to accept updates from the child LB.