Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ private Metrics(HoodieWriteConfig metricConfig) {
}
reporter.start();

Runtime.getRuntime().addShutdownHook(new Thread(this::reportAndCloseReporter));
Runtime.getRuntime().addShutdownHook(new Thread(Metrics::shutdown));
}

private void reportAndCloseReporter() {
try {
registerHoodieCommonMetrics();
reporter.report();
if (getReporter() != null) {
LOG.info("Closing metrics reporter...");
getReporter().close();
}
} catch (Exception e) {
Expand Down Expand Up @@ -139,4 +140,8 @@ public MetricRegistry getRegistry() {
public Closeable getReporter() {
return reporter.getReporter();
}

public static boolean isInitialized() {
return initialized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class DefaultSource extends RelationProvider
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
} else {
HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
HoodieSparkSqlWriter.cleanup()
}
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import org.apache.hudi.table.BulkInsertPartitioner
Expand Down Expand Up @@ -593,6 +594,10 @@ object HoodieSparkSqlWriter {
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
}

def cleanup() : Unit = {
Metrics.shutdown()
}

private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
operation: WriteOperationType, fs: FileSystem): Unit = {
if (mode == SaveMode.Append && tableExists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.{HoodieException, HoodieUpsertException}
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
Expand Down Expand Up @@ -738,6 +739,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime1)
.load(basePath)
assertEquals(N + 1, hoodieIncViewDF1.count())
assertEquals(false, Metrics.isInitialized)
}

@Test def testSchemaEvolution(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.utilities.HiveIncrementalPuller;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
Expand Down Expand Up @@ -208,6 +209,7 @@ public void sync() throws Exception {
throw ex;
} finally {
deltaSyncService.ifPresent(DeltaSyncService::close);
Metrics.shutdown();
LOG.info("Shut down delta streamer");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncClient;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
Expand Down Expand Up @@ -739,30 +740,36 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema,

@Test
public void testUpsertsCOWContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow", false, true);
}

@Test
public void testUpsertsCOW_ContinuousModeDisabled() throws Exception {
testUpserts(HoodieTableType.COPY_ON_WRITE, "non_continuous_cow", false, false);
}

@Test
public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true);
testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow_shutdown_gracefully", true, true);
}

@Test
public void testUpsertsMORContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
testUpserts(HoodieTableType.MERGE_ON_READ, "continuous_mor", false, true);
}

private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
testUpsertsContinuousMode(tableType, tempDir, false);
@Test
public void testUpsertsMOR_ContinuousModeDisabled() throws Exception {
testUpserts(HoodieTableType.MERGE_ON_READ, "non_continuous_mor", false, false);
}

private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully) throws Exception {
private void testUpserts(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully, boolean continuousMode) throws Exception {
String tableBasePath = dfsBasePath + "/" + tempDir;
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.continuousMode = true;
cfg.continuousMode = continuousMode;
if (testShutdownGracefully) {
cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName();
}
Expand All @@ -782,6 +789,10 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir
if (testShutdownGracefully) {
TestDataSource.returnEmptyBatch = true;
}

if (!cfg.continuousMode) {
assertFalse(Metrics.isInitialized());
}
return true;
});
}
Expand Down