diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 85c64f826541e..8981d1447b12f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -18,11 +18,13 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.model.{FileSlice, HoodieTableType} import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestTable} -import org.apache.hudi.config.{HoodieClusteringConfig, HoodieStorageConfig, HoodieWriteConfig} +import org.apache.hudi.common.util.CollectionUtils +import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieStorageConfig, HoodieWriteConfig} import org.apache.hudi.exception.TableNotFoundException import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} @@ -31,7 +33,9 @@ import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{OutputMode, Trigger} import org.apache.spark.sql.types.StructType import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{EnumSource, ValueSource} import scala.collection.JavaConversions._ import scala.concurrent.ExecutionContext.Implicits.global @@ -99,8 +103,30 @@ class TestStructuredStreaming extends HoodieClientTestBase { (sourcePath, destPath) } - @Test - def testStructuredStreaming(): Unit = { + def getOptsWithTableType(tableType: HoodieTableType): Map[String, String] = { + commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name()) + } + + def getClusteringOpts(tableType: HoodieTableType, isInlineClustering: String, + isAsyncClustering: String, clusteringNumCommit: String, + fileMaxRecordNum: Int): Map[String, String] = { + getOptsWithTableType(tableType) + ( + HoodieClusteringConfig.INLINE_CLUSTERING.key -> isInlineClustering, + HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit, + DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering, + HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit, + HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key -> dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString + ) + } + + def getCompactionOpts(tableType: HoodieTableType, isAsyncCompaction: Boolean): Map[String, String] = { + getOptsWithTableType(tableType) + ( + DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> isAsyncCompaction.toString, + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "1" + ) + } + + def structuredStreamingTestRunner(tableType: HoodieTableType, addCompactionConfigs: Boolean, isAsyncCompaction: Boolean): Unit = { val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") // First chunk of data val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList @@ -111,7 +137,12 @@ class TestStructuredStreaming extends HoodieClientTestBase { val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() - val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, commonOpts) + val hudiOptions = if (addCompactionConfigs) { + getCompactionOpts(tableType, isAsyncCompaction) + } else { + getOptsWithTableType(tableType) + } + val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, hudiOptions) val f2 = Future { inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) @@ -125,16 +156,23 @@ class TestStructuredStreaming extends HoodieClientTestBase { assert(hoodieROViewDF1.count() == 100) inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) - // wait for spark streaming to process second microbatch - waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5) - val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath) - assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) + // When the compaction configs are added, one more commit of the compaction is expected + val numExpectedCommits = if (addCompactionConfigs) currNumCommits + 2 else currNumCommits + 1 + waitTillAtleastNCommits(fs, destPath, numExpectedCommits, 120, 5) + + val commitInstantTime2 = if (tableType == HoodieTableType.MERGE_ON_READ) { + // For the records that are processed by the compaction in MOR table + // the "_hoodie_commit_time" still reflects the latest delta commit + latestInstant(fs, destPath, HoodieTimeline.DELTA_COMMIT_ACTION) + } else { + HoodieDataSourceHelpers.latestCommit(fs, destPath) + } + assertEquals(numExpectedCommits, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) // Read RO View val hoodieROViewDF2 = spark.read.format("org.apache.hudi") .load(destPath + "/*/*/*/*") assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated - // Read Incremental View // we have 2 commits, try pulling the first commit (which is not the latest) val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").get(0) @@ -163,6 +201,12 @@ class TestStructuredStreaming extends HoodieClientTestBase { Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf) } + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testStructuredStreaming(tableType: HoodieTableType): Unit = { + structuredStreamingTestRunner(tableType, false, false) + } + @throws[InterruptedException] private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String, numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int) = { @@ -178,8 +222,6 @@ class TestStructuredStreaming extends HoodieClientTestBase { numInstants = timeline.countInstants success = true } - val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath) - .setLoadActiveTimelineOnLoad(true).build() } catch { case te: TableNotFoundException => log.info("Got table not found exception. Retrying") @@ -193,61 +235,30 @@ class TestStructuredStreaming extends HoodieClientTestBase { numInstants } - def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String, isAsyncCompaction: String, - clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = { - commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING.key -> isInlineClustering, - HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit, - DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering, - DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> isAsyncCompaction, - HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit, - HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key -> dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString - ) - } - - @Test - def testStructuredStreamingWithInlineClustering(): Unit = { + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testStructuredStreamingWithClustering(isAsyncClustering: Boolean): Unit = { val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") - def checkClusteringResult(destPath: String):Unit = { + def checkClusteringResult(destPath: String): Unit = { // check have schedule clustering and clustering file group to one waitTillHasCompletedReplaceInstant(destPath, 120, 1) metaClient.reloadActiveTimeline() assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) } - structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, false, false, - HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) - } - - @Test - def testStructuredStreamingWithAsyncClustering(): Unit = { - val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") - def checkClusteringResult(destPath: String):Unit = { - // check have schedule clustering and clustering file group to one - waitTillHasCompletedReplaceInstant(destPath, 120, 1) - metaClient.reloadActiveTimeline() - assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) - } - structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, true, false, - HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) + structuredStreamingForTestClusteringRunner(sourcePath, destPath, HoodieTableType.COPY_ON_WRITE, + !isAsyncClustering, isAsyncClustering, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) } - @Test - def testStructuredStreamingWithAsyncClusteringAndCompaction(): Unit = { - val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") - - def checkClusteringResult(destPath: String):Unit = { - // check have schedule clustering and clustering file group to one - waitTillHasCompletedReplaceInstant(destPath, 120, 1) - metaClient.reloadActiveTimeline() - assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) - } - structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, true, true, - HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testStructuredStreamingWithCompaction(isAsyncCompaction: Boolean): Unit = { + structuredStreamingTestRunner(HoodieTableType.MERGE_ON_READ, true, isAsyncCompaction) } - def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, isInlineClustering: Boolean, - isAsyncClustering: Boolean, isAsyncCompaction: Boolean, + def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, tableType: HoodieTableType, + isInlineClustering: Boolean, isAsyncClustering: Boolean, partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = { // First insert of data val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, partitionOfRecords)).toList @@ -257,8 +268,8 @@ class TestStructuredStreaming extends HoodieClientTestBase { val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 100, partitionOfRecords)).toList val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) - val hudiOptions = getClusteringOpts(isInlineClustering.toString, isAsyncClustering.toString, - isAsyncCompaction.toString, "2", 100) + val hudiOptions = getClusteringOpts( + tableType, isInlineClustering.toString, isAsyncClustering.toString, "2", 100) val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, hudiOptions) val f2 = Future { @@ -270,28 +281,24 @@ class TestStructuredStreaming extends HoodieClientTestBase { inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) // wait for spark streaming to process second microbatch currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5) - // for inline clustering, clustering may be complete along with 2nd commit - if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, destPath).getCompletedReplaceTimeline.countInstants() > 0) { - assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) - // check have at least one file group - this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath) - .setLoadActiveTimelineOnLoad(true).build() - assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0) - } else { - assertEquals(currNumCommits, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) - // check have more than one file group - this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath) - .setLoadActiveTimelineOnLoad(true).build() - assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1) - } - // check clustering result + // Wait for the clustering to finish + this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath) + .setLoadActiveTimelineOnLoad(true).build() checkClusteringResult(destPath) - // check data correct after clustering + assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) + // Check have at least one file group + assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0) + + // Validate data after clustering val hoodieROViewDF2 = spark.read.format("org.apache.hudi") .load(destPath + "/*/*/*/*") assertEquals(200, hoodieROViewDF2.count()) + val countsPerCommit = hoodieROViewDF2.groupBy("_hoodie_commit_time").count().collect() + assertEquals(2, countsPerCommit.length) + val commitInstantTime2 = latestInstant(fs, destPath, HoodieTimeline.COMMIT_ACTION) + assertEquals(commitInstantTime2, countsPerCommit.maxBy(row => row.getAs[String](0)).get(0)) } Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf) } @@ -327,4 +334,13 @@ class TestStructuredStreaming extends HoodieClientTestBase { if (!success) throw new IllegalStateException("Timed-out waiting for completing replace instant appear in " + tablePath) } + private def latestInstant(fs: FileSystem, basePath: String, instantAction: String): String = { + val metaClient = HoodieTableMetaClient.builder + .setConf(fs.getConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build + metaClient.getActiveTimeline + .getTimelineOfActions(CollectionUtils.createSet(instantAction)) + .filterCompletedInstants + .lastInstant + .get.getTimestamp + } }