From 306bf62c3f0b80da791788ca9ddacc1aef9ea6a5 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Wed, 7 Oct 2020 17:38:05 -0700 Subject: [PATCH 1/2] [HUDI-1326] Added an API to force publish metrics and flush them. Using the added API, publish metrics after each level of the DAG completed in hudi-test-suite. --- .../java/org/apache/hudi/metrics/Metrics.java | 25 +++++++++++++++++-- .../testsuite/dag/scheduler/DagScheduler.java | 4 +++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java index c4107ce02f742..626e33b37d827 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -41,10 +41,11 @@ public class Metrics { private static Metrics metrics = null; private final MetricRegistry registry; private MetricsReporter reporter; + private final String commonMetricPrefix; private Metrics(HoodieWriteConfig metricConfig) { registry = new MetricRegistry(); - + commonMetricPrefix = metricConfig.getTableName(); reporter = MetricsReporterFactory.createReporter(metricConfig, registry); if (reporter == null) { throw new RuntimeException("Cannot initialize Reporter."); @@ -68,8 +69,20 @@ private void reportAndCloseReporter() { } } + private void reportAndFlushMetrics() { + try { + LOG.info("Reporting and flushing all metrics"); + + this.registerHoodieCommonMetrics(); + this.reporter.report(); + this.registry.getNames().forEach(name -> this.registry.remove(name)); + } catch (Exception e) { + Metrics.LOG.error((Object)"Error while reporting and flushing metrics", (Throwable)e); + } + } + private void registerHoodieCommonMetrics() { - registerGauges(Registry.getAllMetrics(true, true), Option.empty()); + registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix)); } public static Metrics getInstance() { @@ -97,6 +110,14 @@ public static synchronized void shutdown() { initialized = false; } + public static synchronized void flush() { + if (!Metrics.initialized) { + return; + } + + metrics.reportAndFlushMetrics(); + } + public static void registerGauges(Map metricsMap, Option prefix) { String metricPrefix = prefix.isPresent() ? prefix.get() + "." : ""; metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v)); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java index 22b9ff49e4243..bf6f5da53e0a5 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; +import org.apache.hudi.metrics.Metrics; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.dag.WorkflowDag; import org.apache.hudi.integ.testsuite.dag.WriterContext; @@ -95,6 +96,9 @@ private void execute(ExecutorService service, List nodes) throws Except for (Future future : futures) { future.get(1, TimeUnit.HOURS); } + + // After each level, report and flush the metrics + Metrics.flush(); } while (queue.size() > 0); log.info("Finished workloads"); } From cd02a8bfd91146c3ea35988b8760799c27191769 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Sat, 24 Oct 2020 15:18:11 -0700 Subject: [PATCH 2/2] Code cleanups --- .../java/org/apache/hudi/metrics/Metrics.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java index 626e33b37d827..5667a66a54934 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -38,7 +38,8 @@ public class Metrics { private static final Logger LOG = LogManager.getLogger(Metrics.class); private static volatile boolean initialized = false; - private static Metrics metrics = null; + private static Metrics instance = null; + private final MetricRegistry registry; private MetricsReporter reporter; private final String commonMetricPrefix; @@ -72,12 +73,11 @@ private void reportAndCloseReporter() { private void reportAndFlushMetrics() { try { LOG.info("Reporting and flushing all metrics"); - this.registerHoodieCommonMetrics(); this.reporter.report(); - this.registry.getNames().forEach(name -> this.registry.remove(name)); + this.registry.getNames().forEach(this.registry::remove); } catch (Exception e) { - Metrics.LOG.error((Object)"Error while reporting and flushing metrics", (Throwable)e); + LOG.error("Error while reporting and flushing metrics", e); } } @@ -87,7 +87,7 @@ private void registerHoodieCommonMetrics() { public static Metrics getInstance() { assert initialized; - return metrics; + return instance; } public static synchronized void init(HoodieWriteConfig metricConfig) { @@ -95,7 +95,7 @@ public static synchronized void init(HoodieWriteConfig metricConfig) { return; } try { - metrics = new Metrics(metricConfig); + instance = new Metrics(metricConfig); } catch (Exception e) { throw new HoodieException(e); } @@ -106,7 +106,7 @@ public static synchronized void shutdown() { if (!initialized) { return; } - metrics.reportAndCloseReporter(); + instance.reportAndCloseReporter(); initialized = false; } @@ -114,8 +114,7 @@ public static synchronized void flush() { if (!Metrics.initialized) { return; } - - metrics.reportAndFlushMetrics(); + instance.reportAndFlushMetrics(); } public static void registerGauges(Map metricsMap, Option prefix) {