diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/PrometheusMetricsSink.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/PrometheusMetricsSink.java index e77553ea3a41..320f92efc79e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/PrometheusMetricsSink.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/PrometheusMetricsSink.java @@ -45,7 +45,9 @@ public class PrometheusMetricsSink implements MetricsSink { /** * Cached output lines for each metrics. */ - private final Map> metricLines = + private Map> metricLines = + Collections.synchronizedSortedMap(new TreeMap<>()); + private Map> nextMetricLines = Collections.synchronizedSortedMap(new TreeMap<>()); private static final Pattern SPLIT_PATTERN = @@ -59,15 +61,15 @@ public PrometheusMetricsSink() { @Override public void putMetrics(MetricsRecord metricsRecord) { - for (AbstractMetric metrics : metricsRecord.metrics()) { - if (metrics.type() == MetricType.COUNTER - || metrics.type() == MetricType.GAUGE) { + for (AbstractMetric metric : metricsRecord.metrics()) { + if (metric.type() == MetricType.COUNTER + || metric.type() == MetricType.GAUGE) { String metricName = DecayRpcSchedulerUtil - .splitMetricNameIfNeeded(metricsRecord.name(), metrics.name()); + .splitMetricNameIfNeeded(metricsRecord.name(), metric.name()); // If there is no username this should be null String username = DecayRpcSchedulerUtil - .checkMetricNameForUsername(metricsRecord.name(), metrics.name()); + .checkMetricNameForUsername(metricsRecord.name(), metric.name()); String key = prometheusName( metricsRecord.name(), metricName); @@ -78,11 +80,13 @@ public void putMetrics(MetricsRecord metricsRecord) { String metricKey = "# TYPE " + key + " " - + metrics.type().toString().toLowerCase(); + + metric.type().toString().toLowerCase(); - metricLines.computeIfAbsent(metricKey, - any -> Collections.synchronizedSortedMap(new TreeMap<>())) - .put(prometheusMetricKeyAsString, String.valueOf(metrics.value())); + synchronized (this) { + nextMetricLines.computeIfAbsent(metricKey, + any -> Collections.synchronizedSortedMap(new TreeMap<>())) + .put(prometheusMetricKeyAsString, String.valueOf(metric.value())); + } } } } @@ -146,7 +150,11 @@ public static String normalizeName(String baseName) { @Override public void flush() { - + synchronized (this) { + metricLines = nextMetricLines; + nextMetricLines = Collections + .synchronizedSortedMap(new TreeMap<>()); + } } @Override @@ -154,7 +162,8 @@ public void init(SubsetConfiguration subsetConfiguration) { } - public void writeMetrics(Writer writer) throws IOException { + public synchronized void writeMetrics(Writer writer) + throws IOException { for (Map.Entry> metricsEntry : metricLines.entrySet()) { writer.write(metricsEntry.getKey() + "\n"); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsIntegration.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsIntegration.java new file mode 100644 index 000000000000..bc2ccef57dad --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsIntegration.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server.http; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Test prometheus Metrics. + */ +public class TestPrometheusMetricsIntegration { + + private MetricsSystem metrics; + private PrometheusMetricsSink sink; + + private static final MetricsInfo PORT_INFO = new MetricsInfo() { + @Override + public String name() { + return "PORT"; + } + + @Override + public String description() { + return "port"; + } + }; + + private static final MetricsInfo COUNTER_INFO = new MetricsInfo() { + @Override + public String name() { + return "COUNTER"; + } + + @Override + public String description() { + return "counter"; + } + }; + + private static final int COUNTER_1 = 123; + private static final int COUNTER_2 = 234; + + @BeforeEach + public void init() { + metrics = DefaultMetricsSystem.instance(); + + metrics.init("test"); + sink = new PrometheusMetricsSink(); + metrics.register("Prometheus", "Prometheus", sink); + } + + @AfterEach + public void tearDown() { + metrics.stop(); + metrics.shutdown(); + } + + @Test + public void testPublish() + throws InterruptedException, TimeoutException { + //GIVEN + TestMetrics testMetrics = metrics + .register("TestMetrics", "Testing metrics", new TestMetrics()); + + testMetrics.numBucketCreateFails.incr(); + + String writtenMetrics = waitForMetricsToPublish("test_metrics_num"); + + //THEN + Assertions.assertTrue( + writtenMetrics.contains( + "test_metrics_num_bucket_create_fails{context=\"dfs\""), + "The expected metric line is missing from prometheus metrics output" + ); + + metrics.unregisterSource("TestMetrics"); + } + + @Test + public void testPublishWithSameName() + throws InterruptedException, TimeoutException { + // GIVEN + metrics.register("FooBar", "fooBar", (MetricsSource) (collector, all) -> { + collector.addRecord("RpcMetrics").add(new MetricsTag(PORT_INFO, "1234")) + .addGauge(COUNTER_INFO, COUNTER_1).endRecord(); + + collector.addRecord("RpcMetrics").add(new MetricsTag( + PORT_INFO, "2345")).addGauge(COUNTER_INFO, COUNTER_2).endRecord(); + }); + + String writtenMetrics = waitForMetricsToPublish("rpc_metrics_counter"); + + // THEN + Assertions.assertTrue( + writtenMetrics.contains("rpc_metrics_counter{port=\"2345\""), + "The expected metric line is missing from prometheus metrics output"); + + Assertions.assertTrue( + writtenMetrics.contains("rpc_metrics_counter{port=\"1234\""), + "The expected metric line is missing from prometheus metrics output"); + + metrics.unregisterSource("FooBar"); + } + + @Test + public void testTypeWithSameNameButDifferentLabels() + throws InterruptedException, TimeoutException { + // GIVEN + metrics.register("SameName", "sameName", + (MetricsSource) (collector, all) -> { + collector.addRecord("SameName").add(new MetricsTag(PORT_INFO, "1234")) + .addGauge(COUNTER_INFO, COUNTER_1).endRecord(); + collector.addRecord("SameName").add(new MetricsTag(PORT_INFO, "2345")) + .addGauge(COUNTER_INFO, COUNTER_2).endRecord(); + }); + + // WHEN + String writtenMetrics = waitForMetricsToPublish("same_name_counter"); + + // THEN + Assertions.assertEquals(1, StringUtils.countMatches(writtenMetrics, + "# TYPE same_name_counter")); + + // both metrics should be present + Assertions.assertTrue( + writtenMetrics.contains("same_name_counter{port=\"1234\""), + "The expected metric line is present in prometheus metrics output"); + Assertions.assertTrue( + writtenMetrics.contains("same_name_counter{port=\"2345\""), + "The expected metric line is present in prometheus metrics output"); + + metrics.unregisterSource("SameName"); + } + + /** + * Make sure Prometheus metrics start fresh after each flush. + * Publish the metrics and flush them, + * then unregister one of them and register another. + * Publish and flush the metrics again + * and then check that the unregistered metric is not present. + */ + @Test + public void testRemovingStaleMetricsOnFlush() + throws InterruptedException, TimeoutException { + // GIVEN + metrics.register("StaleMetric", "staleMetric", + (MetricsSource) (collector, all) -> + collector.addRecord("StaleMetric") + .add(new MetricsTag(PORT_INFO, "1234")) + .addGauge(COUNTER_INFO, COUNTER_1).endRecord()); + + waitForMetricsToPublish("stale_metric_counter"); + + // unregister the metric + metrics.unregisterSource("StaleMetric"); + + metrics.register("SomeMetric", "someMetric", + (MetricsSource) (collector, all) -> + collector.addRecord("SomeMetric") + .add(new MetricsTag(PORT_INFO, "4321")) + .addGauge(COUNTER_INFO, COUNTER_2).endRecord()); + + String writtenMetrics = waitForMetricsToPublish("some_metric_counter"); + + // THEN + // The first metric shouldn't be present + Assertions.assertFalse( + writtenMetrics.contains("stale_metric_counter{port=\"1234\""), + "The expected metric line is present in prometheus metrics output"); + Assertions.assertTrue( + writtenMetrics.contains("some_metric_counter{port=\"4321\""), + "The expected metric line is present in prometheus metrics output"); + + metrics.unregisterSource("SomeMetric"); + } + + private String publishMetricsAndGetOutput() throws IOException { + metrics.publishMetricsNow(); + + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8); + + sink.writeMetrics(writer); + writer.flush(); + return stream.toString(UTF_8.name()); + } + + /** + * metrics.publishMetricsNow() might not finish in a reasonable + * amount of time leading to a full queue and any further attempt + * for publishing to fail. Wrapping the call with + * GenericTestUtils.waitFor() to retry until the queue has been + * cleared and publish is a success. + * + * @param registeredMetric to check if it's published + * @return all published metrics + */ + private String waitForMetricsToPublish(String registeredMetric) + throws InterruptedException, TimeoutException { + + final String[] writtenMetrics = new String[1]; + + GenericTestUtils.waitFor(() -> { + try { + writtenMetrics[0] = publishMetricsAndGetOutput(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return writtenMetrics[0].contains(registeredMetric); + }, 1000, 120000); + + return writtenMetrics[0]; + } + + /** + * Example metric pojo. + */ + @Metrics(about = "Test Metrics", context = "dfs") + private static class TestMetrics { + + @Metric + private MutableCounterLong numBucketCreateFails; + } + +} diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java index 5339a06bbb61..dc15d805d670 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java @@ -17,23 +17,8 @@ */ package org.apache.hadoop.hdds.server.http; -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.metrics2.MetricsInfo; -import org.apache.hadoop.metrics2.MetricsSource; -import org.apache.hadoop.metrics2.MetricsSystem; -import org.apache.hadoop.metrics2.MetricsTag; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; /** @@ -41,111 +26,11 @@ */ public class TestPrometheusMetricsSink { - private MetricsSystem metrics; - private PrometheusMetricsSink sink; - - private static final MetricsInfo PORT_INFO = new MetricsInfo() { - @Override - public String name() { - return "PORT"; - } - - @Override - public String description() { - return "port"; - } - }; - - private static final MetricsInfo COUNTER_INFO = new MetricsInfo() { - @Override - public String name() { - return "COUNTER"; - } - - @Override - public String description() { - return "counter"; - } - }; - - private static final int COUNTER_1 = 123; - private static final int COUNTER_2 = 234; + private static PrometheusMetricsSink sink; - @BeforeEach - public void init() { - metrics = DefaultMetricsSystem.instance(); - - metrics.init("test"); + @BeforeAll + public static void setUp() { sink = new PrometheusMetricsSink(); - metrics.register("Prometheus", "Prometheus", sink); - } - - @AfterEach - public void tearDown() { - metrics.stop(); - metrics.shutdown(); - } - - @Test - public void testPublish() throws IOException { - //GIVEN - TestMetrics testMetrics = metrics - .register("TestMetrics", "Testing metrics", new TestMetrics()); - - testMetrics.numBucketCreateFails.incr(); - - //WHEN - String writtenMetrics = publishMetricsAndGetOutput(); - - //THEN - Assertions.assertTrue( - writtenMetrics.contains( - "test_metrics_num_bucket_create_fails{context=\"dfs\""), - "The expected metric line is missing from prometheus metrics output" - ); - } - - @Test - public void testPublishWithSameName() throws IOException { - //GIVEN - metrics.register("FooBar", "fooBar", (MetricsSource) (collector, all) -> { - collector.addRecord("RpcMetrics").add(new MetricsTag(PORT_INFO, "1234")) - .addGauge(COUNTER_INFO, COUNTER_1).endRecord(); - - collector.addRecord("RpcMetrics").add(new MetricsTag( - PORT_INFO, "2345")).addGauge(COUNTER_INFO, COUNTER_2).endRecord(); - }); - - // WHEN - String writtenMetrics = publishMetricsAndGetOutput(); - - // THEN - Assertions.assertTrue( - writtenMetrics.contains("rpc_metrics_counter{port=\"2345\""), - "The expected metric line is missing from prometheus metrics output"); - - Assertions.assertTrue( - writtenMetrics.contains("rpc_metrics_counter{port=\"1234\""), - "The expected metric line is missing from prometheus metrics output"); - } - - @Test - public void testTypeWithSameNameButDifferentLabels() throws IOException { - //GIVEN - metrics.register("SameName", "sameName", - (MetricsSource) (collector, all) -> { - collector.addRecord("SameName").add(new MetricsTag(PORT_INFO, "1234")) - .addGauge(COUNTER_INFO, COUNTER_1).endRecord(); - collector.addRecord("SameName").add(new MetricsTag(PORT_INFO, "2345")) - .addGauge(COUNTER_INFO, COUNTER_2).endRecord(); - }); - - // WHEN - String writtenMetrics = publishMetricsAndGetOutput(); - - // THEN - Assertions.assertEquals(1, StringUtils.countMatches(writtenMetrics, - "# TYPE same_name_counter")); } @Test @@ -195,27 +80,4 @@ public void testNamingSpaces() { "jvm_metrics_gc_time_millis_g1_young_generation", sink.prometheusName(recordName, metricName)); } - - private String publishMetricsAndGetOutput() throws IOException { - metrics.publishMetricsNow(); - - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8); - - sink.writeMetrics(writer); - writer.flush(); - - return stream.toString(UTF_8.name()); - } - - /** - * Example metric pojo. - */ - @Metrics(about = "Test Metrics", context = "dfs") - private static class TestMetrics { - - @Metric - private MutableCounterLong numBucketCreateFails; - } - }