diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala index 9524adbd5efd5..af2bc69804742 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala @@ -21,6 +21,8 @@ package org.apache.hudi.functional import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.HoodieWriteConfig @@ -28,11 +30,14 @@ import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, lit} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.Tag +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.api.{Tag, Test} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.{Arguments, CsvSource, ValueSource} +import java.util +import java.util.Arrays +import java.util.stream.Stream import scala.collection.JavaConversions._ @@ -170,4 +175,79 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { .load(basePath) assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled } + + @ParameterizedTest + @ValueSource(strings = Array("insert_overwrite", "delete_partition")) + def testArchivalWithReplaceCommitActions(writeOperation: String): Unit = { + + val dataGen = new HoodieTestDataGenerator() + // use this to generate records only for certain partitions. + val dataGenPartition1 = new HoodieTestDataGenerator(Array[String](HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)) + val dataGenPartition2 = new HoodieTestDataGenerator(Array[String](HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)) + + // do one bulk insert to all partitions + val records = recordsToStrings(dataGen.generateInserts("%05d".format(1), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + val partition1RecordCount = inputDF.filter(row => row.getAs("partition_path") + .equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).count() + inputDF.write.format("hudi") + .options(commonOpts) + .option("hoodie.keep.min.commits", "2") + .option("hoodie.keep.max.commits", "3") + .option("hoodie.cleaner.commits.retained", "1") + .option("hoodie.metadata.enable","false") + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + assertRecordCount(basePath, 100) + + // issue delete partition to partition1 + writeRecords(2, dataGenPartition1, writeOperation, basePath) + + val expectedRecCount = if (writeOperation.equals(DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL)) + { + 200 - partition1RecordCount + } else { + 100 - partition1RecordCount + } + assertRecordCount(basePath, expectedRecCount) + + // add more data to partition2. + for (i <- 3 to 7) { + writeRecords(i, dataGenPartition2, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, basePath) + } + + assertRecordCount(basePath, expectedRecCount + 500) + val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath) + .setLoadActiveTimelineOnLoad(true).build() + val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray + .map(instant => instant.asInstanceOf[HoodieInstant].getAction) + // assert replace commit is archived and not part of active timeline. + assertFalse(commits.contains(HoodieTimeline.REPLACE_COMMIT_ACTION)) + // assert that archival timeline has replace commit actions. + val archivedTimeline = metaClient.getArchivedTimeline(); + assertTrue(archivedTimeline.getInstants.toArray.map(instant => instant.asInstanceOf[HoodieInstant].getAction) + .filter(action => action.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).size > 0) + } + + def writeRecords(commitTime: Int, dataGen: HoodieTestDataGenerator, writeOperation: String, basePath: String): Unit = { + val records = recordsToStrings(dataGen.generateInserts("%05d".format(commitTime), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + inputDF.write.format("hudi") + .options(commonOpts) + .option("hoodie.keep.min.commits", "2") + .option("hoodie.keep.max.commits", "3") + .option("hoodie.cleaner.commits.retained", "1") + .option("hoodie.metadata.enable","false") + .option(DataSourceWriteOptions.OPERATION.key, writeOperation) + .mode(SaveMode.Append) + .save(basePath) + } + + def assertRecordCount(basePath: String, expectedRecordCount: Long) : Unit = { + val snapshotDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*/*") + assertEquals(expectedRecordCount, snapshotDF.count()) + } }