Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ jobs:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4 as it's covered by Azure CI
run:
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" '-Dtest=org.apache.spark.sql.hudi.Test*' -pl hudi-spark-datasource/hudi-spark
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" '-Dtest=Test*' -pl hudi-spark-datasource/hudi-spark
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,12 @@ private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTim

return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime)
.withFileId(fileId)
.overBaseCommit(baseCommitTime)
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
.withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
.withSizeThreshold(config.getLogFileMaxSize())
.withFs(fs)
Comment on lines -474 to +479
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not include unrelated style changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was accidental (had to switch b/w different branches back-n-forth while cherry-picking)

.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private[hudi] trait SparkVersionsSupport {
def isSpark3_2: Boolean = getSparkVersion.startsWith("3.2")
def isSpark3_3: Boolean = getSparkVersion.startsWith("3.3")

def gteqSpark3_0: Boolean = getSparkVersion >= "3.0"
def gteqSpark3_1: Boolean = getSparkVersion >= "3.1"
def gteqSpark3_1_3: Boolean = getSparkVersion >= "3.1.3"
def gteqSpark3_2: Boolean = getSparkVersion >= "3.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class TestConvertFilterToCatalystExpression {
private def checkConvertFilter(filter: Filter, expectExpression: String): Unit = {
// [SPARK-25769][SPARK-34636][SPARK-34626][SQL] sql method in UnresolvedAttribute,
// AttributeReference and Alias don't quote qualified names properly
val removeQuotesIfNeed = if (expectExpression != null && HoodieSparkUtils.isSpark3_2) {
val removeQuotesIfNeed = if (expectExpression != null && HoodieSparkUtils.gteqSpark3_2) {
expectExpression.replace("`", "")
} else {
expectExpression
Expand All @@ -86,7 +86,7 @@ class TestConvertFilterToCatalystExpression {
private def checkConvertFilters(filters: Array[Filter], expectExpression: String): Unit = {
// [SPARK-25769][SPARK-34636][SPARK-34626][SQL] sql method in UnresolvedAttribute,
// AttributeReference and Alias don't quote qualified names properly
val removeQuotesIfNeed = if (expectExpression != null && HoodieSparkUtils.isSpark3_2) {
val removeQuotesIfNeed = if (expectExpression != null && HoodieSparkUtils.gteqSpark3_2) {
expectExpression.replace("`", "")
} else {
expectExpression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.time.Instant
import java.util.{Collections, Date, UUID}
import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils.gteqSpark3_0
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
Expand All @@ -41,7 +42,8 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
import org.junit.jupiter.params.provider.Arguments.arguments
import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.Assertions.assertThrows
Expand Down Expand Up @@ -485,11 +487,8 @@ class TestHoodieSparkSqlWriter {
* @param populateMetaFields Flag for populating meta fields
*/
@ParameterizedTest
@CsvSource(
Array("COPY_ON_WRITE,parquet,true", "COPY_ON_WRITE,parquet,false", "MERGE_ON_READ,parquet,true", "MERGE_ON_READ,parquet,false",
"COPY_ON_WRITE,orc,true", "COPY_ON_WRITE,orc,false", "MERGE_ON_READ,orc,true", "MERGE_ON_READ,orc,false"
))
def testDatasourceInsertForTableTypeBaseFileMetaFields(tableType: String, baseFileFormat: String, populateMetaFields: Boolean): Unit = {
@MethodSource(Array("testDatasourceInsert"))
def testDatasourceInsertForTableTypeBaseFileMetaFields(tableType: String, populateMetaFields: Boolean, baseFileFormat: String): Unit = {
val hoodieFooTableName = "hoodie_foo_tbl"
val fooTableModifier = Map("path" -> tempBasePath,
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
Expand Down Expand Up @@ -1069,3 +1068,26 @@ class TestHoodieSparkSqlWriter {
assertTrue(kg2 == classOf[SimpleKeyGenerator].getName)
}
}

object TestHoodieSparkSqlWriter {
def testDatasourceInsert: java.util.stream.Stream[Arguments] = {
val scenarios = Array(
Seq("COPY_ON_WRITE", true),
Seq("COPY_ON_WRITE", false),
Seq("MERGE_ON_READ", true),
Seq("MERGE_ON_READ", false)
)

val parquetScenarios = scenarios.map { _ :+ "parquet" }
val orcScenarios = scenarios.map { _ :+ "orc" }

// TODO(HUDI-4496) Fix Orc support in Spark 3.x
val targetScenarios = if (gteqSpark3_0) {
parquetScenarios
} else {
parquetScenarios ++ orcScenarios
}

java.util.Arrays.stream(targetScenarios.map(as => arguments(as.map(_.asInstanceOf[AnyRef]):_*)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ import java.util.TimeZone
* A serializer to serialize data in catalyst format to data in avro format.
*
* NOTE: This code is borrowed from Spark 3.2.1
* This code is borrowed, so that we can better control compatibility w/in Spark minor
* branches (3.2.x, 3.1.x, etc)
* This code is borrowed, so that we can better control compatibility w/in Spark minor
* branches (3.2.x, 3.1.x, etc)
*
* NOTE: THIS IMPLEMENTATION HAS BEEN MODIFIED FROM ITS ORIGINAL VERSION WITH THE MODIFICATION
* BEING EXPLICITLY ANNOTATED INLINE. PLEASE MAKE SURE TO UNDERSTAND PROPERLY ALL THE
* MODIFICATIONS.
*
*
* PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY NECESSARY
*/
Expand Down Expand Up @@ -211,11 +216,20 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
val numFields = st.length
(getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))

////////////////////////////////////////////////////////////////////////////////////////////
// Following section is amended to the original (Spark's) implementation
// >>> BEGINS
////////////////////////////////////////////////////////////////////////////////////////////

case (st: StructType, UNION) =>
val unionConverter = newUnionConverter(st, avroType, catalystPath, avroPath)
val numFields = st.length
(getter, ordinal) => unionConverter(getter.getStruct(ordinal, numFields))

////////////////////////////////////////////////////////////////////////////////////////////
// <<< ENDS
////////////////////////////////////////////////////////////////////////////////////////////

case (MapType(kt, vt, valueContainsNull), MAP) if kt == StringType =>
val valueConverter = newConverter(
vt, resolveNullableType(avroType.getValueType, valueContainsNull),
Expand Down Expand Up @@ -293,6 +307,11 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
result
}

////////////////////////////////////////////////////////////////////////////////////////////
// Following section is amended to the original (Spark's) implementation
// >>> BEGINS
////////////////////////////////////////////////////////////////////////////////////////////

private def newUnionConverter(catalystStruct: StructType,
avroUnion: Schema,
catalystPath: Seq[String],
Expand Down Expand Up @@ -337,6 +356,10 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
avroStruct.getTypes.size() - 1 == catalystStruct.length) || avroStruct.getTypes.size() == catalystStruct.length
}

////////////////////////////////////////////////////////////////////////////////////////////
// <<< ENDS
////////////////////////////////////////////////////////////////////////////////////////////

/**
* Resolve a possibly nullable Avro Type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,15 @@ import java.util.TimeZone
*
* PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY NECESSARY
*/
private[sql] class AvroDeserializer(
rootAvroType: Schema,
rootCatalystType: DataType,
positionalFieldMatch: Boolean,
datetimeRebaseSpec: RebaseSpec,
filters: StructFilters) {

def this(
rootAvroType: Schema,
rootCatalystType: DataType,
datetimeRebaseMode: String) = {
private[sql] class AvroDeserializer(rootAvroType: Schema,
rootCatalystType: DataType,
positionalFieldMatch: Boolean,
datetimeRebaseSpec: RebaseSpec,
filters: StructFilters) {

def this(rootAvroType: Schema,
rootCatalystType: DataType,
datetimeRebaseMode: String) = {
this(
rootAvroType,
rootCatalystType,
Expand All @@ -69,11 +67,9 @@ private[sql] class AvroDeserializer(

private lazy val decimalConversions = new DecimalConversion()

private val dateRebaseFunc = createDateRebaseFuncInRead(
datetimeRebaseSpec.mode, "Avro")
private val dateRebaseFunc = createDateRebaseFuncInRead(datetimeRebaseSpec.mode, "Avro")

private val timestampRebaseFunc = createTimestampRebaseFuncInRead(
datetimeRebaseSpec, "Avro")
private val timestampRebaseFunc = createTimestampRebaseFuncInRead(datetimeRebaseSpec, "Avro")

private val converter: Any => Option[Any] = try {
rootCatalystType match {
Expand Down Expand Up @@ -112,11 +108,10 @@ private[sql] class AvroDeserializer(
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
*/
private def newWriter(
avroType: Schema,
catalystType: DataType,
avroPath: Seq[String],
catalystPath: Seq[String]): (CatalystDataUpdater, Int, Any) => Unit = {
private def newWriter(avroType: Schema,
catalystType: DataType,
avroPath: Seq[String],
catalystPath: Seq[String]): (CatalystDataUpdater, Int, Any) => Unit = {
val errorPrefix = s"Cannot convert Avro ${toFieldStr(avroPath)} to " +
s"SQL ${toFieldStr(catalystPath)} because "
val incompatibleMsg = errorPrefix +
Expand Down
Loading