Skip to content

Commit 8cc2eb9

Browse files
author
Naireen
committed
refactor histograms as per issue #34195
1 parent 8e89e97 commit 8cc2eb9

File tree

11 files changed

+74
-127
lines changed

11 files changed

+74
-127
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ public Gauge getGauge(MetricName metricName) {
7575
return getCurrentContainer().getGauge(metricName);
7676
}
7777

78+
@Override
79+
public Histogram getHistogram(MetricName metricName, HistogramData.BucketType bucketType) {
80+
return getCurrentContainer().getHistogram(metricName, bucketType);
81+
}
82+
7883
@Override
7984
public StringSet getStringSet(MetricName metricName) {
8085
return getCurrentContainer().getStringSet(metricName);
@@ -84,10 +89,4 @@ public StringSet getStringSet(MetricName metricName) {
8489
public BoundedTrie getBoundedTrie(MetricName metricName) {
8590
return getCurrentContainer().getBoundedTrie(metricName);
8691
}
87-
88-
@Override
89-
public Histogram getPerWorkerHistogram(
90-
MetricName metricName, HistogramData.BucketType bucketType) {
91-
return getCurrentContainer().getPerWorkerHistogram(metricName, bucketType);
92-
}
9392
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java

+28-21
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,14 @@
4848
import org.apache.beam.sdk.metrics.MetricKey;
4949
import org.apache.beam.sdk.metrics.MetricName;
5050
import org.apache.beam.sdk.metrics.MetricsContainer;
51+
import org.apache.beam.sdk.metrics.NoOpHistogram;
5152
import org.apache.beam.sdk.metrics.StringSet;
5253
import org.apache.beam.sdk.util.HistogramData;
5354
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
5455
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function;
5556
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
5657
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
58+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
5759
import org.checkerframework.checker.nullness.qual.Nullable;
5860

5961
/**
@@ -76,6 +78,9 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
7678

7779
private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);
7880

81+
// Stores metrics that will be later aggregted per worker.
82+
private MetricsMap<MetricName, GaugeCell> perWorkerGauges = new MetricsMap<>(GaugeCell::new);
83+
7984
private MetricsMap<MetricName, StringSetCell> stringSets = new MetricsMap<>(StringSetCell::new);
8085

8186
private MetricsMap<MetricName, DeltaDistributionCell> distributions =
@@ -84,6 +89,7 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
8489
private MetricsMap<MetricName, BoundedTrieCell> boundedTries =
8590
new MetricsMap<>(BoundedTrieCell::new);
8691

92+
// Stores metrics that will be later aggregted per worker.
8793
private final ConcurrentHashMap<MetricName, LockFreeHistogram> perWorkerHistograms =
8894
new ConcurrentHashMap<>();
8995

@@ -173,6 +179,9 @@ public Distribution getDistribution(MetricName metricName) {
173179

174180
@Override
175181
public Gauge getGauge(MetricName metricName) {
182+
if (MonitoringInfoConstants.isPerWorkerMetric(metricName)) {
183+
return perWorkerGauges.get(metricName);
184+
}
176185
return gauges.get(metricName);
177186
}
178187

@@ -187,10 +196,10 @@ public BoundedTrie getBoundedTrie(MetricName metricName) {
187196
}
188197

189198
@Override
190-
public Histogram getPerWorkerHistogram(
191-
MetricName metricName, HistogramData.BucketType bucketType) {
192-
if (!enablePerWorkerMetrics) {
193-
return MetricsContainer.super.getPerWorkerHistogram(metricName, bucketType);
199+
public Histogram getHistogram(MetricName metricName, HistogramData.BucketType bucketType) {
200+
// Currently worker only supports per worker histograms
201+
if (!enablePerWorkerMetrics || !MonitoringInfoConstants.isPerWorkerMetric(metricName)) {
202+
return NoOpHistogram.getInstance();
194203
}
195204

196205
LockFreeHistogram val = perWorkerHistograms.get(metricName);
@@ -247,7 +256,6 @@ private FluentIterable<CounterUpdate> gaugeUpdates() {
247256
return MetricsToCounterUpdateConverter.fromGauge(
248257
MetricKey.create(stepName, entry.getKey()), value, timestamp);
249258
} else {
250-
// add a test for this.
251259
return null;
252260
}
253261
}
@@ -377,11 +385,11 @@ private void deleteStaleCounters(
377385
*/
378386
@VisibleForTesting
379387
Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
380-
ConcurrentHashMap<MetricName, Long> counters = new ConcurrentHashMap<MetricName, Long>();
381-
ConcurrentHashMap<MetricName, Long> per_worker_gauges =
382-
new ConcurrentHashMap<MetricName, Long>();
383-
ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot> histograms =
384-
new ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot>();
388+
ImmutableMap.Builder<MetricName, Long> counters = new ImmutableMap.Builder<MetricName, Long>();
389+
ImmutableMap.Builder<MetricName, Long> per_worker_gauges =
390+
new ImmutableMap.Builder<MetricName, Long>();
391+
ImmutableMap.Builder<MetricName, LockFreeHistogram.Snapshot> per_worker_histograms =
392+
new ImmutableMap.Builder<MetricName, LockFreeHistogram.Snapshot>();
385393
HashSet<MetricName> currentZeroValuedCounters = new HashSet<MetricName>();
386394
perWorkerCounters.forEach(
387395
(k, v) -> {
@@ -393,27 +401,26 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
393401
counters.put(k, val);
394402
});
395403

396-
gauges.forEach(
404+
perWorkerGauges.forEach(
397405
(k, v) -> {
398-
// Check if metric name has the per worker label set.
399-
// TODO(Naireen): Populate local map with perWorkerMetrics so we don't need to check each
400-
// time we update the metrics.
401-
if (MonitoringInfoConstants.isPerWorkerMetric(k)) {
402-
Long val = v.getCumulative().value();
403-
per_worker_gauges.put(k, val);
404-
v.reset();
405-
}
406+
Long val = v.getCumulative().value();
407+
per_worker_gauges.put(k, val);
408+
v.reset();
406409
});
407410

408411
perWorkerHistograms.forEach(
409412
(k, v) -> {
410-
v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k, snapshot));
413+
v.getSnapshotAndReset().ifPresent(snapshot -> per_worker_histograms.put(k, snapshot));
411414
});
412415

413416
deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));
414417

