Skip to content

Commit 4b1ed80

Browse files
author
Naireen
committed
address comments
1 parent 0899da0 commit 4b1ed80

File tree

7 files changed

+29
-14
lines changed

7 files changed

+29
-14
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -386,9 +386,9 @@ private void deleteStaleCounters(
386386
@VisibleForTesting
387387
Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
388388
ImmutableMap.Builder<MetricName, Long> counters = new ImmutableMap.Builder<MetricName, Long>();
389-
ImmutableMap.Builder<MetricName, Long> per_worker_gauges =
389+
ImmutableMap.Builder<MetricName, Long> perWorkerGaugeUpdates =
390390
new ImmutableMap.Builder<MetricName, Long>();
391-
ImmutableMap.Builder<MetricName, LockFreeHistogram.Snapshot> per_worker_histograms =
391+
ImmutableMap.Builder<MetricName, LockFreeHistogram.Snapshot> perWorkerHistogramUpdates =
392392
new ImmutableMap.Builder<MetricName, LockFreeHistogram.Snapshot>();
393393
HashSet<MetricName> currentZeroValuedCounters = new HashSet<MetricName>();
394394
perWorkerCounters.forEach(
@@ -404,22 +404,22 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
404404
perWorkerGauges.forEach(
405405
(k, v) -> {
406406
Long val = v.getCumulative().value();
407-
per_worker_gauges.put(k, val);
407+
perWorkerGaugeUpdates.put(k, val);
408408
v.reset();
409409
});
410410

411411
perWorkerHistograms.forEach(
412412
(k, v) -> {
413-
v.getSnapshotAndReset().ifPresent(snapshot -> per_worker_histograms.put(k, snapshot));
413+
v.getSnapshotAndReset().ifPresent(snapshot -> perWorkerHistogramUpdates.put(k, snapshot));
414414
});
415415

416416
deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));
417417

418418
return MetricsToPerStepNamespaceMetricsConverter.convert(
419419
stepName,
420420
counters.build(),
421-
per_worker_gauges.build(),
422-
per_worker_histograms.build(),
421+
perWorkerGaugeUpdates.build(),
422+
perWorkerHistogramUpdates.build(),
423423
parsedPerWorkerMetricsCache);
424424
}
425425

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java

-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ private Optional<Histogram> getHistogram() {
5050
if (container == null) {
5151
return Optional.empty();
5252
}
53-
// LOG.info("Xxx get histogram");
5453
return Optional.of(container.getHistogram(name, bucketType));
5554
}
5655

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ static Histogram createRPCLatencyHistogram(RpcMethod method) {
114114
LabeledMetricNameUtils.MetricNameBuilder nameBuilder =
115115
LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(RPC_LATENCY);
116116
nameBuilder.addLabel(RPC_METHOD, method.toString());
117-
MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);
118117
nameBuilder.addMetricLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true");
118+
MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);
119119

120120
// Create Exponential histogram buckets with the following parameters:
121121
// 0 scale, resulting in bucket widths with a size 2 growth factor.

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.beam.sdk.metrics.MetricsEnvironment;
4242
import org.apache.beam.sdk.util.HistogramData;
4343
import org.apache.beam.sdk.values.KV;
44+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
4445
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
4546
import org.hamcrest.collection.IsMapContaining;
4647
import org.junit.Test;
@@ -203,7 +204,10 @@ public void testReportSuccessfulRpcMetrics() throws Exception {
203204
MetricName counterNameDisabledDeletes =
204205
MetricName.named("BigQuerySink", "RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:OK;");
205206
MetricName histogramName =
206-
MetricName.named("BigQuerySink", "RpcLatency*rpc_method:APPEND_ROWS;");
207+
MetricName.named(
208+
"BigQuerySink",
209+
"RpcLatency*rpc_method:APPEND_ROWS;",
210+
ImmutableMap.of("PER_WORKER_METRIC", "true"));
207211
HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(0, 17);
208212
testContainer.assertPerWorkerCounterValue(counterNameDisabledDeletes, 1L);
209213
testContainer.assertPerWorkerHistogramValues(histogramName, bucketType, 3.0);
@@ -243,7 +247,10 @@ public void testReportFailedRPCMetrics_KnownGrpcError() throws Exception {
243247
MetricName.named(
244248
"BigQuerySink", "RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:NOT_FOUND;");
245249
MetricName histogramName =
246-
MetricName.named("BigQuerySink", "RpcLatency*rpc_method:APPEND_ROWS;");
250+
MetricName.named(
251+
"BigQuerySink",
252+
"RpcLatency*rpc_method:APPEND_ROWS;",
253+
ImmutableMap.of("PER_WORKER_METRIC", "true"));
247254
HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(0, 17);
248255
testContainer.assertPerWorkerCounterValue(counterNameDisabledDeletes, 1L);
249256
testContainer.assertPerWorkerHistogramValues(histogramName, bucketType, 5.0);
@@ -282,7 +289,10 @@ public void testReportFailedRPCMetrics_UnknownGrpcError() throws Exception {
282289
MetricName.named(
283290
"BigQuerySink", "RpcRequestsCount*rpc_method:APPEND_ROWS;rpc_status:UNKNOWN;");
284291
MetricName histogramName =
285-
MetricName.named("BigQuerySink", "RpcLatency*rpc_method:APPEND_ROWS;");
292+
MetricName.named(
293+
"BigQuerySink",
294+
"RpcLatency*rpc_method:APPEND_ROWS;",
295+
ImmutableMap.of("PER_WORKER_METRIC", "true"));
286296
HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(0, 17);
287297
testContainer.assertPerWorkerCounterValue(counterNameDisabledDeletes, 1L);
288298
testContainer.assertPerWorkerHistogramValues(histogramName, bucketType, 15.0);

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ public static Histogram createRPCLatencyHistogram(RpcMethod method, String topic
7171
nameBuilder.addLabel(RPC_METHOD, method.toString());
7272
nameBuilder.addLabel(TOPIC_LABEL, topic);
7373

74-
MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);
7574
nameBuilder.addMetricLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true");
75+
MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);
7676

7777
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17);
7878
return new DelegatingHistogram(metricName, buckets, false);

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,10 @@ public void testKafkaRPCLatencyMetrics() throws Exception {
121121
results.flushBufferedMetrics();
122122
// RpcLatency*rpc_method:POLL;topic_name:test-topic
123123
MetricName histogramName =
124-
MetricName.named("KafkaSink", "RpcLatency*rpc_method:POLL;topic_name:test-topic;");
124+
MetricName.named(
125+
"KafkaSink",
126+
"RpcLatency*rpc_method:POLL;topic_name:test-topic;",
127+
ImmutableMap.of("PER_WORKER_METRIC", "true"));
125128
HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(1, 17);
126129

127130
assertThat(testContainer.histograms.size(), equalTo(1));

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetricsTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ public void testCreatingHistogram() throws Exception {
3838
KafkaSinkMetrics.createRPCLatencyHistogram(KafkaSinkMetrics.RpcMethod.POLL, "topic1");
3939

4040
MetricName histogramName =
41-
MetricName.named("KafkaSink", "RpcLatency*rpc_method:POLL;topic_name:topic1;");
41+
MetricName.named(
42+
"KafkaSink",
43+
"RpcLatency*rpc_method:POLL;topic_name:topic1;",
44+
ImmutableMap.of("PER_WORKER_METRIC", "true"));
4245
assertThat(histogram.getName(), equalTo(histogramName));
4346
}
4447

0 commit comments

Comments
 (0)