diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala index a1e9947cafacf..f7415f93f6b2d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -91,6 +91,9 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, // Get required schemas for column pruning var requiredDataSchema = StructType(Seq()) var requiredSkeletonSchema = StructType(Seq()) + // requiredColsSchema is the schema of requiredColumns, note that requiredColumns is in a random order + // so requiredColsSchema is not always equal to (requiredSkeletonSchema.fields ++ requiredDataSchema.fields) + var requiredColsSchema = StructType(Seq()) requiredColumns.foreach(col => { var field = dataSchema.find(_.name == col) if (field.isDefined) { @@ -99,6 +102,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, field = skeletonSchema.find(_.name == col) requiredSkeletonSchema = requiredSkeletonSchema.add(field.get) } + requiredColsSchema = requiredColsSchema.add(field.get) }) // Prepare readers for reading data file and skeleton files @@ -129,7 +133,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, sparkSession = _sqlContext.sparkSession, dataSchema = fullSchema, partitionSchema = StructType(Seq.empty), - requiredSchema = StructType(requiredSkeletonSchema.fields ++ requiredDataSchema.fields), + requiredSchema = requiredColsSchema, filters = filters, options = Map.empty, hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index f24e5ad5bacc1..2a6a0a71360af 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -17,9 +17,6 @@ package org.apache.hudi.functional -import java.time.Instant -import java.util.Collections - import collection.JavaConverters._ import org.apache.hadoop.fs.FileSystem import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider @@ -37,10 +34,13 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.io.TempDir +import java.time.Instant +import java.util.Collections + class TestDataSourceForBootstrap { var spark: SparkSession = _ - val commonOpts = Map( + val commonOpts: Map[String, String] = Map( HoodieWriteConfig.INSERT_PARALLELISM -> "4", HoodieWriteConfig.UPSERT_PARALLELISM -> "4", HoodieWriteConfig.DELETE_PARALLELISM -> "4", @@ -56,6 +56,14 @@ class TestDataSourceForBootstrap { var srcPath: String = _ var fs: FileSystem = _ + val partitionPaths: List[String] = List("2020-04-01", "2020-04-02", "2020-04-03") + val numRecords: Int = 100 + val numRecordsUpdate: Int = 10 + val verificationRowKey: String = "trip_0" + val verificationCol: String = "driver" + val originalVerificationVal: String = "driver_0" + val updatedVerificationVal: String = "driver_update" + @BeforeEach def initialize(@TempDir tempDir: java.nio.file.Path) { spark = SparkSession.builder .appName("Hoodie Datasource test") @@ -83,7 +91,6 @@ class TestDataSourceForBootstrap { @Test def testMetadataBootstrapCOWNonPartitioned(): Unit = { val timestamp = Instant.now.toEpochMilli - val numRecords = 100 val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, Collections.emptyList(), jsc, @@ -96,20 +103,7 @@ class TestDataSourceForBootstrap { .save(srcPath) // Perform bootstrap - val bootstrapDF = spark.emptyDataFrame - bootstrapDF.write - .format("hudi") - .options(commonOpts) - .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") - .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") - .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) - .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.NonpartitionedKeyGenerator") - .mode(SaveMode.Overwrite) - .save(basePath) - - val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) - assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1) + val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) // Read bootstrapped table and verify count var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") @@ -117,18 +111,14 @@ class TestDataSourceForBootstrap { // Perform upsert val updateTimestamp = Instant.now.toEpochMilli - val numRecordsUpdate = 10 val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, Collections.emptyList(), jsc, spark.sqlContext) updateDF.write .format("hudi") .options(commonOpts) - .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") - .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator") .mode(SaveMode.Append) .save(basePath) @@ -141,36 +131,11 @@ class TestDataSourceForBootstrap { assertEquals(numRecords, hoodieROViewDF1.count()) assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp == $updateTimestamp").count()) - // incrementally pull only changes in the bootstrap commit, which would pull all the initial records written - // during bootstrap - val hoodieIncViewDF1 = spark.read.format("hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") - .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1) - .load(basePath) - - assertEquals(numRecords, hoodieIncViewDF1.count()) - var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect(); - assertEquals(1, countsPerCommit.length) - assertEquals(commitInstantTime1, countsPerCommit(0).get(0)) - - // incrementally pull only changes in the latest commit, which would pull only the updated records in the - // latest commit - val hoodieIncViewDF2 = spark.read.format("hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) - .load(basePath); - - assertEquals(numRecordsUpdate, hoodieIncViewDF2.count()) - countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); - assertEquals(1, countsPerCommit.length) - assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) + verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = false) } @Test def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = { val timestamp = Instant.now.toEpochMilli - val numRecords = 100 - val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03") val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, @@ -184,20 +149,7 @@ class TestDataSourceForBootstrap { .save(srcPath) // Perform bootstrap - val bootstrapDF = spark.emptyDataFrame - bootstrapDF.write - .format("hudi") - .options(commonOpts) - .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") - .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") - .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) - .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator") - .mode(SaveMode.Overwrite) - .save(basePath) - - val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) - assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1) + val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) // Read bootstrapped table and verify count val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") @@ -205,18 +157,14 @@ class TestDataSourceForBootstrap { // Perform upsert val updateTimestamp = Instant.now.toEpochMilli - val numRecordsUpdate = 10 val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava, jsc, spark.sqlContext) updateDF.write .format("hudi") .options(commonOpts) - .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") - .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") // Required because source data is hive style partitioned .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true") @@ -231,49 +179,14 @@ class TestDataSourceForBootstrap { assertEquals(numRecords, hoodieROViewDF2.count()) assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count()) - // incrementally pull only changes in the bootstrap commit, which would pull all the initial records written - // during bootstrap - val hoodieIncViewDF1 = spark.read.format("hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") - .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1) - .load(basePath) - - assertEquals(numRecords, hoodieIncViewDF1.count()) - var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect(); - assertEquals(1, countsPerCommit.length) - assertEquals(commitInstantTime1, countsPerCommit(0).get(0)) - - // incrementally pull only changes in the latest commit, which would pull only the updated records in the - // latest commit - val hoodieIncViewDF2 = spark.read.format("hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) - .load(basePath); - - assertEquals(numRecordsUpdate, hoodieIncViewDF2.count()) - countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); - assertEquals(1, countsPerCommit.length) - assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) - - // pull the latest commit within certain partitions - val hoodieIncViewDF3 = spark.read.format("hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) - .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/datestr=2020-04-02/*") - .load(basePath) - - assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(), - hoodieIncViewDF3.count()) + verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true) } @Test def testMetadataBootstrapCOWPartitioned(): Unit = { val timestamp = Instant.now.toEpochMilli - val numRecords = 100 - val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03") val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) - var sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, + val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, spark.sqlContext) // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence @@ -288,92 +201,56 @@ class TestDataSourceForBootstrap { }) // Perform bootstrap - val bootstrapDF = spark.emptyDataFrame - bootstrapDF.write - .format("hudi") - .options(commonOpts) - .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") - .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") - .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) - .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator") - .mode(SaveMode.Overwrite) - .save(basePath) - - val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) - assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1) + val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) // Read bootstrapped table and verify count val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") assertEquals(numRecords, hoodieROViewDF1.count()) - // Perform upsert - val updateTimestamp = Instant.now.toEpochMilli - val numRecordsUpdate = 10 - var updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava, - jsc, spark.sqlContext) - - updateDF.write + // Perform upsert based on the written bootstrap table + val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) + updateDf1.write .format("hudi") .options(commonOpts) - .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") - .option("hoodie.upsert.shuffle.parallelism", "4") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") - .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") .mode(SaveMode.Append) .save(basePath) - val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + // Read table after upsert and verify the updated value assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) - - // Read table after upsert and verify count val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*") - assertEquals(numRecords, hoodieROViewDF2.count()) - assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count()) - - // incrementally pull only changes in the bootstrap commit, which would pull all the initial records written - // during bootstrap - val hoodieIncViewDF1 = spark.read.format("hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") - .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1) - .load(basePath) + hoodieROViewDF2.collect() + assertEquals(updatedVerificationVal, hoodieROViewDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0)) - assertEquals(numRecords, hoodieIncViewDF1.count()) - var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect(); - assertEquals(1, countsPerCommit.length) - assertEquals(commitInstantTime1, countsPerCommit(0).get(0)) + // Perform upsert based on the source data + val updateTimestamp = Instant.now.toEpochMilli + val updateDF2 = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava, + jsc, spark.sqlContext) - // incrementally pull only changes in the latest commit, which would pull only the updated records in the - // latest commit - val hoodieIncViewDF2 = spark.read.format("hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) - .load(basePath); + updateDF2.write + .format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") + .mode(SaveMode.Append) + .save(basePath) - assertEquals(numRecordsUpdate, hoodieIncViewDF2.count()) - countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); - assertEquals(1, countsPerCommit.length) - assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) + val commitInstantTime3: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) - // pull the latest commit within certain partitions - val hoodieIncViewDF3 = spark.read.format("hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) - .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*") - .load(basePath) + // Read table after upsert and verify count + val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF3.count()) + assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count()) - assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(), - hoodieIncViewDF3.count()) + verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, isPartitioned = true, isHiveStylePartitioned = false) } @Test def testMetadataBootstrapMORPartitionedInlineCompactionOn(): Unit = { val timestamp = Instant.now.toEpochMilli - val numRecords = 100 - val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03") val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, @@ -391,21 +268,7 @@ class TestDataSourceForBootstrap { }) // Perform bootstrap - val bootstrapDF = spark.emptyDataFrame - bootstrapDF.write - .format("hudi") - .options(commonOpts) - .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") - .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") - .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) - .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator") - .mode(SaveMode.Overwrite) - .save(basePath) - - val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) - assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1) + val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) // Read bootstrapped table and verify count val hoodieROViewDF1 = spark.read.format("hudi") @@ -416,18 +279,14 @@ class TestDataSourceForBootstrap { // Perform upsert val updateTimestamp = Instant.now.toEpochMilli - val numRecordsUpdate = 10 val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava, jsc, spark.sqlContext) updateDF.write .format("hudi") .options(commonOpts) - .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") - .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true") .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1") @@ -449,8 +308,6 @@ class TestDataSourceForBootstrap { @Test def testMetadataBootstrapMORPartitioned(): Unit = { val timestamp = Instant.now.toEpochMilli - val numRecords = 100 - val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03") val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, @@ -468,64 +325,64 @@ class TestDataSourceForBootstrap { }) // Perform bootstrap - val bootstrapDF = spark.emptyDataFrame - bootstrapDF.write + val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + + // Read bootstrapped table and verify count + val hoodieROViewDF1 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, + DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF1.count()) + + // Perform upsert based on the written bootstrap table + val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) + updateDf1.write .format("hudi") .options(commonOpts) - .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") - .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") - .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) - .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator") - .mode(SaveMode.Overwrite) + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") + .mode(SaveMode.Append) .save(basePath) - val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) - assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1) - - // Read bootstrapped table and verify count - val hoodieROViewDF1 = spark.read.format("hudi") + // Read table after upsert and verify the value + assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) + val hoodieROViewDF2 = spark.read.format("hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) .load(basePath + "/*") - assertEquals(numRecords, hoodieROViewDF1.count()) + hoodieROViewDF2.collect() + assertEquals(originalVerificationVal, hoodieROViewDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0)) - // Perform upsert + // Perform upsert based on the source data val updateTimestamp = Instant.now.toEpochMilli - val numRecordsUpdate = 10 - val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, + val updateDF2 = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava, jsc, spark.sqlContext) - updateDF.write + updateDF2.write .format("hudi") .options(commonOpts) - .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") - .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") .mode(SaveMode.Append) .save(basePath) - // Expect 1 new commit since meta bootstrap - delta commit (because inline compaction is off) - assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) + // Expect 2 new commit since meta bootstrap - 2 delta commits (because inline compaction is off) + assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) // Read table after upsert and verify count. Since we have inline compaction off the RO view will have // no updated rows. - val hoodieROViewDF2 = spark.read.format("hudi") + val hoodieROViewDF3 = spark.read.format("hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) .load(basePath + "/*") - assertEquals(numRecords, hoodieROViewDF2.count()) - assertEquals(0, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count()) + assertEquals(numRecords, hoodieROViewDF3.count()) + assertEquals(0, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count()) } @Test def testFullBootstrapCOWPartitioned(): Unit = { val timestamp = Instant.now.toEpochMilli - val numRecords = 100 - val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03") val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, @@ -547,11 +404,8 @@ class TestDataSourceForBootstrap { bootstrapDF.write .format("hudi") .options(commonOpts) - .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") - .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName) .option(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR, classOf[FullRecordBootstrapModeSelector].getName) @@ -568,18 +422,14 @@ class TestDataSourceForBootstrap { // Perform upsert val updateTimestamp = Instant.now.toEpochMilli - val numRecordsUpdate = 10 val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava, jsc, spark.sqlContext) updateDF.write .format("hudi") .options(commonOpts) - .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") - .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") .mode(SaveMode.Append) .save(basePath) @@ -592,39 +442,64 @@ class TestDataSourceForBootstrap { assertEquals(numRecords, hoodieROViewDF2.count()) assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count()) + verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = false) + } + + def runMetadataBootstrapAndVerifyCommit(tableType: String): String = { + val bootstrapDF = spark.emptyDataFrame + bootstrapDF.write + .format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType) + .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) + .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName) + .mode(SaveMode.Overwrite) + .save(basePath) + + val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1) + commitInstantTime1 + } + + def verifyIncrementalViewResult(bootstrapCommitInstantTime: String, latestCommitInstantTime: String, + isPartitioned: Boolean, isHiveStylePartitioned: Boolean): Unit = { // incrementally pull only changes in the bootstrap commit, which would pull all the initial records written // during bootstrap val hoodieIncViewDF1 = spark.read.format("hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") - .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1) + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, bootstrapCommitInstantTime) .load(basePath) assertEquals(numRecords, hoodieIncViewDF1.count()) var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect(); assertEquals(1, countsPerCommit.length) - assertEquals(commitInstantTime1, countsPerCommit(0).get(0)) + assertEquals(bootstrapCommitInstantTime, countsPerCommit(0).get(0)) - // incrementally pull only changes in the latest commit, which would pull only the updated records in the - // latest commit + // incrementally pull only changes after bootstrap commit, which would pull only the updated records in the + // later commits val hoodieIncViewDF2 = spark.read.format("hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, bootstrapCommitInstantTime) .load(basePath); assertEquals(numRecordsUpdate, hoodieIncViewDF2.count()) countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); assertEquals(1, countsPerCommit.length) - assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) - - // pull the latest commit within certain partitions - val hoodieIncViewDF3 = spark.read.format("hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) - .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*") - .load(basePath) - - assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(), - hoodieIncViewDF3.count()) + assertEquals(latestCommitInstantTime, countsPerCommit(0).get(0)) + + if (isPartitioned) { + val relativePartitionPath = if (isHiveStylePartitioned) "/datestr=2020-04-02/*" else "/2020-04-02/*" + // pull the update commits within certain partitions + val hoodieIncViewDF3 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, bootstrapCommitInstantTime) + .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, relativePartitionPath) + .load(basePath) + + assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(), + hoodieIncViewDF3.count()) + } } }