Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@

package org.apache.hudi

import java.sql.{Date, Timestamp}

import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.spark.sql.DataFrame
import org.junit.jupiter.api.Test

import org.junit.jupiter.api.{BeforeEach, Test}
import java.sql.{Date, Timestamp}

class TestGenericRecordAndRowConsistency extends HoodieClientTestBase {
class TestGenericRecordAndRowConsistency extends SparkClientFunctionalTestHarness {

var spark: SparkSession = _
val commonOpts = Map(
HoodieWriteConfig.TBL_NAME.key -> "hoodie_type_consistency_tbl",
"hoodie.insert.shuffle.parallelism" -> "1",
Expand All @@ -40,16 +38,6 @@ class TestGenericRecordAndRowConsistency extends HoodieClientTestBase {
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.ComplexKeyGenerator"
)

/**
* Setup method running before each test.
*/
@BeforeEach override def setUp() {
setTableName("hoodie_type_consistency_tbl")
initPath()
initSparkContexts()
spark = sqlContext.sparkSession
}

@Test
def testTimestampTypeConsistency(): Unit = {
val _spark = spark
Expand All @@ -60,7 +48,7 @@ class TestGenericRecordAndRowConsistency extends HoodieClientTestBase {
(1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"),
(2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"),
(2, Timestamp.valueOf("2016-05-09 10:12:43"), "def")
).toDF("typeId","eventTime", "str")
).toDF("typeId", "eventTime", "str")

testConsistencyBetweenGenericRecordAndRow(df)
}
Expand All @@ -75,7 +63,7 @@ class TestGenericRecordAndRowConsistency extends HoodieClientTestBase {
(1, Date.valueOf("2014-11-30"), "abc"),
(2, Date.valueOf("2016-12-29"), "def"),
(2, Date.valueOf("2016-05-09"), "def")
).toDF("typeId","eventTime", "str")
).toDF("typeId", "eventTime", "str")

testConsistencyBetweenGenericRecordAndRow(df)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.hudi

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions.{INSERT_DROP_DUPS, INSERT_OPERATION_OPT_VAL, KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, OPERATION, PAYLOAD_CLASS_NAME, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.HoodieConfig
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
Expand All @@ -35,10 +35,12 @@ import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{expr, lit}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.hudi.command.SqlKeyGenerator
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, Assertions, BeforeEach, Test}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
import org.mockito.ArgumentMatchers.any
Expand All @@ -47,15 +49,13 @@ import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, inte

import java.time.Instant
import java.util.{Collections, Date, UUID}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters
import scala.util.control.NonFatal

/**
* Test suite for SparkSqlWriter class.
*/
class HoodieSparkSqlWriterSuite {
class TestHoodieSparkSqlWriter {
var spark: SparkSession = _
var sqlContext: SQLContext = _
var sc: SparkContext = _
Expand All @@ -70,7 +70,7 @@ class HoodieSparkSqlWriterSuite {
* Setup method running before each test.
*/
@BeforeEach
def setUp() {
def setUp(): Unit = {
initSparkContext()
tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path")
tempBootStrapPath = java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
Expand All @@ -95,6 +95,7 @@ class HoodieSparkSqlWriterSuite {
spark = SparkSession.builder()
.appName(hoodieFooTableName)
.master("local[2]")
.withExtensions(new HoodieSparkSessionExtension)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
sc = spark.sparkContext
Expand Down Expand Up @@ -250,12 +251,14 @@ class HoodieSparkSqlWriterSuite {
"hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4")
val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableModifier, dataFrame2))
assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
assert(tableAlreadyExistException.getMessage.contains("Config conflict"))
assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))

//on same path try append with delete operation and different("hoodie_bar_tbl") table name which should throw an exception
val deleteTableModifier = barTableModifier ++ Map(OPERATION.key -> "delete")
val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableModifier, dataFrame2))
assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
assert(tableAlreadyExistException.getMessage.contains("Config conflict"))
assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))
}

