diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java index c4107ce02f742..5667a66a54934 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -38,13 +38,15 @@ public class Metrics { private static final Logger LOG = LogManager.getLogger(Metrics.class); private static volatile boolean initialized = false; - private static Metrics metrics = null; + private static Metrics instance = null; + private final MetricRegistry registry; private MetricsReporter reporter; + private final String commonMetricPrefix; private Metrics(HoodieWriteConfig metricConfig) { registry = new MetricRegistry(); - + commonMetricPrefix = metricConfig.getTableName(); reporter = MetricsReporterFactory.createReporter(metricConfig, registry); if (reporter == null) { throw new RuntimeException("Cannot initialize Reporter."); @@ -68,13 +70,24 @@ private void reportAndCloseReporter() { } } + private void reportAndFlushMetrics() { + try { + LOG.info("Reporting and flushing all metrics"); + this.registerHoodieCommonMetrics(); + this.reporter.report(); + this.registry.getNames().forEach(this.registry::remove); + } catch (Exception e) { + LOG.error("Error while reporting and flushing metrics", e); + } + } + private void registerHoodieCommonMetrics() { - registerGauges(Registry.getAllMetrics(true, true), Option.empty()); + registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix)); } public static Metrics getInstance() { assert initialized; - return metrics; + return instance; } public static synchronized void init(HoodieWriteConfig metricConfig) { @@ -82,7 +95,7 @@ public static synchronized void init(HoodieWriteConfig metricConfig) { return; } try { - metrics = new Metrics(metricConfig); + instance = new Metrics(metricConfig); } catch (Exception e) { throw new HoodieException(e); } @@ -93,10 +106,17 @@ public static synchronized void shutdown() { if (!initialized) { return; } - metrics.reportAndCloseReporter(); + instance.reportAndCloseReporter(); initialized = false; } + public static synchronized void flush() { + if (!Metrics.initialized) { + return; + } + instance.reportAndFlushMetrics(); + } + public static void registerGauges(Map metricsMap, Option prefix) { String metricPrefix = prefix.isPresent() ? prefix.get() + "." : ""; metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v)); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java index 22b9ff49e4243..bf6f5da53e0a5 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; +import org.apache.hudi.metrics.Metrics; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.dag.WorkflowDag; import org.apache.hudi.integ.testsuite.dag.WriterContext; @@ -95,6 +96,9 @@ private void execute(ExecutorService service, List nodes) throws Except for (Future future : futures) { future.get(1, TimeUnit.HOURS); } + + // After each level, report and flush the metrics + Metrics.flush(); } while (queue.size() > 0); log.info("Finished workloads"); }