diff --git a/balancer/balancer.go b/balancer/balancer.go index c9b343c71564..b1264017db1f 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -360,6 +360,10 @@ type Balancer interface { // call SubConn.Shutdown for its existing SubConns; however, this will be // required in a future release, so it is recommended. Close() + // ExitIdle instructs the LB policy to reconnect to backends / exit the + // IDLE state, if appropriate and possible. Note that SubConns that enter + // the IDLE state will not reconnect until SubConn.Connect is called. + ExitIdle() } // ExitIdler is an optional interface for balancers to implement. If @@ -367,8 +371,8 @@ type Balancer interface { // the ClientConn is idle. If unimplemented, ClientConn.Connect will cause // all SubConns to connect. // -// Notice: it will be required for all balancers to implement this in a future -// release. +// Deprecated: All balancers must implement this interface. This interface will +// be removed in a future release. type ExitIdler interface { // ExitIdle instructs the LB policy to reconnect to backends / exit the // IDLE state, if appropriate and possible. Note that SubConns that enter diff --git a/balancer/endpointsharding/endpointsharding.go b/balancer/endpointsharding/endpointsharding.go index cc606f4dae4e..0ad6bb1f2203 100644 --- a/balancer/endpointsharding/endpointsharding.go +++ b/balancer/endpointsharding/endpointsharding.go @@ -45,7 +45,15 @@ type ChildState struct { // Balancer exposes only the ExitIdler interface of the child LB policy. // Other methods of the child policy are called only by endpointsharding. - Balancer balancer.ExitIdler + Balancer ExitIdler +} + +// ExitIdler provides access to only the ExitIdle method of the child balancer. +type ExitIdler interface { + // ExitIdle instructs the LB policy to reconnect to backends / exit the + // IDLE state, if appropriate and possible. Note that SubConns that enter + // the IDLE state will not reconnect until SubConn.Connect is called. + ExitIdle() } // Options are the options to configure the behaviour of the @@ -205,6 +213,16 @@ func (es *endpointSharding) Close() { } } +func (es *endpointSharding) ExitIdle() { + es.childMu.Lock() + defer es.childMu.Unlock() + for _, bw := range es.children.Load().Values() { + if !bw.isClosed { + bw.child.ExitIdle() + } + } +} + // updateState updates this component's state. It sends the aggregated state, // and a picker with round robin behavior with all the child states present if // needed. @@ -326,15 +344,13 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) { // ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to // avoid deadlocks due to synchronous balancer state updates. func (bw *balancerWrapper) ExitIdle() { - if ei, ok := bw.child.(balancer.ExitIdler); ok { - go func() { - bw.es.childMu.Lock() - if !bw.isClosed { - ei.ExitIdle() - } - bw.es.childMu.Unlock() - }() - } + go func() { + bw.es.childMu.Lock() + if !bw.isClosed { + bw.child.ExitIdle() + } + bw.es.childMu.Unlock() + }() } // updateClientConnStateLocked delivers the ClientConnState to the child diff --git a/balancer/endpointsharding/endpointsharding_test.go b/balancer/endpointsharding/endpointsharding_test.go index 5567b4c9038b..0b9955211b74 100644 --- a/balancer/endpointsharding/endpointsharding_test.go +++ b/balancer/endpointsharding/endpointsharding_test.go @@ -285,3 +285,69 @@ func (s) TestEndpointShardingReconnectDisabled(t *testing.T) { } } } + +// Tests that endpointsharding doesn't automatically re-connect IDLE children +// until cc.Connect() is called. The test creates an endpoint with a single +// address. The client is connected and the active server is closed to make the +// child pickfirst enter IDLE state. The test verifies that the child pickfirst +// doesn't re-connect automatically. The test calls cc.Connect() and verified +// that the balancer connects causing the channel to enter TransientFailure. +func (s) TestEndpointShardingExitIdle(t *testing.T) { + backend := stubserver.StartTestService(t, nil) + defer backend.Stop() + + mr := manual.NewBuilderWithScheme("e2e-test") + defer mr.Close() + + name := strings.ReplaceAll(strings.ToLower(t.Name()), "/", "") + bf := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + epOpts := endpointsharding.Options{DisableAutoReconnect: true} + bd.Data = endpointsharding.NewBalancer(bd.ClientConn, bd.BuildOptions, balancer.Get(pickfirstleaf.Name).Build, epOpts) + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + ExitIdle: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).ExitIdle() + }, + } + stub.Register(name, bf) + + json := fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, name) + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(json) + mr.InitialState(resolver.State{ + Endpoints: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: backend.Address}}}, + }, + ServiceConfig: sc, + }) + + cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create new client: %v", err) + } + defer cc.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Errorf("client.EmptyCall() returned unexpected error: %v", err) + } + + // On closing the first server, the first child balancer should enter + // IDLE. Since endpointsharding is configured not to auto-reconnect, it will + // remain IDLE and will not try to re-connect + backend.Stop() + testutils.AwaitState(ctx, t, cc, connectivity.Idle) + shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer shortCancel() + testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Idle) + + // The balancer should try to re-connect and fail. + cc.Connect() + testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) +} diff --git a/balancer/lazy/lazy.go b/balancer/lazy/lazy.go index 7368d8f4bf9c..314ccd1b0aa1 100644 --- a/balancer/lazy/lazy.go +++ b/balancer/lazy/lazy.go @@ -125,9 +125,7 @@ func (lb *lazyBalancer) ExitIdle() { lb.mu.Lock() defer lb.mu.Unlock() if lb.delegate != nil { - if d, ok := lb.delegate.(balancer.ExitIdler); ok { - d.ExitIdle() - } + lb.delegate.ExitIdle() return } lb.delegate = lb.childBuilder(lb.cc, lb.buildOptions) diff --git a/balancer/lazy/lazy_ext_test.go b/balancer/lazy/lazy_ext_test.go index c57ad28e1bb7..cfd4ca998463 100644 --- a/balancer/lazy/lazy_ext_test.go +++ b/balancer/lazy/lazy_ext_test.go @@ -82,7 +82,7 @@ func (s) TestExitIdle(t *testing.T) { bd.Data = lazy.NewBalancer(bd.ClientConn, bd.BuildOptions, balancer.Get(pickfirstleaf.Name).Build) }, ExitIdle: func(bd *stub.BalancerData) { - bd.Data.(balancer.ExitIdler).ExitIdle() + bd.Data.(balancer.Balancer).ExitIdle() }, ResolverError: func(bd *stub.BalancerData, err error) { bd.Data.(balancer.Balancer).ResolverError(err) @@ -410,7 +410,7 @@ func (s) TestExitIdlePassthrough(t *testing.T) { bd.Data = lazy.NewBalancer(bd.ClientConn, bd.BuildOptions, balancer.Get(pickfirstleaf.Name).Build) }, ExitIdle: func(bd *stub.BalancerData) { - bd.Data.(balancer.ExitIdler).ExitIdle() + bd.Data.(balancer.Balancer).ExitIdle() }, ResolverError: func(bd *stub.BalancerData, err error) { bd.Data.(balancer.Balancer).ResolverError(err) diff --git a/balancer/leastrequest/leastrequest.go b/balancer/leastrequest/leastrequest.go index f758f954e9f1..f9cf7ccfc1ef 100644 --- a/balancer/leastrequest/leastrequest.go +++ b/balancer/leastrequest/leastrequest.go @@ -125,9 +125,7 @@ func (lrb *leastRequestBalancer) ResolverError(err error) { } func (lrb *leastRequestBalancer) ExitIdle() { - if ei, ok := lrb.child.(balancer.ExitIdler); ok { // Should always be ok, as child is endpoint sharding. - ei.ExitIdle() - } + lrb.child.ExitIdle() } func (lrb *leastRequestBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go index 160783fc27d3..25bb29444d8a 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go @@ -1592,12 +1592,6 @@ func (b *stateStoringBalancer) Close() { b.Balancer.Close() } -func (b *stateStoringBalancer) ExitIdle() { - if ib, ok := b.Balancer.(balancer.ExitIdler); ok { - ib.ExitIdle() - } -} - type stateStoringBalancerBuilder struct { balancer chan *stateStoringBalancer } diff --git a/balancer/ringhash/ringhash.go b/balancer/ringhash/ringhash.go index 8f20410ecf07..2defa5ff1301 100644 --- a/balancer/ringhash/ringhash.go +++ b/balancer/ringhash/ringhash.go @@ -263,7 +263,7 @@ func (b *ringhashBalancer) updatePickerLocked() { sort.Slice(endpointStates, func(i, j int) bool { return endpointStates[i].hashKey < endpointStates[j].hashKey }) - var idleBalancer balancer.ExitIdler + var idleBalancer endpointsharding.ExitIdler for _, es := range endpointStates { connState := es.state.ConnectivityState if connState == connectivity.Connecting { @@ -394,7 +394,7 @@ type endpointState struct { // overridden, for example based on EDS endpoint metadata. hashKey string weight uint32 - balancer balancer.ExitIdler + balancer endpointsharding.ExitIdler // state is updated by the balancer while receiving resolver updates from // the channel and picker updates from its children. Access to it is guarded diff --git a/balancer/roundrobin/roundrobin.go b/balancer/roundrobin/roundrobin.go index 35da5d1ec9d9..03477217df60 100644 --- a/balancer/roundrobin/roundrobin.go +++ b/balancer/roundrobin/roundrobin.go @@ -72,8 +72,5 @@ func (b *rrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { } func (b *rrBalancer) ExitIdle() { - // Should always be ok, as child is endpoint sharding. - if ei, ok := b.Balancer.(balancer.ExitIdler); ok { - ei.ExitIdle() - } + b.Balancer.ExitIdle() } diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index e6fe5a37acd7..fe8ebff1f5b5 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -404,9 +404,7 @@ func (b *wrrBalancer) Close() { } func (b *wrrBalancer) ExitIdle() { - if ei, ok := b.child.(balancer.ExitIdler); ok { // Should always be ok, as child is endpoint sharding. - ei.ExitIdle() - } + b.child.ExitIdle() } // picker is the WRR policy's picker. It uses live-updating backend weights to diff --git a/examples/features/orca/client/main.go b/examples/features/orca/client/main.go index 60a91f4c0dc6..e21f30965f02 100644 --- a/examples/features/orca/client/main.go +++ b/examples/features/orca/client/main.go @@ -95,6 +95,7 @@ func (orcaLBBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) bala // designed to run within. type orcaLB struct { cc balancer.ClientConn + sc balancer.SubConn } func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error { @@ -112,6 +113,7 @@ func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error { return fmt.Errorf("orcaLB: error creating SubConn: %v", err) } sc.Connect() + o.sc = sc // Register a simple ORCA OOB listener on the SubConn. We request a 1 // second report interval, but in this example the server indicated the @@ -124,6 +126,12 @@ func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error { func (o *orcaLB) ResolverError(error) {} +func (o *orcaLB) ExitIdle() { + if o.sc != nil { + o.sc.Connect() + } +} + // TODO: unused; remove when no longer required. func (o *orcaLB) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {} diff --git a/internal/balancer/gracefulswitch/gracefulswitch.go b/internal/balancer/gracefulswitch/gracefulswitch.go index fbc1ca356ab9..ba25b8988718 100644 --- a/internal/balancer/gracefulswitch/gracefulswitch.go +++ b/internal/balancer/gracefulswitch/gracefulswitch.go @@ -223,15 +223,7 @@ func (gsb *Balancer) ExitIdle() { // There is no need to protect this read with a mutex, as the write to the // Balancer field happens in SwitchTo, which completes before this can be // called. - if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok { - ei.ExitIdle() - return - } - gsb.mu.Lock() - defer gsb.mu.Unlock() - for sc := range balToUpdate.subconns { - sc.Connect() - } + balToUpdate.ExitIdle() } // updateSubConnState forwards the update to the appropriate child. diff --git a/internal/balancer/gracefulswitch/gracefulswitch_test.go b/internal/balancer/gracefulswitch/gracefulswitch_test.go index 5e6c53c74534..9f536e8586f0 100644 --- a/internal/balancer/gracefulswitch/gracefulswitch_test.go +++ b/internal/balancer/gracefulswitch/gracefulswitch_test.go @@ -795,9 +795,7 @@ func (s) TestInlineCallbackInBuild(t *testing.T) { } } -// TestExitIdle tests the ExitIdle operation on the Graceful Switch Balancer for -// both possible codepaths, one where the child implements ExitIdler interface -// and one where the child doesn't implement ExitIdler interface. +// TestExitIdle tests the ExitIdle operation on the Graceful Switch Balancer. func (s) TestExitIdle(t *testing.T) { _, gsb := setup(t) // switch to a balancer that implements ExitIdle{} (will populate current). @@ -811,22 +809,6 @@ func (s) TestExitIdle(t *testing.T) { if err := currBal.waitForExitIdle(ctx); err != nil { t.Fatal(err) } - - // switch to a balancer that doesn't implement ExitIdle{} (will populate - // pending). - gsb.SwitchTo(verifyBalancerBuilder{}) - // call exitIdle concurrently with newSubConn to make sure there is not a - // data race. - done := make(chan struct{}) - go func() { - gsb.ExitIdle() - close(done) - }() - pendBal := gsb.balancerPending.Balancer.(*verifyBalancer) - for i := 0; i < 10; i++ { - pendBal.newSubConn([]resolver.Address{}, balancer.NewSubConnOptions{}) - } - <-done } const balancerName1 = "mock_balancer_1" @@ -1010,6 +992,8 @@ func (vb *verifyBalancer) UpdateClientConnState(balancer.ClientConnState) error return nil } +func (vb *verifyBalancer) ExitIdle() {} + func (vb *verifyBalancer) ResolverError(error) {} func (vb *verifyBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { @@ -1068,6 +1052,8 @@ func (bcb *buildCallbackBal) UpdateClientConnState(balancer.ClientConnState) err func (bcb *buildCallbackBal) ResolverError(error) {} +func (bcb *buildCallbackBal) ExitIdle() {} + func (bcb *buildCallbackBal) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { panic(fmt.Sprintf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)) } diff --git a/internal/balancer/nop/nop.go b/internal/balancer/nop/nop.go index 0c96f1b81186..8c5bf96f2d27 100644 --- a/internal/balancer/nop/nop.go +++ b/internal/balancer/nop/nop.go @@ -60,3 +60,6 @@ func (b *bal) UpdateSubConnState(_ balancer.SubConn, _ balancer.SubConnState) {} // Close is a no-op. func (b *bal) Close() {} + +// ExitIdle is a no-op. +func (b *bal) ExitIdle() {} diff --git a/interop/orcalb.go b/interop/orcalb.go index 572a7dfcd5cb..b19defb63242 100644 --- a/interop/orcalb.go +++ b/interop/orcalb.go @@ -53,6 +53,12 @@ type orcab struct { report *v3orcapb.OrcaLoadReport } +func (o *orcab) ExitIdle() { + if o.sc != nil { + o.sc.Connect() + } +} + func (o *orcab) UpdateClientConnState(s balancer.ClientConnState) error { if o.sc != nil { o.sc.UpdateAddresses(s.ResolverState.Addresses) diff --git a/test/balancer_test.go b/test/balancer_test.go index 8c26ea1b92cf..049aeb7f5c23 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -1034,7 +1034,7 @@ func (s) TestSubConn_RegisterHealthListener(t *testing.T) { return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) }, ExitIdle: func(bd *stub.BalancerData) { - bd.Data.(balancer.ExitIdler).ExitIdle() + bd.Data.(balancer.Balancer).ExitIdle() }, } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index fa34748c8822..3a0e3f9e6be7 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -424,9 +424,7 @@ func (b *cdsBalancer) ExitIdle() { // ExitIdle (but still checks for the interface's existence to // avoid a panic if not). If the child does not, no subconns // will be connected. - if ei, ok := b.childLB.(balancer.ExitIdler); ok { - ei.ExitIdle() - } + b.childLB.ExitIdle() }) } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 87888c734d7a..2cebfe52db52 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -138,8 +138,7 @@ func registerWrappedClusterResolverPolicy(t *testing.T) (chan serviceconfig.Load bal.ResolverError(err) }, ExitIdle: func(bd *stub.BalancerData) { - bal := bd.Data.(balancer.Balancer) - bal.(balancer.ExitIdler).ExitIdle() + bd.Data.(balancer.Balancer).ExitIdle() close(exitIdleCh) }, Close: func(bd *stub.BalancerData) { @@ -1110,7 +1109,7 @@ func (s) TestExitIdle(t *testing.T) { case <-ctx.Done(): t.Fatal("Timeout when waiting for cds LB policy to be created") } - cdsBal.(balancer.ExitIdler).ExitIdle() + cdsBal.ExitIdle() // Wait for ExitIdle to be called on the child policy. select { diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index fe5d93dbfbdf..f9ce57293393 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -335,13 +335,7 @@ func (b *clusterResolverBalancer) run() { } break } - // This implementation assumes the child balancer supports - // ExitIdle (but still checks for the interface's existence to - // avoid a panic if not). If the child does not, no subconns - // will be connected. - if ei, ok := b.child.(balancer.ExitIdler); ok { - ei.ExitIdle() - } + b.child.ExitIdle() } case u := <-b.resourceWatcher.updateChannel: b.handleResourceUpdate(u) diff --git a/xds/internal/balancer/wrrlocality/balancer.go b/xds/internal/balancer/wrrlocality/balancer.go index 065de200f40c..2d03a9c75e7b 100644 --- a/xds/internal/balancer/wrrlocality/balancer.go +++ b/xds/internal/balancer/wrrlocality/balancer.go @@ -154,6 +154,10 @@ type wrrLocalityBalancer struct { logger *grpclog.PrefixLogger } +func (b *wrrLocalityBalancer) ExitIdle() { + b.child.ExitIdle() +} + func (b *wrrLocalityBalancer) UpdateClientConnState(s balancer.ClientConnState) error { lbCfg, ok := s.BalancerConfig.(*LBConfig) if !ok {