/**
Expand All @@ -266,7 +269,7 @@ class HoodieSparkSqlWriterSuite {
@ParameterizedTest
@EnumSource(value = classOf[BulkInsertSortMode])
def testBulkInsertForSortMode(sortMode: BulkInsertSortMode): Unit = {
testBulkInsertWithSortMode(sortMode, true)
testBulkInsertWithSortMode(sortMode, populateMetaFields = true)
}

/**
Expand All @@ -287,12 +290,13 @@ class HoodieSparkSqlWriterSuite {
@Test
def testDisableAndEnableMetaFields(): Unit = {
try {
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, false)
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields = false)
//create a new table
val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name())
.updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true")

// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
Expand All @@ -302,9 +306,10 @@ class HoodieSparkSqlWriterSuite {
try {
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df)
Assertions.fail("Should have thrown exception")
fail("Should have thrown exception")
} catch {
case e: HoodieException => assertTrue(e.getMessage.contains("hoodie.populate.meta.fields already disabled for the table. Can't be re-enabled back"))
case e: HoodieException => assertTrue(e.getMessage.startsWith("Config conflict"))
Comment on lines -307 to +311
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan please note that conflict config like populate MetaFields will be captured in org.apache.hudi.HoodieWriterUtils#validateTableConfig

case e: Exception => fail(e);
}
}
}
Expand Down Expand Up @@ -439,7 +444,7 @@ class HoodieSparkSqlWriterSuite {
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
initializeMetaClientForBootstrap(fooTableParams, tableType, false)
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false)
val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
Expand Down Expand Up @@ -496,7 +501,7 @@ class HoodieSparkSqlWriterSuite {
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getCanonicalName)
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
initializeMetaClientForBootstrap(fooTableParams, tableType, true)
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true)

val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc),
Expand Down Expand Up @@ -526,7 +531,8 @@ class HoodieSparkSqlWriterSuite {
.setTableType(tableType)
.setTableName(hoodieFooTableName)
.setRecordKeyFields(fooTableParams(DataSourceWriteOptions.RECORDKEY_FIELD.key))
.setBaseFileFormat(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name())
.setBaseFileFormat(fooTableParams.getOrElse(HoodieWriteConfig.BASE_FILE_FORMAT.key,
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name))
.setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue())
.setPayloadClassName(fooTableParams(PAYLOAD_CLASS_NAME.key))
.setPreCombineField(fooTableParams(PRECOMBINE_FIELD.key))
Expand Down Expand Up @@ -873,18 +879,15 @@ class HoodieSparkSqlWriterSuite {

val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
// raise exception when use params which is not same with HoodieTableConfig
try {
val configConflictException = intercept[HoodieException] {
df2.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName2)
.option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
.mode(SaveMode.Append).save(tablePath2)
} catch {
case NonFatal(e) =>
assert(e.getMessage.contains("Config conflict"))
assert(e.getMessage.contains(
s"${HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key}\t${classOf[ComplexKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}"))
}
assert(configConflictException.getMessage.contains("Config conflict"))
assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[ComplexKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}"))

// do not need to specify hiveStylePartitioning/urlEncodePartitioning/KeyGenerator params
df2.write.format("hudi")
Expand All @@ -893,6 +896,24 @@ class HoodieSparkSqlWriterSuite {
.mode(SaveMode.Append).save(tablePath2)
val data = spark.read.format("hudi").load(tablePath2 + "/*")
assert(data.count() == 2)
assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "dt=2021-10-16")
assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "2021-10-16")
}

@Test
def testGetOriginKeyGenerator(): Unit = {
// for dataframe write
val m1 = Map(
HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[ComplexKeyGenerator].getName
)
val kg1 = HoodieWriterUtils.getOriginKeyGenerator(m1)
assertTrue(kg1 == classOf[ComplexKeyGenerator].getName)

// for sql write
val m2 = Map(
HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName
)
val kg2 = HoodieWriterUtils.getOriginKeyGenerator(m2)
assertTrue(kg2 == classOf[SimpleKeyGenerator].getName)
}
}
2 changes: 1 addition & 1 deletion style/scalastyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"/>
<check level="error" class="org.scalastyle.file.FileLengthChecker" enabled="true">
<parameters>
<parameter name="maxFileLength"><![CDATA[900]]></parameter>
<parameter name="maxFileLength"><![CDATA[1000]]></parameter>
</parameters>
</check>
<check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/>
Expand Down