Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.hudi.functional

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestTable}
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.common.util.CollectionUtils
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
Expand All @@ -31,7 +33,9 @@ import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types.StructType
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{EnumSource, ValueSource}

import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -99,8 +103,30 @@ class TestStructuredStreaming extends HoodieClientTestBase {
(sourcePath, destPath)
}

@Test
def testStructuredStreaming(): Unit = {
def getOptsWithTableType(tableType: HoodieTableType): Map[String, String] = {
commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name())
}

def getClusteringOpts(tableType: HoodieTableType, isInlineClustering: String,
isAsyncClustering: String, clusteringNumCommit: String,
fileMaxRecordNum: Int): Map[String, String] = {
getOptsWithTableType(tableType) + (
HoodieClusteringConfig.INLINE_CLUSTERING.key -> isInlineClustering,
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,
DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,
HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key -> dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
)
}

def getCompactionOpts(tableType: HoodieTableType, isAsyncCompaction: Boolean): Map[String, String] = {
getOptsWithTableType(tableType) + (
DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> isAsyncCompaction.toString,
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "1"
)
}

def structuredStreamingTestRunner(tableType: HoodieTableType, addCompactionConfigs: Boolean, isAsyncCompaction: Boolean): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
// First chunk of data
val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
Expand All @@ -111,7 +137,12 @@ class TestStructuredStreaming extends HoodieClientTestBase {
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()

val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, commonOpts)
val hudiOptions = if (addCompactionConfigs) {
getCompactionOpts(tableType, isAsyncCompaction)
} else {
getOptsWithTableType(tableType)
}
val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, hudiOptions)

val f2 = Future {
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
Expand All @@ -125,16 +156,23 @@ class TestStructuredStreaming extends HoodieClientTestBase {
assert(hoodieROViewDF1.count() == 100)

inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process second microbatch
waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath)
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// When the compaction configs are added, one more commit of the compaction is expected
val numExpectedCommits = if (addCompactionConfigs) currNumCommits + 2 else currNumCommits + 1
waitTillAtleastNCommits(fs, destPath, numExpectedCommits, 120, 5)

val commitInstantTime2 = if (tableType == HoodieTableType.MERGE_ON_READ) {
// For the records that are processed by the compaction in MOR table
// the "_hoodie_commit_time" still reflects the latest delta commit
latestInstant(fs, destPath, HoodieTimeline.DELTA_COMMIT_ACTION)
} else {
HoodieDataSourceHelpers.latestCommit(fs, destPath)
}
assertEquals(numExpectedCommits, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// Read RO View
val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
.load(destPath + "/*/*/*/*")
assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated


// Read Incremental View
// we have 2 commits, try pulling the first commit (which is not the latest)
val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").get(0)
Expand Down Expand Up @@ -163,6 +201,12 @@ class TestStructuredStreaming extends HoodieClientTestBase {
Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
}

@ParameterizedTest
@EnumSource(value = classOf[HoodieTableType])
def testStructuredStreaming(tableType: HoodieTableType): Unit = {
structuredStreamingTestRunner(tableType, false, false)
}

@throws[InterruptedException]
private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String,
numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int) = {
Expand All @@ -178,8 +222,6 @@ class TestStructuredStreaming extends HoodieClientTestBase {
numInstants = timeline.countInstants
success = true
}
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath)
.setLoadActiveTimelineOnLoad(true).build()
} catch {
case te: TableNotFoundException =>
log.info("Got table not found exception. Retrying")
Expand All @@ -193,61 +235,30 @@ class TestStructuredStreaming extends HoodieClientTestBase {
numInstants
}

def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String, isAsyncCompaction: String,
clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = {
commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING.key -> isInlineClustering,
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,
DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> isAsyncCompaction,
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,
HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key -> dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
)
}

@Test
def testStructuredStreamingWithInlineClustering(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStructuredStreamingWithClustering(isAsyncClustering: Boolean): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")

def checkClusteringResult(destPath: String):Unit = {
def checkClusteringResult(destPath: String): Unit = {
// check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline()
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, false, false,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
}

@Test
def testStructuredStreamingWithAsyncClustering(): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")

def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline()
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, true, false,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
structuredStreamingForTestClusteringRunner(sourcePath, destPath, HoodieTableType.COPY_ON_WRITE,
!isAsyncClustering, isAsyncClustering, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
}

@Test
def testStructuredStreamingWithAsyncClusteringAndCompaction(): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not trigger compaction as COW table is written.

val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")

def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline()
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, true, true,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStructuredStreamingWithCompaction(isAsyncCompaction: Boolean): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, compaction is tested on MOR, and clustering is tested on COW.

structuredStreamingTestRunner(HoodieTableType.MERGE_ON_READ, true, isAsyncCompaction)
}

def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, isInlineClustering: Boolean,
isAsyncClustering: Boolean, isAsyncCompaction: Boolean,
def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, tableType: HoodieTableType,
isInlineClustering: Boolean, isAsyncClustering: Boolean,
partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = {
// First insert of data
val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, partitionOfRecords)).toList
Expand All @@ -257,8 +268,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 100, partitionOfRecords)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))

val hudiOptions = getClusteringOpts(isInlineClustering.toString, isAsyncClustering.toString,
isAsyncCompaction.toString, "2", 100)
val hudiOptions = getClusteringOpts(
tableType, isInlineClustering.toString, isAsyncClustering.toString, "2", 100)
val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, hudiOptions)

val f2 = Future {
Expand All @@ -270,28 +281,24 @@ class TestStructuredStreaming extends HoodieClientTestBase {
inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process second microbatch
currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
// for inline clustering, clustering may be complete along with 2nd commit
if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, destPath).getCompletedReplaceTimeline.countInstants() > 0) {
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// check have at least one file group
this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
.setLoadActiveTimelineOnLoad(true).build()
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
} else {
assertEquals(currNumCommits, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// check have more than one file group
this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
.setLoadActiveTimelineOnLoad(true).build()
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1)
}

// check clustering result
// Wait for the clustering to finish
this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
.setLoadActiveTimelineOnLoad(true).build()
checkClusteringResult(destPath)

// check data correct after clustering
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// Check have at least one file group
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)

// Validate data after clustering
val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
.load(destPath + "/*/*/*/*")
assertEquals(200, hoodieROViewDF2.count())
val countsPerCommit = hoodieROViewDF2.groupBy("_hoodie_commit_time").count().collect()
assertEquals(2, countsPerCommit.length)
val commitInstantTime2 = latestInstant(fs, destPath, HoodieTimeline.COMMIT_ACTION)
assertEquals(commitInstantTime2, countsPerCommit.maxBy(row => row.getAs[String](0)).get(0))
}
Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
}
Expand Down Expand Up @@ -327,4 +334,13 @@ class TestStructuredStreaming extends HoodieClientTestBase {
if (!success) throw new IllegalStateException("Timed-out waiting for completing replace instant appear in " + tablePath)
}

private def latestInstant(fs: FileSystem, basePath: String, instantAction: String): String = {
val metaClient = HoodieTableMetaClient.builder
.setConf(fs.getConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build
metaClient.getActiveTimeline
.getTimelineOfActions(CollectionUtils.createSet(instantAction))
.filterCompletedInstants
.lastInstant
.get.getTimestamp
}
}