Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ private Metrics(HoodieWriteConfig metricConfig) {
}
reporter.start();

Runtime.getRuntime().addShutdownHook(new Thread(this::reportAndCloseReporter));
Runtime.getRuntime().addShutdownHook(new Thread(Metrics::shutdown));
}

private void reportAndCloseReporter() {
try {
registerHoodieCommonMetrics();
reporter.report();
if (getReporter() != null) {
LOG.info("Closing metrics reporter...");
getReporter().close();
}
} catch (Exception e) {
Expand Down Expand Up @@ -139,4 +140,8 @@ public MetricRegistry getRegistry() {
public Closeable getReporter() {
return reporter.getReporter();
}

public static boolean isInitialized() {
return initialized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ class DefaultSource extends RelationProvider
} else {
HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
}

HoodieSparkSqlWriter.cleanup()
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import org.apache.hudi.table.BulkInsertPartitioner
Expand Down Expand Up @@ -594,6 +595,10 @@ object HoodieSparkSqlWriter {
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
}

def cleanup() : Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junyuc25 Since we already have a shutdown hook to close the metric, is explicitly shutdown metric here still necessary?

I found an issue that if the metric server experiences problems such as high load, it fails to respond quickly, causing queries to be blocked until the timeout, the Metrics.shutdown here could block the query to finish until the timeout.

23/07/03 14:16:31 WARN GraphiteReporter: Unable to report to Graphite
java.net.ConnectException: Connection timed out (Connection timed out)
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:607)
        at java.net.Socket.connect(Socket.java:556)
        at java.net.Socket.<init>(Socket.java:452)
        at java.net.Socket.<init>(Socket.java:262)
        at javax.net.DefaultSocketFactory.createSocket(SocketFactory.java:277)
        at org.apache.hudi.com.codahale.metrics.graphite.Graphite.connect(Graphite.java:130)
        at org.apache.hudi.com.codahale.metrics.graphite.GraphiteReporter.report(GraphiteReporter.java:263)
        at org.apache.hudi.com.codahale.metrics.ScheduledReporter.report(ScheduledReporter.java:237)
        at org.apache.hudi.metrics.MetricsGraphiteReporter.report(MetricsGraphiteReporter.java:74)
        at org.apache.hudi.metrics.Metrics.reportAndStopReporter(Metrics.java:61)
        at org.apache.hudi.metrics.Metrics.shutdown(Metrics.java:105)
        at java.lang.Thread.run(Thread.java:748)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch ~, can you fire a fix then?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me fix it

Metrics.shutdown()
}

private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
operation: WriteOperationType, fs: FileSystem): Unit = {
if (mode == SaveMode.Append && tableExists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.hudi.functional

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.timeline.HoodieInstant
Expand All @@ -28,12 +29,14 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrin
import org.apache.hudi.common.util
import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.metrics.HoodieMetricsConfig
import org.apache.hudi.exception.{HoodieException, HoodieUpsertException}
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, QuickstartUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, concat, lit, udf}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
Expand Down Expand Up @@ -738,6 +741,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime1)
.load(basePath)
assertEquals(N + 1, hoodieIncViewDF1.count())
assertEquals(false, Metrics.isInitialized)
}

@Test def testSchemaEvolution(): Unit = {
Expand Down Expand Up @@ -1024,4 +1028,26 @@ class TestCOWDataSource extends HoodieClientTestBase {
.saveAsTable("hoodie_test")
assertEquals(spark.read.format("hudi").load(basePath).count(), 9)
}

@Test
def testMetricsReporterViaDataSource(): Unit = {
val dataGenerator = new QuickstartUtils.DataGenerator()
val records = convertToStringList(dataGenerator.generateInserts( 10))
val recordsRDD = spark.sparkContext.parallelize(records, 2)
val inputDF = spark.read.json(sparkSession.createDataset(recordsRDD)(Encoders.STRING))
inputDF.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "uuid")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "partitionpath")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
.option(HoodieMetricsConfig.TURN_METRICS_ON.key(), "true")
.option(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE")
.mode(SaveMode.Overwrite)
.save(basePath)

assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
assertEquals(false, Metrics.isInitialized, "Metrics should be shutdown")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.utilities.HiveIncrementalPuller;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
Expand Down Expand Up @@ -208,6 +209,7 @@ public void sync() throws Exception {
throw ex;
} finally {
deltaSyncService.ifPresent(DeltaSyncService::close);
Metrics.shutdown();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only works with sync once mode. can we also hook it up with continuous-mode's shutdown callback as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Metric's reporter already has a shutdown hook of its own, that should run when jvm exits. That should suffice the continuous mode case right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please correct me if I'm wrong. It seems like the DeltaStreamer continuous mode does not have this issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

functionality-wise, it should be ok. i think it's a question of where is the single place we can clean up all resources. org.apache.hudi.utilities.deltastreamer.DeltaSync#close looks like the right place. It looks more of a minor clean up to me now.

LOG.info("Shut down delta streamer");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncClient;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
Expand Down Expand Up @@ -134,6 +135,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE;
import static org.apache.hudi.config.metrics.HoodieMetricsConfig.TURN_METRICS_ON;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
Expand Down Expand Up @@ -742,6 +745,20 @@ public void testUpsertsCOWContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
}

@Test
public void testUpsertsCOW_ContinuousModeDisabled() throws Exception {
String tableBasePath = dfsBasePath + "/non_continuous_cow";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE"));
cfg.continuousMode = false;
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
assertFalse(Metrics.isInitialized(), "Metrics should be shutdown");
}

@Test
public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true);
Expand All @@ -752,6 +769,20 @@ public void testUpsertsMORContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
}

@Test
public void testUpsertsMOR_ContinuousModeDisabled() throws Exception {
String tableBasePath = dfsBasePath + "/non_continuous_mor";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE"));
cfg.continuousMode = false;
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
assertFalse(Metrics.isInitialized(), "Metrics should be shutdown");
}

private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
testUpsertsContinuousMode(tableType, tempDir, false);
}
Expand Down