diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 4a7837816976b..0e322e2ecc801 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -19,7 +19,7 @@ package org.apache.hudi import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceReadOptions._ -import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.HoodieTableMetaClient @@ -33,6 +33,8 @@ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} +import scala.collection.JavaConverters._ + /** * Hoodie Spark Datasource, for reading and writing hoodie tables * @@ -119,12 +121,14 @@ class DefaultSource extends RelationProvider optParams: Map[String, String], df: DataFrame): BaseRelation = { val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams) + val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*) + if (parameters(OPERATION_OPT_KEY).equals(BOOTSTRAP_OPERATION_OPT_VAL)) { - HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, df) + HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, dfWithoutMetaCols) } else { - HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df) + HoodieSparkSqlWriter.write(sqlContext, mode, parameters, dfWithoutMetaCols) } - new HoodieEmptyRelation(sqlContext, df.schema) + new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema) } override def createSink(sqlContext: SQLContext, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 3db89487ffb75..dd83bf836433a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -30,7 +30,7 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.spark.sql._ -import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.types.{DataTypes, DateType, IntegerType, StringType, StructField, StructType, TimestampType} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -52,6 +52,9 @@ class TestCOWDataSource extends HoodieClientTestBase { HoodieWriteConfig.TABLE_NAME -> "hoodie_test" ) + val verificationCol: String = "driver" + val updatedVerificationVal: String = "driver_update" + @BeforeEach override def setUp() { initPath() initSparkContexts() @@ -96,23 +99,36 @@ class TestCOWDataSource extends HoodieClientTestBase { val snapshotDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*") assertEquals(100, snapshotDF1.count()) + // Upsert based on the written table with Hudi metadata columns + val verificationRowKey = snapshotDF1.limit(1).select("_row_key").first.getString(0) + val updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) + + updateDf.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + + val snapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*/*/*") + assertEquals(100, snapshotDF2.count()) + assertEquals(updatedVerificationVal, snapshotDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0)) + + // Upsert Operation without Hudi metadata columns val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() - // Upsert Operation inputDF2.write.format("org.apache.hudi") .options(commonOpts) .mode(SaveMode.Append) .save(basePath) val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath) - assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size()) + assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size()) // Snapshot Query - val snapshotDF2 = spark.read.format("org.apache.hudi") + val snapshotDF3 = spark.read.format("org.apache.hudi") .load(basePath + "/*/*/*/*") - assertEquals(100, snapshotDF2.count()) // still 100, since we only updated + assertEquals(100, snapshotDF3.count()) // still 100, since we only updated // Read Incremental Query // we have 2 commits, try pulling the first commit (which is not the latest) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 5938ee54e110b..7c73669a06cd7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -47,6 +47,9 @@ class TestMORDataSource extends HoodieClientTestBase { HoodieWriteConfig.TABLE_NAME -> "hoodie_test" ) + val verificationCol: String = "driver" + val updatedVerificationVal: String = "driver_update" + @BeforeEach override def setUp() { initPath() initSparkContexts() @@ -86,7 +89,7 @@ class TestMORDataSource extends HoodieClientTestBase { val insertCommitTimes = hudiRODF1.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList assertEquals(List(insertCommitTime), insertCommitTimes) - // Upsert operation + // Upsert operation without Hudi metadata columns val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") @@ -101,6 +104,19 @@ class TestMORDataSource extends HoodieClientTestBase { .load(basePath + "/*/*/*/*") val updateCommitTimes = hudiSnapshotDF2.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList assertEquals(List(updateCommitTime), updateCommitTimes) + + // Upsert based on the written table with Hudi metadata columns + val verificationRowKey = hudiSnapshotDF2.limit(1).select("_row_key").first.getString(0) + val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) + + inputDF3.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + + val hudiSnapshotDF3 = spark.read.format("hudi").load(basePath + "/*/*/*/*") + assertEquals(100, hudiSnapshotDF3.count()) + assertEquals(updatedVerificationVal, hudiSnapshotDF3.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0)) } @Test def testCount() {