diff --git a/balancer/pickfirst/pickfirstleaf/metrics_test.go b/balancer/pickfirst/pickfirstleaf/metrics_test.go new file mode 100644 index 000000000000..9cbec8e2b2ae --- /dev/null +++ b/balancer/pickfirst/pickfirstleaf/metrics_test.go @@ -0,0 +1,311 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package pickfirstleaf_test + +import ( + "context" + "fmt" + "google.golang.org/grpc/balancer/pickfirst" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" + "google.golang.org/grpc/internal/envconfig" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils/stats" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/stats/opentelemetry" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +var pfConfig string + +func init() { + name := pickfirst.Name + if !envconfig.NewPickFirstEnabled { + name = pickfirstleaf.Name + } + pfConfig = fmt.Sprintf(`{ + "loadBalancingConfig": [ + { + %q: { + } + } + ] + }`, name) +} + +// TestPickFirstMetrics tests pick first metrics. It configures a pick first +// balancer, causes it to connect and then disconnect, and expects the +// subsequent metrics to emit from that. +func (s) TestPickFirstMetrics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + } + ss.StartServer() + defer ss.Stop() + + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(pfConfig) + + r := manual.NewBuilderWithScheme("whatever") + r.InitialState(resolver.State{ + ServiceConfig: sc, + Addresses: []resolver.Address{{Addr: ss.Address}}}, + ) + + grpcTarget := r.Scheme() + ":///" + tmr := stats.NewTestMetricsRecorder() + cc, err := grpc.NewClient(grpcTarget, grpc.WithStatsHandler(tmr), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("NewClient() failed with error: %v", err) + } + defer cc.Close() + + tsc := testgrpc.NewTestServiceClient(cc) + if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + + if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 1 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_succeeded", got, 1) + } + if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 0 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_failed", got, 0) + } + if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0) + } + + ss.Stop() + if err = pollForDisconnectedMetrics(ctx, tmr); err != nil { + t.Fatal(err) + } +} + +func pollForDisconnectedMetrics(ctx context.Context, tmr *stats.TestMetricsRecorder) error { + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got == 1 { + return nil + } + } + return fmt.Errorf("error waiting for grpc.lb.pick_first.disconnections metric: %v", ctx.Err()) +} + +// TestPickFirstMetricsFailure tests the connection attempts failed metric. It +// configures a channel and scenario that causes a pick first connection attempt +// to fail, and then expects that metric to emit. +func (s) TestPickFirstMetricsFailure(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(pfConfig) + + r := manual.NewBuilderWithScheme("whatever") + r.InitialState(resolver.State{ + ServiceConfig: sc, + Addresses: []resolver.Address{{Addr: "bad address"}}}, + ) + grpcTarget := r.Scheme() + ":///" + tmr := stats.NewTestMetricsRecorder() + cc, err := grpc.NewClient(grpcTarget, grpc.WithStatsHandler(tmr), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("NewClient() failed with error: %v", err) + } + defer cc.Close() + + tsc := testgrpc.NewTestServiceClient(cc) + if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}); err == nil { + t.Fatalf("EmptyCall() passed when expected to fail") + } + + if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 0 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_succeeded", got, 0) + } + if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 1 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_failed", got, 1) + } + if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0) + } +} + +// TestPickFirstMetricsE2E tests the pick first metrics end to end. It +// configures a channel with an OpenTelemetry plugin, induces all 3 pick first +// metrics to emit, and makes sure the correct OpenTelemetry metrics atoms emit. +func (s) TestPickFirstMetricsE2E(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + } + ss.StartServer() + defer ss.Stop() + + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(pfConfig) + r := manual.NewBuilderWithScheme("whatever") + r.InitialState(resolver.State{ + ServiceConfig: sc, + Addresses: []resolver.Address{{Addr: "bad address"}}}, + ) // Will trigger connection failed. + + grpcTarget := r.Scheme() + ":///" + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + mo := opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics().Add("grpc.lb.pick_first.disconnections", "grpc.lb.pick_first.connection_attempts_succeeded", "grpc.lb.pick_first.connection_attempts_failed"), + } + + cc, err := grpc.NewClient(grpcTarget, opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("NewClient() failed with error: %v", err) + } + defer cc.Close() + + tsc := testgrpc.NewTestServiceClient(cc) + if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}); err == nil { + t.Fatalf("EmptyCall() passed when expected to fail") + } + + r.UpdateState(resolver.State{ + ServiceConfig: sc, + Addresses: []resolver.Address{{Addr: ss.Address}}}) // Will trigger successful connection metric. + if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + + // Stop the server, that should send signal to disconnect, which will + // eventually emit disconnection metric. + ss.Stop() + wantMetrics := []metricdata.Metrics{ + { + Name: "grpc.lb.pick_first.connection_attempts_succeeded", + Description: "EXPERIMENTAL. Number of successful connection attempts.", + Unit: "attempt", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("grpc.target", grpcTarget)), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.lb.pick_first.connection_attempts_failed", + Description: "EXPERIMENTAL. Number of failed connection attempts.", + Unit: "attempt", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("grpc.target", grpcTarget)), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + } + + gotMetrics := metricsDataFromReader(ctx, reader) + for _, metric := range wantMetrics { + val, ok := gotMetrics[metric.Name] + if !ok { + t.Fatalf("Metric %v not present in recorded metrics", metric.Name) + } + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) + } + } + // Disconnections metric will show up eventually, as asynchronous from + // server stopping. + wantMetrics = []metricdata.Metrics{ + { + Name: "grpc.lb.pick_first.disconnections", + Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.", + Unit: "disconnection", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("grpc.target", grpcTarget)), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + } + if err := pollForWantMetrics(ctx, t, reader, wantMetrics); err != nil { + t.Fatal(err) + } +} + +func metricsDataFromReader(ctx context.Context, reader *metric.ManualReader) map[string]metricdata.Metrics { + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + return gotMetrics +} + +func pollForWantMetrics(ctx context.Context, t *testing.T, reader *metric.ManualReader, wantMetrics []metricdata.Metrics) error { + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + gotMetrics := metricsDataFromReader(ctx, reader) + for _, metric := range wantMetrics { + val, ok := gotMetrics[metric.Name] + if !ok { + break + } + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + return fmt.Errorf("metrics data type not equal for metric: %v", metric.Name) + } + return nil + } + time.Sleep(5 * time.Millisecond) + } + + return fmt.Errorf("error waiting for metrics %v: %v", wantMetrics, ctx.Err()) +} diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go index aaec87497fd4..07f0841e38a8 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/pickfirst/internal" "google.golang.org/grpc/connectivity" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/envconfig" internalgrpclog "google.golang.org/grpc/internal/grpclog" @@ -57,7 +58,28 @@ var ( // Name is the name of the pick_first_leaf balancer. // It is changed to "pick_first" in init() if this balancer is to be // registered as the default pickfirst. - Name = "pick_first_leaf" + Name = "pick_first_leaf" + pickFirstDisconnectionsMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.pick_first.disconnections", + Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.", + Unit: "disconnection", + Labels: []string{"grpc.target"}, + Default: false, + }) + pickFirstConnectionAttemptsSucceeded = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.pick_first.connection_attempts_succeeded", + Description: "EXPERIMENTAL. Number of successful connection attempts.", + Unit: "attempt", + Labels: []string{"grpc.target"}, + Default: false, + }) + pickFirstConnectionAttemptsFailed = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.pick_first.connection_attempts_failed", + Description: "EXPERIMENTAL. Number of failed connection attempts.", + Unit: "attempt", + Labels: []string{"grpc.target"}, + Default: false, + }) ) const ( @@ -80,9 +102,12 @@ const ( type pickfirstBuilder struct{} -func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { +func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions) balancer.Balancer { b := &pickfirstBalancer{ - cc: cc, + cc: cc, + target: bo.Target.String(), + metricsRecorder: bo.MetricsRecorder, + addressList: addressList{}, subConns: resolver.NewAddressMap(), state: connectivity.Connecting, @@ -147,8 +172,10 @@ func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) { type pickfirstBalancer struct { // The following fields are initialized at build time and read-only after // that and therefore do not need to be guarded by a mutex. - logger *internalgrpclog.PrefixLogger - cc balancer.ClientConn + logger *internalgrpclog.PrefixLogger + cc balancer.ClientConn + target string + metricsRecorder estats.MetricsRecorder // The mutex is used to ensure synchronization of updates triggered // from the idle picker and the already serialized resolver, @@ -548,7 +575,12 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub return } + if oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.TransientFailure { + pickFirstConnectionAttemptsFailed.Record(b.metricsRecorder, 1, b.target) + } + if newState.ConnectivityState == connectivity.Ready { + pickFirstConnectionAttemptsSucceeded.Record(b.metricsRecorder, 1, b.target) b.shutdownRemainingLocked(sd) if !b.addressList.seekTo(sd.addr) { // This should not fail as we should have only one SubConn after @@ -575,6 +607,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub // the first address when the picker is used. b.shutdownRemainingLocked(sd) b.state = connectivity.Idle + pickFirstDisconnectionsMetric.Record(b.metricsRecorder, 1, b.target) b.addressList.reset() b.cc.UpdateState(balancer.State{ ConnectivityState: connectivity.Idle, diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go index bf957f98b119..6baa68593eb5 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go @@ -39,6 +39,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/pickfirst" + "google.golang.org/grpc/internal/testutils/stats" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/status" @@ -1036,7 +1037,7 @@ func (s) TestPickFirstLeaf_InterleavingIPV4Preffered(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() cc := testutils.NewBalancerClientConn(t) - bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{}) + bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}}) defer bal.Close() ccState := balancer.ClientConnState{ ResolverState: resolver.State{ @@ -1082,7 +1083,7 @@ func (s) TestPickFirstLeaf_InterleavingIPv6Preffered(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() cc := testutils.NewBalancerClientConn(t) - bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{}) + bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}}) defer bal.Close() ccState := balancer.ClientConnState{ ResolverState: resolver.State{ @@ -1126,7 +1127,7 @@ func (s) TestPickFirstLeaf_InterleavingUnknownPreffered(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() cc := testutils.NewBalancerClientConn(t) - bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{}) + bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}}) defer bal.Close() ccState := balancer.ClientConnState{ ResolverState: resolver.State{ diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go index 71984a238cd5..f269a71a7a97 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/stats" "google.golang.org/grpc/resolver" ) @@ -195,7 +196,7 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() cc := testutils.NewBalancerClientConn(t) - bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{}) + bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}}) defer bal.Close() ccState := balancer.ClientConnState{ ResolverState: resolver.State{ diff --git a/gcp/observability/go.sum b/gcp/observability/go.sum index 30e984fb4343..472ac41d57ad 100644 --- a/gcp/observability/go.sum +++ b/gcp/observability/go.sum @@ -1107,6 +1107,7 @@ go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozR go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= +go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= diff --git a/internal/testutils/stats/test_metrics_recorder.go b/internal/testutils/stats/test_metrics_recorder.go index 72a20c1cbf44..2d6b7b9a0f53 100644 --- a/internal/testutils/stats/test_metrics_recorder.go +++ b/internal/testutils/stats/test_metrics_recorder.go @@ -63,6 +63,8 @@ func NewTestMetricsRecorder() *TestMetricsRecorder { // Metric returns the most recent data for a metric, and whether this recorder // has received data for a metric. func (r *TestMetricsRecorder) Metric(name string) (float64, bool) { + r.mu.Lock() + defer r.mu.Unlock() data, ok := r.data[estats.Metric(name)] return data, ok }