diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java index b570f512f371d..d9f22bca01ed4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -53,7 +53,7 @@ private Metrics(HoodieWriteConfig metricConfig) { } reporter.start(); - Runtime.getRuntime().addShutdownHook(new Thread(this::reportAndCloseReporter)); + Runtime.getRuntime().addShutdownHook(new Thread(Metrics::shutdown)); } private void reportAndCloseReporter() { @@ -61,6 +61,7 @@ private void reportAndCloseReporter() { registerHoodieCommonMetrics(); reporter.report(); if (getReporter() != null) { + LOG.info("Closing metrics reporter..."); getReporter().close(); } } catch (Exception e) { @@ -139,4 +140,8 @@ public MetricRegistry getRegistry() { public Closeable getReporter() { return reporter.getReporter(); } + + public static boolean isInitialized() { + return initialized; + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index a141c60f3dd81..348a6056c8883 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -182,6 +182,8 @@ class DefaultSource extends RelationProvider } else { HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols) } + + HoodieSparkSqlWriter.cleanup() new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index fed3353552155..028de00f7ff47 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -44,6 +44,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} +import org.apache.hudi.metrics.Metrics import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.sync.common.util.SyncUtilHelpers import org.apache.hudi.table.BulkInsertPartitioner @@ -594,6 +595,10 @@ object HoodieSparkSqlWriter { (syncHiveSuccess, common.util.Option.ofNullable(instantTime)) } + def cleanup() : Unit = { + Metrics.shutdown() + } + private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String, operation: WriteOperationType, fs: FileSystem): Unit = { if (mode == SaveMode.Append && tableExists) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 6697ec1514cdd..241223be7e333 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -19,6 +19,7 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.HoodieConversionUtils.toJavaOption +import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs} import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.timeline.HoodieInstant @@ -28,12 +29,14 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrin import org.apache.hudi.common.util import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.metrics.HoodieMetricsConfig import org.apache.hudi.exception.{HoodieException, HoodieUpsertException} import org.apache.hudi.keygen._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config +import org.apache.hudi.metrics.Metrics import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.util.JFunction -import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, QuickstartUtils} import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, concat, lit, udf} import org.apache.spark.sql.hudi.HoodieSparkSessionExtension @@ -738,6 +741,7 @@ class TestCOWDataSource extends HoodieClientTestBase { .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime1) .load(basePath) assertEquals(N + 1, hoodieIncViewDF1.count()) + assertEquals(false, Metrics.isInitialized) } @Test def testSchemaEvolution(): Unit = { @@ -1024,4 +1028,26 @@ class TestCOWDataSource extends HoodieClientTestBase { .saveAsTable("hoodie_test") assertEquals(spark.read.format("hudi").load(basePath).count(), 9) } + + @Test + def testMetricsReporterViaDataSource(): Unit = { + val dataGenerator = new QuickstartUtils.DataGenerator() + val records = convertToStringList(dataGenerator.generateInserts( 10)) + val recordsRDD = spark.sparkContext.parallelize(records, 2) + val inputDF = spark.read.json(sparkSession.createDataset(recordsRDD)(Encoders.STRING)) + inputDF.write.format("hudi") + .options(getQuickstartWriteConfigs) + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "uuid") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "partitionpath") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts") + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test") + .option(HoodieMetricsConfig.TURN_METRICS_ON.key(), "true") + .option(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE") + .mode(SaveMode.Overwrite) + .save(basePath) + + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + assertEquals(false, Metrics.isInitialized, "Metrics should be shutdown") + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 0f403cd266028..367a121e9f783 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -53,6 +53,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.hive.HiveSyncTool; +import org.apache.hudi.metrics.Metrics; import org.apache.hudi.utilities.HiveIncrementalPuller; import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.utilities.UtilHelpers; @@ -208,6 +209,7 @@ public void sync() throws Exception { throw ex; } finally { deltaSyncService.ifPresent(DeltaSyncService::close); + Metrics.shutdown(); LOG.info("Shut down delta streamer"); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 69d6dd7d3b298..741ff8010a9ef 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -56,6 +56,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncClient; import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.metrics.Metrics; import org.apache.hudi.utilities.DummySchemaProvider; import org.apache.hudi.utilities.HoodieClusteringJob; import org.apache.hudi.utilities.HoodieIndexer; @@ -134,6 +135,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE; +import static org.apache.hudi.config.metrics.HoodieMetricsConfig.TURN_METRICS_ON; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; import static org.apache.hudi.utilities.UtilHelpers.EXECUTE; @@ -742,6 +745,20 @@ public void testUpsertsCOWContinuousMode() throws Exception { testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow"); } + @Test + public void testUpsertsCOW_ContinuousModeDisabled() throws Exception { + String tableBasePath = dfsBasePath + "/non_continuous_cow"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true")); + cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE")); + cfg.continuousMode = false; + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + ds.sync(); + TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); + assertFalse(Metrics.isInitialized(), "Metrics should be shutdown"); + } + @Test public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception { testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true); @@ -752,6 +769,20 @@ public void testUpsertsMORContinuousMode() throws Exception { testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor"); } + @Test + public void testUpsertsMOR_ContinuousModeDisabled() throws Exception { + String tableBasePath = dfsBasePath + "/non_continuous_mor"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); + cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); + cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true")); + cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE")); + cfg.continuousMode = false; + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + ds.sync(); + TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); + assertFalse(Metrics.isInitialized(), "Metrics should be shutdown"); + } + private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception { testUpsertsContinuousMode(tableType, tempDir, false); }