Skip to content

Commit

Permalink
Add pick first metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Nov 14, 2024
1 parent 0553bc3 commit e214b5f
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 9 deletions.
311 changes: 311 additions & 0 deletions balancer/pickfirst/pickfirstleaf/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package pickfirstleaf_test

import (
"context"
"fmt"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
"google.golang.org/grpc/internal/envconfig"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/stats"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/stats/opentelemetry"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)

var pfConfig string

func init() {
name := pickfirst.Name
if !envconfig.NewPickFirstEnabled {
name = pickfirstleaf.Name
}
pfConfig = fmt.Sprintf(`{
"loadBalancingConfig": [
{
%q: {
}
}
]
}`, name)
}

// TestPickFirstMetrics tests pick first metrics. It configures a pick first
// balancer, causes it to connect and then disconnect, and expects the
// subsequent metrics to emit from that.
func (s) TestPickFirstMetrics(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
ss.StartServer()
defer ss.Stop()

sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(pfConfig)

r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{
ServiceConfig: sc,
Addresses: []resolver.Address{{Addr: ss.Address}}},
)

grpcTarget := r.Scheme() + ":///"
tmr := stats.NewTestMetricsRecorder()
cc, err := grpc.NewClient(grpcTarget, grpc.WithStatsHandler(tmr), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("NewClient() failed with error: %v", err)
}
defer cc.Close()

tsc := testgrpc.NewTestServiceClient(cc)
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_succeeded", got, 1)
}
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_failed", got, 0)
}
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
}

ss.Stop()
if err = pollForDisconnectedMetrics(ctx, tmr); err != nil {
t.Fatal(err)
}
}

func pollForDisconnectedMetrics(ctx context.Context, tmr *stats.TestMetricsRecorder) error {
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got == 1 {
return nil
}
}
return fmt.Errorf("error waiting for grpc.lb.pick_first.disconnections metric: %v", ctx.Err())
}

// TestPickFirstMetricsFailure tests the connection attempts failed metric. It
// configures a channel and scenario that causes a pick first connection attempt
// to fail, and then expects that metric to emit.
func (s) TestPickFirstMetricsFailure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(pfConfig)

r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{
ServiceConfig: sc,
Addresses: []resolver.Address{{Addr: "bad address"}}},
)
grpcTarget := r.Scheme() + ":///"
tmr := stats.NewTestMetricsRecorder()
cc, err := grpc.NewClient(grpcTarget, grpc.WithStatsHandler(tmr), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("NewClient() failed with error: %v", err)
}
defer cc.Close()

tsc := testgrpc.NewTestServiceClient(cc)
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}); err == nil {
t.Fatalf("EmptyCall() passed when expected to fail")
}

if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_succeeded", got, 0)
}
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 1 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_failed", got, 1)
}
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
}
}

// TestPickFirstMetricsE2E tests the pick first metrics end to end. It
// configures a channel with an OpenTelemetry plugin, induces all 3 pick first
// metrics to emit, and makes sure the correct OpenTelemetry metrics atoms emit.
func (s) TestPickFirstMetricsE2E(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
ss.StartServer()
defer ss.Stop()

sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(pfConfig)
r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{
ServiceConfig: sc,
Addresses: []resolver.Address{{Addr: "bad address"}}},
) // Will trigger connection failed.

grpcTarget := r.Scheme() + ":///"
reader := metric.NewManualReader()
provider := metric.NewMeterProvider(metric.WithReader(reader))
mo := opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics().Add("grpc.lb.pick_first.disconnections", "grpc.lb.pick_first.connection_attempts_succeeded", "grpc.lb.pick_first.connection_attempts_failed"),
}

cc, err := grpc.NewClient(grpcTarget, opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("NewClient() failed with error: %v", err)
}
defer cc.Close()

tsc := testgrpc.NewTestServiceClient(cc)
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}); err == nil {
t.Fatalf("EmptyCall() passed when expected to fail")
}

r.UpdateState(resolver.State{
ServiceConfig: sc,
Addresses: []resolver.Address{{Addr: ss.Address}}}) // Will trigger successful connection metric.
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

// Stop the server, that should send signal to disconnect, which will
// eventually emit disconnection metric.
ss.Stop()
wantMetrics := []metricdata.Metrics{
{
Name: "grpc.lb.pick_first.connection_attempts_succeeded",
Description: "EXPERIMENTAL. Number of successful connection attempts.",
Unit: "attempt",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("grpc.target", grpcTarget)),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
{
Name: "grpc.lb.pick_first.connection_attempts_failed",
Description: "EXPERIMENTAL. Number of failed connection attempts.",
Unit: "attempt",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("grpc.target", grpcTarget)),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
}

