Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with Log
ProcedureParameter.optional(13, "parallelism", DataTypes.IntegerType, 1500),
ProcedureParameter.optional(14, "enable_hive_sync", DataTypes.BooleanType, false),
ProcedureParameter.optional(15, "props_file_path", DataTypes.StringType, ""),
ProcedureParameter.optional(16, "bootstrap_overwrite", DataTypes.BooleanType, false)
ProcedureParameter.optional(16, "bootstrap_overwrite", DataTypes.BooleanType, false),
// params => key=value, key2=value2
ProcedureParameter.optional(17, "options", DataTypes.StringType, None)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand Down Expand Up @@ -84,6 +86,7 @@ class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with Log
val enableHiveSync = getArgValueOrDefault(args, PARAMETERS(14)).get.asInstanceOf[Boolean]
val propsFilePath = getArgValueOrDefault(args, PARAMETERS(15)).get.asInstanceOf[String]
val bootstrapOverwrite = getArgValueOrDefault(args, PARAMETERS(16)).get.asInstanceOf[Boolean]
val options = getArgValueOrDefault(args, PARAMETERS(17))

val configs: util.List[String] = new util.ArrayList[String]

Expand Down Expand Up @@ -120,6 +123,19 @@ class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with Log

// add session bootstrap conf
properties.putAll(spark.sqlContext.conf.getAllConfs.asJava)

// add conf from procedure, may overwrite session conf
options match {
case Some(p) =>
val paramPairs = StringUtils.split(p.asInstanceOf[String], ",").asScala
paramPairs.foreach { pair =>
val values = StringUtils.split(pair, "=")
properties.setProperty(values.get(0), values.get(1))
}
case _ =>
logInfo("No options")
}

new BootstrapExecutorUtils(cfg, jsc, fs, jsc.hadoopConfiguration, properties).execute()
Seq(Row(0))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.sql.hudi.procedure

import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.functional.TestBootstrap
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{Dataset, Row}

Expand Down Expand Up @@ -86,6 +88,73 @@ class TestBootstrapProcedure extends HoodieSparkProcedureTestBase {
}
}

test("Test Call run_bootstrap Procedure with properties") {
withTempDir { tmp =>
val NUM_OF_RECORDS = 100
val PARTITION_FIELD = "datestr"
val RECORD_KEY_FIELD = "_row_key"

val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}"

val srcName: String = "source"
val sourcePath = basePath + Path.SEPARATOR + srcName
val tablePath = basePath + Path.SEPARATOR + tableName
val jsc = new JavaSparkContext(spark.sparkContext)

// generate test data
val partitions = util.Arrays.asList("2018", "2019", "2020")
val timestamp: Long = Instant.now.toEpochMilli
for (i <- 0 until partitions.size) {
val df: Dataset[Row] = TestBootstrap.generateTestRawTripDataset(timestamp, i * NUM_OF_RECORDS, i * NUM_OF_RECORDS + NUM_OF_RECORDS, null, jsc, spark.sqlContext)
df.write.parquet(sourcePath + Path.SEPARATOR + PARTITION_FIELD + "=" + partitions.get(i))
}

spark.sql("set hoodie.bootstrap.parallelism = 20")
checkAnswer(
s"""call run_bootstrap(
|table => '$tableName',
|base_path => '$tablePath',
|table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
|bootstrap_path => '$sourcePath',
|rowKey_field => '$RECORD_KEY_FIELD',
|partition_path_field => '$PARTITION_FIELD',
|options => 'hoodie.datasource.write.hive_style_partitioning=true',
|bootstrap_overwrite => true)""".stripMargin) {
Seq(0)
}

// create table
spark.sql(
s"""
|create table $tableName using hudi
|location '$tablePath'
|tblproperties(primaryKey = '$RECORD_KEY_FIELD')
|""".stripMargin)

// show bootstrap's index partitions
var result = spark.sql(s"""call show_bootstrap_partitions(table => '$tableName')""".stripMargin).collect()
assertResult(3) {
result.length
}

// show bootstrap's index mapping
result = spark.sql(
s"""call show_bootstrap_mapping(table => '$tableName')""".stripMargin).collect()
assertResult(10) {
result.length
}

val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath)
.setConf(spark.sessionState.newHadoopConf()).build()

assertResult("true") {
metaClient.getTableConfig.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE)
};

}
}

test("Test Call run_bootstrap Procedure with no-partitioned") {
withTempDir { tmp =>
val NUM_OF_RECORDS = 100
Expand Down