Skip to content

Conversation

@mbissa
Copy link
Contributor

@mbissa mbissa commented Jan 7, 2026

This PR leverages the async gauge framework and implements the xdsclient metrics to report number of xds resources and whether or not the xDS client currently has a working ADS stream to the xDS server along with the required labels as part of A78

RELEASE NOTES:

  • xds/internal/client: Add async gauge metrics for "grpc.xds_client.connected" and "grpc.xds_client.resources" as part of A78.

@mbissa mbissa added this to the 1.79 Release milestone Jan 7, 2026
@mbissa mbissa requested a review from easwars January 7, 2026 01:18
@mbissa mbissa self-assigned this Jan 7, 2026
@mbissa mbissa added Type: Feature New features or improvements in behavior Area: xDS Includes everything xDS related, including LB policies used with xDS. labels Jan 7, 2026
@codecov
Copy link

codecov bot commented Jan 7, 2026

Codecov Report

❌ Patch coverage is 74.22680% with 25 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.23%. Comparing base (c6d5e5e) to head (ec83a4e).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
internal/xds/xdsclient/clientimpl.go 25.92% 19 Missing and 1 partial ⚠️
internal/xds/clients/xdsclient/xdsclient.go 92.42% 4 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #8807      +/-   ##
==========================================
- Coverage   83.26%   83.23%   -0.03%     
==========================================
  Files         418      418              
  Lines       33004    33101      +97     
==========================================
+ Hits        27480    27551      +71     
- Misses       4109     4135      +26     
  Partials     1415     1415              
Files with missing lines Coverage Δ
internal/xds/clients/xdsclient/ads_stream.go 85.06% <100.00%> (+0.21%) ⬆️
internal/xds/clients/xdsclient/xdsclient.go 86.61% <92.42%> (+2.03%) ⬆️
internal/xds/xdsclient/clientimpl.go 73.72% <25.92%> (-11.74%) ⬇️

... and 29 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

// Value is 1 if connected, 0 otherwise.
type XDSClientConnected struct {
ServerURI string
Value int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this not be of type bool?

Comment on lines +53 to +56
Authority string
ResourceType string
CacheState string
Count int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that the previously defined metric structs did not document the fields. But I feel we should do one of the following:

  • trialing line comments for all of the fields
  • a package level comment that talks about every single field
  • a package level comment pointing to the section in A78 that contains a description of the labels, which is what these fields correspond to

ServerURI string
}

// XDSClientConnected reports the connectivity state of the xDS stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/xDS stream/ADS stream/

