Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;

import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.UncheckedExecutionException;
import org.apache.commons.configuration2.SubsetConfiguration;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricType;
Expand All @@ -35,13 +41,16 @@
import java.util.regex.Pattern;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Metrics sink for prometheus exporter.
* <p>
* Stores the metric data in-memory and return with it on request.
*/
public class PrometheusMetricsSink implements MetricsSink {
private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricsSink.class);

/**
* Cached output lines for each metrics.
Expand All @@ -62,6 +71,17 @@ public class PrometheusMetricsSink implements MetricsSink {
Pattern
.compile("^op=(?<op>\\w+)(.user=(?<user>.*)|)\\.(TotalCount|count)$");

/**
* A fixed cache for Hadoop metric to Prometheus metric name conversion.
*/
private static final int NORMALIZED_NAME_CACHE_MAX_SIZE = 100_000;
private static final CacheLoader<String, String> NORMALIZED_NAME_CACHE_LOADER =
CacheLoader.from(PrometheusMetricsSink::normalizeImpl);
private static final LoadingCache<String, String> NORMALIZED_NAME_CACHE =
CacheBuilder.newBuilder()
.maximumSize(NORMALIZED_NAME_CACHE_MAX_SIZE)
.build(NORMALIZED_NAME_CACHE_LOADER);

public PrometheusMetricsSink() {
}

Expand All @@ -83,7 +103,21 @@ public void putMetrics(MetricsRecord metricsRecord) {

/**
* Convert CamelCase based names to lower-case names where the separator
* is the underscore, to follow prometheus naming conventions.
* is the underscore, to follow prometheus naming conventions. This method
* utilizes a cache to improve performance.
*
* <p>
* Reference:
* <ul>
* <li>
* <a href="https://prometheus.io/docs/practices/naming/">
* Metrics and Label Naming</a>
* </li>
* <li>
* <a href="https://prometheus.io/docs/instrumenting/exposition_formats/">
* Exposition formats</a>
* </li>
* </ul>
*
* @param metricName metricName.
* @param recordName recordName.
Expand All @@ -93,6 +127,22 @@ public String prometheusName(String recordName,
String metricName) {
String baseName = StringUtils.capitalize(recordName)
+ StringUtils.capitalize(metricName);
try {
return NORMALIZED_NAME_CACHE.get(baseName);
} catch (ExecutionException | UncheckedExecutionException e) {
// This should not happen since normalization function do not throw any exception
// Nevertheless, we can fall back to uncached implementation if it somehow happens.
LOG.warn("Exception encountered when loading metric with base name {} from cache, " +
"fall back to uncached normalization implementation", baseName, e);
return normalizeImpl(baseName);
}
}

/**
* Underlying Prometheus normalization implementation.
* See {@link PrometheusMetricsSink#prometheusName(String, String)} for more information.
*/
private static String normalizeImpl(String baseName) {
String[] parts = SPLIT_PATTERN.split(baseName);
String joined = String.join("_", parts).toLowerCase();
return DELIMITERS.matcher(joined).replaceAll("_");
Expand Down