From 815639b89acf9a943dae0ca8d81ae9988451579d Mon Sep 17 00:00:00 2001 From: JayajP Date: Tue, 7 May 2024 13:56:13 -0700 Subject: [PATCH] Stop using KV as the Key for Histogram Metrics (#31166) --- .../dataflow/worker/LockFreeHistogram.java | 7 +++---- .../worker/StreamingStepMetricsContainer.java | 19 ++++++++++++------- .../worker/LockFreeHistogramTest.java | 11 +++++------ ...oPerStepNamespaceMetricsConverterTest.java | 14 +++++--------- 4 files changed, 25 insertions(+), 26 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/LockFreeHistogram.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/LockFreeHistogram.java index bc42e1283240..07f1a9f84967 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/LockFreeHistogram.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/LockFreeHistogram.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.util.HistogramData; -import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.ImmutableLongArray; /** @@ -54,9 +53,9 @@ public final class LockFreeHistogram implements Histogram { private final AtomicBoolean dirty; /** Create a histogram. */ - public LockFreeHistogram(KV kv) { - this.name = kv.getKey(); - this.bucketType = kv.getValue(); + public LockFreeHistogram(MetricName name, HistogramData.BucketType bucketType) { + this.name = name; + this.bucketType = bucketType; this.buckets = new AtomicLongArray(bucketType.getNumBuckets()); this.underflowStatistic = new AtomicReference(OutlierStatistic.EMPTY); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index f5b8185bd6be..15fdbf4ab7dd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -42,7 +42,6 @@ import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.util.HistogramData; -import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; @@ -71,8 +70,8 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private MetricsMap distributions = new MetricsMap<>(DeltaDistributionCell::new); - private MetricsMap, LockFreeHistogram> - perWorkerHistograms = new MetricsMap<>(LockFreeHistogram::new); + private final ConcurrentHashMap perWorkerHistograms = + new ConcurrentHashMap<>(); private final Map perWorkerCountersByFirstStaleTime; @@ -163,11 +162,17 @@ public Gauge getGauge(MetricName metricName) { @Override public Histogram getPerWorkerHistogram( MetricName metricName, HistogramData.BucketType bucketType) { - if (enablePerWorkerMetrics) { - return perWorkerHistograms.get(KV.of(metricName, bucketType)); - } else { + if (!enablePerWorkerMetrics) { return MetricsContainer.super.getPerWorkerHistogram(metricName, bucketType); } + + LockFreeHistogram val = perWorkerHistograms.get(metricName); + if (val != null) { + return val; + } + + return perWorkerHistograms.computeIfAbsent( + metricName, name -> new LockFreeHistogram(metricName, bucketType)); } public Iterable extractUpdates() { @@ -316,7 +321,7 @@ Iterable extractPerWorkerMetricUpdates() { }); perWorkerHistograms.forEach( (k, v) -> { - v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k.getKey(), snapshot)); + v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k, snapshot)); }); deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock)); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/LockFreeHistogramTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/LockFreeHistogramTest.java index dfb63a36f836..6acb5904e21a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/LockFreeHistogramTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/LockFreeHistogramTest.java @@ -29,7 +29,6 @@ import java.util.concurrent.Future; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.util.HistogramData; -import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.ImmutableLongArray; import org.junit.Test; import org.junit.runner.RunWith; @@ -43,7 +42,7 @@ public class LockFreeHistogramTest { public void testUpdate_OverflowValues() { HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 10, 3); LockFreeHistogram histogram = - new LockFreeHistogram(KV.of(MetricName.named("name", "namespace"), bucketType)); + new LockFreeHistogram(MetricName.named("name", "namespace"), bucketType); histogram.update(35, 40, 45); Optional snapshot = histogram.getSnapshotAndReset(); @@ -65,7 +64,7 @@ public void testUpdate_OverflowValues() { public void testUpdate_UnderflowValues() { HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(100, 10, 3); LockFreeHistogram histogram = - new LockFreeHistogram(KV.of(MetricName.named("name", "namespace"), bucketType)); + new LockFreeHistogram(MetricName.named("name", "namespace"), bucketType); histogram.update(35, 40, 45); Optional snapshot = histogram.getSnapshotAndReset(); @@ -86,7 +85,7 @@ public void testUpdate_UnderflowValues() { public void testUpdate_InBoundsValues() { HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 10, 3); LockFreeHistogram histogram = - new LockFreeHistogram(KV.of(MetricName.named("name", "namespace"), bucketType)); + new LockFreeHistogram(MetricName.named("name", "namespace"), bucketType); histogram.update(5, 15, 25); Optional snapshot = histogram.getSnapshotAndReset(); @@ -105,7 +104,7 @@ public void testUpdate_InBoundsValues() { public void testUpdate_EmptySnapshot() { HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 10, 3); LockFreeHistogram histogram = - new LockFreeHistogram(KV.of(MetricName.named("name", "namespace"), bucketType)); + new LockFreeHistogram(MetricName.named("name", "namespace"), bucketType); histogram.update(5, 15, 25); Optional snapshot_1 = histogram.getSnapshotAndReset(); @@ -155,7 +154,7 @@ public void testUpdateAndSnapshots_MultipleThreads() { HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(1, 10); LockFreeHistogram histogram = - new LockFreeHistogram(KV.of(MetricName.named("name", "namespace"), bucketType)); + new LockFreeHistogram(MetricName.named("name", "namespace"), bucketType); List callables = new ArrayList<>(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java index 4e77c5f92018..f9a9bb42906b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.metrics.LabeledMetricNameUtils; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.util.HistogramData; -import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.ImmutableLongArray; import org.hamcrest.collection.IsMapContaining; @@ -139,8 +138,7 @@ public void testConvert_skipInvalidMetricNames() { Map histograms = new HashMap<>(); MetricName invalidName2 = MetricName.named("BigQuerySink", "****"); - LockFreeHistogram nonEmptyLinearHistogram = - new LockFreeHistogram(KV.of(invalidName2, lienarBuckets)); + LockFreeHistogram nonEmptyLinearHistogram = new LockFreeHistogram(invalidName2, lienarBuckets); nonEmptyLinearHistogram.update(-5.0); histograms.put(invalidName2, nonEmptyLinearHistogram.getSnapshotAndReset().get()); @@ -162,12 +160,12 @@ public void testConvert_successfulConvertHistograms() { MetricName bigQueryMetric3 = MetricName.named("BigQuerySink", "zeroValue"); LockFreeHistogram nonEmptyLinearHistogram = - new LockFreeHistogram(KV.of(bigQueryMetric1, lienarBuckets)); + new LockFreeHistogram(bigQueryMetric1, lienarBuckets); nonEmptyLinearHistogram.update(-5.0, 15.0, 25.0, 35.0, 105.0); histograms.put(bigQueryMetric1, nonEmptyLinearHistogram.getSnapshotAndReset().get()); LockFreeHistogram noEmptyExponentialHistogram = - new LockFreeHistogram(KV.of(bigQueryMetric2, exponentialBuckets)); + new LockFreeHistogram(bigQueryMetric2, exponentialBuckets); noEmptyExponentialHistogram.update(-5.0, 15.0, 25.0, 35.0, 105.0); histograms.put(bigQueryMetric2, noEmptyExponentialHistogram.getSnapshotAndReset().get()); @@ -267,8 +265,7 @@ public void testConvert_skipUnknownHistogramBucketType() { Map histograms = new HashMap<>(); MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "baseLabel"); - LockFreeHistogram histogram = - new LockFreeHistogram(KV.of(bigQueryMetric1, new TestBucketType())); + LockFreeHistogram histogram = new LockFreeHistogram(bigQueryMetric1, new TestBucketType()); histogram.update(1.0, 2.0); histograms.put(bigQueryMetric1, histogram.getSnapshotAndReset().get()); @@ -290,8 +287,7 @@ public void testConvert_convertCountersAndHistograms() { counters.put(counterMetricName, 3L); MetricName histogramMetricName = MetricName.named("BigQuerySink", "histogram*label2:val2;"); - LockFreeHistogram linearHistogram = - new LockFreeHistogram(KV.of(histogramMetricName, lienarBuckets)); + LockFreeHistogram linearHistogram = new LockFreeHistogram(histogramMetricName, lienarBuckets); linearHistogram.update(5.0); histograms.put(histogramMetricName, linearHistogram.getSnapshotAndReset().get());