FYI: The xDS client establishes two streams, ADS and LRS.

})
xdsClientConnectedMetric = estats.RegisterInt64AsyncGauge(estats.MetricDescriptor{
Name: "grpc.xds_client.connected",
Description: "Experimental. 1 if the xDS stream is connected, 0 otherwise.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are only the newly added metrics "experimental"? Have the other three already been stabilized?

xdsClientResourcesMetric = estats.RegisterInt64AsyncGauge(estats.MetricDescriptor{
Name: "grpc.xds_client.resources",
Description: "Experimental. Number of xDS resources currently cached.",
Unit: "{resource}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean to have a {resource} unit?

@easwars
Copy link
Contributor

easwars commented Feb 6, 2026

/gemini review

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces asynchronous gauge metrics for tracking xDS client connectivity and resource counts, which is a valuable addition for observability. The implementation looks mostly solid, but I've found a critical issue in the connectivity metric logic that causes it to report incorrect status. I've also included a couple of medium-severity suggestions to improve test coverage and code clarity. Please see the detailed comments below.

Comment on lines +470 to +472
if cs.channel.ads != nil && cs.channel.ads.fc != nil && !cs.channel.ads.fc.isStopped() {
val = 1
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The logic to determine if the xDS client is connected is incorrect. The condition !cs.channel.ads.fc.isStopped() checks if the adsStreamImpl has been stopped, which only happens when the xdsChannel is closed (e.g., when XDSClient.Close() is called).

An xdsChannel can be active while the underlying ADS stream is down (e.g., during connection backoff after a stream failure). In such cases, this logic will incorrectly report Value: 1 (connected), even though there is no active stream to the management server.

The metric's description is "1 if the xDS stream is connected, 0 otherwise." The implementation should reflect the actual state of the underlying stream.

To fix this, adsStreamImpl would need to maintain a state indicating whether a stream is currently established, and reportConnectedState should query that state. This would likely involve adding an atomic boolean to adsStreamImpl that is updated as the stream is created and terminated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct.

Here is where the ADS stream is created:

stream, err := s.transport.NewStream(ctx, "/envoy.service.discovery.v3.AggregatedDiscoveryService/StreamAggregatedResources")

And here is where we know the previously created stream is no longer active:

Also, note that if the underlying connection is broken, new stream creation will fail:

s.logger.Warningf("Failed to create a new ADS streaming RPC: %v", err)

Comment on lines +386 to +388
if err := tmr.waitForSpecificMetric(ctx, &metrics.ServerFailure{ServerURI: mgmtServer.Address}); err != nil {
t.Fatal(err.Error())
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The test verifies that the XDSClientConnected metric reports a value of 1 when connected. However, after the management server is stopped and a ServerFailure metric is received, the test does not verify that the XDSClientConnected metric subsequently reports 0 for the disconnected state.

Adding this check would make the test more comprehensive and would have helped identify the issue in reportConnectedState where the disconnected state is not being reported correctly.

Suggested change
if err := tmr.waitForSpecificMetric(ctx, &metrics.ServerFailure{ServerURI: mgmtServer.Address}); err != nil {
t.Fatal(err.Error())
}
if err := tmr.waitForSpecificMetric(ctx, &metrics.ServerFailure{ServerURI: mgmtServer.Address}); err != nil {
t.Fatal(err.Error())
}
tmr.triggerAsyncMetrics()
if err := tmr.waitForSpecificMetric(ctx, &metrics.XDSClientConnected{ServerURI: mgmtServer.Address, Value: 0}); err != nil {
t.Fatal(err.Error())
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't reviewed the tests yet and don't plan to review them until the non-test code looks good. So, please check if this comment is valid and act on it if required.

switch m := metric.(type) {
case *metrics.XDSClientConnected:
// Record: grpc.xds_client.connected
// Labels: grpc.target, grpc.xds.authority

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment here states that one of the labels is grpc.xds.authority. However, the xdsClientConnectedMetric is defined with the label grpc.xds.server, and the code correctly passes m.ServerURI for it. To avoid confusion, the comment should be updated to match the metric definition.

Suggested change
// Labels: grpc.target, grpc.xds.authority
// Labels: grpc.target, grpc.xds.server

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

ReportMetric(metric any)

// RegisterAsyncReporter registers a reporter to produce metric values for
// only the listed descriptors. The returned function must be called when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are these descriptors listed? Can that be specified in this docstring?

RegisterAsyncReporter(reporter AsyncReporter) func()
}

// AsyncReporter is an interface for types that record metrics asynchronously.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can this line be replaced with:
// AsyncReporter allows implementations to record metrics asynchronously

The fact that it is an interface is clear by the definition of the type. So, it need not be specified in the docstring.


// RegisterAsyncReporter registers a reporter to produce metric values for
// only the listed descriptors. The returned function must be called when
// the metrics are no longer needed, which will remove the reporter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which will remove the reporter

Does this guarantee that once the returned cancel func is called, the Report method on the registered AsyncReporter is never called? If so, does it make sense to mention that in the docstring.

FYI: Even though this is an internal package, this xDS client is used by non-grpc folks, and we might fork this off to a separate repo or sub-module at some point in time. So, the requirements for docstrings here should be the same as that of any public APIs in our repo.

Comment on lines +128 to +130
// ReportMetric reports a metric. The metric will be one of the predefined
// set of types in the metrics.go file.
ReportMetric(metric any)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this method is called and passed a metric that is not supposed to be reported asynchronously?

// RegisterAsyncReporter adapts the generic clients.AsyncReporter to the
// estats.AsyncMetricReporter interface and registers it.
func (mr *metricsReporter) RegisterAsyncReporter(reporter clients.AsyncReporter) func() {
if mr.recorder == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we handle the case of reporter being nil here as well?

Comment on lines +386 to +388
if err := tmr.waitForSpecificMetric(ctx, &metrics.ServerFailure{ServerURI: mgmtServer.Address}); err != nil {
t.Fatal(err.Error())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't reviewed the tests yet and don't plan to review them until the non-test code looks good. So, please check if this comment is valid and act on it if required.

switch m := metric.(type) {
case *metrics.XDSClientConnected:
// Record: grpc.xds_client.connected
// Labels: grpc.target, grpc.xds.authority
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


// Schedule the operation.
// If the serializer is closed/context canceled, the second func (onFailure) runs.
a.xdsClientSerializer.ScheduleOr(op, func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why the TrySchedule method on the serializer cannot be used here?

Comment on lines +559 to +562
default:
// Fallback for initialization states
return "requested"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this being specified in the gRFC. Am I missing something?

switch r.md.Status {
case xdsresource.ServiceStatusRequested:
return "requested"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't add newlines between case statements. That is not Go style. It makes the whole switch statement unnecessarily long.

@easwars easwars removed their assignment Feb 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area: xDS Includes everything xDS related, including LB policies used with xDS. Type: Feature New features or improvements in behavior

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants