Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.hudi

import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
Expand All @@ -33,6 +33,8 @@ import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}

import scala.collection.JavaConverters._

/**
* Hoodie Spark Datasource, for reading and writing hoodie tables
*
Expand Down Expand Up @@ -119,12 +121,14 @@ class DefaultSource extends RelationProvider
optParams: Map[String, String],
df: DataFrame): BaseRelation = {
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)

if (parameters(OPERATION_OPT_KEY).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, df)
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, dfWithoutMetaCols)
} else {
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, dfWithoutMetaCols)
}
new HoodieEmptyRelation(sqlContext, df.schema)
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
}

override def createSink(sqlContext: SQLContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.{DataTypes, DateType, IntegerType, StringType, StructField, StructType, TimestampType}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
Expand All @@ -52,6 +52,9 @@ class TestCOWDataSource extends HoodieClientTestBase {
HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
)

val verificationCol: String = "driver"
val updatedVerificationVal: String = "driver_update"

@BeforeEach override def setUp() {
initPath()
initSparkContexts()
Expand Down Expand Up @@ -96,23 +99,36 @@ class TestCOWDataSource extends HoodieClientTestBase {
val snapshotDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")
assertEquals(100, snapshotDF1.count())

// Upsert based on the written table with Hudi metadata columns
val verificationRowKey = snapshotDF1.limit(1).select("_row_key").first.getString(0)
val updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))

updateDf.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Append)
.save(basePath)

val snapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*/*/*")
assertEquals(100, snapshotDF2.count())
assertEquals(updatedVerificationVal, snapshotDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))

// Upsert Operation without Hudi metadata columns
val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()

// Upsert Operation
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Append)
.save(basePath)

val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size())
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size())

// Snapshot Query
val snapshotDF2 = spark.read.format("org.apache.hudi")
val snapshotDF3 = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
assertEquals(100, snapshotDF2.count()) // still 100, since we only updated
assertEquals(100, snapshotDF3.count()) // still 100, since we only updated

// Read Incremental Query
// we have 2 commits, try pulling the first commit (which is not the latest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class TestMORDataSource extends HoodieClientTestBase {
HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
)

val verificationCol: String = "driver"
val updatedVerificationVal: String = "driver_update"

@BeforeEach override def setUp() {
initPath()
initSparkContexts()
Expand Down Expand Up @@ -86,7 +89,7 @@ class TestMORDataSource extends HoodieClientTestBase {
val insertCommitTimes = hudiRODF1.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
assertEquals(List(insertCommitTime), insertCommitTimes)

// Upsert operation
// Upsert operation without Hudi metadata columns
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
Expand All @@ -101,6 +104,19 @@ class TestMORDataSource extends HoodieClientTestBase {
.load(basePath + "/*/*/*/*")
val updateCommitTimes = hudiSnapshotDF2.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
assertEquals(List(updateCommitTime), updateCommitTimes)

// Upsert based on the written table with Hudi metadata columns
val verificationRowKey = hudiSnapshotDF2.limit(1).select("_row_key").first.getString(0)
val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))

inputDF3.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Append)
.save(basePath)

val hudiSnapshotDF3 = spark.read.format("hudi").load(basePath + "/*/*/*/*")
assertEquals(100, hudiSnapshotDF3.count())
assertEquals(updatedVerificationVal, hudiSnapshotDF3.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))
}

@Test def testCount() {
Expand Down