From 9af34e66858902a11473560a403aaef1b07126b3 Mon Sep 17 00:00:00 2001 From: Pardhu Konakanchi Date: Tue, 8 Jul 2025 11:21:49 -0400 Subject: [PATCH 01/12] xds: client outlier detection metrics --- .../xds/balancer/outlierdetection/balancer.go | 93 ++++++++++++++----- .../outlierdetection/balancer_test.go | 30 +++++- 2 files changed, 99 insertions(+), 24 deletions(-) diff --git a/internal/xds/balancer/outlierdetection/balancer.go b/internal/xds/balancer/outlierdetection/balancer.go index 1b0149e309c6..9230d2512846 100644 --- a/internal/xds/balancer/outlierdetection/balancer.go +++ b/internal/xds/balancer/outlierdetection/balancer.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/channelz" @@ -52,6 +53,26 @@ var ( // Name is the name of the outlier detection balancer. const Name = "outlier_detection_experimental" +var ( + ejectionsEnforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.outlier_detection.ejections_enforced", + Description: "EXPERIMENTAL. Enforced outlier ejections by detection method", + Unit: "ejection", + Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method"}, + OptionalLabels: []string{"grpc.lb.backend_service"}, + Default: false, + }) + + ejectionsUnenforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.outlier_detection.ejections_unenforced", + Description: "EXPERIMENTAL. Unenforced outlier ejections due to either max ejection percentage or enforcement_percentage", + Unit: "ejection", + Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method", "grpc.lb.outlier_detection.unenforced_reason"}, + OptionalLabels: []string{"grpc.lb.backend_service"}, + Default: false, + }) +) + func init() { balancer.Register(bb{}) } @@ -60,14 +81,17 @@ type bb struct{} func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { b := &outlierDetectionBalancer{ - ClientConn: cc, - closed: grpcsync.NewEvent(), - done: grpcsync.NewEvent(), - addrs: make(map[string]*endpointInfo), - scUpdateCh: buffer.NewUnbounded(), - pickerUpdateCh: buffer.NewUnbounded(), - channelzParent: bOpts.ChannelzParent, - endpoints: resolver.NewEndpointMap[*endpointInfo](), + ClientConn: cc, + closed: grpcsync.NewEvent(), + done: grpcsync.NewEvent(), + addrs: make(map[string]*endpointInfo), + scUpdateCh: buffer.NewUnbounded(), + pickerUpdateCh: buffer.NewUnbounded(), + channelzParent: bOpts.ChannelzParent, + endpoints: resolver.NewEndpointMap[*endpointInfo](), + metricsRecorder: cc.MetricsRecorder(), + target: bOpts.Target.String(), + backendService: "", // Will be set in UpdateClientConnState } b.logger = prefixLogger(b) b.logger.Infof("Created") @@ -134,6 +158,15 @@ func (bb) Name() string { return Name } +// extractBackendService extracts the backend service from resolver state attributes. +// This is a placeholder implementation - the actual extraction logic should be +// implemented based on the specific resolver attributes available. +func extractBackendService(state resolver.State) string { + // TODO: Implement backend service extraction from resolver attributes per A89 and A75 + // For now, return empty string as this is optional + return "" +} + // scUpdate wraps a subConn update to be sent to the child balancer. type scUpdate struct { scw *subConnWrapper @@ -169,10 +202,13 @@ type outlierDetectionBalancer struct { // to suppress redundant picker updates. recentPickerNoop bool - closed *grpcsync.Event - done *grpcsync.Event - logger *grpclog.PrefixLogger - channelzParent channelz.Identifier + closed *grpcsync.Event + done *grpcsync.Event + logger *grpclog.PrefixLogger + channelzParent channelz.Identifier + metricsRecorder estats.MetricsRecorder + target string + backendService string child synchronizingBalancerWrapper @@ -294,6 +330,7 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt b.inhibitPickerUpdates = true b.updateUnconditionally = false b.cfg = lbCfg + b.backendService = extractBackendService(s.ResolverState) newEndpoints := resolver.NewEndpointMap[bool]() for _, ep := range s.ResolverState.Endpoints { @@ -791,15 +828,21 @@ func (b *outlierDetectionBalancer) successRateAlgorithm() { for _, epInfo := range endpointsToConsider { bucket := epInfo.callCounter.inactiveBucket ejectionCfg := b.cfg.SuccessRateEjection - if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { - return - } successRate := float64(bucket.numSuccesses) / float64(bucket.numSuccesses+bucket.numFailures) requiredSuccessRate := mean - stddev*(float64(ejectionCfg.StdevFactor)/1000) if successRate < requiredSuccessRate { channelz.Infof(logger, b.channelzParent, "SuccessRate algorithm detected outlier: %s. Parameters: successRate=%f, mean=%f, stddev=%f, requiredSuccessRate=%f", epInfo, successRate, mean, stddev, requiredSuccessRate) + // Check if max ejection percentage would prevent ejection + if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { + // Record unenforced ejection due to max ejection percentage + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "max_ejection_overflow", b.backendService) + continue + } if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage { - b.ejectEndpoint(epInfo) + b.ejectEndpoint(epInfo, "success_rate") + } else { + // Record unenforced ejection due to enforcement percentage + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "enforcement_percentage", b.backendService) } } } @@ -819,21 +862,27 @@ func (b *outlierDetectionBalancer) failurePercentageAlgorithm() { for _, epInfo := range endpointsToConsider { bucket := epInfo.callCounter.inactiveBucket ejectionCfg := b.cfg.FailurePercentageEjection - if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { - return - } failurePercentage := (float64(bucket.numFailures) / float64(bucket.numSuccesses+bucket.numFailures)) * 100 if failurePercentage > float64(b.cfg.FailurePercentageEjection.Threshold) { channelz.Infof(logger, b.channelzParent, "FailurePercentage algorithm detected outlier: %s, failurePercentage=%f", epInfo, failurePercentage) + // Check if max ejection percentage would prevent ejection + if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { + // Record unenforced ejection due to max ejection percentage + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "max_ejection_overflow", b.backendService) + continue + } if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage { - b.ejectEndpoint(epInfo) + b.ejectEndpoint(epInfo, "failure_percentage") + } else { + // Record unenforced ejection due to enforcement percentage + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "enforcement_percentage", b.backendService) } } } } // Caller must hold b.mu. -func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo) { +func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo, detectionMethod string) { b.numEndpointsEjected++ epInfo.latestEjectionTimestamp = b.timerStartTime epInfo.ejectionTimeMultiplier++ @@ -842,6 +891,8 @@ func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo) { channelz.Infof(logger, b.channelzParent, "Subchannel ejected: %s", sbw) } + // Record the enforced ejection metric + ejectionsEnforcedMetric.Record(b.metricsRecorder, 1, b.target, detectionMethod, b.backendService) } // Caller must hold b.mu. diff --git a/internal/xds/balancer/outlierdetection/balancer_test.go b/internal/xds/balancer/outlierdetection/balancer_test.go index be3f03cdc0eb..34fc39875c3a 100644 --- a/internal/xds/balancer/outlierdetection/balancer_test.go +++ b/internal/xds/balancer/outlierdetection/balancer_test.go @@ -46,6 +46,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/roundrobin" + "google.golang.org/grpc/internal/testutils/stats" "google.golang.org/grpc/internal/xds/balancer/clusterimpl" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" @@ -761,7 +762,7 @@ func (s) TestUpdateAddresses(t *testing.T) { defer cancel() // Transition SubConns to READY so that they can register a health listener. - for range 2 { + for i := 0; i < 2; i++ { select { case <-ctx.Done(): t.Fatalf("Timed out waiting for creation of new SubConn.") @@ -1113,7 +1114,7 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { // Transition the SubConns to READY so that they can register health // listeners. - for range 3 { + for i := 0; i < 3; i++ { select { case <-ctx.Done(): t.Fatalf("Timed out waiting for creation of new SubConn.") @@ -1159,6 +1160,9 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { } } + // Create test metrics recorder + tmr := stats.NewTestMetricsRecorder() + od.metricsRecorder = tmr od.intervalTimerAlgorithm() // verify no StateListener() call on the child, as no addresses got @@ -1168,6 +1172,9 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { if _, err := scsCh.Receive(sCtx); err == nil { t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 0", got) + } // Since no addresses are ejected, a SubConn update should forward down // to the child. @@ -1234,6 +1241,9 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { if _, err := scsCh.Receive(sCtx); err == nil { t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 1 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 1", got) + } // Now that an address is ejected, SubConn updates for SubConns using // that address should not be forwarded downward. These SubConn updates @@ -1370,7 +1380,7 @@ func (s) TestEjectFailureRate(t *testing.T) { // Transition the SubConns to READY so that they can register health // listeners. - for range 3 { + for i := 0; i < 3; i++ { select { case <-ctx.Done(): t.Fatal("Timed out waiting for creation of new SubConn.") @@ -1414,6 +1424,8 @@ func (s) TestEjectFailureRate(t *testing.T) { pi.Done(balancer.DoneInfo{}) } } + tmr := stats.NewTestMetricsRecorder() + od.metricsRecorder = tmr od.intervalTimerAlgorithm() sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) @@ -1421,6 +1433,12 @@ func (s) TestEjectFailureRate(t *testing.T) { if _, err := scsCh.Receive(sCtx); err == nil { t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 0", got) + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) + } // Set two upstream addresses to have five successes each, and one // upstream address to have five failures. This should cause the address @@ -1465,6 +1483,12 @@ func (s) TestEjectFailureRate(t *testing.T) { if _, err := scsCh.Receive(sCtx); err == nil { t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 1 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 1", got) + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) + } // upon the Outlier Detection balancer being reconfigured with a noop // configuration, every ejected SubConn should be unejected. From a6d81a3ce88a3cc22ab974e53a2ad2343d5d274b Mon Sep 17 00:00:00 2001 From: Pardhu Konakanchi Date: Tue, 8 Jul 2025 11:29:01 -0400 Subject: [PATCH 02/12] revert range changes in test --- internal/xds/balancer/outlierdetection/balancer_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/xds/balancer/outlierdetection/balancer_test.go b/internal/xds/balancer/outlierdetection/balancer_test.go index 34fc39875c3a..bf4f6d6b9d3a 100644 --- a/internal/xds/balancer/outlierdetection/balancer_test.go +++ b/internal/xds/balancer/outlierdetection/balancer_test.go @@ -762,7 +762,7 @@ func (s) TestUpdateAddresses(t *testing.T) { defer cancel() // Transition SubConns to READY so that they can register a health listener. - for i := 0; i < 2; i++ { + for range 2 { select { case <-ctx.Done(): t.Fatalf("Timed out waiting for creation of new SubConn.") @@ -1114,7 +1114,7 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { // Transition the SubConns to READY so that they can register health // listeners. - for i := 0; i < 3; i++ { + for range 3 { select { case <-ctx.Done(): t.Fatalf("Timed out waiting for creation of new SubConn.") @@ -1380,7 +1380,7 @@ func (s) TestEjectFailureRate(t *testing.T) { // Transition the SubConns to READY so that they can register health // listeners. - for i := 0; i < 3; i++ { + for range 3 { select { case <-ctx.Done(): t.Fatal("Timed out waiting for creation of new SubConn.") From 69fdb33f563175369c7f016d35d106bb683d1f14 Mon Sep 17 00:00:00 2001 From: Pardhu Konakanchi Date: Tue, 8 Jul 2025 11:30:02 -0400 Subject: [PATCH 03/12] add checks for ejections_unenforced in test --- internal/xds/balancer/outlierdetection/balancer_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/xds/balancer/outlierdetection/balancer_test.go b/internal/xds/balancer/outlierdetection/balancer_test.go index bf4f6d6b9d3a..63a53755af5b 100644 --- a/internal/xds/balancer/outlierdetection/balancer_test.go +++ b/internal/xds/balancer/outlierdetection/balancer_test.go @@ -1175,6 +1175,9 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 0 { t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 0", got) } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) + } // Since no addresses are ejected, a SubConn update should forward down // to the child. @@ -1244,6 +1247,9 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 1 { t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 1", got) } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) + } // Now that an address is ejected, SubConn updates for SubConns using // that address should not be forwarded downward. These SubConn updates From b7765f816206bf62ed23f0d12abcc2bb156683b3 Mon Sep 17 00:00:00 2001 From: Pardhu Konakanchi <46410151+PardhuKonakanchi@users.noreply.github.com> Date: Mon, 14 Jul 2025 21:35:47 -0400 Subject: [PATCH 04/12] Update xds/internal/balancer/outlierdetection/balancer.go Co-authored-by: eshitachandwani <59800922+eshitachandwani@users.noreply.github.com> --- internal/xds/balancer/outlierdetection/balancer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/xds/balancer/outlierdetection/balancer.go b/internal/xds/balancer/outlierdetection/balancer.go index 9230d2512846..d9e95b9378a6 100644 --- a/internal/xds/balancer/outlierdetection/balancer.go +++ b/internal/xds/balancer/outlierdetection/balancer.go @@ -56,7 +56,7 @@ const Name = "outlier_detection_experimental" var ( ejectionsEnforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ Name: "grpc.lb.outlier_detection.ejections_enforced", - Description: "EXPERIMENTAL. Enforced outlier ejections by detection method", + Description: "EXPERIMENTAL. Number of outlier ejections enforced by detection method", Unit: "ejection", Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method"}, OptionalLabels: []string{"grpc.lb.backend_service"}, From ff7bb777bb682167b62f2ed0ff1549ede0f8bc2a Mon Sep 17 00:00:00 2001 From: Pardhu Konakanchi <46410151+PardhuKonakanchi@users.noreply.github.com> Date: Mon, 14 Jul 2025 21:40:25 -0400 Subject: [PATCH 05/12] Update xds/internal/balancer/outlierdetection/balancer.go Co-authored-by: eshitachandwani <59800922+eshitachandwani@users.noreply.github.com> --- internal/xds/balancer/outlierdetection/balancer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/xds/balancer/outlierdetection/balancer.go b/internal/xds/balancer/outlierdetection/balancer.go index d9e95b9378a6..3550d6bf92fc 100644 --- a/internal/xds/balancer/outlierdetection/balancer.go +++ b/internal/xds/balancer/outlierdetection/balancer.go @@ -65,7 +65,7 @@ var ( ejectionsUnenforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ Name: "grpc.lb.outlier_detection.ejections_unenforced", - Description: "EXPERIMENTAL. Unenforced outlier ejections due to either max ejection percentage or enforcement_percentage", + Description: "EXPERIMENTAL. Number of unenforced outlier ejections due to either max ejection percentage or enforcement_percentage", Unit: "ejection", Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method", "grpc.lb.outlier_detection.unenforced_reason"}, OptionalLabels: []string{"grpc.lb.backend_service"}, From 3f2d32b44f2acc2c27670c747c03a61ed4c13fce Mon Sep 17 00:00:00 2001 From: Pardhu Konakanchi <46410151+PardhuKonakanchi@users.noreply.github.com> Date: Mon, 14 Jul 2025 21:40:32 -0400 Subject: [PATCH 06/12] Update xds/internal/balancer/outlierdetection/balancer.go Co-authored-by: eshitachandwani <59800922+eshitachandwani@users.noreply.github.com> --- internal/xds/balancer/outlierdetection/balancer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/xds/balancer/outlierdetection/balancer.go b/internal/xds/balancer/outlierdetection/balancer.go index 3550d6bf92fc..2d76e5d7041f 100644 --- a/internal/xds/balancer/outlierdetection/balancer.go +++ b/internal/xds/balancer/outlierdetection/balancer.go @@ -161,7 +161,7 @@ func (bb) Name() string { // extractBackendService extracts the backend service from resolver state attributes. // This is a placeholder implementation - the actual extraction logic should be // implemented based on the specific resolver attributes available. -func extractBackendService(state resolver.State) string { +func extractBackendService(resolver.State) string { // TODO: Implement backend service extraction from resolver attributes per A89 and A75 // For now, return empty string as this is optional return "" From b046aa2106dccc7fd0d8a4218498b748c0b90790 Mon Sep 17 00:00:00 2001 From: Pardhu Konakanchi Date: Mon, 14 Jul 2025 21:49:46 -0400 Subject: [PATCH 07/12] remove unnecessary state param name --- internal/xds/balancer/outlierdetection/balancer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/xds/balancer/outlierdetection/balancer.go b/internal/xds/balancer/outlierdetection/balancer.go index 2d76e5d7041f..66ca672fea52 100644 --- a/internal/xds/balancer/outlierdetection/balancer.go +++ b/internal/xds/balancer/outlierdetection/balancer.go @@ -91,7 +91,6 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba endpoints: resolver.NewEndpointMap[*endpointInfo](), metricsRecorder: cc.MetricsRecorder(), target: bOpts.Target.String(), - backendService: "", // Will be set in UpdateClientConnState } b.logger = prefixLogger(b) b.logger.Infof("Created") From ea06c83bd0f59938ef4fffe39e58929003adb1e4 Mon Sep 17 00:00:00 2001 From: sotiris Date: Mon, 13 Oct 2025 14:14:11 -0400 Subject: [PATCH 08/12] * addresses PR comments by removing the optional label that is set to `""` * adds tests to verify the unenforced ejection metrics are recorded correctly when outliers are detected but not enforced. Signed-off-by: sotiris --- .../xds/balancer/outlierdetection/balancer.go | 43 +- .../outlierdetection/balancer_test.go | 548 ++++++++++-------- 2 files changed, 328 insertions(+), 263 deletions(-) diff --git a/internal/xds/balancer/outlierdetection/balancer.go b/internal/xds/balancer/outlierdetection/balancer.go index 66ca672fea52..ba8b5c64ff6f 100644 --- a/internal/xds/balancer/outlierdetection/balancer.go +++ b/internal/xds/balancer/outlierdetection/balancer.go @@ -55,21 +55,19 @@ const Name = "outlier_detection_experimental" var ( ejectionsEnforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ - Name: "grpc.lb.outlier_detection.ejections_enforced", - Description: "EXPERIMENTAL. Number of outlier ejections enforced by detection method", - Unit: "ejection", - Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method"}, - OptionalLabels: []string{"grpc.lb.backend_service"}, - Default: false, + Name: "grpc.lb.outlier_detection.ejections_enforced", + Description: "EXPERIMENTAL. Number of outlier ejections enforced by detection method", + Unit: "ejection", + Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method"}, + Default: false, }) ejectionsUnenforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ - Name: "grpc.lb.outlier_detection.ejections_unenforced", - Description: "EXPERIMENTAL. Number of unenforced outlier ejections due to either max ejection percentage or enforcement_percentage", - Unit: "ejection", - Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method", "grpc.lb.outlier_detection.unenforced_reason"}, - OptionalLabels: []string{"grpc.lb.backend_service"}, - Default: false, + Name: "grpc.lb.outlier_detection.ejections_unenforced", + Description: "EXPERIMENTAL. Number of unenforced outlier ejections due to either max ejection percentage or enforcement_percentage", + Unit: "ejection", + Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method", "grpc.lb.outlier_detection.unenforced_reason"}, + Default: false, }) ) @@ -157,15 +155,6 @@ func (bb) Name() string { return Name } -// extractBackendService extracts the backend service from resolver state attributes. -// This is a placeholder implementation - the actual extraction logic should be -// implemented based on the specific resolver attributes available. -func extractBackendService(resolver.State) string { - // TODO: Implement backend service extraction from resolver attributes per A89 and A75 - // For now, return empty string as this is optional - return "" -} - // scUpdate wraps a subConn update to be sent to the child balancer. type scUpdate struct { scw *subConnWrapper @@ -207,7 +196,6 @@ type outlierDetectionBalancer struct { channelzParent channelz.Identifier metricsRecorder estats.MetricsRecorder target string - backendService string child synchronizingBalancerWrapper @@ -329,7 +317,6 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt b.inhibitPickerUpdates = true b.updateUnconditionally = false b.cfg = lbCfg - b.backendService = extractBackendService(s.ResolverState) newEndpoints := resolver.NewEndpointMap[bool]() for _, ep := range s.ResolverState.Endpoints { @@ -834,14 +821,14 @@ func (b *outlierDetectionBalancer) successRateAlgorithm() { // Check if max ejection percentage would prevent ejection if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { // Record unenforced ejection due to max ejection percentage - ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "max_ejection_overflow", b.backendService) + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "max_ejection_overflow") continue } if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage { b.ejectEndpoint(epInfo, "success_rate") } else { // Record unenforced ejection due to enforcement percentage - ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "enforcement_percentage", b.backendService) + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "enforcement_percentage") } } } @@ -867,14 +854,14 @@ func (b *outlierDetectionBalancer) failurePercentageAlgorithm() { // Check if max ejection percentage would prevent ejection if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { // Record unenforced ejection due to max ejection percentage - ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "max_ejection_overflow", b.backendService) + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "max_ejection_overflow") continue } if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage { b.ejectEndpoint(epInfo, "failure_percentage") } else { // Record unenforced ejection due to enforcement percentage - ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "enforcement_percentage", b.backendService) + ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "enforcement_percentage") } } } @@ -891,7 +878,7 @@ func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo, detection } // Record the enforced ejection metric - ejectionsEnforcedMetric.Record(b.metricsRecorder, 1, b.target, detectionMethod, b.backendService) + ejectionsEnforcedMetric.Record(b.metricsRecorder, 1, b.target, detectionMethod) } // Caller must hold b.mu. diff --git a/internal/xds/balancer/outlierdetection/balancer_test.go b/internal/xds/balancer/outlierdetection/balancer_test.go index 63a53755af5b..45909e44234d 100644 --- a/internal/xds/balancer/outlierdetection/balancer_test.go +++ b/internal/xds/balancer/outlierdetection/balancer_test.go @@ -1028,268 +1028,346 @@ func (s) TestDurationOfInterval(t *testing.T) { // TestEjectUnejectSuccessRate tests the functionality of the interval timer // algorithm when configured with SuccessRateEjection. The Outlier Detection -// Balancer will be set up with 3 SubConns, each with a different address. +// Balancer will be set up with N SubConns, each with a different address. // It tests the following scenarios, in a step by step fashion: -// 1. The three addresses each have 5 successes. The interval timer algorithm should +// 1. The N addresses each have 5 successes. The interval timer algorithm should // not eject any of the addresses. -// 2. Two of the addresses have 5 successes, the third has five failures. The -// interval timer algorithm should eject the third address with five failures. +// 2. N - `wantFailures` of the addresses have 5 successes but the remaining address has 5 failures. The internal algorithm +// should attempt to eject the address based on the outlier detection lb config. Only `wantEjections` addresses should be ejected. // 3. The interval timer algorithm is run at a later time past max ejection -// time. The interval timer algorithm should uneject the third address. +// time. The interval timer algorithm should uneject all. func (s) TestEjectUnejectSuccessRate(t *testing.T) { - scsCh := testutils.NewChannel() - var scw1, scw2, scw3 balancer.SubConn - var err error - connectivityCh := make(chan struct{}) - stub.Register(t.Name(), stub.BalancerFuncs{ - UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error { - scw1, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{ - StateListener: func(balancer.SubConnState) {}, - }) - if err != nil { - t.Errorf("error in od.NewSubConn call: %v", err) - } - scw1.Connect() - scw2, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{ - StateListener: func(balancer.SubConnState) {}, - }) - if err != nil { - t.Errorf("error in od.NewSubConn call: %v", err) - } - scw2.Connect() - scw3, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address3"}}, balancer.NewSubConnOptions{ - StateListener: func(state balancer.SubConnState) { - if state.ConnectivityState == connectivity.Ready { - close(connectivityCh) - } + tests := []struct { + name string + lbConfig LBConfig + numberOfConns int + wantFailures int + wantEjections int + }{ + { + name: "three_upstreams_one_failure", + lbConfig: LBConfig{ + Interval: math.MaxInt64, // so the interval will never run unless called manually in test. + BaseEjectionTime: iserviceconfig.Duration(30 * time.Second), + MaxEjectionTime: iserviceconfig.Duration(300 * time.Second), + MaxEjectionPercent: 10, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, }, - }) - if err != nil { - t.Errorf("error in od.NewSubConn call: %v", err) - } - scw3.Connect() - bd.ClientConn.UpdateState(balancer.State{ - ConnectivityState: connectivity.Ready, - Picker: &rrPicker{ - scs: []balancer.SubConn{scw1, scw2, scw3}, + ChildPolicy: &iserviceconfig.BalancerConfig{ + Name: "three_upstreams_one_failure", + Config: emptyChildConfig{}, }, - }) - return nil + }, + numberOfConns: 3, + wantFailures: 1, + wantEjections: 1, }, - }) - - od, tcc, cleanup := setup(t) - defer func() { - cleanup() - }() - - od.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{ - Endpoints: []resolver.Endpoint{ - {Addresses: []resolver.Address{{Addr: "address1"}}}, - {Addresses: []resolver.Address{{Addr: "address2"}}}, - {Addresses: []resolver.Address{{Addr: "address3"}}}, + { + name: "three_upstreams_no_failure", + lbConfig: LBConfig{ + Interval: math.MaxInt64, // so the interval will never run unless called manually in test. + BaseEjectionTime: iserviceconfig.Duration(30 * time.Second), + MaxEjectionTime: iserviceconfig.Duration(300 * time.Second), + MaxEjectionPercent: 10, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + ChildPolicy: &iserviceconfig.BalancerConfig{ + Name: "three_upstreams_no_failure", + Config: emptyChildConfig{}, + }, }, + numberOfConns: 3, + wantFailures: 0, + wantEjections: 0, }, - BalancerConfig: &LBConfig{ - Interval: math.MaxInt64, // so the interval will never run unless called manually in test. - BaseEjectionTime: iserviceconfig.Duration(30 * time.Second), - MaxEjectionTime: iserviceconfig.Duration(300 * time.Second), - MaxEjectionPercent: 10, - FailurePercentageEjection: &FailurePercentageEjection{ - Threshold: 50, - EnforcementPercentage: 100, - MinimumHosts: 3, - RequestVolume: 3, + { + name: "three_upstreams_one_failure_no_ejection_enforcement_perc", + lbConfig: LBConfig{ + Interval: math.MaxInt64, // so the interval will never run unless called manually in test. + BaseEjectionTime: iserviceconfig.Duration(30 * time.Second), + MaxEjectionTime: iserviceconfig.Duration(300 * time.Second), + MaxEjectionPercent: 10, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 0, + MinimumHosts: 3, + RequestVolume: 3, + }, + ChildPolicy: &iserviceconfig.BalancerConfig{ + Name: "three_upstreams_one_failure_no_ejection_enforcement_perc", + Config: emptyChildConfig{}, + }, }, - ChildPolicy: &iserviceconfig.BalancerConfig{ - Name: t.Name(), - Config: emptyChildConfig{}, + numberOfConns: 3, + wantFailures: 1, + wantEjections: 0, + }, + { + name: "three_upstreams_one_failure_no_ejection_max_ejection_perc", + lbConfig: LBConfig{ + Interval: math.MaxInt64, // so the interval will never run unless called manually in test. + BaseEjectionTime: iserviceconfig.Duration(30 * time.Second), + MaxEjectionTime: iserviceconfig.Duration(300 * time.Second), + MaxEjectionPercent: 0, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + ChildPolicy: &iserviceconfig.BalancerConfig{ + Name: "three_upstreams_one_failure_no_ejection_max_ejection_perc", + Config: emptyChildConfig{}, + }, }, + numberOfConns: 3, + wantFailures: 1, + wantEjections: 0, }, - }) - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // Transition the SubConns to READY so that they can register health - // listeners. - for range 3 { - select { - case <-ctx.Done(): - t.Fatalf("Timed out waiting for creation of new SubConn.") - case sc := <-tcc.NewSubConnCh: - sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) - } } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + scsCh := testutils.NewChannel() + connectivityCh := make(chan struct{}) + var allSubConns = make([]balancer.SubConn, test.numberOfConns) + stub.Register(test.name, stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error { + for i := range test.numberOfConns { + scw, err := bd.ClientConn.NewSubConn([]resolver.Address{{Addr: fmt.Sprintf("address%d", i+1)}}, balancer.NewSubConnOptions{ + StateListener: func(state balancer.SubConnState) { + if state.ConnectivityState == connectivity.Ready { + connectivityCh <- struct{}{} + } + }, + }) + if err != nil { + t.Errorf("error in od.NewSubConn call: %v", err) + } + scw.Connect() + allSubConns[i] = scw + } - // Register health listeners after all the connectivity updates are - // processed to avoid data races while accessing the health listener within - // the TestClientConn. - select { - case <-ctx.Done(): - t.Fatal("Context timed out waiting for all SubConns to become READY.") - case <-connectivityCh: - } + bd.ClientConn.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &rrPicker{ + scs: allSubConns, + }, + }) + return nil + }, + }) - scw1.RegisterHealthListener(func(healthState balancer.SubConnState) { - scsCh.Send(subConnWithState{sc: scw1, state: healthState}) - }) - scw2.RegisterHealthListener(func(healthState balancer.SubConnState) { - scsCh.Send(subConnWithState{sc: scw2, state: healthState}) - }) - scw3.RegisterHealthListener(func(healthState balancer.SubConnState) { - scsCh.Send(subConnWithState{sc: scw3, state: healthState}) - }) + od, tcc, cleanup := setup(t) + defer cleanup() + endpoints := make([]resolver.Endpoint, len(allSubConns)) + for i := range allSubConns { + endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{{Addr: fmt.Sprintf("address%d", i+1)}}} + } + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Endpoints: endpoints, + }, + BalancerConfig: &test.lbConfig, + }) - select { - case <-ctx.Done(): - t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") - case picker := <-tcc.NewPickerCh: - // Set each of the three upstream addresses to have five successes each. - // This should cause none of the addresses to be ejected as none of them - // are outliers according to the success rate algorithm. - for i := 0; i < 3; i++ { - pi, err := picker.Pick(balancer.PickInfo{}) - if err != nil { - t.Fatalf("picker.Pick failed with error: %v", err) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Transition the SubConns to READY so that they can register health + // listeners. + for range test.numberOfConns { + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for creation of new SubConn.") + case sc := <-tcc.NewSubConnCh: + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + } } - for c := 0; c < 5; c++ { - pi.Done(balancer.DoneInfo{}) + + // Register health listeners after all the connectivity updates are + // processed to avoid data races while accessing the health listener within + // the TestClientConn. + connectionsReady := 0 + OuterLoop: + for { + select { + case <-ctx.Done(): + t.Fatal("Context timed out waiting for all SubConns to become READY.") + case <-connectivityCh: + connectionsReady++ + if connectionsReady == len(allSubConns) { + break OuterLoop + } + } } - } - // Create test metrics recorder - tmr := stats.NewTestMetricsRecorder() - od.metricsRecorder = tmr - od.intervalTimerAlgorithm() + for _, sc := range allSubConns { + sc.RegisterHealthListener(func(healthState balancer.SubConnState) { + scsCh.Send(subConnWithState{sc: sc, state: healthState}) + }) + } - // verify no StateListener() call on the child, as no addresses got - // ejected (ejected address will cause an StateListener call). - sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - if _, err := scsCh.Receive(sCtx); err == nil { - t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") - } - if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 0 { - t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 0", got) - } - if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { - t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) - } + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case picker := <-tcc.NewPickerCh: + // Set each of the three upstream addresses to have five successes each. + // This should cause none of the addresses to be ejected as none of them + // are outliers according to the success rate algorithm. + for i := 0; i < len(allSubConns); i++ { + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("picker.Pick failed with error: %v", err) + } + for c := 0; c < 5; c++ { + pi.Done(balancer.DoneInfo{}) + } + } - // Since no addresses are ejected, a SubConn update should forward down - // to the child. - od.scUpdateCh.Put(&scHealthUpdate{ - scw: scw1.(*subConnWrapper), - state: balancer.SubConnState{ - ConnectivityState: connectivity.Connecting, - }}, - ) + // Create test metrics recorder + tmr := stats.NewTestMetricsRecorder() + od.metricsRecorder = tmr + od.intervalTimerAlgorithm() + + // verify no StateListener() call on the child, as no addresses got + // ejected (ejected address will cause an StateListener call). + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err := scsCh.Receive(sCtx); err == nil { + t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 0", got) + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) + } - gotSCWS, err := scsCh.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for Sub Conn update: %v", err) - } - if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ - sc: scw1, - state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, - }); err != nil { - t.Fatalf("Error in Sub Conn update: %v", err) - } + // Since no addresses are ejected, a SubConn update should forward down + // to the child. + newState := balancer.SubConnState{ConnectivityState: connectivity.Connecting} + wantSC := allSubConns[0] + od.scUpdateCh.Put(&scHealthUpdate{ + scw: wantSC.(*subConnWrapper), + state: newState, + }) + + gotSCWS, err := scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ + sc: wantSC, + state: newState, + }); err != nil { + t.Fatalf("Error in Sub Conn update: %v", err) + } - // Set two of the upstream addresses to have five successes each, and - // one of the upstream addresses to have five failures. This should - // cause the address which has five failures to be ejected according to - // the SuccessRateAlgorithm. - for i := 0; i < 2; i++ { - pi, err := picker.Pick(balancer.PickInfo{}) - if err != nil { - t.Fatalf("picker.Pick failed with error: %v", err) - } - for c := 0; c < 5; c++ { - pi.Done(balancer.DoneInfo{}) - } - } - pi, err := picker.Pick(balancer.PickInfo{}) - if err != nil { - t.Fatalf("picker.Pick failed with error: %v", err) - } - if got, want := pi.SubConn, scw3.(*subConnWrapper).SubConn; got != want { - t.Fatalf("Unexpected SubConn chosen by picker: got %v, want %v", got, want) - } - for c := 0; c < 5; c++ { - pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) - } + // Set all the upstream but the failing one to have five successes + for i := 0; i < len(allSubConns); i++ { + pickerDone := balancer.DoneInfo{} + if i >= (len(allSubConns) - test.wantFailures) { + pickerDone = balancer.DoneInfo{Err: errors.New("some error")} + } + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("picker.Pick failed with error: %v", err) + } + for c := 0; c < 5; c++ { + pi.Done(pickerDone) + } - // should eject address that always errored. - od.intervalTimerAlgorithm() - // Due to the address being ejected, the SubConn with that address - // should be ejected, meaning a TRANSIENT_FAILURE connectivity state - // gets reported to the child. - gotSCWS, err = scsCh.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for Sub Conn update: %v", err) - } - if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ - sc: scw3, - state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, - }); err != nil { - t.Fatalf("Error in Sub Conn update: %v", err) - } - // Only one address should be ejected. - sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - if _, err := scsCh.Receive(sCtx); err == nil { - t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") - } - if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 1 { - t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 1", got) - } - if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { - t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) - } + } - // Now that an address is ejected, SubConn updates for SubConns using - // that address should not be forwarded downward. These SubConn updates - // will be cached to update the child sometime in the future when the - // address gets unejected. - od.scUpdateCh.Put(&scHealthUpdate{ - scw: scw3.(*subConnWrapper), - state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, - }) - sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - if _, err := scsCh.Receive(sCtx); err == nil { - t.Fatalf("SubConn update should not have been forwarded (the SubConn is ejected)") - } + // should eject address that always errored. + od.intervalTimerAlgorithm() + + // Due to the address being ejected, the SubConn with that address + // should be ejected, meaning a TRANSIENT_FAILURE connectivity state + // gets reported to the child. + for i := 0; i < test.wantEjections; i++ { + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + got, err := scsCh.Receive(sCtx) + if err != nil { + t.Fatalf("Error waiting for SubConn to be ejected: %v", err) + } + if err = scwsEqual(got.(subConnWithState), subConnWithState{ + sc: allSubConns[len(allSubConns)-1-i], + state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, + }); err != nil { + t.Fatalf("Unexpected subconnection with state: %v", err) + } + } - // Override now to cause the interval timer algorithm to always uneject - // the ejected address. This will always uneject the ejected address - // because this time is set way past the max ejection time set in the - // configuration, which will make the next interval timer algorithm run - // uneject any ejected addresses. - defer func(n func() time.Time) { - now = n - }(now) - now = func() time.Time { - return time.Now().Add(time.Second * 1000) - } - od.intervalTimerAlgorithm() + sCtx, cancel2 := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel2() + if _, err := scsCh.Receive(sCtx); err == nil { + t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") + } - // unejected SubConn should report latest persisted state - which is - // connecting from earlier. - gotSCWS, err = scsCh.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for Sub Conn update: %v", err) - } - if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ - sc: scw3, - state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, - }); err != nil { - t.Fatalf("Error in Sub Conn update: %v", err) - } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != float64(test.wantEjections) { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want %v", got, float64(test.wantEjections)) + } + if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != float64(test.wantFailures-test.wantEjections) { + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want %v", got, float64(test.wantFailures-test.wantEjections)) + } + + for i := 0; i < test.wantEjections; i++ { + // Now that an address is ejected, SubConn updates for SubConns using + // that address should not be forwarded downward. These SubConn updates + // will be cached to update the child sometime in the future when the + // address gets unejected. + scw := allSubConns[len(allSubConns)-1-i] + od.scUpdateCh.Put(&scHealthUpdate{ + scw: scw.(*subConnWrapper), + state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, + }) + sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err := scsCh.Receive(sCtx); err == nil { + t.Fatalf("SubConn update should not have been forwarded (the SubConn is ejected)") + } + } + + // Override now to cause the interval timer algorithm to always uneject + // the ejected address. This will always uneject the ejected address + // because this time is set way past the max ejection time set in the + // configuration, which will make the next interval timer algorithm run + // uneject any ejected addresses. + defer func(n func() time.Time) { + now = n + }(now) + now = func() time.Time { + return time.Now().Add(time.Second * 1000) + } + od.intervalTimerAlgorithm() + for i := 0; i < test.wantEjections; i++ { + scw := allSubConns[len(allSubConns)-1-i] + // unejected SubConn should report latest persisted state - which is + // connecting from earlier. + gotSCWS, err = scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ + sc: scw, + state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, + }); err != nil { + t.Fatalf("Error in Sub Conn update: %v", err) + } + } + } + }) } } From bc761a0abccac5a659c29ecad8d6ab9e5a1d2e7c Mon Sep 17 00:00:00 2001 From: sotiris Date: Mon, 13 Oct 2025 16:35:53 -0400 Subject: [PATCH 09/12] pr review Signed-off-by: sotiris --- internal/xds/balancer/outlierdetection/balancer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/xds/balancer/outlierdetection/balancer.go b/internal/xds/balancer/outlierdetection/balancer.go index ba8b5c64ff6f..1e45335c5ae9 100644 --- a/internal/xds/balancer/outlierdetection/balancer.go +++ b/internal/xds/balancer/outlierdetection/balancer.go @@ -64,7 +64,7 @@ var ( ejectionsUnenforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ Name: "grpc.lb.outlier_detection.ejections_unenforced", - Description: "EXPERIMENTAL. Number of unenforced outlier ejections due to either max ejection percentage or enforcement_percentage", + Description: "EXPERIMENTAL. Number of unenforced outlier ejections due to either `max_ejection_percentage` or `enforcement_percentage`", Unit: "ejection", Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method", "grpc.lb.outlier_detection.unenforced_reason"}, Default: false, @@ -87,7 +87,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba pickerUpdateCh: buffer.NewUnbounded(), channelzParent: bOpts.ChannelzParent, endpoints: resolver.NewEndpointMap[*endpointInfo](), - metricsRecorder: cc.MetricsRecorder(), + metricsRecorder: cc.MetricsRecorder(), // we use an explicit field instead of using cc.MetricsRecorder() so we can override the metric recorder in tests. target: bOpts.Target.String(), } b.logger = prefixLogger(b) @@ -811,9 +811,9 @@ func (b *outlierDetectionBalancer) successRateAlgorithm() { return } mean, stddev := b.meanAndStdDev(endpointsToConsider) + ejectionCfg := b.cfg.SuccessRateEjection for _, epInfo := range endpointsToConsider { bucket := epInfo.callCounter.inactiveBucket - ejectionCfg := b.cfg.SuccessRateEjection successRate := float64(bucket.numSuccesses) / float64(bucket.numSuccesses+bucket.numFailures) requiredSuccessRate := mean - stddev*(float64(ejectionCfg.StdevFactor)/1000) if successRate < requiredSuccessRate { @@ -845,9 +845,9 @@ func (b *outlierDetectionBalancer) failurePercentageAlgorithm() { return } + ejectionCfg := b.cfg.FailurePercentageEjection for _, epInfo := range endpointsToConsider { bucket := epInfo.callCounter.inactiveBucket - ejectionCfg := b.cfg.FailurePercentageEjection failurePercentage := (float64(bucket.numFailures) / float64(bucket.numSuccesses+bucket.numFailures)) * 100 if failurePercentage > float64(b.cfg.FailurePercentageEjection.Threshold) { channelz.Infof(logger, b.channelzParent, "FailurePercentage algorithm detected outlier: %s, failurePercentage=%f", epInfo, failurePercentage) From a7f947ac1f24b1cc40349acbf1eb7368382c0aa4 Mon Sep 17 00:00:00 2001 From: sotiris Date: Thu, 16 Oct 2025 08:49:17 -0400 Subject: [PATCH 10/12] fix styling Signed-off-by: sotiris --- internal/xds/balancer/outlierdetection/balancer.go | 14 +++++++------- .../xds/balancer/outlierdetection/balancer_test.go | 10 ++++++---- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/internal/xds/balancer/outlierdetection/balancer.go b/internal/xds/balancer/outlierdetection/balancer.go index 1e45335c5ae9..225a3a9ba193 100644 --- a/internal/xds/balancer/outlierdetection/balancer.go +++ b/internal/xds/balancer/outlierdetection/balancer.go @@ -818,16 +818,16 @@ func (b *outlierDetectionBalancer) successRateAlgorithm() { requiredSuccessRate := mean - stddev*(float64(ejectionCfg.StdevFactor)/1000) if successRate < requiredSuccessRate { channelz.Infof(logger, b.channelzParent, "SuccessRate algorithm detected outlier: %s. Parameters: successRate=%f, mean=%f, stddev=%f, requiredSuccessRate=%f", epInfo, successRate, mean, stddev, requiredSuccessRate) - // Check if max ejection percentage would prevent ejection + // Check if max ejection percentage would prevent ejection. if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { - // Record unenforced ejection due to max ejection percentage + // Record unenforced ejection due to max ejection percentage. ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "max_ejection_overflow") continue } if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage { b.ejectEndpoint(epInfo, "success_rate") } else { - // Record unenforced ejection due to enforcement percentage + // Record unenforced ejection due to enforcement percentage. ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "success_rate", "enforcement_percentage") } } @@ -851,16 +851,16 @@ func (b *outlierDetectionBalancer) failurePercentageAlgorithm() { failurePercentage := (float64(bucket.numFailures) / float64(bucket.numSuccesses+bucket.numFailures)) * 100 if failurePercentage > float64(b.cfg.FailurePercentageEjection.Threshold) { channelz.Infof(logger, b.channelzParent, "FailurePercentage algorithm detected outlier: %s, failurePercentage=%f", epInfo, failurePercentage) - // Check if max ejection percentage would prevent ejection + // Check if max ejection percentage would prevent ejection. if float64(b.numEndpointsEjected)/float64(b.endpoints.Len())*100 >= float64(b.cfg.MaxEjectionPercent) { - // Record unenforced ejection due to max ejection percentage + // Record unenforced ejection due to max ejection percentage. ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "max_ejection_overflow") continue } if uint32(rand.Int32N(100)) < ejectionCfg.EnforcementPercentage { b.ejectEndpoint(epInfo, "failure_percentage") } else { - // Record unenforced ejection due to enforcement percentage + // Record unenforced ejection due to enforcement percentage. ejectionsUnenforcedMetric.Record(b.metricsRecorder, 1, b.target, "failure_percentage", "enforcement_percentage") } } @@ -877,7 +877,7 @@ func (b *outlierDetectionBalancer) ejectEndpoint(epInfo *endpointInfo, detection channelz.Infof(logger, b.channelzParent, "Subchannel ejected: %s", sbw) } - // Record the enforced ejection metric + // Record the enforced ejection metric. ejectionsEnforcedMetric.Record(b.metricsRecorder, 1, b.target, detectionMethod) } diff --git a/internal/xds/balancer/outlierdetection/balancer_test.go b/internal/xds/balancer/outlierdetection/balancer_test.go index 45909e44234d..e7d97a6a4706 100644 --- a/internal/xds/balancer/outlierdetection/balancer_test.go +++ b/internal/xds/balancer/outlierdetection/balancer_test.go @@ -1030,10 +1030,12 @@ func (s) TestDurationOfInterval(t *testing.T) { // algorithm when configured with SuccessRateEjection. The Outlier Detection // Balancer will be set up with N SubConns, each with a different address. // It tests the following scenarios, in a step by step fashion: -// 1. The N addresses each have 5 successes. The interval timer algorithm should -// not eject any of the addresses. -// 2. N - `wantFailures` of the addresses have 5 successes but the remaining address has 5 failures. The internal algorithm -// should attempt to eject the address based on the outlier detection lb config. Only `wantEjections` addresses should be ejected. +// 1. The N addresses each have 5 successes. The interval timer algorithm +// should not eject any of the addresses. +// 2. N - `wantFailures` of the addresses have 5 successes but the remaining +// address has 5 failures. The internal algorithm +// should attempt to eject the address based on the outlier detection lb +// config. Only `wantEjections` addresses should be ejected. // 3. The interval timer algorithm is run at a later time past max ejection // time. The interval timer algorithm should uneject all. func (s) TestEjectUnejectSuccessRate(t *testing.T) { From 32d9eb3ea51d51cdcf5c8ae1477f6c435ed878bd Mon Sep 17 00:00:00 2001 From: sotiris Date: Fri, 17 Oct 2025 12:22:28 -0400 Subject: [PATCH 11/12] fix tests Signed-off-by: sotiris --- .../outlierdetection/balancer_test.go | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/internal/xds/balancer/outlierdetection/balancer_test.go b/internal/xds/balancer/outlierdetection/balancer_test.go index e7d97a6a4706..e96599d895ed 100644 --- a/internal/xds/balancer/outlierdetection/balancer_test.go +++ b/internal/xds/balancer/outlierdetection/balancer_test.go @@ -1141,9 +1141,9 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { connectivityCh := make(chan struct{}) var allSubConns = make([]balancer.SubConn, test.numberOfConns) stub.Register(test.name, stub.BalancerFuncs{ - UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error { + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { for i := range test.numberOfConns { - scw, err := bd.ClientConn.NewSubConn([]resolver.Address{{Addr: fmt.Sprintf("address%d", i+1)}}, balancer.NewSubConnOptions{ + scw, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Endpoints[i].Addresses, balancer.NewSubConnOptions{ StateListener: func(state balancer.SubConnState) { if state.ConnectivityState == connectivity.Ready { connectivityCh <- struct{}{} @@ -1151,7 +1151,7 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { }, }) if err != nil { - t.Errorf("error in od.NewSubConn call: %v", err) + t.Errorf("NewSubConn(%v) failed: %v", ccs.ResolverState.Endpoints[i].Addresses, err) } scw.Connect() allSubConns[i] = scw @@ -1169,8 +1169,8 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { od, tcc, cleanup := setup(t) defer cleanup() - endpoints := make([]resolver.Endpoint, len(allSubConns)) - for i := range allSubConns { + endpoints := make([]resolver.Endpoint, test.numberOfConns) + for i := range test.numberOfConns { endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{{Addr: fmt.Sprintf("address%d", i+1)}}} } od.UpdateClientConnState(balancer.ClientConnState{ @@ -1199,22 +1199,18 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { // processed to avoid data races while accessing the health listener within // the TestClientConn. connectionsReady := 0 - OuterLoop: - for { + for connectionsReady < test.numberOfConns { select { case <-ctx.Done(): t.Fatal("Context timed out waiting for all SubConns to become READY.") case <-connectivityCh: connectionsReady++ - if connectionsReady == len(allSubConns) { - break OuterLoop - } } } - for _, sc := range allSubConns { - sc.RegisterHealthListener(func(healthState balancer.SubConnState) { - scsCh.Send(subConnWithState{sc: sc, state: healthState}) + for i := range test.numberOfConns { + allSubConns[i].RegisterHealthListener(func(healthState balancer.SubConnState) { + scsCh.Send(subConnWithState{sc: allSubConns[i], state: healthState}) }) } @@ -1225,7 +1221,7 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { // Set each of the three upstream addresses to have five successes each. // This should cause none of the addresses to be ejected as none of them // are outliers according to the success rate algorithm. - for i := 0; i < len(allSubConns); i++ { + for i := 0; i < test.numberOfConns; i++ { pi, err := picker.Pick(balancer.PickInfo{}) if err != nil { t.Fatalf("picker.Pick failed with error: %v", err) @@ -1275,9 +1271,9 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { } // Set all the upstream but the failing one to have five successes - for i := 0; i < len(allSubConns); i++ { + for i := 0; i < test.numberOfConns; i++ { pickerDone := balancer.DoneInfo{} - if i >= (len(allSubConns) - test.wantFailures) { + if i >= (test.numberOfConns - test.wantFailures) { pickerDone = balancer.DoneInfo{Err: errors.New("some error")} } pi, err := picker.Pick(balancer.PickInfo{}) @@ -1517,7 +1513,7 @@ func (s) TestEjectFailureRate(t *testing.T) { sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer cancel() if _, err := scsCh.Receive(sCtx); err == nil { - t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") + t.Fatalf("Received unexpected subchannel state change when expecting none") } if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 0 { t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 0", got) @@ -1567,7 +1563,7 @@ func (s) TestEjectFailureRate(t *testing.T) { sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) defer cancel() if _, err := scsCh.Receive(sCtx); err == nil { - t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") + t.Fatalf("Received unexpected subchannel state change when expecting none") } if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 1 { t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 1", got) From 43354625f538b2623211d2f3428f1389d3f8011d Mon Sep 17 00:00:00 2001 From: sotiris Date: Tue, 21 Oct 2025 12:31:40 -0400 Subject: [PATCH 12/12] pr feedback Signed-off-by: sotiris --- .../xds/balancer/outlierdetection/balancer.go | 4 ++-- .../balancer/outlierdetection/balancer_test.go | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/internal/xds/balancer/outlierdetection/balancer.go b/internal/xds/balancer/outlierdetection/balancer.go index 225a3a9ba193..d6c8c6de8eaf 100644 --- a/internal/xds/balancer/outlierdetection/balancer.go +++ b/internal/xds/balancer/outlierdetection/balancer.go @@ -57,7 +57,7 @@ var ( ejectionsEnforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ Name: "grpc.lb.outlier_detection.ejections_enforced", Description: "EXPERIMENTAL. Number of outlier ejections enforced by detection method", - Unit: "ejection", + Unit: "{ejection}", Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method"}, Default: false, }) @@ -65,7 +65,7 @@ var ( ejectionsUnenforcedMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ Name: "grpc.lb.outlier_detection.ejections_unenforced", Description: "EXPERIMENTAL. Number of unenforced outlier ejections due to either `max_ejection_percentage` or `enforcement_percentage`", - Unit: "ejection", + Unit: "{ejection}", Labels: []string{"grpc.target", "grpc.lb.outlier_detection.detection_method", "grpc.lb.outlier_detection.unenforced_reason"}, Default: false, }) diff --git a/internal/xds/balancer/outlierdetection/balancer_test.go b/internal/xds/balancer/outlierdetection/balancer_test.go index e96599d895ed..3cb1dfc1004b 100644 --- a/internal/xds/balancer/outlierdetection/balancer_test.go +++ b/internal/xds/balancer/outlierdetection/balancer_test.go @@ -1244,10 +1244,10 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") } if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != 0 { - t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want 0", got) + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %f, want 0", got) } if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != 0 { - t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want 0", got) + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %f, want 0", got) } // Since no addresses are ejected, a SubConn update should forward down @@ -1270,10 +1270,12 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { t.Fatalf("Error in Sub Conn update: %v", err) } - // Set all the upstream but the failing one to have five successes + // Set all the upstream before the offset to have five successes, but + // the remaining addresses to have failures. + offset := test.numberOfConns - test.wantFailures for i := 0; i < test.numberOfConns; i++ { pickerDone := balancer.DoneInfo{} - if i >= (test.numberOfConns - test.wantFailures) { + if i >= offset { pickerDone = balancer.DoneInfo{Err: errors.New("some error")} } pi, err := picker.Pick(balancer.PickInfo{}) @@ -1293,9 +1295,7 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { // should be ejected, meaning a TRANSIENT_FAILURE connectivity state // gets reported to the child. for i := 0; i < test.wantEjections; i++ { - sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - got, err := scsCh.Receive(sCtx) + got, err := scsCh.Receive(ctx) if err != nil { t.Fatalf("Error waiting for SubConn to be ejected: %v", err) } @@ -1314,10 +1314,10 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) { } if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_enforced"); got != float64(test.wantEjections) { - t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %v, want %v", got, float64(test.wantEjections)) + t.Errorf("Metric grpc.lb.outlier_detection.ejections_enforced: got %f, want %f", got, float64(test.wantEjections)) } if got, _ := tmr.Metric("grpc.lb.outlier_detection.ejections_unenforced"); got != float64(test.wantFailures-test.wantEjections) { - t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %v, want %v", got, float64(test.wantFailures-test.wantEjections)) + t.Errorf("Metric grpc.lb.outlier_detection.ejections_unenforced: got %f, want %f", got, float64(test.wantFailures-test.wantEjections)) } for i := 0; i < test.wantEjections; i++ {