diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala index bb93cf1a4857..458d618e92b2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala @@ -52,7 +52,9 @@ class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with Log ProcedureParameter.optional(13, "parallelism", DataTypes.IntegerType, 1500), ProcedureParameter.optional(14, "enable_hive_sync", DataTypes.BooleanType, false), ProcedureParameter.optional(15, "props_file_path", DataTypes.StringType, ""), - ProcedureParameter.optional(16, "bootstrap_overwrite", DataTypes.BooleanType, false) + ProcedureParameter.optional(16, "bootstrap_overwrite", DataTypes.BooleanType, false), + // params => key=value, key2=value2 + ProcedureParameter.optional(17, "options", DataTypes.StringType) ) private val OUTPUT_TYPE = new StructType(Array[StructField]( @@ -83,6 +85,7 @@ class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with Log val enableHiveSync = getArgValueOrDefault(args, PARAMETERS(14)).get.asInstanceOf[Boolean] val propsFilePath = getArgValueOrDefault(args, PARAMETERS(15)).get.asInstanceOf[String] val bootstrapOverwrite = getArgValueOrDefault(args, PARAMETERS(16)).get.asInstanceOf[Boolean] + val options = getArgValueOrDefault(args, PARAMETERS(17)) val (tableName, database) = HoodieCLIUtils.getTableIdentifier(table.get.asInstanceOf[String]) val configs: util.List[String] = new util.ArrayList[String] @@ -121,6 +124,18 @@ class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with Log // add session bootstrap conf TypedProperties.putAll(properties, spark.sqlContext.conf.getAllConfs.asJava) + + // add conf from procedure, may overwrite session conf + options match { + case Some(p) => + val paramPairs = HoodieCLIUtils.extractOptions(p.asInstanceOf[String]) + paramPairs.foreach { pair => + properties.setProperty(pair._1, pair._2) + } + case _ => + logInfo("No options") + } + new BootstrapExecutorUtils(cfg, jsc, fs, jsc.hadoopConfiguration, properties).execute() Seq(Row(0)) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala index 31a1de80da7b..a8ac9b5e3176 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.hudi.procedure import org.apache.hadoop.fs.Path import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.functional.TestBootstrap +import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.{Dataset, Row} @@ -93,6 +95,73 @@ class TestBootstrapProcedure extends HoodieSparkProcedureTestBase { } } + test("Test Call run_bootstrap Procedure with properties") { + withTempDir { tmp => + val NUM_OF_RECORDS = 100 + val PARTITION_FIELD = "datestr" + val RECORD_KEY_FIELD = "_row_key" + + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}" + + val srcName: String = "source" + val sourcePath = basePath + Path.SEPARATOR + srcName + val tablePath = basePath + Path.SEPARATOR + tableName + val jsc = new JavaSparkContext(spark.sparkContext) + + // generate test data + val partitions = util.Arrays.asList("2018", "2019", "2020") + val timestamp: Long = Instant.now.toEpochMilli + for (i <- 0 until partitions.size) { + val df: Dataset[Row] = TestBootstrap.generateTestRawTripDataset(timestamp, i * NUM_OF_RECORDS, i * NUM_OF_RECORDS + NUM_OF_RECORDS, null, jsc, spark.sqlContext) + df.write.parquet(sourcePath + Path.SEPARATOR + PARTITION_FIELD + "=" + partitions.get(i)) + } + + spark.sql("set hoodie.bootstrap.parallelism = 20") + checkAnswer( + s"""call run_bootstrap( + |table => '$tableName', + |base_path => '$tablePath', + |table_type => '${HoodieTableType.COPY_ON_WRITE.name}', + |bootstrap_path => '$sourcePath', + |rowKey_field => '$RECORD_KEY_FIELD', + |partition_path_field => '$PARTITION_FIELD', + |options => 'hoodie.datasource.write.hive_style_partitioning=true', + |bootstrap_overwrite => true)""".stripMargin) { + Seq(0) + } + + // create table + spark.sql( + s""" + |create table $tableName using hudi + |location '$tablePath' + |tblproperties(primaryKey = '$RECORD_KEY_FIELD') + |""".stripMargin) + + // show bootstrap's index partitions + var result = spark.sql(s"""call show_bootstrap_partitions(table => '$tableName')""".stripMargin).collect() + assertResult(3) { + result.length + } + + // show bootstrap's index mapping + result = spark.sql( + s"""call show_bootstrap_mapping(table => '$tableName')""".stripMargin).collect() + assertResult(10) { + result.length + } + + val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()).build() + + assertResult("true") { + metaClient.getTableConfig.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE) + }; + + } + } + test("Test Call run_bootstrap Procedure with no-partitioned") { withTempDir { tmp => val NUM_OF_RECORDS = 100