Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.io.Writer;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
Expand All @@ -42,7 +43,10 @@ public class PrometheusMetricsSink implements MetricsSink {
/**
* Cached output lines for each metrics.
*/
private final Map<String, String> metricLines = new ConcurrentHashMap<>();
private Map<String, Map<Collection<MetricsTag>, AbstractMetric>> promMetrics =
new ConcurrentHashMap<>();
private Map<String, Map<Collection<MetricsTag>, AbstractMetric>> nextPromMetrics =
new ConcurrentHashMap<>();

private static final Pattern SPLIT_PATTERN =
Pattern.compile("(?<!(^|[A-Z_]))(?=[A-Z])|(?<!^)(?=[A-Z][a-z])");
Expand All @@ -53,42 +57,16 @@ public PrometheusMetricsSink() {

@Override
public void putMetrics(MetricsRecord metricsRecord) {
for (AbstractMetric metrics : metricsRecord.metrics()) {
if (metrics.type() == MetricType.COUNTER
|| metrics.type() == MetricType.GAUGE) {
for (AbstractMetric metric : metricsRecord.metrics()) {
if (metric.type() == MetricType.COUNTER
|| metric.type() == MetricType.GAUGE) {

String key = prometheusName(
metricsRecord.name(), metrics.name());

StringBuilder builder = new StringBuilder();
builder.append("# TYPE ")
.append(key)
.append(" ")
.append(metrics.type().toString().toLowerCase())
.append("\n")
.append(key)
.append("{");
String sep = "";

//add tags
for (MetricsTag tag : metricsRecord.tags()) {
String tagName = tag.name().toLowerCase();

//ignore specific tag which includes sub-hierarchy
if (!tagName.equals("numopenconnectionsperuser")) {
builder.append(sep)
.append(tagName)
.append("=\"")
.append(tag.value())
.append("\"");
sep = ",";
}
}
builder.append("} ");
builder.append(metrics.value());
builder.append("\n");
metricLines.put(key, builder.toString());
metricsRecord.name(), metric.name());

nextPromMetrics.computeIfAbsent(key,
any -> new ConcurrentHashMap<>())
.put(metricsRecord.tags(), metric);
}
}
}
Expand All @@ -108,17 +86,55 @@ public String prometheusName(String recordName,

@Override
public void flush() {

promMetrics = nextPromMetrics;
nextPromMetrics = new ConcurrentHashMap<>();
}

@Override
public void init(SubsetConfiguration subsetConfiguration) {

public void init(SubsetConfiguration conf) {
}

public void writeMetrics(Writer writer) throws IOException {
for (String line : metricLines.values()) {
writer.write(line);
for (Map.Entry<String, Map<Collection<MetricsTag>, AbstractMetric>> promMetric :
promMetrics.entrySet()) {
AbstractMetric firstMetric = promMetric.getValue().values().iterator().next();

StringBuilder builder = new StringBuilder();
builder.append("# HELP ")
.append(promMetric.getKey())
.append(" ")
.append(firstMetric.description())
.append("\n")
.append("# TYPE ")
.append(promMetric.getKey())
.append(" ")
.append(firstMetric.type().toString().toLowerCase())
.append("\n");

for (Map.Entry<Collection<MetricsTag>, AbstractMetric> metric :
promMetric.getValue().entrySet()) {
builder.append(promMetric.getKey())
.append("{");

String sep = "";
for (MetricsTag tag : metric.getKey()) {
String tagName = tag.name().toLowerCase();

if (!tagName.equals("numopenconnectionsperuser")) {
builder.append(sep)
.append(tagName)
.append("=\"")
.append(tag.value())
.append("\"");
sep = ",";
}
}
builder.append("} ");
builder.append(metric.getValue().value());
builder.append("\n");
}

writer.write(builder.toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.annotation.Metric.Type;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;

Expand All @@ -48,7 +49,6 @@ public void testPublish() throws IOException {
TestMetrics testMetrics = metrics
.register("TestMetrics", "Testing metrics", new TestMetrics());

metrics.start();
testMetrics.numBucketCreateFails.incr();
metrics.publishMetricsNow();
ByteArrayOutputStream stream = new ByteArrayOutputStream();
Expand All @@ -67,6 +67,104 @@ public void testPublish() throws IOException {
"test_metrics_num_bucket_create_fails{context=\"dfs\"")
);

metrics.unregisterSource("TestMetrics");
metrics.stop();
metrics.shutdown();
}

/**
* Fix for HADOOP-17804, make sure Prometheus metrics get deduped based on metric
* and tags, not just the metric.
*/
@Test
public void testPublishMultiple() throws IOException {
//GIVEN
MetricsSystem metrics = DefaultMetricsSystem.instance();

metrics.init("test");
PrometheusMetricsSink sink = new PrometheusMetricsSink();
metrics.register("Prometheus", "Prometheus", sink);
TestMetrics testMetrics1 = metrics
.register("TestMetrics1", "Testing metrics", new TestMetrics("1"));
TestMetrics testMetrics2 = metrics
.register("TestMetrics2", "Testing metrics", new TestMetrics("2"));

testMetrics1.numBucketCreateFails.incr();
testMetrics2.numBucketCreateFails.incr();
metrics.publishMetricsNow();
ByteArrayOutputStream stream = new ByteArrayOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8);

//WHEN
sink.writeMetrics(writer);
writer.flush();

//THEN
String writtenMetrics = stream.toString(UTF_8.name());
System.out.println(writtenMetrics);
Assert.assertTrue(
"The expected first metric line is missing from prometheus metrics output",
writtenMetrics.contains(
"test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue1\"")
);
Assert.assertTrue(
"The expected second metric line is missing from prometheus metrics output",
writtenMetrics.contains(
"test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue2\"")
);

metrics.unregisterSource("TestMetrics1");
metrics.unregisterSource("TestMetrics2");
metrics.stop();
metrics.shutdown();
}

/**
* Fix for HADOOP-17804, make sure Prometheus metrics start fresh after each flush.
*/
@Test
public void testPublishFlush() throws IOException {
//GIVEN
MetricsSystem metrics = DefaultMetricsSystem.instance();

metrics.init("test");
PrometheusMetricsSink sink = new PrometheusMetricsSink();
metrics.register("Prometheus", "Prometheus", sink);
TestMetrics testMetrics = metrics
.register("TestMetrics", "Testing metrics", new TestMetrics("1"));

testMetrics.numBucketCreateFails.incr();
metrics.publishMetricsNow();

metrics.unregisterSource("TestMetrics");
testMetrics = metrics
.register("TestMetrics", "Testing metrics", new TestMetrics("2"));

testMetrics.numBucketCreateFails.incr();
metrics.publishMetricsNow();

ByteArrayOutputStream stream = new ByteArrayOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8);

//WHEN
sink.writeMetrics(writer);
writer.flush();

//THEN
String writtenMetrics = stream.toString(UTF_8.name());
System.out.println(writtenMetrics);
Assert.assertFalse(
"The first metric should not exist after flushing",
writtenMetrics.contains(
"test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue1\"")
);
Assert.assertTrue(
"The expected metric line is missing from prometheus metrics output",
writtenMetrics.contains(
"test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue2\"")
);

metrics.unregisterSource("TestMetrics");
metrics.stop();
metrics.shutdown();
}
Expand Down Expand Up @@ -126,6 +224,20 @@ public void testNamingWhitespaces() {
*/
@Metrics(about = "Test Metrics", context = "dfs")
private static class TestMetrics {
private String id;

TestMetrics() {
this("1");
}

TestMetrics(String id) {
this.id = id;
}

@Metric(value={"testTag", ""}, type=Type.TAG)
String testTag1() {
return "testTagValue" + id;
}

@Metric
private MutableCounterLong numBucketCreateFails;
Expand Down