Skip to content

Commit 751e4ee

Browse files
zhedoubushishiWenning Ding
andauthored
[HUDI-1396] Fix for preventing bootstrap datasource jobs from hanging via spark-submit (#2253)
Co-authored-by: Wenning Ding <[email protected]>
1 parent d9411c3 commit 751e4ee

File tree

2 files changed

+69
-7
lines changed

2 files changed

+69
-7
lines changed

hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,8 @@ private[hudi] object HoodieSparkSqlWriter {
221221
mode: SaveMode,
222222
parameters: Map[String, String],
223223
df: DataFrame,
224-
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty): Boolean = {
224+
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
225+
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = {
225226

226227
val sparkContext = sqlContext.sparkContext
227228
val path = parameters.getOrElse("path", throw new HoodieException("'path' must be set."))
@@ -263,8 +264,13 @@ private[hudi] object HoodieSparkSqlWriter {
263264
}
264265

265266
val jsc = new JavaSparkContext(sqlContext.sparkContext)
266-
val writeClient = DataSourceUtils.createHoodieClient(jsc, schema, path, tableName, mapAsJavaMap(parameters))
267-
writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
267+
val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
268+
schema, path, tableName, mapAsJavaMap(parameters)))
269+
try {
270+
writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
271+
} finally {
272+
writeClient.close()
273+
}
268274
val metaSyncSuccess = metaSync(parameters, basePath, jsc.hadoopConfiguration)
269275
metaSyncSuccess
270276
}

hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,18 @@
1717

1818
package org.apache.hudi.functional
1919

20+
import java.time.Instant
2021
import java.util
21-
import java.util.{Date, UUID}
22+
import java.util.{Collections, Date, UUID}
2223

2324
import org.apache.commons.io.FileUtils
2425
import org.apache.hudi.DataSourceWriteOptions._
25-
import org.apache.hudi.client.SparkRDDWriteClient
26+
import org.apache.hudi.client.{SparkRDDWriteClient, TestBootstrap}
2627
import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
2728
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
28-
import org.apache.hudi.config.HoodieWriteConfig
29+
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
2930
import org.apache.hudi.exception.HoodieException
30-
import org.apache.hudi.keygen.SimpleKeyGenerator
31+
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
3132
import org.apache.hudi.testutils.DataSourceTestUtils
3233
import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
3334
import org.apache.spark.SparkContext
@@ -341,6 +342,61 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
341342
}
342343
})
343344

345+
List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
346+
.foreach(tableType => {
347+
test("test HoodieSparkSqlWriter functionality with datasource bootstrap for " + tableType) {
348+
initSparkContext("test_bootstrap_datasource")
349+
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
350+
val srcPath = java.nio.file.Files.createTempDirectory("hoodie_bootstrap_source_path")
351+
352+
try {
353+
354+
val hoodieFooTableName = "hoodie_foo_tbl"
355+
356+
val sourceDF = TestBootstrap.generateTestRawTripDataset(Instant.now.toEpochMilli, 0, 100, Collections.emptyList(), sc,
357+
spark.sqlContext)
358+
359+
// Write source data non-partitioned
360+
sourceDF.write
361+
.format("parquet")
362+
.mode(SaveMode.Overwrite)
363+
.save(srcPath.toAbsolutePath.toString)
364+
365+
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
366+
HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP -> srcPath.toAbsolutePath.toString,
367+
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
368+
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType,
369+
HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM -> "4",
370+
DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL,
371+
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
372+
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
373+
HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS -> classOf[NonpartitionedKeyGenerator].getCanonicalName)
374+
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
375+
376+
val client = spy(DataSourceUtils.createHoodieClient(
377+
new JavaSparkContext(sc),
378+
null,
379+
path.toAbsolutePath.toString,
380+
hoodieFooTableName,
381+
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
382+
383+
HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Append, fooTableParams, spark.emptyDataFrame, Option.empty,
384+
Option(client))
385+
386+
// Verify that HoodieWriteClient is closed correctly
387+
verify(client, times(1)).close()
388+
389+
// fetch all records from parquet files generated from write to hudi
390+
val actualDf = sqlContext.read.parquet(path.toAbsolutePath.toString)
391+
assert(actualDf.count == 100)
392+
} finally {
393+
spark.stop()
394+
FileUtils.deleteDirectory(path.toFile)
395+
FileUtils.deleteDirectory(srcPath.toFile)
396+
}
397+
}
398+
})
399+
344400
case class Test(uuid: String, ts: Long)
345401

346402
import scala.collection.JavaConverters

0 commit comments

Comments
 (0)