Skip to content

Commit

Permalink
xds/internal/balancer/clusterimpl: Switch cluster impl child to grace…
Browse files Browse the repository at this point in the history
…ful switch (#6420)
  • Loading branch information
zasweq authored Jun 30, 2023
1 parent 6b8f427 commit 620a118
Showing 1 changed file with 11 additions and 40 deletions.
51 changes: 11 additions & 40 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
Expand Down Expand Up @@ -69,6 +70,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
requestCountMax: defaultRequestCountMax,
}
b.logger = prefixLogger(b)
b.child = gracefulswitch.NewBalancer(b, bOpts)
go b.run()
b.logger.Infof("Created")
return b
Expand Down Expand Up @@ -102,7 +104,7 @@ type clusterImplBalancer struct {
xdsClient xdsclient.XDSClient

config *LBConfig
childLB balancer.Balancer
child *gracefulswitch.Balancer
cancelLoadReport func()
edsServiceName string
lrsServer *bootstrap.ServerConfig
Expand Down Expand Up @@ -251,31 +253,19 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
return err
}

// If child policy is a different type, recreate the sub-balancer.
if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
if b.childLB != nil {
b.childLB.Close()
if err := b.child.SwitchTo(bb); err != nil {
return fmt.Errorf("error switching to child of type %q: %v", newConfig.ChildPolicy.Name, err)
}
b.childLB = bb.Build(b, b.bOpts)
}
b.config = newConfig

if b.childLB == nil {
// This is not an expected situation, and should be super rare in
// practice.
//
// When this happens, we already applied all the other configurations
// (drop/circuit breaking), but there's no child policy. This balancer
// will be stuck, and we report the error to the parent.
return fmt.Errorf("child policy is nil, this means balancer %q's Build() returned nil", newConfig.ChildPolicy.Name)
}

// Notify run() of this new config, in case drop and request counter need
// update (which means a new picker needs to be generated).
b.pickerUpdateCh.Put(newConfig)

// Addresses and sub-balancer config are sent to sub-balancer.
return b.childLB.UpdateClientConnState(balancer.ClientConnState{
return b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: b.config.ChildPolicy.Config,
})
Expand All @@ -286,10 +276,7 @@ func (b *clusterImplBalancer) ResolverError(err error) {
b.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err)
return
}

if b.childLB != nil {
b.childLB.ResolverError(err)
}
b.child.ResolverError(err)
}

func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
Expand Down Expand Up @@ -318,39 +305,23 @@ func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer
}
}
b.scWrappersMu.Unlock()
if b.childLB != nil {
b.childLB.UpdateSubConnState(sc, s)
}
b.child.UpdateSubConnState(sc, s)
}

func (b *clusterImplBalancer) Close() {
b.mu.Lock()
b.closed.Fire()
b.mu.Unlock()

if b.childLB != nil {
b.childLB.Close()
b.childLB = nil
b.childState = balancer.State{}
}
b.child.Close()
b.childState = balancer.State{}
b.pickerUpdateCh.Close()
<-b.done.Done()
b.logger.Infof("Shutdown")
}

func (b *clusterImplBalancer) ExitIdle() {
if b.childLB == nil {
return
}
if ei, ok := b.childLB.(balancer.ExitIdler); ok {
ei.ExitIdle()
return
}
// Fallback for children that don't support ExitIdle -- connect to all
// SubConns.
for _, sc := range b.scWrappers {
sc.Connect()
}
b.child.ExitIdle()
}

// Override methods to accept updates from the child LB.
Expand Down

0 comments on commit 620a118

Please sign in to comment.