Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ public class Metrics {
private static final Logger LOG = LogManager.getLogger(Metrics.class);

private static volatile boolean initialized = false;
private static Metrics metrics = null;
private static Metrics instance = null;

private final MetricRegistry registry;
private MetricsReporter reporter;
private final String commonMetricPrefix;

private Metrics(HoodieWriteConfig metricConfig) {
registry = new MetricRegistry();

commonMetricPrefix = metricConfig.getTableName();
reporter = MetricsReporterFactory.createReporter(metricConfig, registry);
if (reporter == null) {
throw new RuntimeException("Cannot initialize Reporter.");
Expand All @@ -68,21 +70,32 @@ private void reportAndCloseReporter() {
}
}

private void reportAndFlushMetrics() {
try {
LOG.info("Reporting and flushing all metrics");
this.registerHoodieCommonMetrics();
this.reporter.report();
this.registry.getNames().forEach(this.registry::remove);
} catch (Exception e) {
LOG.error("Error while reporting and flushing metrics", e);
}
}

private void registerHoodieCommonMetrics() {
registerGauges(Registry.getAllMetrics(true, true), Option.empty());
registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix));
}

public static Metrics getInstance() {
assert initialized;
return metrics;
return instance;
}

public static synchronized void init(HoodieWriteConfig metricConfig) {
if (initialized) {
return;
}
try {
metrics = new Metrics(metricConfig);
instance = new Metrics(metricConfig);
} catch (Exception e) {
throw new HoodieException(e);
}
Expand All @@ -93,10 +106,17 @@ public static synchronized void shutdown() {
if (!initialized) {
return;
}
metrics.reportAndCloseReporter();
instance.reportAndCloseReporter();
initialized = false;
}

public static synchronized void flush() {
if (!Metrics.initialized) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should be the right expectation if it's not initialized ? Simply returning gives a false impression to the client that it's flushed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inline with the shutdown() and init() functions which are fail-safe. Unless metrics are initialized there is no effect of calling these.

}
instance.reportAndFlushMetrics();
}

public static void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
String metricPrefix = prefix.isPresent() ? prefix.get() + "." : "";
metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
Expand Down Expand Up @@ -95,6 +96,9 @@ private void execute(ExecutorService service, List<DagNode> nodes) throws Except
for (Future future : futures) {
future.get(1, TimeUnit.HOURS);
}

// After each level, report and flush the metrics
Metrics.flush();
} while (queue.size() > 0);
log.info("Finished workloads");
}
Expand Down