Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions balancer/pickfirst/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,31 @@ func (s) TestPickFirstMetrics(t *testing.T) {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
}

// Checking for subchannel metrics as well
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1)
}
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 0)
}
if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0)
}
if got, _ := tmr.Metric("grpc.subchannel.open_connections"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.open_connections", got, 1)
}

ss.Stop()
testutils.AwaitState(ctx, t, cc, connectivity.Idle)
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 1)
}
if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 1)
}
if got, _ := tmr.Metric("grpc.subchannel.open_connections"); got != -1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.open_connections", got, -1)
}
}

// TestPickFirstMetricsFailure tests the connection attempts failed metric. It
Expand Down
116 changes: 114 additions & 2 deletions balancer/pickfirst/pickfirst_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1935,6 +1935,7 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_AfterEndOfList(t *testing.T) {
holds[1].Fail(fmt.Errorf("test error"))
tmr.WaitForInt64CountIncr(ctx, 1)
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
waitForMetric(ctx, t, tmr, "grpc.subchannel.connection_attempts_failed")

// Only connection attempt fails in this test.
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 0 {
Expand All @@ -1946,6 +1947,12 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_AfterEndOfList(t *testing.T) {
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
}
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 1)
}
if got, _ := tmr.Metric("grpc.lb.subchannel.connection_attempts_succeeded"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.subchannel.connection_attempts_succeeded", got, 0)
}
}

// Test verifies that pickfirst attempts to connect to the second backend once
Expand Down Expand Up @@ -1995,7 +2002,7 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TriggerConnectionDelay(t *testing.T) {
// that the channel becomes READY.
holds[1].Resume()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

waitForMetric(ctx, t, tmr, "grpc.subchannel.connection_attempts_succeeded")
// Only connection attempt successes in this test.
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_succeeded", got, 1)
Expand All @@ -2006,6 +2013,27 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TriggerConnectionDelay(t *testing.T) {
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
}

if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1)
}
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 0)
}
}

func waitForMetric(ctx context.Context, t *testing.T, tmr *stats.TestMetricsRecorder, metricName string) {
for {
if _, ok := tmr.Metric(metricName); ok {
break
}
select {
case <-ctx.Done():
t.Fatalf("Timeout waiting for metric emission: %s", metricName)
case <-time.After(10 * time.Millisecond):
continue
}
}
}

// Test tests the pickfirst balancer by causing a SubConn to fail and then
Expand Down Expand Up @@ -2057,6 +2085,9 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) {
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_failed", got, 1)
}
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 1)
}
if holds[2].IsStarted() != false {
t.Fatalf("Server %d with address %q contacted unexpectedly", 2, addrs[2])
}
Expand All @@ -2073,7 +2104,10 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) {
// that the channel becomes READY.
holds[1].Resume()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

waitForMetric(ctx, t, tmr, "grpc.subchannel.connection_attempts_succeeded")
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1)
}
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_succeeded", got, 1)
}
Expand All @@ -2082,6 +2116,84 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) {
}
}

// Test verifies that when a subchannel is shut down by the LB (because another
// subchannel won) while its dial is still in-flight, it records exactly one
// successful attempt.
func (s) TestPickFirstLeaf_HappyEyeballs_Ignore_Inflight_Cancellations(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

originalTimer := pfinternal.TimeAfterFunc
defer func() {
pfinternal.TimeAfterFunc = originalTimer
}()
triggerTimer, timeAfter := mockTimer()
pfinternal.TimeAfterFunc = timeAfter

tmr := stats.NewTestMetricsRecorder()
dialer := testutils.NewBlockingDialer()
opts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, pfbalancer.Name)),
grpc.WithContextDialer(dialer.DialContext),
grpc.WithStatsHandler(tmr),
}

// Setup 2 backend addresses
cc, rb, bm := setupPickFirstLeaf(t, 2, opts...)
addrs := bm.resolverAddrs()
holds := bm.holds(dialer)
rb.UpdateState(resolver.State{Addresses: addrs})
cc.Connect()

