Skip to content

Commit

Permalink
Stop using KV as the Key for Histogram Metrics (#31166)
Browse files Browse the repository at this point in the history
  • Loading branch information
JayajP authored May 7, 2024
1 parent c0c8d78 commit 815639b
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.ImmutableLongArray;

/**
Expand All @@ -54,9 +53,9 @@ public final class LockFreeHistogram implements Histogram {
private final AtomicBoolean dirty;

/** Create a histogram. */
public LockFreeHistogram(KV<MetricName, HistogramData.BucketType> kv) {
this.name = kv.getKey();
this.bucketType = kv.getValue();
public LockFreeHistogram(MetricName name, HistogramData.BucketType bucketType) {
this.name = name;
this.bucketType = bucketType;
this.buckets = new AtomicLongArray(bucketType.getNumBuckets());
this.underflowStatistic =
new AtomicReference<LockFreeHistogram.OutlierStatistic>(OutlierStatistic.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
Expand Down Expand Up @@ -71,8 +70,8 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
private MetricsMap<MetricName, DeltaDistributionCell> distributions =
new MetricsMap<>(DeltaDistributionCell::new);

private MetricsMap<KV<MetricName, HistogramData.BucketType>, LockFreeHistogram>
perWorkerHistograms = new MetricsMap<>(LockFreeHistogram::new);
private final ConcurrentHashMap<MetricName, LockFreeHistogram> perWorkerHistograms =
new ConcurrentHashMap<>();

private final Map<MetricName, Instant> perWorkerCountersByFirstStaleTime;

Expand Down Expand Up @@ -163,11 +162,17 @@ public Gauge getGauge(MetricName metricName) {
@Override
public Histogram getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
if (enablePerWorkerMetrics) {
return perWorkerHistograms.get(KV.of(metricName, bucketType));
} else {
if (!enablePerWorkerMetrics) {
return MetricsContainer.super.getPerWorkerHistogram(metricName, bucketType);
}

LockFreeHistogram val = perWorkerHistograms.get(metricName);
if (val != null) {
return val;
}

return perWorkerHistograms.computeIfAbsent(
metricName, name -> new LockFreeHistogram(metricName, bucketType));
}

public Iterable<CounterUpdate> extractUpdates() {
Expand Down Expand Up @@ -316,7 +321,7 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
});
perWorkerHistograms.forEach(
(k, v) -> {
v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k.getKey(), snapshot));
v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k, snapshot));
});

deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.Future;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.ImmutableLongArray;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -43,7 +42,7 @@ public class LockFreeHistogramTest {
public void testUpdate_OverflowValues() {
HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 10, 3);
LockFreeHistogram histogram =
new LockFreeHistogram(KV.of(MetricName.named("name", "namespace"), bucketType));
new LockFreeHistogram(MetricName.named("name", "namespace"), bucketType);
histogram.update(35, 40, 45);
Optional<LockFreeHistogram.Snapshot> snapshot = histogram.getSnapshotAndReset();

Expand All @@ -65,7 +64,7 @@ public void testUpdate_OverflowValues() {
public void testUpdate_UnderflowValues() {
HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(100, 10, 3);
LockFreeHistogram histogram =
new LockFreeHistogram(KV.of(MetricName.named("name", "namespace"), bucketType));
new LockFreeHistogram(MetricName.named("name", "namespace"), bucketType);
histogram.update(35, 40, 45);
Optional<LockFreeHistogram.Snapshot> snapshot = histogram.getSnapshotAndReset();

Expand All @@ -86,7 +85,7 @@ public void testUpdate_UnderflowValues() {
public void testUpdate_InBoundsValues() {
HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 10, 3);
LockFreeHistogram histogram =
new LockFreeHistogram(KV.of(MetricName.named("name", "namespace"), bucketType));
new LockFreeHistogram(MetricName.named("name", "namespace"), bucketType);
histogram.update(5, 15, 25);
Optional<LockFreeHistogram.Snapshot> snapshot = histogram.getSnapshotAndReset();

Expand All @@ -105,7 +104,7 @@ public void testUpdate_InBoundsValues() {
public void testUpdate_EmptySnapshot() {
HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 10, 3);
LockFreeHistogram histogram =
new LockFreeHistogram(KV.of(MetricName.named("name", "namespace"), bucketType));
new LockFreeHistogram(MetricName.named("name", "namespace"), bucketType);
histogram.update(5, 15, 25);
Optional<LockFreeHistogram.Snapshot> snapshot_1 = histogram.getSnapshotAndReset();

Expand Down Expand Up @@ -155,7 +154,7 @@ public void testUpdateAndSnapshots_MultipleThreads() {

HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(1, 10);
LockFreeHistogram histogram =
new LockFreeHistogram(KV.of(MetricName.named("name", "namespace"), bucketType));
new LockFreeHistogram(MetricName.named("name", "namespace"), bucketType);

List<UpdateHistogramCallable> callables = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.beam.sdk.metrics.LabeledMetricNameUtils;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.ImmutableLongArray;
import org.hamcrest.collection.IsMapContaining;
Expand Down Expand Up @@ -139,8 +138,7 @@ public void testConvert_skipInvalidMetricNames() {

Map<MetricName, LockFreeHistogram.Snapshot> histograms = new HashMap<>();
MetricName invalidName2 = MetricName.named("BigQuerySink", "****");
LockFreeHistogram nonEmptyLinearHistogram =
new LockFreeHistogram(KV.of(invalidName2, lienarBuckets));
LockFreeHistogram nonEmptyLinearHistogram = new LockFreeHistogram(invalidName2, lienarBuckets);
nonEmptyLinearHistogram.update(-5.0);
histograms.put(invalidName2, nonEmptyLinearHistogram.getSnapshotAndReset().get());

Expand All @@ -162,12 +160,12 @@ public void testConvert_successfulConvertHistograms() {
MetricName bigQueryMetric3 = MetricName.named("BigQuerySink", "zeroValue");

LockFreeHistogram nonEmptyLinearHistogram =
new LockFreeHistogram(KV.of(bigQueryMetric1, lienarBuckets));
new LockFreeHistogram(bigQueryMetric1, lienarBuckets);
nonEmptyLinearHistogram.update(-5.0, 15.0, 25.0, 35.0, 105.0);
histograms.put(bigQueryMetric1, nonEmptyLinearHistogram.getSnapshotAndReset().get());

LockFreeHistogram noEmptyExponentialHistogram =
new LockFreeHistogram(KV.of(bigQueryMetric2, exponentialBuckets));
new LockFreeHistogram(bigQueryMetric2, exponentialBuckets);
noEmptyExponentialHistogram.update(-5.0, 15.0, 25.0, 35.0, 105.0);
histograms.put(bigQueryMetric2, noEmptyExponentialHistogram.getSnapshotAndReset().get());

Expand Down Expand Up @@ -267,8 +265,7 @@ public void testConvert_skipUnknownHistogramBucketType() {
Map<MetricName, LockFreeHistogram.Snapshot> histograms = new HashMap<>();

MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "baseLabel");
LockFreeHistogram histogram =
new LockFreeHistogram(KV.of(bigQueryMetric1, new TestBucketType()));
LockFreeHistogram histogram = new LockFreeHistogram(bigQueryMetric1, new TestBucketType());
histogram.update(1.0, 2.0);
histograms.put(bigQueryMetric1, histogram.getSnapshotAndReset().get());

Expand All @@ -290,8 +287,7 @@ public void testConvert_convertCountersAndHistograms() {
counters.put(counterMetricName, 3L);

MetricName histogramMetricName = MetricName.named("BigQuerySink", "histogram*label2:val2;");
LockFreeHistogram linearHistogram =
new LockFreeHistogram(KV.of(histogramMetricName, lienarBuckets));
LockFreeHistogram linearHistogram = new LockFreeHistogram(histogramMetricName, lienarBuckets);
linearHistogram.update(5.0);
histograms.put(histogramMetricName, linearHistogram.getSnapshotAndReset().get());

Expand Down

0 comments on commit 815639b

Please sign in to comment.