Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ private Metrics(HoodieWriteConfig metricConfig) {
}
reporter.start();

Runtime.getRuntime().addShutdownHook(new Thread(this::reportAndCloseReporter));
Runtime.getRuntime().addShutdownHook(new Thread(Metrics::shutdown));
}

private void reportAndCloseReporter() {
try {
registerHoodieCommonMetrics();
reporter.report();
if (getReporter() != null) {
LOG.info("Closing metrics reporter...");
getReporter().close();
}
} catch (Exception e) {
Expand Down Expand Up @@ -139,4 +140,8 @@ public MetricRegistry getRegistry() {
public Closeable getReporter() {
return reporter.getReporter();
}

public static boolean isInitialized() {
return initialized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class DefaultSource extends RelationProvider
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
} else {
HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
HoodieSparkSqlWriter.cleanup()
}
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import org.apache.hudi.table.BulkInsertPartitioner
Expand Down Expand Up @@ -593,6 +594,10 @@ object HoodieSparkSqlWriter {
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
}

def cleanup() : Unit = {
Metrics.shutdown()
}

private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
operation: WriteOperationType, fs: FileSystem): Unit = {
if (mode == SaveMode.Append && tableExists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.{HoodieException, HoodieUpsertException}
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
Expand All @@ -40,7 +41,7 @@ import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.types._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
Expand Down Expand Up @@ -738,6 +739,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime1)
.load(basePath)
assertEquals(N + 1, hoodieIncViewDF1.count())
assertEquals(false, Metrics.isInitialized)
}

@Test def testSchemaEvolution(): Unit = {
Expand Down