// Make sure we connect to second subconn
testutils.AwaitState(ctx, t, cc, connectivity.Connecting)
if holds[0].Wait(ctx) != true {
t.Fatalf("Timeout waiting for server %d to be contacted", 0)
}
triggerTimer()
if holds[1].Wait(ctx) != true {
t.Fatalf("Timeout waiting for server %d to be contacted", 1)
}
holds[1].Resume()

// Wait for Channel to become READY.
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Unblock the First SubConn.
// Since the LB has already closed this subchannel, the context passed to Dial
// is canceled. This will lead to an inflight attempt to be cancelled.
// No success or failure metric should be recorded for this.
holds[0].Resume()

// --- Assertions ---

// Wait for the SUCCESS metric to ensure recording logic has processed.
waitForMetric(ctx, t, tmr, "grpc.subchannel.connection_attempts_succeeded")

// Verify Success: Exactly 1 (The Winner).
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: 1", "grpc.subchannel.connection_attempts_succeeded", got)
}
Comment on lines +2170 to +2176
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this actually ensure that we check the value of the metric after the first connection attempt is completely processed? We do call holds[0].Resume(), but does that guarantee that the subchannel code sees the connection being successful, but drops it since the subchannel has been deleted by the LB policy.

Copy link
Contributor Author

@mbissa mbissa Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are waiting for the metric to be emitted. Connection attempt success will only be emitted if there is a successful connection. In case of cancellation of attempt - it will not be successful and in case of disconnection after establishing connection, it will still be recorded as a disconnection. In both scenarios, the attempts succeeded will always be 1.


// Verify Failure: Exactly 0 (The Loser was ignored).
// We poll briefly to ensure no delayed failure metric appears.
sCtx, sCancel := context.WithTimeout(ctx, 50*time.Millisecond)
defer sCancel()
for ; sCtx.Err() == nil; <-time.After(time.Millisecond) {
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 0 {
t.Fatalf("Unexpected failure recorded for shutdown subchannel, got: %v, want: 0", got)
}
}

// LB Metrics Check
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: 1", "grpc.lb.pick_first.connection_attempts_succeeded", got)
}
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: 0", "grpc.lb.pick_first.connection_attempts_failed", got)
}
}

func (s) TestPickFirstLeaf_InterleavingIPV4Preferred(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down
104 changes: 103 additions & 1 deletion clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
expstats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
Expand Down Expand Up @@ -98,6 +100,41 @@ var (
errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
)

var (
disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
Name: "grpc.subchannel.disconnections",
Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
Unit: "{disconnection}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality", "grpc.disconnect_error"},
Default: false,
})
connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
Name: "grpc.subchannel.connection_attempts_succeeded",
Description: "EXPERIMENTAL. Number of successful connection attempts.",
Unit: "{attempt}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"},
Default: false,
})
connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
Name: "grpc.subchannel.connection_attempts_failed",
Description: "EXPERIMENTAL. Number of failed connection attempts.",
Unit: "{attempt}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"},
Default: false,
})
openConnectionsMetric = expstats.RegisterInt64UpDownCount(expstats.MetricDescriptor{
Name: "grpc.subchannel.open_connections",
Description: "EXPERIMENTAL. Number of open connections.",
Unit: "{attempt}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.security_level", "grpc.lb.locality"},
Default: false,
})
)

const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
defaultClientMaxSendMessageSize = math.MaxInt32
Expand Down Expand Up @@ -861,6 +898,7 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
resetBackoff: make(chan struct{}),
}
ac.updateTelemetryLabelsLocked()
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Start with our address set to the first address; this may be updated if
// we connect to different addresses.
Expand Down Expand Up @@ -977,7 +1015,7 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
}

ac.addrs = addrs

