Skip to content

Commit 6bede04

Browse files
committed
opentelemetry: Add optional grpc.lb.locality to per-call metrics
The optional label API was added in 4c78a97 and xds_cluster_impl was plumbed in 077dcbf. From gRFC A78: > ### Optional xDS Locality Label > > When xDS is used, it is desirable for some metrics to include an optional > label indicating which xDS locality the metrics are associated with. > We want to provide this optional label for the metrics in both the > existing per-call metrics defined in [A66] and in the new metrics for > the WRR LB policy, described below. > > If locality information is available, the value of this label will be of > the form `{region="${REGION}", zone="${ZONE}", sub_zone="${SUB_ZONE}"}`, > where `${REGION}`, `${ZONE}`, and `${SUB_ZONE}` are replaced with the > actual values. If no locality information is available, the label will > be set to the empty string. > > #### Per-Call Metrics > > To support the locality label in the per-call metrics, we will provide > a mechanism for LB picker to add optional labels to the call attempt > tracer. We will then use this mechanism in the `xds_cluster_impl` > policy's picker to set the locality label. ... > > This label will be available on the following per-call metrics: > - `grpc.client.attempt.duration` > - `grpc.client.attempt.sent_total_compressed_message_size` > - `grpc.client.attempt.rcvd_total_compressed_message_size`
1 parent 6ec744f commit 6bede04

File tree

4 files changed

+183
-19
lines changed

4 files changed

+183
-19
lines changed

opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.grpc.opentelemetry;
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
20+
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
2021
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
2122
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
2223
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
@@ -40,6 +41,8 @@
4041
import io.grpc.Status;
4142
import io.grpc.Status.Code;
4243
import io.grpc.StreamTracer;
44+
import io.opentelemetry.api.common.AttributesBuilder;
45+
import java.util.Collection;
4346
import java.util.concurrent.TimeUnit;
4447
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
4548
import java.util.concurrent.atomic.AtomicLong;
@@ -63,6 +66,7 @@
6366
*/
6467
final class OpenTelemetryMetricsModule {
6568
private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
69+
private static final String LOCALITY_LABEL_NAME = "grpc.lb.locality";
6670
public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
6771
ImmutableSet.of(
6872
"grpc.client.attempt.started",
@@ -81,11 +85,13 @@ final class OpenTelemetryMetricsModule {
8185

8286
private final OpenTelemetryMetricsResource resource;
8387
private final Supplier<Stopwatch> stopwatchSupplier;
88+
private final boolean localityEnabled;
8489

8590
OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
86-
OpenTelemetryMetricsResource resource) {
91+
OpenTelemetryMetricsResource resource, Collection<String> optionalLabels) {
8792
this.resource = checkNotNull(resource, "resource");
8893
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
94+
this.localityEnabled = optionalLabels.contains(LOCALITY_LABEL_NAME);
8995
}
9096

9197
/**
@@ -140,6 +146,7 @@ private static final class ClientTracer extends ClientStreamTracer {
140146
final String fullMethodName;
141147
volatile long outboundWireSize;
142148
volatile long inboundWireSize;
149+
volatile String locality;
143150
long attemptNanos;
144151
Code statusCode;
145152

@@ -173,6 +180,13 @@ public void inboundWireSize(long bytes) {
173180
}
174181
}
175182

183+
@Override
184+
public void addOptionalLabel(String key, String value) {
185+
if (LOCALITY_LABEL_NAME.equals(key)) {
186+
locality = value;
187+
}
188+
}
189+
176190
@Override
177191
public void streamClosed(Status status) {
178192
stopwatch.stop();
@@ -192,10 +206,18 @@ public void streamClosed(Status status) {
192206
}
193207

194208
void recordFinishedAttempt() {
195-
io.opentelemetry.api.common.Attributes attribute =
196-
io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
197-
TARGET_KEY, target,
198-
STATUS_KEY, statusCode.toString());
209+
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
210+
.put(METHOD_KEY, fullMethodName)
211+
.put(TARGET_KEY, target)
212+
.put(STATUS_KEY, statusCode.toString());
213+
if (module.localityEnabled) {
214+
String savedLocality = locality;
215+
if (savedLocality == null) {
216+
savedLocality = "unknown";
217+
}
218+
builder.put(LOCALITY_KEY, savedLocality);
219+
}
220+
io.opentelemetry.api.common.Attributes attribute = builder.build();
199221

200222
if (module.resource.clientAttemptDurationCounter() != null ) {
201223
module.resource.clientAttemptDurationCounter()
@@ -315,7 +337,8 @@ void callEnded(Status status) {
315337

316338
void recordFinishedCall() {
317339
if (attemptsPerCall.get() == 0) {
318-
ClientTracer tracer = new ClientTracer(this, module, null, target, fullMethodName);
340+
ClientTracer tracer =
341+
new ClientTracer(this, module, null, target, fullMethodName);
319342
tracer.attemptNanos = attemptStopwatch.elapsed(TimeUnit.NANOSECONDS);
320343
tracer.statusCode = status.getCode();
321344
tracer.recordFinishedAttempt();
@@ -478,8 +501,8 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
478501
// which is true for all generated methods. Otherwise, programatically
479502
// created methods result in high cardinality metrics.
480503
final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
481-
OpenTelemetryMetricsModule.this, target, recordMethodName(method.getFullMethodName(),
482-
method.isSampledToLocalTracing()));
504+
OpenTelemetryMetricsModule.this, target,
505+
recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()));
483506
ClientCall<ReqT, RespT> call =
484507
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
485508
return new SimpleForwardingClientCall<ReqT, RespT>(call) {

opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,9 @@ private OpenTelemetryModule(Builder builder) {
8484
this.enableMetrics = ImmutableMap.copyOf(builder.enableMetrics);
8585
this.disableDefault = builder.disableAll;
8686
this.resource = createMetricInstruments(meter, enableMetrics, disableDefault);
87-
this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule(STOPWATCH_SUPPLIER, resource);
8887
this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels);
88+
this.openTelemetryMetricsModule =
89+
new OpenTelemetryMetricsModule(STOPWATCH_SUPPLIER, resource, optionalLabels);
8990
this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels);
9091
}
9192

opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ public final class OpenTelemetryConstants {
2828

2929
public static final AttributeKey<String> TARGET_KEY = AttributeKey.stringKey("grpc.target");
3030

31+
public static final AttributeKey<String> LOCALITY_KEY =
32+
AttributeKey.stringKey("grpc.lb.locality");
33+
3134
private OpenTelemetryConstants() {
3235
}
3336
}

opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java

Lines changed: 147 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.grpc.opentelemetry;
1818

1919
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
20+
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
2021
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
2122
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
2223
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
@@ -52,6 +53,7 @@
5253
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
5354
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
5455
import java.io.InputStream;
56+
import java.util.Arrays;
5557
import java.util.Map;
5658
import java.util.concurrent.TimeUnit;
5759
import java.util.concurrent.atomic.AtomicReference;
@@ -158,8 +160,7 @@ public void setUp() throws Exception {
158160
public void testClientInterceptors() {
159161
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
160162
enabledMetricsMap, disableDefaultMetrics);
161-
OpenTelemetryMetricsModule module =
162-
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
163+
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
163164
grpcServerRule.getServiceRegistry().addService(
164165
ServerServiceDefinition.builder("package1.service2").addMethod(
165166
method, new ServerCallHandler<String, String>() {
@@ -215,8 +216,7 @@ public void clientBasicMetrics() {
215216
String target = "target:///";
216217
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
217218
enabledMetricsMap, disableDefaultMetrics);
218-
OpenTelemetryMetricsModule module =
219-
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
219+
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
220220
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
221221
new CallAttemptsTracerFactory(module, target, method.getFullMethodName());
222222
Metadata headers = new Metadata();
@@ -243,6 +243,8 @@ public void clientBasicMetrics() {
243243
.hasAttributes(attributes)
244244
.hasValue(1))));
245245

246+
tracer.addOptionalLabel("grpc.lb.locality", "should-be-ignored");
247+
246248
fakeClock.forwardTime(30, TimeUnit.MILLISECONDS);
247249
tracer.outboundHeaders();
248250

@@ -353,8 +355,7 @@ public void recordAttemptMetrics() {
353355
String target = "dns:///example.com";
354356
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
355357
enabledMetricsMap, disableDefaultMetrics);
356-
OpenTelemetryMetricsModule module =
357-
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
358+
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
358359
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
359360
new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target,
360361
method.getFullMethodName());
@@ -779,8 +780,7 @@ public void clientStreamNeverCreatedStillRecordMetrics() {
779780
String target = "dns:///foo.example.com";
780781
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
781782
enabledMetricsMap, disableDefaultMetrics);
782-
OpenTelemetryMetricsModule module =
783-
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
783+
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
784784
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
785785
new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target,
786786
method.getFullMethodName());
@@ -880,11 +880,142 @@ public void clientStreamNeverCreatedStillRecordMetrics() {
880880
}
881881

882882
@Test
883-
public void serverBasicMetrics() {
883+
public void clientLocalityMetrics_present() {
884+
String target = "target:///";
885+
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
886+
enabledMetricsMap, disableDefaultMetrics);
887+
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
888+
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"));
889+
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
890+
new CallAttemptsTracerFactory(module, target, method.getFullMethodName());
891+
892+
ClientStreamTracer tracer =
893+
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
894+
tracer.addOptionalLabel("grpc.lb.foo", "unimportant");
895+
tracer.addOptionalLabel("grpc.lb.locality", "should-be-overwritten");
896+
tracer.addOptionalLabel("grpc.lb.locality", "the-moon");
897+
tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon");
898+
tracer.streamClosed(Status.OK);
899+
callAttemptsTracerFactory.callEnded(Status.OK);
900+
901+
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
902+
TARGET_KEY, target,
903+
METHOD_KEY, method.getFullMethodName());
904+
905+
io.opentelemetry.api.common.Attributes clientAttributes
906+
= io.opentelemetry.api.common.Attributes.of(
907+
TARGET_KEY, target,
908+
METHOD_KEY, method.getFullMethodName(),
909+
STATUS_KEY, Status.Code.OK.toString());
910+
911+
io.opentelemetry.api.common.Attributes clientAttributesWithLocality
912+
= clientAttributes.toBuilder()
913+
.put(LOCALITY_KEY, "the-moon")
914+
.build();
915+
916+
assertThat(openTelemetryTesting.getMetrics())
917+
.satisfiesExactlyInAnyOrder(
918+
metric ->
919+
assertThat(metric)
920+
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
921+
.hasLongSumSatisfying(
922+
longSum -> longSum.hasPointsSatisfying(
923+
point -> point.hasAttributes(attributes))),
924+
metric ->
925+
assertThat(metric)
926+
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
927+
.hasHistogramSatisfying(
928+
histogram -> histogram.hasPointsSatisfying(
929+
point -> point.hasAttributes(clientAttributesWithLocality))),
930+
metric ->
931+
assertThat(metric)
932+
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
933+
.hasHistogramSatisfying(
934+
histogram -> histogram.hasPointsSatisfying(
935+
point -> point.hasAttributes(clientAttributesWithLocality))),
936+
metric ->
937+
assertThat(metric)
938+
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
939+
.hasHistogramSatisfying(
940+
histogram -> histogram.hasPointsSatisfying(
941+
point -> point.hasAttributes(clientAttributesWithLocality))),
942+
metric ->
943+
assertThat(metric)
944+
.hasName(CLIENT_CALL_DURATION)
945+
.hasHistogramSatisfying(
946+
histogram -> histogram.hasPointsSatisfying(
947+
point -> point.hasAttributes(clientAttributes))));
948+
}
949+
950+
@Test
951+
public void clientLocalityMetrics_missing() {
952+
String target = "target:///";
884953
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
885954
enabledMetricsMap, disableDefaultMetrics);
886955
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
887-
fakeClock.getStopwatchSupplier(), resource);
956+
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"));
957+
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
958+
new CallAttemptsTracerFactory(module, target, method.getFullMethodName());
959+
960+
ClientStreamTracer tracer =
961+
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
962+
tracer.streamClosed(Status.OK);
963+
callAttemptsTracerFactory.callEnded(Status.OK);
964+
965+
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
966+
TARGET_KEY, target,
967+
METHOD_KEY, method.getFullMethodName());
968+
969+
io.opentelemetry.api.common.Attributes clientAttributes
970+
= io.opentelemetry.api.common.Attributes.of(
971+
TARGET_KEY, target,
972+
METHOD_KEY, method.getFullMethodName(),
973+
STATUS_KEY, Status.Code.OK.toString());
974+
975+
io.opentelemetry.api.common.Attributes clientAttributesWithLocality
976+
= clientAttributes.toBuilder()
977+
.put(LOCALITY_KEY, "unknown")
978+
.build();
979+
980+
assertThat(openTelemetryTesting.getMetrics())
981+
.satisfiesExactlyInAnyOrder(
982+
metric ->
983+
assertThat(metric)
984+
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
985+
.hasLongSumSatisfying(
986+
longSum -> longSum.hasPointsSatisfying(
987+
point -> point.hasAttributes(attributes))),
988+
metric ->
989+
assertThat(metric)
990+
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
991+
.hasHistogramSatisfying(
992+
histogram -> histogram.hasPointsSatisfying(
993+
point -> point.hasAttributes(clientAttributesWithLocality))),
994+
metric ->
995+
assertThat(metric)
996+
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
997+
.hasHistogramSatisfying(
998+
histogram -> histogram.hasPointsSatisfying(
999+
point -> point.hasAttributes(clientAttributesWithLocality))),
1000+
metric ->
1001+
assertThat(metric)
1002+
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
1003+
.hasHistogramSatisfying(
1004+
histogram -> histogram.hasPointsSatisfying(
1005+
point -> point.hasAttributes(clientAttributesWithLocality))),
1006+
metric ->
1007+
assertThat(metric)
1008+
.hasName(CLIENT_CALL_DURATION)
1009+
.hasHistogramSatisfying(
1010+
histogram -> histogram.hasPointsSatisfying(
1011+
point -> point.hasAttributes(clientAttributes))));
1012+
}
1013+
1014+
@Test
1015+
public void serverBasicMetrics() {
1016+
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
1017+
enabledMetricsMap, disableDefaultMetrics);
1018+
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
8881019
ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory();
8891020
ServerStreamTracer tracer =
8901021
tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata());
@@ -994,6 +1125,12 @@ public void serverBasicMetrics() {
9941125

9951126
}
9961127

1128+
private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule(
1129+
OpenTelemetryMetricsResource resource) {
1130+
return new OpenTelemetryMetricsModule(
1131+
fakeClock.getStopwatchSupplier(), resource, Arrays.asList());
1132+
}
1133+
9971134
static class CallInfo<ReqT, RespT> extends ServerCallInfo<ReqT, RespT> {
9981135
private final MethodDescriptor<ReqT, RespT> methodDescriptor;
9991136
private final Attributes attributes;

0 commit comments

Comments
 (0)