Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,76 @@

package org.apache.hudi.metrics.prometheus;

import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.MetricsReporter;

import com.codahale.metrics.MetricRegistry;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.dropwizard.samplebuilder.DefaultSampleBuilder;
import io.prometheus.client.dropwizard.samplebuilder.SampleBuilder;
import io.prometheus.client.exporter.HTTPServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

/**
* Implementation of Prometheus reporter, which connects to the Http server, and get metrics
* from that server.
*/
public class PrometheusReporter extends MetricsReporter {
private static final Pattern LABEL_PATTERN = Pattern.compile("\\s*,\\s*");

private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporter.class);
private static final Map<Integer, CollectorRegistry> PORT_TO_COLLECTOR_REGISTRY = new HashMap<>();
private static final Map<Integer, HTTPServer> PORT_TO_SERVER = new HashMap<>();

private HTTPServer httpServer;
private final DropwizardExports metricExports;
private final CollectorRegistry collectorRegistry;
private final int serverPort;

public PrometheusReporter(HoodieWriteConfig config, MetricRegistry registry) {
int serverPort = config.getPrometheusPort();
collectorRegistry = new CollectorRegistry();
metricExports = new DropwizardExports(registry);
this.serverPort = config.getPrometheusPort();
if (!PORT_TO_SERVER.containsKey(serverPort) || !PORT_TO_COLLECTOR_REGISTRY.containsKey(serverPort)) {
startHttpServer(serverPort);
}
List<String> labelNames = new ArrayList<>();
List<String> labelValues = new ArrayList<>();
if (StringUtils.nonEmpty(config.getPushGatewayLabels())) {
LABEL_PATTERN.splitAsStream(config.getPushGatewayLabels().trim()).map(s -> s.split(":", 2))
.forEach(parts -> {
labelNames.add(parts[0]);
labelValues.add(parts[1]);
});
}
metricExports = new DropwizardExports(registry, new LabeledSampleBuilder(labelNames, labelValues));
this.collectorRegistry = PORT_TO_COLLECTOR_REGISTRY.get(serverPort);
metricExports.register(collectorRegistry);
try {
httpServer = new HTTPServer(new InetSocketAddress(serverPort), collectorRegistry);
} catch (Exception e) {
String msg = "Could not start PrometheusReporter HTTP server on port " + serverPort;
LOG.error(msg, e);
throw new HoodieException(msg, e);
}

private static synchronized void startHttpServer(int serverPort) {
if (!PORT_TO_COLLECTOR_REGISTRY.containsKey(serverPort)) {
PORT_TO_COLLECTOR_REGISTRY.put(serverPort, new CollectorRegistry());
}
if (!PORT_TO_SERVER.containsKey(serverPort)) {
try {
HTTPServer server = new HTTPServer(new InetSocketAddress(serverPort), PORT_TO_COLLECTOR_REGISTRY.get(serverPort));
PORT_TO_SERVER.put(serverPort, server);
Runtime.getRuntime().addShutdownHook(new Thread(server::stop));
} catch (Exception e) {
String msg = "Could not start PrometheusReporter HTTP server on port " + serverPort;
LOG.error(msg, e);
throw new HoodieException(msg, e);
}
}
}

Expand All @@ -68,8 +102,31 @@ public void report() {
@Override
public void stop() {
collectorRegistry.unregister(metricExports);
HTTPServer httpServer = PORT_TO_SERVER.remove(serverPort);
if (httpServer != null) {
httpServer.stop();
}
PORT_TO_COLLECTOR_REGISTRY.remove(serverPort);
}

private static class LabeledSampleBuilder implements SampleBuilder {
private final DefaultSampleBuilder defaultMetricSampleBuilder = new DefaultSampleBuilder();
private final List<String> labelNames;
private final List<String> labelValues;

public LabeledSampleBuilder(List<String> labelNames, List<String> labelValues) {
this.labelNames = labelNames;
this.labelValues = labelValues;
}

@Override
public Collector.MetricFamilySamples.Sample createSample(String dropwizardName, String nameSuffix, List<String> additionalLabelNames, List<String> additionalLabelValues, double value) {
return defaultMetricSampleBuilder.createSample(
dropwizardName,
nameSuffix,
labelNames,
labelValues,
value);
}
}
}