Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ private[hudi] object HoodieSparkSqlWriter {
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty): Boolean = {
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = {

val sparkContext = sqlContext.sparkContext
val path = parameters.getOrElse("path", throw new HoodieException("'path' must be set."))
Expand Down Expand Up @@ -263,8 +264,13 @@ private[hudi] object HoodieSparkSqlWriter {
}

val jsc = new JavaSparkContext(sqlContext.sparkContext)
val writeClient = DataSourceUtils.createHoodieClient(jsc, schema, path, tableName, mapAsJavaMap(parameters))
writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
schema, path, tableName, mapAsJavaMap(parameters)))
try {
writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
} finally {
writeClient.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess, this is the real fix here.

}
val metaSyncSuccess = metaSync(parameters, basePath, jsc.hadoopConfiguration)
metaSyncSuccess
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package org.apache.hudi.functional

import java.time.Instant
import java.util
import java.util.{Date, UUID}
import java.util.{Collections, Date, UUID}

import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.{SparkRDDWriteClient, TestBootstrap}
import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
import org.apache.spark.SparkContext
Expand Down Expand Up @@ -341,6 +342,61 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}
})

List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.foreach(tableType => {
test("test HoodieSparkSqlWriter functionality with datasource bootstrap for " + tableType) {
initSparkContext("test_bootstrap_datasource")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
val srcPath = java.nio.file.Files.createTempDirectory("hoodie_bootstrap_source_path")

try {

val hoodieFooTableName = "hoodie_foo_tbl"

val sourceDF = TestBootstrap.generateTestRawTripDataset(Instant.now.toEpochMilli, 0, 100, Collections.emptyList(), sc,
spark.sqlContext)

// Write source data non-partitioned
sourceDF.write
.format("parquet")
.mode(SaveMode.Overwrite)
.save(srcPath.toAbsolutePath.toString)

val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP -> srcPath.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType,
HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM -> "4",
DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL,
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS -> classOf[NonpartitionedKeyGenerator].getCanonicalName)
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)

val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc),
null,
path.toAbsolutePath.toString,
hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])

HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Append, fooTableParams, spark.emptyDataFrame, Option.empty,
Option(client))

// Verify that HoodieWriteClient is closed correctly
verify(client, times(1)).close()

// fetch all records from parquet files generated from write to hudi
val actualDf = sqlContext.read.parquet(path.toAbsolutePath.toString)
assert(actualDf.count == 100)
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
FileUtils.deleteDirectory(srcPath.toFile)
}
}
})

case class Test(uuid: String, ts: Long)

import scala.collection.JavaConverters
Expand Down