diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite2.scala
deleted file mode 100644
index e64f96ff8d0f4..0000000000000
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite2.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
-
-import org.apache.spark.sql.hudi.command.SqlKeyGenerator
-
-import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.Test
-
-class HoodieSparkSqlWriterSuite2 {
-
- @Test
- def testGetOriginKeyGenerator(): Unit = {
- // for dataframe write
- val m1 = Map(
- HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[ComplexKeyGenerator].getName
- )
- val kg1 = HoodieWriterUtils.getOriginKeyGenerator(m1)
- assertTrue(kg1 == classOf[ComplexKeyGenerator].getName)
-
- // for sql write
- val m2 = Map(
- HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getName,
- SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName
- )
- val kg2 = HoodieWriterUtils.getOriginKeyGenerator(m2)
- assertTrue(kg2 == classOf[SimpleKeyGenerator].getName)
- }
-}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala
index 2caf4cc20eaa4..985bf2e9408bd 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala
@@ -18,17 +18,15 @@
package org.apache.hudi
-import java.sql.{Date, Timestamp}
-
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.testutils.HoodieClientTestBase
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.spark.sql.DataFrame
+import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.{BeforeEach, Test}
+import java.sql.{Date, Timestamp}
-class TestGenericRecordAndRowConsistency extends HoodieClientTestBase {
+class TestGenericRecordAndRowConsistency extends SparkClientFunctionalTestHarness {
- var spark: SparkSession = _
val commonOpts = Map(
HoodieWriteConfig.TBL_NAME.key -> "hoodie_type_consistency_tbl",
"hoodie.insert.shuffle.parallelism" -> "1",
@@ -40,16 +38,6 @@ class TestGenericRecordAndRowConsistency extends HoodieClientTestBase {
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.ComplexKeyGenerator"
)
- /**
- * Setup method running before each test.
- */
- @BeforeEach override def setUp() {
- setTableName("hoodie_type_consistency_tbl")
- initPath()
- initSparkContexts()
- spark = sqlContext.sparkSession
- }
-
@Test
def testTimestampTypeConsistency(): Unit = {
val _spark = spark
@@ -60,7 +48,7 @@ class TestGenericRecordAndRowConsistency extends HoodieClientTestBase {
(1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"),
(2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"),
(2, Timestamp.valueOf("2016-05-09 10:12:43"), "def")
- ).toDF("typeId","eventTime", "str")
+ ).toDF("typeId", "eventTime", "str")
testConsistencyBetweenGenericRecordAndRow(df)
}
@@ -75,7 +63,7 @@ class TestGenericRecordAndRowConsistency extends HoodieClientTestBase {
(1, Date.valueOf("2014-11-30"), "abc"),
(2, Date.valueOf("2016-12-29"), "def"),
(2, Date.valueOf("2016-05-09"), "def")
- ).toDF("typeId","eventTime", "str")
+ ).toDF("typeId", "eventTime", "str")
testConsistencyBetweenGenericRecordAndRow(df)
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
similarity index 94%
rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index ad372c4620642..fbdfb699bb89f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -19,10 +19,10 @@ package org.apache.hudi
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
-import org.apache.hudi.DataSourceWriteOptions.{INSERT_DROP_DUPS, INSERT_OPERATION_OPT_VAL, KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, OPERATION, PAYLOAD_CLASS_NAME, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.HoodieConfig
-import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload, HoodieTableType, WriteOperationType}
+import org.apache.hudi.common.model._
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
@@ -35,10 +35,12 @@ import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{expr, lit}
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.hudi.command.SqlKeyGenerator
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail}
-import org.junit.jupiter.api.{AfterEach, Assertions, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
import org.mockito.ArgumentMatchers.any
@@ -47,15 +49,13 @@ import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, inte
import java.time.Instant
import java.util.{Collections, Date, UUID}
-
import scala.collection.JavaConversions._
import scala.collection.JavaConverters
-import scala.util.control.NonFatal
/**
* Test suite for SparkSqlWriter class.
*/
-class HoodieSparkSqlWriterSuite {
+class TestHoodieSparkSqlWriter {
var spark: SparkSession = _
var sqlContext: SQLContext = _
var sc: SparkContext = _
@@ -70,7 +70,7 @@ class HoodieSparkSqlWriterSuite {
* Setup method running before each test.
*/
@BeforeEach
- def setUp() {
+ def setUp(): Unit = {
initSparkContext()
tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path")
tempBootStrapPath = java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
@@ -95,6 +95,7 @@ class HoodieSparkSqlWriterSuite {
spark = SparkSession.builder()
.appName(hoodieFooTableName)
.master("local[2]")
+ .withExtensions(new HoodieSparkSessionExtension)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
sc = spark.sparkContext
@@ -250,12 +251,14 @@ class HoodieSparkSqlWriterSuite {
"hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4")
val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableModifier, dataFrame2))
- assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
+ assert(tableAlreadyExistException.getMessage.contains("Config conflict"))
+ assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))
//on same path try append with delete operation and different("hoodie_bar_tbl") table name which should throw an exception
val deleteTableModifier = barTableModifier ++ Map(OPERATION.key -> "delete")
val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableModifier, dataFrame2))
- assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
+ assert(tableAlreadyExistException.getMessage.contains("Config conflict"))
+ assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))
}
/**
@@ -266,7 +269,7 @@ class HoodieSparkSqlWriterSuite {
@ParameterizedTest
@EnumSource(value = classOf[BulkInsertSortMode])
def testBulkInsertForSortMode(sortMode: BulkInsertSortMode): Unit = {
- testBulkInsertWithSortMode(sortMode, true)
+ testBulkInsertWithSortMode(sortMode, populateMetaFields = true)
}
/**
@@ -287,12 +290,13 @@ class HoodieSparkSqlWriterSuite {
@Test
def testDisableAndEnableMetaFields(): Unit = {
try {
- testBulkInsertWithSortMode(BulkInsertSortMode.NONE, false)
+ testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields = false)
//create a new table
val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name())
+ .updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true")
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -302,9 +306,10 @@ class HoodieSparkSqlWriterSuite {
try {
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df)
- Assertions.fail("Should have thrown exception")
+ fail("Should have thrown exception")
} catch {
- case e: HoodieException => assertTrue(e.getMessage.contains("hoodie.populate.meta.fields already disabled for the table. Can't be re-enabled back"))
+ case e: HoodieException => assertTrue(e.getMessage.startsWith("Config conflict"))
+ case e: Exception => fail(e);
}
}
}
@@ -439,7 +444,7 @@ class HoodieSparkSqlWriterSuite {
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
- initializeMetaClientForBootstrap(fooTableParams, tableType, false)
+ initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false)
val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
@@ -496,7 +501,7 @@ class HoodieSparkSqlWriterSuite {
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getCanonicalName)
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
- initializeMetaClientForBootstrap(fooTableParams, tableType, true)
+ initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true)
val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc),
@@ -526,7 +531,8 @@ class HoodieSparkSqlWriterSuite {
.setTableType(tableType)
.setTableName(hoodieFooTableName)
.setRecordKeyFields(fooTableParams(DataSourceWriteOptions.RECORDKEY_FIELD.key))
- .setBaseFileFormat(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name())
+ .setBaseFileFormat(fooTableParams.getOrElse(HoodieWriteConfig.BASE_FILE_FORMAT.key,
+ HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name))
.setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue())
.setPayloadClassName(fooTableParams(PAYLOAD_CLASS_NAME.key))
.setPreCombineField(fooTableParams(PRECOMBINE_FIELD.key))
@@ -873,18 +879,15 @@ class HoodieSparkSqlWriterSuite {
val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
// raise exception when use params which is not same with HoodieTableConfig
- try {
+ val configConflictException = intercept[HoodieException] {
df2.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName2)
.option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
.mode(SaveMode.Append).save(tablePath2)
- } catch {
- case NonFatal(e) =>
- assert(e.getMessage.contains("Config conflict"))
- assert(e.getMessage.contains(
- s"${HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key}\t${classOf[ComplexKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}"))
}
+ assert(configConflictException.getMessage.contains("Config conflict"))
+ assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[ComplexKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}"))
// do not need to specify hiveStylePartitioning/urlEncodePartitioning/KeyGenerator params
df2.write.format("hudi")
@@ -893,6 +896,24 @@ class HoodieSparkSqlWriterSuite {
.mode(SaveMode.Append).save(tablePath2)
val data = spark.read.format("hudi").load(tablePath2 + "/*")
assert(data.count() == 2)
- assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "dt=2021-10-16")
+ assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "2021-10-16")
+ }
+
+ @Test
+ def testGetOriginKeyGenerator(): Unit = {
+ // for dataframe write
+ val m1 = Map(
+ HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[ComplexKeyGenerator].getName
+ )
+ val kg1 = HoodieWriterUtils.getOriginKeyGenerator(m1)
+ assertTrue(kg1 == classOf[ComplexKeyGenerator].getName)
+
+ // for sql write
+ val m2 = Map(
+ HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getName,
+ SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName
+ )
+ val kg2 = HoodieWriterUtils.getOriginKeyGenerator(m2)
+ assertTrue(kg2 == classOf[SimpleKeyGenerator].getName)
}
}
diff --git a/style/scalastyle.xml b/style/scalastyle.xml
index 89306f36e1c36..2ba4042be0ca4 100644
--- a/style/scalastyle.xml
+++ b/style/scalastyle.xml
@@ -27,7 +27,7 @@
-
+