diff --git a/internal/xds/balancer/outlierdetection/balancer.go b/internal/xds/balancer/outlierdetection/balancer.go index 1b0149e309c6..d6c8c6de8eaf 100644 --- a/internal/xds/balancer/outlierdetection/balancer.go +++ b/internal/xds/balancer/outlierdetection/balancer.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/channelz" @@ -52,6 +53,24 @@ var ( // Name is the name of the outlier detection balancer. const Name = "outlier_detection_experimental" +var ( + ejectionsEnforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.outlier_detection.ejections_enforced", + Description: "EXPERIMENTAL. Number of outlier ejections enforced by detection method", + Unit: "{ejection}", + Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method"}, + Default: false, + }) + + ejectionsUnenforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.outlier_detection.ejections_unenforced", + Description: "EXPERIMENTAL. Number of unenforced outlier ejections due to either `max_ejection_percentage` or `enforcement_percentage`", + Unit: "{ejection}", + Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method", "grpc.lb.outlier_detection.unenforced_reason"}, + Default: false, + }) +) + func init() { balancer.Register(bb{}) } @@ -60,14 +79,16 @@ type bb struct{} func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { b := &outlierDetectionBalancer{ - ClientConn: cc, - closed: grpcsync.NewEvent(), - done: grpcsync.NewEvent(), - addrs: make(map[string]*endpointInfo), - scUpdateCh: buffer.NewUnbounded(), - pickerUpdateCh: buffer.NewUnbounded(), - channelzParent: bOpts.ChannelzParent, - endpoints: resolver.NewEndpointMap[*endpointInfo](), + ClientConn: cc, + closed: grpcsync.NewEvent(), + done: grpcsync.NewEvent(), + addrs: make(map[string]*endpointInfo), + scUpdateCh: buffer.NewUnbounded(), + pickerUpdateCh: buffer.NewUnbounded(), + channelzParent: bOpts.ChannelzParent, + endpoints: resolver.NewEndpointMap[*endpointInfo](), + metricsRecorder: cc.MetricsRecorder(), // we use an explicit field instead of using cc.MetricsRecorder() so we can override the metric recorder in tests. + target: bOpts.Target.String(), } b.logger = prefixLogger(b) b.logger.Infof("Created") @@ -169,10 +190,12 @@ type outlierDetectionBalancer struct { // to suppress redundant picker updates. recentPickerNoop bool - closed *grpcsync.Event - done *grpcsync.Event - logger *grpclog.PrefixLogger - channelzParent channelz.Identifier + closed *grpcsync.Event + done *grpcsync.Event + logger *grpclog.PrefixLogger + channelzParent channelz.Identifier + metricsRecorder estats.MetricsRecorder + target string child synchronizingBalancerWrapper @@ -788,18 +811,24 @@ func (b *outlierDetectionBalancer) successRateAlgorithm() { return } mean, stddev := b.meanAndStdDev(endpointsToConsider) + ejectionCfg := b.cfg.SuccessRateEjection for _, epInfo := range endpointsToConsider { bucket := epInfo.callCounter.inactiveBucket - ejectionCfg := b.cfg.SuccessRateEjection - if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { - return - } successRate := float64(bucket.numSuccesses) / float64(bucket.numSuccesses+bucket.numFailures) requiredSuccessRate := mean - stddev*(float64(ejectionCfg.StdevFactor)/1000) if successRate < requiredSuccessRate { channelz.Infof(logger, b.channelzParent, "SuccessRate algorithm detected outlier: %s. Parameters: successRate=%f, mean=%f, stddev=%f, requiredSuccessRate=%f", epInfo, successRate, mean, stddev, requiredSuccessRate) + // Check if max ejection percentage would prevent ejection. + if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { + // Record unenforced ejection due to max ejection percentage. + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "max_ejection_overflow") + continue + } if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage { - b.ejectEndpoint(epInfo) + b.ejectEndpoint(epInfo, "success_rate") + } else { + // Record unenforced ejection due to enforcement percentage. + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "enforcement_percentage") } } } @@ -816,24 +845,30 @@ func (b *outlierDetectionBalancer) failurePercentageAlgorithm() { return } + ejectionCfg := b.cfg.FailurePercentageEjection for _, epInfo := range endpointsToConsider { bucket := epInfo.callCounter.inactiveBucket - ejectionCfg := b.cfg.FailurePercentageEjection - if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { - return - } failurePercentage := (float64(bucket.numFailures) / float64(bucket.numSuccesses+bucket.numFailures)) * 100 if failurePercentage > float64(b.cfg.FailurePercentageEjection.Threshold) { channelz.Infof(logger, b.channelzParent, "FailurePercentage algorithm detected outlier: %s, failurePercentage=%f", epInfo, failurePercentage) + // Check if max ejection percentage would prevent ejection. + if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { + // Record unenforced ejection due to max ejection percentage. + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "max_ejection_overflow") + continue + } if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage { - b.ejectEndpoint(epInfo) + b.ejectEndpoint(epInfo, "failure_percentage") + } else { + // Record unenforced ejection due to enforcement percentage. + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "enforcement_percentage") } } } } // Caller must hold b.mu. -func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo) { +func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo, detectionMethod string) { b.numEndpointsEjected++ epInfo.latestEjectionTimestamp = b.timerStartTime epInfo.ejectionTimeMultiplier++ @@ -842,6 +877,8 @@ func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo) { channelz.Infof(logger, b.channelzParent, "Subchannel ejected: %s", sbw) } + // Record the enforced ejection metric. + ejectionsEnforcedMetric.Record(b.metricsRecorder, 1, b.target, detectionMethod) } // Caller must hold b.mu. diff --git a/internal/xds/balancer/outlierdetection/balancer_test.go b/internal/xds/balancer/outlierdetection/balancer_test.go index be3f03cdc0eb..3cb1dfc1004b 100644 --- a/internal/xds/balancer/outlierdetection/balancer_test.go +++ b/internal/xds/balancer/outlierdetection/balancer_test.go @@ -46,6 +46,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/roundrobin" + "google.golang.org/grpc/internal/testutils/stats" "google.golang.org/grpc/internal/xds/balancer/clusterimpl" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" @@ -1027,253 +1028,344 @@ func (s) TestDurationOfInterval(t *testing.T) { // TestEjectUnejectSuccessRate tests the functionality of the interval timer // algorithm when configured with SuccessRateEjection. The Outlier Detection -// Balancer will be set up with 3 SubConns, each with a different address. +// Balancer will be set up with N SubConns, each with a different address. // It tests the following scenarios, in a step by step fashion: -// 1. The three addresses each have 5 successes. The interval timer algorithm should -// not eject any of the addresses. -// 2. Two of the addresses have 5 successes, the third has five failures. The -// interval timer algorithm should eject the third address with five failures. +// 1. The N addresses each have 5 successes. The interval timer algorithm +// should not eject any of the addresses. +// 2. N - `wantFailures` of the addresses have 5 successes but the remaining +// address has 5 failures. The internal algorithm +// should attempt to eject the address based on the outlier detection lb +// config. Only `wantEjections` addresses should be ejected. // 3. The interval timer algorithm is run at a later time past max ejection -// time. The interval timer algorithm should uneject the third address. +// time. The interval timer algorithm should uneject all. func (s) TestEjectUnejectSuccessRate(t *testing.T) { - scsCh := testutils.NewChannel() - var scw1, scw2, scw3 balancer.SubConn - var err error - connectivityCh := make(chan struct{}) - stub.Register(t.Name(), stub.BalancerFuncs{ - UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error { - scw1, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{ - StateListener: func(balancer.SubConnState) {}, - }) - if err != nil { - t.Errorf("error in od.NewSubConn call: %v", err) - } - scw1.Connect() - scw2, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{ - StateListener: func(balancer.SubConnState) {}, - }) - if err != nil { - t.Errorf("error in od.NewSubConn call: %v", err) - } - scw2.Connect() - scw3, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address3"}}, balancer.NewSubConnOptions{ - StateListener: func(state balancer.SubConnState) { - if state.ConnectivityState == connectivity.Ready { - close(connectivityCh) - } + tests := []struct { + name string + lbConfig LBConfig + numberOfConns int + wantFailures int + wantEjections int + }{ + { + name: "three_upstreams_one_failure", + lbConfig: LBConfig{ + Interval: math.MaxInt64, // so the interval will never run unless called manually in test. + BaseEjectionTime: iserviceconfig.Duration(30 * time.Second), + MaxEjectionTime: iserviceconfig.Duration(300 * time.Second), + MaxEjectionPercent: 10, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, }, - }) - if err != nil { - t.Errorf("error in od.NewSubConn call: %v", err) - } - scw3.Connect() - bd.ClientConn.UpdateState(balancer.State{ - ConnectivityState: connectivity.Ready, - Picker: &rrPicker{ - scs: []balancer.SubConn{scw1, scw2, scw3}, + ChildPolicy: &iserviceconfig.BalancerConfig{ + Name: "three_upstreams_one_failure", + Config: emptyChildConfig{}, }, - }) - return nil + }, + numberOfConns: 3, + wantFailures: 1, + wantEjections: 1, }, - }) - - od, tcc, cleanup := setup(t) - defer func() { - cleanup() - }() - - od.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{ - Endpoints: []resolver.Endpoint{ - {Addresses: []resolver.Address{{Addr: "address1"}}}, - {Addresses: []resolver.Address{{Addr: "address2"}}}, - {Addresses: []resolver.Address{{Addr: "address3"}}}, + { + name: "three_upstreams_no_failure", + lbConfig: LBConfig{ + Interval: math.MaxInt64, // so the interval will never run unless called manually in test. + BaseEjectionTime: iserviceconfig.Duration(30 * time.Second), + MaxEjectionTime: iserviceconfig.Duration(300 * time.Second), + MaxEjectionPercent: 10, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + ChildPolicy: &iserviceconfig.BalancerConfig{ + Name: "three_upstreams_no_failure", + Config: emptyChildConfig{}, + }, }, + numberOfConns: 3, + wantFailures: 0, + wantEjections: 0, }, - BalancerConfig: &LBConfig{ - Interval: math.MaxInt64, // so the interval will never run unless called manually in test. - BaseEjectionTime: iserviceconfig.Duration(30 * time.Second), - MaxEjectionTime: iserviceconfig.Duration(300 * time.Second), - MaxEjectionPercent: 10, - FailurePercentageEjection: &FailurePercentageEjection{ - Threshold: 50, - EnforcementPercentage: 100, - MinimumHosts: 3, - RequestVolume: 3, + { + name: "three_upstreams_one_failure_no_ejection_enforcement_perc", + lbConfig: LBConfig{ + Interval: math.MaxInt64, // so the interval will never run unless called manually in test. + BaseEjectionTime: iserviceconfig.Duration(30 * time.Second), + MaxEjectionTime: iserviceconfig.Duration(300 * time.Second), + MaxEjectionPercent: 10, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 0, + MinimumHosts: 3, + RequestVolume: 3, + }, + ChildPolicy: &iserviceconfig.BalancerConfig{ + Name: "three_upstreams_one_failure_no_ejection_enforcement_perc", + Config: emptyChildConfig{}, + }, }, - ChildPolicy: &iserviceconfig.BalancerConfig{ - Name: t.Name(), - Config: emptyChildConfig{}, + numberOfConns: 3, + wantFailures: 1, + wantEjections: 0, + }, + { + name: "three_upstreams_one_failure_no_ejection_max_ejection_perc", + lbConfig: LBConfig{ + Interval: math.MaxInt64, // so the interval will never run unless called manually in test. + BaseEjectionTime: iserviceconfig.Duration(30 * time.Second), + MaxEjectionTime: iserviceconfig.Duration(300 * time.Second), + MaxEjectionPercent: 0, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + ChildPolicy: &iserviceconfig.BalancerConfig{ + Name: "three_upstreams_one_failure_no_ejection_max_ejection_perc", + Config: emptyChildConfig{}, + }, }, + numberOfConns: 3, + wantFailures: 1, + wantEjections: 0, }, - }) - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // Transition the SubConns to READY so that they can register health - // listeners. - for range 3 { - select { - case <-ctx.Done(): - t.Fatalf("Timed out waiting for creation of new SubConn.") - case sc := <-tcc.NewSubConnCh: - sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) - } } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + scsCh := testutils.NewChannel() + connectivityCh := make(chan struct{}) + var allSubConns = make([]balancer.SubConn, test.numberOfConns) + stub.Register(test.name, stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + for i := range test.numberOfConns { + scw, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Endpoints[i].Addresses, balancer.NewSubConnOptions{ + StateListener: func(state balancer.SubConnState) { + if state.ConnectivityState == connectivity.Ready { + connectivityCh <- struct{}{} + } + }, + }) + if err != nil { + t.Errorf("NewSubConn(%v) failed: %v", ccs.ResolverState.Endpoints[i].Addresses, err) + } + scw.Connect() + allSubConns[i] = scw + } - // Register health listeners after all the connectivity updates are - // processed to avoid data races while accessing the health listener within - // the TestClientConn. - select { - case <-ctx.Done(): - t.Fatal("Context timed out waiting for all SubConns to become READY.") - case <-connectivityCh: - } + bd.ClientConn.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &rrPicker{ + scs: allSubConns, + }, + }) + return nil + }, + }) - scw1.RegisterHealthListener(func(healthState balancer.SubConnState) { - scsCh.Send(subConnWithState{sc: scw1, state: healthState}) - }) - scw2.RegisterHealthListener(func(healthState balancer.SubConnState) { - scsCh.Send(subConnWithState{sc: scw2, state: healthState}) - }) - scw3.RegisterHealthListener(func(healthState balancer.SubConnState) { - scsCh.Send(subConnWithState{sc: scw3, state: healthState}) - }) + od, tcc, cleanup := setup(t) + defer cleanup() + endpoints := make([]resolver.Endpoint, test.numberOfConns) + for i := range test.numberOfConns { + endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{{Addr: fmt.Sprintf("address%d", i+1)}}} + } + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Endpoints: endpoints, + }, + BalancerConfig: &test.lbConfig, + }) - select { - case <-ctx.Done(): - t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") - case picker := <-tcc.NewPickerCh: - // Set each of the three upstream addresses to have five successes each. - // This should cause none of the addresses to be ejected as none of them - // are outliers according to the success rate algorithm. - for i := 0; i < 3; i++ { - pi, err := picker.Pick(balancer.PickInfo{}) - if err != nil { - t.Fatalf("picker.Pick failed with error: %v", err) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Transition the SubConns to READY so that they can register health + // listeners. + for range test.numberOfConns { + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for creation of new SubConn.") + case sc := <-tcc.NewSubConnCh: + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + } } - for c := 0; c < 5; c++ { - pi.Done(balancer.DoneInfo{}) + + // Register health listeners after all the connectivity updates are + // processed to avoid data races while accessing the health listener within + // the TestClientConn. + connectionsReady := 0 + for connectionsReady < test.numberOfConns { + select { + case <-ctx.Done(): + t.Fatal("Context timed out waiting for all SubConns to become READY.") + case <-connectivityCh: + connectionsReady++ + } } - } - od.intervalTimerAlgorithm() + for i := range test.numberOfConns { + allSubConns[i].RegisterHealthListener(func(healthState balancer.SubConnState) { + scsCh.Send(subConnWithState{sc: allSubConns[i], state: healthState}) + }) + } - // verify no StateListener() call on the child, as no addresses got - // ejected (ejected address will cause an StateListener call). - sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - if _, err := scsCh.Receive(sCtx); err == nil { - t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") - } + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case picker := <-tcc.NewPickerCh: + // Set each of the three upstream addresses to have five successes each. + // This should cause none of the addresses to be ejected as none of them + // are outliers according to the success rate algorithm. + for i := 0; i < test.numberOfConns; i++ { + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("picker.Pick failed with error: %v", err) + } + for c := 0; c < 5; c++ { + pi.Done(balancer.DoneInfo{}) + } + } - // Since no addresses are ejected, a SubConn update should forward down - // to the child. - od.scUpdateCh.Put(&scHealthUpdate{ - scw: scw1.(*subConnWrapper), - state: balancer.SubConnState{ - ConnectivityState: connectivity.Connecting, - }}, - ) + // Create test metrics recorder + tmr := stats.NewTestMetricsRecorder() + od.metricsRecorder = tmr + od.intervalTimerAlgorithm() + + // verify no StateListener() call on the child, as no addresses got + // ejected (ejected address will cause an StateListener call). + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err := scsCh.Receive(sCtx); err == nil { + t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %f, want 0", got) + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %f, want 0", got) + } - gotSCWS, err := scsCh.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for Sub Conn update: %v", err) - } - if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ - sc: scw1, - state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, - }); err != nil { - t.Fatalf("Error in Sub Conn update: %v", err) - } + // Since no addresses are ejected, a SubConn update should forward down + // to the child. + newState := balancer.SubConnState{ConnectivityState: connectivity.Connecting} + wantSC := allSubConns[0] + od.scUpdateCh.Put(&scHealthUpdate{ + scw: wantSC.(*subConnWrapper), + state: newState, + }) + + gotSCWS, err := scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ + sc: wantSC, + state: newState, + }); err != nil { + t.Fatalf("Error in Sub Conn update: %v", err) + } - // Set two of the upstream addresses to have five successes each, and - // one of the upstream addresses to have five failures. This should - // cause the address which has five failures to be ejected according to - // the SuccessRateAlgorithm. - for i := 0; i < 2; i++ { - pi, err := picker.Pick(balancer.PickInfo{}) - if err != nil { - t.Fatalf("picker.Pick failed with error: %v", err) - } - for c := 0; c < 5; c++ { - pi.Done(balancer.DoneInfo{}) - } - } - pi, err := picker.Pick(balancer.PickInfo{}) - if err != nil { - t.Fatalf("picker.Pick failed with error: %v", err) - } - if got, want := pi.SubConn, scw3.(*subConnWrapper).SubConn; got != want { - t.Fatalf("Unexpected SubConn chosen by picker: got %v, want %v", got, want) - } - for c := 0; c < 5; c++ { - pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) - } + // Set all the upstream before the offset to have five successes, but + // the remaining addresses to have failures. + offset := test.numberOfConns - test.wantFailures + for i := 0; i < test.numberOfConns; i++ { + pickerDone := balancer.DoneInfo{} + if i >= offset { + pickerDone = balancer.DoneInfo{Err: errors.New("some error")} + } + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("picker.Pick failed with error: %v", err) + } + for c := 0; c < 5; c++ { + pi.Done(pickerDone) + } - // should eject address that always errored. - od.intervalTimerAlgorithm() - // Due to the address being ejected, the SubConn with that address - // should be ejected, meaning a TRANSIENT_FAILURE connectivity state - // gets reported to the child. - gotSCWS, err = scsCh.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for Sub Conn update: %v", err) - } - if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ - sc: scw3, - state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, - }); err != nil { - t.Fatalf("Error in Sub Conn update: %v", err) - } - // Only one address should be ejected. - sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - if _, err := scsCh.Receive(sCtx); err == nil { - t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") - } + } - // Now that an address is ejected, SubConn updates for SubConns using - // that address should not be forwarded downward. These SubConn updates - // will be cached to update the child sometime in the future when the - // address gets unejected. - od.scUpdateCh.Put(&scHealthUpdate{ - scw: scw3.(*subConnWrapper), - state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, - }) - sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - if _, err := scsCh.Receive(sCtx); err == nil { - t.Fatalf("SubConn update should not have been forwarded (the SubConn is ejected)") - } + // should eject address that always errored. + od.intervalTimerAlgorithm() - // Override now to cause the interval timer algorithm to always uneject - // the ejected address. This will always uneject the ejected address - // because this time is set way past the max ejection time set in the - // configuration, which will make the next interval timer algorithm run - // uneject any ejected addresses. - defer func(n func() time.Time) { - now = n - }(now) - now = func() time.Time { - return time.Now().Add(time.Second * 1000) - } - od.intervalTimerAlgorithm() + // Due to the address being ejected, the SubConn with that address + // should be ejected, meaning a TRANSIENT_FAILURE connectivity state + // gets reported to the child. + for i := 0; i < test.wantEjections; i++ { + got, err := scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for SubConn to be ejected: %v", err) + } + if err = scwsEqual(got.(subConnWithState), subConnWithState{ + sc: allSubConns[len(allSubConns)-1-i], + state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, + }); err != nil { + t.Fatalf("Unexpected subconnection with state: %v", err) + } + } - // unejected SubConn should report latest persisted state - which is - // connecting from earlier. - gotSCWS, err = scsCh.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for Sub Conn update: %v", err) - } - if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ - sc: scw3, - state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, - }); err != nil { - t.Fatalf("Error in Sub Conn update: %v", err) - } + sCtx, cancel2 := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel2() + if _, err := scsCh.Receive(sCtx); err == nil { + t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") + } + + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != float64(test.wantEjections) { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %f, want %f", got, float64(test.wantEjections)) + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != float64(test.wantFailures-test.wantEjections) { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %f, want %f", got, float64(test.wantFailures-test.wantEjections)) + } + + for i := 0; i < test.wantEjections; i++ { + // Now that an address is ejected, SubConn updates for SubConns using + // that address should not be forwarded downward. These SubConn updates + // will be cached to update the child sometime in the future when the + // address gets unejected. + scw := allSubConns[len(allSubConns)-1-i] + od.scUpdateCh.Put(&scHealthUpdate{ + scw: scw.(*subConnWrapper), + state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, + }) + sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err := scsCh.Receive(sCtx); err == nil { + t.Fatalf("SubConn update should not have been forwarded (the SubConn is ejected)") + } + } + + // Override now to cause the interval timer algorithm to always uneject + // the ejected address. This will always uneject the ejected address + // because this time is set way past the max ejection time set in the + // configuration, which will make the next interval timer algorithm run + // uneject any ejected addresses. + defer func(n func() time.Time) { + now = n + }(now) + now = func() time.Time { + return time.Now().Add(time.Second * 1000) + } + od.intervalTimerAlgorithm() + for i := 0; i < test.wantEjections; i++ { + scw := allSubConns[len(allSubConns)-1-i] + // unejected SubConn should report latest persisted state - which is + // connecting from earlier. + gotSCWS, err = scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ + sc: scw, + state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, + }); err != nil { + t.Fatalf("Error in Sub Conn update: %v", err) + } + } + } + }) } } @@ -1414,12 +1506,20 @@ func (s) TestEjectFailureRate(t *testing.T) { pi.Done(balancer.DoneInfo{}) } } + tmr := stats.NewTestMetricsRecorder() + od.metricsRecorder = tmr od.intervalTimerAlgorithm() sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer cancel() if _, err := scsCh.Receive(sCtx); err == nil { - t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") + t.Fatalf("Received unexpected subchannel state change when expecting none") + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 0", got) + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) } // Set two upstream addresses to have five successes each, and one @@ -1463,7 +1563,13 @@ func (s) TestEjectFailureRate(t *testing.T) { sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) defer cancel() if _, err := scsCh.Receive(sCtx); err == nil { - t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") + t.Fatalf("Received unexpected subchannel state change when expecting none") + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 1 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 1", got) + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) } // upon the Outlier Detection balancer being reconfigured with a noop