ac.updateTelemetryLabelsLocked()
if ac.state == connectivity.Shutdown ||
ac.state == connectivity.TransientFailure ||
ac.state == connectivity.Idle {
Expand Down Expand Up @@ -1216,13 +1254,28 @@ type addrConn struct {
resetBackoff chan struct{}

channelz *channelz.SubChannel

localityLabel string
backendServiceLabel string
}

// Note: this requires a lock on ac.mu.
func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
if ac.state == s {
return
}

// If we are transitioning out of Ready, it means there is a disconnection.
// A SubConn can also transition from CONNECTING directly to IDLE when
// a transport is successfully created, but the connection fails
// before the SubConn can send the notification for READY. We treat
// this as a successful connection and transition to IDLE.
// TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second
// part of the if condition below once the issue is fixed.
if ac.state == connectivity.Ready || (ac.state == connectivity.Connecting && s == connectivity.Idle) {
disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel, "unknown")
openConnectionsMetric.Record(ac.cc.metricsRecorderList, -1, ac.cc.target, ac.backendServiceLabel, ac.securityLevelLocked(), ac.localityLabel)
}
ac.state = s
ac.channelz.ChannelMetrics.State.Store(&s)
if lastErr == nil {
Expand Down Expand Up @@ -1280,6 +1333,15 @@ func (ac *addrConn) resetTransportAndUnlock() {
ac.mu.Unlock()

if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
if !errors.Is(err, context.Canceled) {
connectionAttemptsFailedMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel)
} else {
if logger.V(2) {
// This records cancelled connection attempts which can be later
// replaced by a metric.
logger.Infof("Context cancellation detected; not recording this as a failed connection attempt.")
}
}
// TODO: #7534 - Move re-resolution requests into the pick_first LB policy
// to ensure one resolution request per pass instead of per subconn failure.
ac.cc.resolveNow(resolver.ResolveNowOptions{})
Expand Down Expand Up @@ -1319,10 +1381,50 @@ func (ac *addrConn) resetTransportAndUnlock() {
}
// Success; reset backoff.
ac.mu.Lock()
connectionAttemptsSucceededMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel)
openConnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.securityLevelLocked(), ac.localityLabel)
ac.backoffIdx = 0
ac.mu.Unlock()
}

// updateTelemetryLabelsLocked calculates and caches the telemetry labels based on the
// first address in addrConn.
func (ac *addrConn) updateTelemetryLabelsLocked() {
labelsFunc, ok := internal.AddressToTelemetryLabels.(func(resolver.Address) map[string]string)
if !ok || len(ac.addrs) == 0 {
// Reset defaults
ac.localityLabel = ""
ac.backendServiceLabel = ""
return
}
labels := labelsFunc(ac.addrs[0])
ac.localityLabel = labels["grpc.lb.locality"]
ac.backendServiceLabel = labels["grpc.lb.backend_service"]
}

type securityLevelKey struct{}

func (ac *addrConn) securityLevelLocked() string {
var secLevel string
// During disconnection, ac.transport is nil. Fall back to the security level
// stored in the current address during connection.
if ac.transport == nil {
secLevel, _ = ac.curAddr.Attributes.Value(securityLevelKey{}).(string)
return secLevel
}
authInfo := ac.transport.Peer().AuthInfo
if ci, ok := authInfo.(interface {
GetCommonAuthInfo() credentials.CommonAuthInfo
}); ok {
secLevel = ci.GetCommonAuthInfo().SecurityLevel.String()
// Store the security level in the current address' attributes so
// that it remains available for disconnection metrics after the
// transport is closed.
ac.curAddr.Attributes = ac.curAddr.Attributes.WithValue(securityLevelKey{}, secLevel)
}
return secLevel
}

// tryAllAddrs tries to create a connection to the addresses, and stop when at
// the first successful one. It returns an error if no address was successfully
// connected, or updates ac appropriately with the new transport.
Expand Down
4 changes: 4 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ var (
// When set, the function will be called before the stream enters
// the blocking state.
NewStreamWaitingForResolver = func() {}

// AddressToTelemetryLabels is an xDS-provided function to extract telemetry
// labels from a resolver.Address. Callers must assert its type before calling.
AddressToTelemetryLabels any // func(addr resolver.Address) map[string]string
)

// HealthChecker defines the signature of the client-side LB channel health
Expand Down
Loading