diff --git a/balancer/pickfirst/metrics_test.go b/balancer/pickfirst/metrics_test.go index 0deb12715831..3257cbe5d454 100644 --- a/balancer/pickfirst/metrics_test.go +++ b/balancer/pickfirst/metrics_test.go @@ -102,11 +102,31 @@ func (s) TestPickFirstMetrics(t *testing.T) { t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0) } + // Checking for subchannel metrics as well + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1) + } + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 0 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 0) + } + if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0) + } + if got, _ := tmr.Metric("grpc.subchannel.open_connections"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.open_connections", got, 1) + } + ss.Stop() testutils.AwaitState(ctx, t, cc, connectivity.Idle) if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 1 { t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 1) } + if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 1) + } + if got, _ := tmr.Metric("grpc.subchannel.open_connections"); got != -1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.open_connections", got, -1) + } } // TestPickFirstMetricsFailure tests the connection attempts failed metric. It diff --git a/balancer/pickfirst/pickfirst_ext_test.go b/balancer/pickfirst/pickfirst_ext_test.go index 3591a6e36baa..7296aa9636b9 100644 --- a/balancer/pickfirst/pickfirst_ext_test.go +++ b/balancer/pickfirst/pickfirst_ext_test.go @@ -1935,6 +1935,7 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_AfterEndOfList(t *testing.T) { holds[1].Fail(fmt.Errorf("test error")) tmr.WaitForInt64CountIncr(ctx, 1) testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) + waitForMetric(ctx, t, tmr, "grpc.subchannel.connection_attempts_failed") // Only connection attempt fails in this test. if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 0 { @@ -1946,6 +1947,12 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_AfterEndOfList(t *testing.T) { if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 { t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0) } + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 1) + } + if got, _ := tmr.Metric("grpc.lb.subchannel.connection_attempts_succeeded"); got != 0 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.subchannel.connection_attempts_succeeded", got, 0) + } } // Test verifies that pickfirst attempts to connect to the second backend once @@ -1995,7 +2002,7 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TriggerConnectionDelay(t *testing.T) { // that the channel becomes READY. holds[1].Resume() testutils.AwaitState(ctx, t, cc, connectivity.Ready) - + waitForMetric(ctx, t, tmr, "grpc.subchannel.connection_attempts_succeeded") // Only connection attempt successes in this test. if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 1 { t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_succeeded", got, 1) @@ -2006,6 +2013,27 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TriggerConnectionDelay(t *testing.T) { if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 { t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0) } + + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1) + } + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 0 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 0) + } +} + +func waitForMetric(ctx context.Context, t *testing.T, tmr *stats.TestMetricsRecorder, metricName string) { + for { + if _, ok := tmr.Metric(metricName); ok { + break + } + select { + case <-ctx.Done(): + t.Fatalf("Timeout waiting for metric emission: %s", metricName) + case <-time.After(10 * time.Millisecond): + continue + } + } } // Test tests the pickfirst balancer by causing a SubConn to fail and then @@ -2057,6 +2085,9 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) { if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 1 { t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_failed", got, 1) } + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 1) + } if holds[2].IsStarted() != false { t.Fatalf("Server %d with address %q contacted unexpectedly", 2, addrs[2]) } @@ -2073,7 +2104,10 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) { // that the channel becomes READY. holds[1].Resume() testutils.AwaitState(ctx, t, cc, connectivity.Ready) - + waitForMetric(ctx, t, tmr, "grpc.subchannel.connection_attempts_succeeded") + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1) + } if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 1 { t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_succeeded", got, 1) } @@ -2082,6 +2116,84 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) { } } +// Test verifies that when a subchannel is shut down by the LB (because another +// subchannel won) while its dial is still in-flight, it records exactly one +// successful attempt. +func (s) TestPickFirstLeaf_HappyEyeballs_Ignore_Inflight_Cancellations(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + originalTimer := pfinternal.TimeAfterFunc + defer func() { + pfinternal.TimeAfterFunc = originalTimer + }() + triggerTimer, timeAfter := mockTimer() + pfinternal.TimeAfterFunc = timeAfter + + tmr := stats.NewTestMetricsRecorder() + dialer := testutils.NewBlockingDialer() + opts := []grpc.DialOption{ + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, pfbalancer.Name)), + grpc.WithContextDialer(dialer.DialContext), + grpc.WithStatsHandler(tmr), + } + + // Setup 2 backend addresses + cc, rb, bm := setupPickFirstLeaf(t, 2, opts...) + addrs := bm.resolverAddrs() + holds := bm.holds(dialer) + rb.UpdateState(resolver.State{Addresses: addrs}) + cc.Connect() + + // Make sure we connect to second subconn + testutils.AwaitState(ctx, t, cc, connectivity.Connecting) + if holds[0].Wait(ctx) != true { + t.Fatalf("Timeout waiting for server %d to be contacted", 0) + } + triggerTimer() + if holds[1].Wait(ctx) != true { + t.Fatalf("Timeout waiting for server %d to be contacted", 1) + } + holds[1].Resume() + + // Wait for Channel to become READY. + testutils.AwaitState(ctx, t, cc, connectivity.Ready) + + // Unblock the First SubConn. + // Since the LB has already closed this subchannel, the context passed to Dial + // is canceled. This will lead to an inflight attempt to be cancelled. + // No success or failure metric should be recorded for this. + holds[0].Resume() + + // --- Assertions --- + + // Wait for the SUCCESS metric to ensure recording logic has processed. + waitForMetric(ctx, t, tmr, "grpc.subchannel.connection_attempts_succeeded") + + // Verify Success: Exactly 1 (The Winner). + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: 1", "grpc.subchannel.connection_attempts_succeeded", got) + } + + // Verify Failure: Exactly 0 (The Loser was ignored). + // We poll briefly to ensure no delayed failure metric appears. + sCtx, sCancel := context.WithTimeout(ctx, 50*time.Millisecond) + defer sCancel() + for ; sCtx.Err() == nil; <-time.After(time.Millisecond) { + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 0 { + t.Fatalf("Unexpected failure recorded for shutdown subchannel, got: %v, want: 0", got) + } + } + + // LB Metrics Check + if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: 1", "grpc.lb.pick_first.connection_attempts_succeeded", got) + } + if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 0 { + t.Errorf("Unexpected data for metric %v, got: %v, want: 0", "grpc.lb.pick_first.connection_attempts_failed", got) + } +} + func (s) TestPickFirstLeaf_InterleavingIPV4Preferred(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() diff --git a/clientconn.go b/clientconn.go index c0c2c9a76abf..aec48cc007e0 100644 --- a/clientconn.go +++ b/clientconn.go @@ -35,6 +35,8 @@ import ( "google.golang.org/grpc/balancer/pickfirst" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials" + expstats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" @@ -98,6 +100,41 @@ var ( errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") ) +var ( + disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{ + Name: "grpc.subchannel.disconnections", + Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.", + Unit: "{disconnection}", + Labels: []string{"grpc.target"}, + OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality", "grpc.disconnect_error"}, + Default: false, + }) + connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{ + Name: "grpc.subchannel.connection_attempts_succeeded", + Description: "EXPERIMENTAL. Number of successful connection attempts.", + Unit: "{attempt}", + Labels: []string{"grpc.target"}, + OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"}, + Default: false, + }) + connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{ + Name: "grpc.subchannel.connection_attempts_failed", + Description: "EXPERIMENTAL. Number of failed connection attempts.", + Unit: "{attempt}", + Labels: []string{"grpc.target"}, + OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"}, + Default: false, + }) + openConnectionsMetric = expstats.RegisterInt64UpDownCount(expstats.MetricDescriptor{ + Name: "grpc.subchannel.open_connections", + Description: "EXPERIMENTAL. Number of open connections.", + Unit: "{attempt}", + Labels: []string{"grpc.target"}, + OptionalLabels: []string{"grpc.lb.backend_service", "grpc.security_level", "grpc.lb.locality"}, + Default: false, + }) +) + const ( defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 defaultClientMaxSendMessageSize = math.MaxInt32 @@ -861,6 +898,7 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer. channelz: channelz.RegisterSubChannel(cc.channelz, ""), resetBackoff: make(chan struct{}), } + ac.updateTelemetryLabelsLocked() ac.ctx, ac.cancel = context.WithCancel(cc.ctx) // Start with our address set to the first address; this may be updated if // we connect to different addresses. @@ -977,7 +1015,7 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) { } ac.addrs = addrs - + ac.updateTelemetryLabelsLocked() if ac.state == connectivity.Shutdown || ac.state == connectivity.TransientFailure || ac.state == connectivity.Idle { @@ -1216,6 +1254,9 @@ type addrConn struct { resetBackoff chan struct{} channelz *channelz.SubChannel + + localityLabel string + backendServiceLabel string } // Note: this requires a lock on ac.mu. @@ -1223,6 +1264,18 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) if ac.state == s { return } + + // If we are transitioning out of Ready, it means there is a disconnection. + // A SubConn can also transition from CONNECTING directly to IDLE when + // a transport is successfully created, but the connection fails + // before the SubConn can send the notification for READY. We treat + // this as a successful connection and transition to IDLE. + // TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second + // part of the if condition below once the issue is fixed. + if ac.state == connectivity.Ready || (ac.state == connectivity.Connecting && s == connectivity.Idle) { + disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel, "unknown") + openConnectionsMetric.Record(ac.cc.metricsRecorderList, -1, ac.cc.target, ac.backendServiceLabel, ac.securityLevelLocked(), ac.localityLabel) + } ac.state = s ac.channelz.ChannelMetrics.State.Store(&s) if lastErr == nil { @@ -1280,6 +1333,15 @@ func (ac *addrConn) resetTransportAndUnlock() { ac.mu.Unlock() if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil { + if !errors.Is(err, context.Canceled) { + connectionAttemptsFailedMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel) + } else { + if logger.V(2) { + // This records cancelled connection attempts which can be later + // replaced by a metric. + logger.Infof("Context cancellation detected; not recording this as a failed connection attempt.") + } + } // TODO: #7534 - Move re-resolution requests into the pick_first LB policy // to ensure one resolution request per pass instead of per subconn failure. ac.cc.resolveNow(resolver.ResolveNowOptions{}) @@ -1319,10 +1381,50 @@ func (ac *addrConn) resetTransportAndUnlock() { } // Success; reset backoff. ac.mu.Lock() + connectionAttemptsSucceededMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel) + openConnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.securityLevelLocked(), ac.localityLabel) ac.backoffIdx = 0 ac.mu.Unlock() } +// updateTelemetryLabelsLocked calculates and caches the telemetry labels based on the +// first address in addrConn. +func (ac *addrConn) updateTelemetryLabelsLocked() { + labelsFunc, ok := internal.AddressToTelemetryLabels.(func(resolver.Address) map[string]string) + if !ok || len(ac.addrs) == 0 { + // Reset defaults + ac.localityLabel = "" + ac.backendServiceLabel = "" + return + } + labels := labelsFunc(ac.addrs[0]) + ac.localityLabel = labels["grpc.lb.locality"] + ac.backendServiceLabel = labels["grpc.lb.backend_service"] +} + +type securityLevelKey struct{} + +func (ac *addrConn) securityLevelLocked() string { + var secLevel string + // During disconnection, ac.transport is nil. Fall back to the security level + // stored in the current address during connection. + if ac.transport == nil { + secLevel, _ = ac.curAddr.Attributes.Value(securityLevelKey{}).(string) + return secLevel + } + authInfo := ac.transport.Peer().AuthInfo + if ci, ok := authInfo.(interface { + GetCommonAuthInfo() credentials.CommonAuthInfo + }); ok { + secLevel = ci.GetCommonAuthInfo().SecurityLevel.String() + // Store the security level in the current address' attributes so + // that it remains available for disconnection metrics after the + // transport is closed. + ac.curAddr.Attributes = ac.curAddr.Attributes.WithValue(securityLevelKey{}, secLevel) + } + return secLevel +} + // tryAllAddrs tries to create a connection to the addresses, and stop when at // the first successful one. It returns an error if no address was successfully // connected, or updates ac appropriately with the new transport. diff --git a/internal/internal.go b/internal/internal.go index 2699223a27f1..27bef83d97bc 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -244,6 +244,10 @@ var ( // When set, the function will be called before the stream enters // the blocking state. NewStreamWaitingForResolver = func() {} + + // AddressToTelemetryLabels is an xDS-provided function to extract telemetry + // labels from a resolver.Address. Callers must assert its type before calling. + AddressToTelemetryLabels any // func(addr resolver.Address) map[string]string ) // HealthChecker defines the signature of the client-side LB channel health diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 65b4ab2439e2..f8c2f5f54652 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -370,7 +370,7 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts }) t.logger = prefixLoggerForClientTransport(t) // Add peer information to the http2client context. - t.ctx = peer.NewContext(t.ctx, t.getPeer()) + t.ctx = peer.NewContext(t.ctx, t.Peer()) if md, ok := addr.Metadata.(*metadata.MD); ok { t.md = *md @@ -510,7 +510,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientSt return s } -func (t *http2Client) getPeer() *peer.Peer { +func (t *http2Client) Peer() *peer.Peer { return &peer.Peer{ Addr: t.remoteAddr, AuthInfo: t.authInfo, // Can be nil @@ -742,7 +742,7 @@ func (e NewStreamError) Error() string { // NewStream creates a stream and registers it into the transport as "active" // streams. All non-nil errors returned will be *NewStreamError. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error) { - ctx = peer.NewContext(ctx, t.getPeer()) + ctx = peer.NewContext(ctx, t.Peer()) // ServerName field of the resolver returned address takes precedence over // Host field of CallHdr to determine the :authority header. This is because, @@ -1807,8 +1807,6 @@ func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics { } } -func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr } - func (t *http2Client) incrMsgSent() { if channelz.IsOn() { t.channelz.SocketMetrics.MessagesSent.Add(1) diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 5ff83a7d7d74..844a4c1675aa 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -608,8 +608,9 @@ type ClientTransport interface { // with a human readable string with debug info. GetGoAwayReason() (GoAwayReason, string) - // RemoteAddr returns the remote network address. - RemoteAddr() net.Addr + // Peer returns information about the peer associated with the Transport. + // The returned information includes authentication and network address details. + Peer() *peer.Peer } // ServerTransport is the common interface for all gRPC server-side transport diff --git a/internal/xds/xds.go b/internal/xds/xds.go index 4d5a85ef35a8..e4fcbea0209c 100644 --- a/internal/xds/xds.go +++ b/internal/xds/xds.go @@ -24,6 +24,7 @@ import ( "fmt" "google.golang.org/grpc/attributes" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/xds/clients" "google.golang.org/grpc/resolver" ) @@ -46,6 +47,17 @@ func GetXDSHandshakeClusterName(attr *attributes.Attributes) (string, bool) { return name, ok } +// addressToTelemetryLabels prepares a telemetry label map from resolver +// address atrributes. +func addressToTelemetryLabels(addr resolver.Address) map[string]string { + cluster, _ := GetXDSHandshakeClusterName(addr.Attributes) + locality := LocalityString(GetLocalityID(addr)) + return map[string]string{ + "grpc.lb.locality": locality, + "grpc.lb.backend_service": cluster, + } +} + // LocalityString generates a string representation of clients.Locality in the // format specified in gRFC A76. func LocalityString(l clients.Locality) string { @@ -99,3 +111,7 @@ var UnknownCSMLabels = map[string]string{ "csm.service_name": "unknown", "csm.service_namespace_name": "unknown", } + +func init() { + internal.AddressToTelemetryLabels = addressToTelemetryLabels +} diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 2d575bfe06c5..cba89ec5f5eb 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -1789,3 +1789,161 @@ func (s) TestStreamingRPC_TraceSequenceNumbers(t *testing.T) { } validateTraces(t, spans, wantSpanInfos) } + +// TestSubChannelMetrics tests subchannel metrics emitted during connection +// lifecycle events (connect, disconnect, failure). +func (s) TestSubChannelMetrics(t *testing.T) { + // Start a single backend server. + backend := stubserver.StartTestService(t, &stubserver.StubServer{ + EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + }) + port := itestutils.ParsePort(t, backend.Address) + defer backend.Stop() + + const serviceName = "my-service-client-side-xds" + + // Configure xDS for that single backend. + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) + routeConfigName := "route-" + serviceName + clusterName := "cluster-" + serviceName + endpointsName := "endpoints-" + serviceName + + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: endpointsName, + Host: "localhost", + Localities: []e2e.LocalityOptions{ + { + Backends: []e2e.BackendOptions{{Ports: []uint32{port}}}, + Weight: 1, + }, + }, + })}, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Setup Telemetry. + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + mo := opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics().Add( + "grpc.subchannel.connection_attempts_succeeded", + "grpc.subchannel.open_connections", + "grpc.subchannel.disconnections", + "grpc.subchannel.connection_attempts_failed", + ), + OptionalLabels: []string{ + "grpc.lb.locality", + "grpc.lb.backend_service", + "grpc.security_level", + "grpc.disconnect_error", + }, + } + + target := fmt.Sprintf("xds:///%s", serviceName) + cc, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver), opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + defer cc.Close() + client := testgrpc.NewTestServiceClient(cc) + + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("rpc failed: %v", err) + } + + targetAttr := attribute.String("grpc.target", target) + localityAttr := attribute.String("grpc.lb.locality", `{region="region-1", zone="zone-1", sub_zone="subzone-1"}`) + backendServiceAttr := attribute.String("grpc.lb.backend_service", clusterName) + disconnectionReasonAttr := attribute.String("grpc.disconnect_error", "unknown") + securityLevelAttr := attribute.String("grpc.security_level", "NoSecurity") + + // Verify Connect Metrics. + wantMetrics := []metricdata.Metrics{ + { + Name: "grpc.subchannel.connection_attempts_succeeded", + Description: "EXPERIMENTAL. Number of successful connection attempts.", + Unit: "{attempt}", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, backendServiceAttr, localityAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.subchannel.open_connections", + Description: "EXPERIMENTAL. Number of open connections.", + Unit: "{attempt}", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, backendServiceAttr, securityLevelAttr, localityAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + }, + }, + } + if err := pollForWantMetrics(ctx, t, reader, wantMetrics); err != nil { + t.Fatal(err) + } + + // Stop backend to trigger Disconnect Metrics. + backend.Stop() + + disconnectionWantMetrics := []metricdata.Metrics{ + { + Name: "grpc.subchannel.disconnections", + Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.", + Unit: "{disconnection}", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, backendServiceAttr, localityAttr, disconnectionReasonAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.subchannel.connection_attempts_failed", + Description: "EXPERIMENTAL. Number of failed connection attempts.", + Unit: "{attempt}", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, backendServiceAttr, localityAttr), + Value: 1, // It will try to reconnect at least once + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + } + + if err := pollForWantMetrics(ctx, t, reader, disconnectionWantMetrics); err != nil { + t.Fatal(err) + } +} diff --git a/stream.go b/stream.go index ca87ff9776ef..5dc57ec4629a 100644 --- a/stream.go +++ b/stream.go @@ -484,7 +484,7 @@ func (a *csAttempt) getTransport() error { return err } if a.trInfo != nil { - a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr()) + a.trInfo.firstLine.SetRemoteAddr(a.transport.Peer().Addr) } if pick.blocked && a.statsHandler != nil { a.statsHandler.HandleRPC(a.ctx, &stats.DelayedPickComplete{})