415418
return MetricsToPerStepNamespaceMetricsConverter.convert(
416-
stepName, counters, per_worker_gauges, histograms, parsedPerWorkerMetricsCache);
419+
stepName,
420+
counters.build(),
421+
per_worker_gauges.build(),
422+
per_worker_histograms.build(),
423+
parsedPerWorkerMetricsCache);
417424
}
418425

419426
/**

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java

+22-10
Original file line numberDiff line numberDiff line change
@@ -421,17 +421,23 @@ public void testBoundedTrieUpdateExtraction() {
421421
public void testPerWorkerMetrics() {
422422
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(false);
423423
MetricsContainer metricsContainer = registry.getContainer("test_step");
424+
MetricName histogramMetricName =
425+
MetricName.named("BigQuerySink", "histogram", ImmutableMap.of("PER_WORKER_METRIC", "true"));
426+
424427
assertThat(
425-
metricsContainer.getPerWorkerCounter(name1), sameInstance(NoOpCounter.getInstance()));
428+
metricsContainer.getPerWorkerCounter(histogramMetricName),
429+
sameInstance(NoOpCounter.getInstance()));
426430
HistogramData.BucketType testBucket = HistogramData.LinearBuckets.of(1, 1, 1);
427431
assertThat(
428-
metricsContainer.getPerWorkerHistogram(name1, testBucket),
432+
metricsContainer.getHistogram(histogramMetricName, testBucket),
429433
sameInstance(NoOpHistogram.getInstance()));
430434

431435
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
432-
assertThat(metricsContainer.getPerWorkerCounter(name1), not(instanceOf(NoOpCounter.class)));
433436
assertThat(
434-
metricsContainer.getPerWorkerHistogram(name1, testBucket),
437+
metricsContainer.getPerWorkerCounter(histogramMetricName),
438+
not(instanceOf(NoOpCounter.class)));
439+
assertThat(
440+
metricsContainer.getHistogram(histogramMetricName, testBucket),
435441
not(instanceOf(NoOpHistogram.class)));
436442
}
437443

@@ -441,9 +447,10 @@ public void testExtractPerWorkerMetricUpdates_populatedMetrics() {
441447
MetricName counterMetricName = MetricName.named("BigQuerySink", "counter");
442448
c1.getPerWorkerCounter(counterMetricName).inc(3);
443449

444-
MetricName histogramMetricName = MetricName.named("BigQuerySink", "histogram");
450+
MetricName histogramMetricName =
451+
MetricName.named("BigQuerySink", "histogram", ImmutableMap.of("PER_WORKER_METRIC", "true"));
445452
HistogramData.LinearBuckets linearBuckets = HistogramData.LinearBuckets.of(0, 10, 10);
446-
c2.getPerWorkerHistogram(histogramMetricName, linearBuckets).update(5.0);
453+
c2.getHistogram(histogramMetricName, linearBuckets).update(5.0);
447454

448455
Iterable<PerStepNamespaceMetrics> updates =
449456
StreamingStepMetricsContainer.extractPerWorkerMetricUpdates(registry);
@@ -488,9 +495,10 @@ public void testExtractPerWorkerMetricUpdates_populatedMetrics() {
488495
public void testExtractPerWorkerMetricUpdatesKafka_populatedHistogramMetrics() {
489496
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
490497

491-
MetricName histogramMetricName = MetricName.named("KafkaSink", "histogram");
498+
MetricName histogramMetricName =
499+
MetricName.named("KafkaSink", "histogram", ImmutableMap.of("PER_WORKER_METRIC", "true"));
492500
HistogramData.LinearBuckets linearBuckets = HistogramData.LinearBuckets.of(0, 10, 10);
493-
c2.getPerWorkerHistogram(histogramMetricName, linearBuckets).update(5.0);
501+
c2.getHistogram(histogramMetricName, linearBuckets).update(5.0);
494502

495503
Iterable<PerStepNamespaceMetrics> updates =
496504
StreamingStepMetricsContainer.extractPerWorkerMetricUpdates(registry);
@@ -552,13 +560,17 @@ public void testExtractPerWorkerMetricUpdatesKafka_populateGaugeMetrics() {
552560
}
553561

554562
@Test
555-
public void testExtractPerWorkerMetricUpdatesKafka_gaugeMetricsDropped() {
563+
public void testExtractPerWorkerMetricUpdatesKafka_perWorkerMetricsDropped() {
556564
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
557565

558566
MetricName gaugeMetricName =
559567
MetricName.named("KafkaSink", "gauge", ImmutableMap.of("PER_WORKER_METRIC", "false"));
560568
c2.getGauge(gaugeMetricName).set(5L);
561569

570+
MetricName histogramMetricName = MetricName.named("BigQuerySink", "histogram");
571+
HistogramData.LinearBuckets linearBuckets = HistogramData.LinearBuckets.of(0, 10, 10);
572+
c2.getHistogram(histogramMetricName, linearBuckets);
573+
562574
Iterable<PerStepNamespaceMetrics> updates =
563575
StreamingStepMetricsContainer.extractPerWorkerMetricUpdates(registry);
564576
assertThat(updates, IsEmptyIterable.emptyIterable());
@@ -573,7 +585,7 @@ public void testExtractPerWorkerMetricUpdates_emptyMetrics() {
573585

574586
MetricName histogramMetricName = MetricName.named("BigQuerySink", "histogram");
575587
HistogramData.LinearBuckets linearBuckets = HistogramData.LinearBuckets.of(0, 10, 10);
576-
c2.getPerWorkerHistogram(histogramMetricName, linearBuckets);
588+
c2.getHistogram(histogramMetricName, linearBuckets);
577589

578590
Iterable<PerStepNamespaceMetrics> updates =
579591
StreamingStepMetricsContainer.extractPerWorkerMetricUpdates(registry);

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfoTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void testTranslateKnownPerWorkerCounters_malformedCounters() throws Excep
6262
.getName();
6363

6464
HistogramData.BucketType linearBuckets = HistogramData.LinearBuckets.of(0, 10.0, 10);
65-
metricsContainer.getPerWorkerHistogram(name, linearBuckets).update(10.0);
65+
metricsContainer.getHistogram(name, linearBuckets).update(10.0);
6666

6767
stageInfo.extractPerWorkerMetricValues();
6868
assertThat(stageInfo.throttledMsecs().getAggregate(), equalTo(0L));

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java

+2-24
Original file line numberDiff line numberDiff line change
@@ -28,37 +28,18 @@ public class DelegatingHistogram implements Metric, Histogram, Serializable {
2828
private final MetricName name;
2929
private final HistogramData.BucketType bucketType;
3030
private final boolean processWideContainer;
31-
private final boolean perWorkerHistogram;
3231

3332
/**
34-
* Create a {@code DelegatingHistogram} with {@code perWorkerHistogram} set to false.
35-
*
3633
* @param name Metric name for this metric.
3734
* @param bucketType Histogram bucketing strategy.
3835
* @param processWideContainer Whether this Counter is stored in the ProcessWide container or the
3936
* current thread's container.
4037
*/
4138
public DelegatingHistogram(
4239
MetricName name, HistogramData.BucketType bucketType, boolean processWideContainer) {
43-
this(name, bucketType, processWideContainer, false);
44-
}
45-
46-
/**
47-
* @param name Metric name for this metric.
48-
* @param bucketType Histogram bucketing strategy.
49-
* @param processWideContainer Whether this Counter is stored in the ProcessWide container or the
50-
* current thread's container.
51-
* @param perWorkerHistogram Whether this Histogram refers to a perWorker metric or not.
52-
*/
53-
public DelegatingHistogram(
54-
MetricName name,
55-
HistogramData.BucketType bucketType,
56-
boolean processWideContainer,
57-
boolean perWorkerHistogram) {
5840
this.name = name;
5941
this.bucketType = bucketType;
6042
this.processWideContainer = processWideContainer;
61-
this.perWorkerHistogram = perWorkerHistogram;
6243
}
6344