gotMetrics := metricsDataFromReader(ctx, reader)
for _, metric := range wantMetrics {
val, ok := gotMetrics[metric.Name]
if !ok {
t.Fatalf("Metric %v not present in recorded metrics", metric.Name)
}
if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
t.Fatalf("Metrics data type not equal for metric: %v", metric.Name)
}
}
// Disconnections metric will show up eventually, as asynchronous from
// server stopping.
wantMetrics = []metricdata.Metrics{
{
Name: "grpc.lb.pick_first.disconnections",
Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
Unit: "disconnection",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("grpc.target", grpcTarget)),
Value: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
}
if err := pollForWantMetrics(ctx, t, reader, wantMetrics); err != nil {
t.Fatal(err)
}
}

func metricsDataFromReader(ctx context.Context, reader *metric.ManualReader) map[string]metricdata.Metrics {
rm := &metricdata.ResourceMetrics{}
reader.Collect(ctx, rm)
gotMetrics := map[string]metricdata.Metrics{}
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
gotMetrics[m.Name] = m
}
}
return gotMetrics
}

func pollForWantMetrics(ctx context.Context, t *testing.T, reader *metric.ManualReader, wantMetrics []metricdata.Metrics) error {
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
gotMetrics := metricsDataFromReader(ctx, reader)
for _, metric := range wantMetrics {
val, ok := gotMetrics[metric.Name]
if !ok {
break
}
if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
return fmt.Errorf("metrics data type not equal for metric: %v", metric.Name)
}
return nil
}
time.Sleep(5 * time.Millisecond)
}

return fmt.Errorf("error waiting for metrics %v: %v", wantMetrics, ctx.Err())
}
43 changes: 38 additions & 5 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst/internal"
"google.golang.org/grpc/connectivity"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/envconfig"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
Expand All @@ -57,7 +58,28 @@ var (
// Name is the name of the pick_first_leaf balancer.
// It is changed to "pick_first" in init() if this balancer is to be
// registered as the default pickfirst.
Name = "pick_first_leaf"
Name = "pick_first_leaf"
pickFirstDisconnectionsMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.pick_first.disconnections",
Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
Unit: "disconnection",
Labels: []string{"grpc.target"},
Default: false,
})
pickFirstConnectionAttemptsSucceeded = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.pick_first.connection_attempts_succeeded",
Description: "EXPERIMENTAL. Number of successful connection attempts.",
Unit: "attempt",
Labels: []string{"grpc.target"},
Default: false,
})
pickFirstConnectionAttemptsFailed = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.pick_first.connection_attempts_failed",
Description: "EXPERIMENTAL. Number of failed connection attempts.",
Unit: "attempt",
Labels: []string{"grpc.target"},
Default: false,
})
)

const (
Expand All @@ -80,9 +102,12 @@ const (

type pickfirstBuilder struct{}

func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions) balancer.Balancer {
b := &pickfirstBalancer{
cc: cc,
cc: cc,
target: bo.Target.String(),
metricsRecorder: bo.MetricsRecorder,

addressList: addressList{},
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
Expand Down Expand Up @@ -147,8 +172,10 @@ func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
type pickfirstBalancer struct {
// The following fields are initialized at build time and read-only after
// that and therefore do not need to be guarded by a mutex.
logger *internalgrpclog.PrefixLogger
cc balancer.ClientConn
logger *internalgrpclog.PrefixLogger
cc balancer.ClientConn
target string
metricsRecorder estats.MetricsRecorder

// The mutex is used to ensure synchronization of updates triggered
// from the idle picker and the already serialized resolver,
Expand Down Expand Up @@ -548,7 +575,12 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
return
}

if oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.TransientFailure {
pickFirstConnectionAttemptsFailed.Record(b.metricsRecorder, 1, b.target)
}

if newState.ConnectivityState == connectivity.Ready {
pickFirstConnectionAttemptsSucceeded.Record(b.metricsRecorder, 1, b.target)
b.shutdownRemainingLocked(sd)
if !b.addressList.seekTo(sd.addr) {
// This should not fail as we should have only one SubConn after
Expand All @@ -575,6 +607,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
// the first address when the picker is used.
b.shutdownRemainingLocked(sd)
b.state = connectivity.Idle
pickFirstDisconnectionsMetric.Record(b.metricsRecorder, 1, b.target)
b.addressList.reset()
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Idle,
Expand Down
Loading

0 comments on commit e214b5f

Please sign in to comment.