6445
private Optional<Histogram> getHistogram() {
@@ -69,11 +50,8 @@ private Optional<Histogram> getHistogram() {
6950
if (container == null) {
7051
return Optional.empty();
7152
}
72-
if (perWorkerHistogram) {
73-
return Optional.of(container.getPerWorkerHistogram(name, bucketType));
74-
} else {
75-
return Optional.of(container.getHistogram(name, bucketType));
76-
}
53+
// LOG.info("Xxx get histogram");
54+
return Optional.of(container.getHistogram(name, bucketType));
7755
}
7856

7957
@Override

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java

-9
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
* appropriate metrics interface for the "current" step.
2727
*/
2828
public interface MetricsContainer extends Serializable {
29-
3029
/**
3130
* Return the {@link Counter} that should be used for implementing the given {@code metricName} in
3231
* this container.
@@ -70,14 +69,6 @@ default Counter getPerWorkerCounter(MetricName metricName) {
7069
* in this container.
7170
*/
7271
default Histogram getHistogram(MetricName metricName, HistogramData.BucketType bucketType) {
73-
throw new RuntimeException("Histogram metric is not supported yet.");
74-
}
75-
/**
76-
* Return the {@link Histogram} that should be used for implementing the given per-worker {@code
77-
* metricName} in this container.
78-
*/
79-
default Histogram getPerWorkerHistogram(
80-
MetricName metricName, HistogramData.BucketType bucketType) {
8172
return NoOpHistogram.getInstance();
8273
}
8374

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpGauge.java

-42
This file was deleted.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.time.Instant;
2222
import javax.annotation.Nonnull;
2323
import javax.annotation.Nullable;
24+
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
2425
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.NestedCounter;
2526
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
2627
import org.apache.beam.sdk.metrics.Counter;
@@ -114,13 +115,14 @@ static Histogram createRPCLatencyHistogram(RpcMethod method) {
114115
LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(RPC_LATENCY);
115116
nameBuilder.addLabel(RPC_METHOD, method.toString());
116117
MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);
118+
nameBuilder.addMetricLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true");
117119

118120
// Create Exponential histogram buckets with the following parameters:
119121
// 0 scale, resulting in bucket widths with a size 2 growth factor.
120122
// 17 buckets, so the max latency of that can be stored is (2^17 millis ~= 130 seconds).
121123
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(0, 17);
122124

123-
return new DelegatingHistogram(metricName, buckets, false, true);
125+
return new DelegatingHistogram(metricName, buckets, false);
124126
}
125127

126128
/**

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public TestMetricsContainer() {
8383
}
8484

8585
@Override
86-
public TestHistogramCell getPerWorkerHistogram(
86+
public TestHistogramCell getHistogram(
8787
MetricName metricName, HistogramData.BucketType bucketType) {
8888
perWorkerHistograms.computeIfAbsent(
8989
KV.of(metricName, bucketType), kv -> new TestHistogramCell(kv));

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,10 @@ public static Histogram createRPCLatencyHistogram(RpcMethod method, String topic
7272
nameBuilder.addLabel(TOPIC_LABEL, topic);
7373

7474
MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);
75-
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17);
75+
nameBuilder.addMetricLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true");
7676

77-
return new DelegatingHistogram(metricName, buckets, false, true);
77+
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17);
78+
return new DelegatingHistogram(metricName, buckets, false);
7879
}
7980

8081
/**

0 commit comments

Comments
 (0)