Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ jobs:
sparkProfile: "spark3.1"
sparkModules: "hudi-spark-datasource/hudi-spark3.1.x"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.0"
sparkModules: "hudi-spark-datasource/hudi-spark3.0.x"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.2"
sparkModules: "hudi-spark-datasource/hudi-spark3.2.x"
Expand Down Expand Up @@ -159,6 +163,9 @@ jobs:
- flinkProfile: 'flink1.13'
sparkProfile: 'spark3.1'
sparkRuntime: 'spark3.1.3'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.0'
sparkRuntime: 'spark3.0.2'
steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ object SparkAdapterSupport {
"org.apache.spark.sql.adapter.Spark3_3Adapter"
} else if (HoodieSparkUtils.isSpark3_2) {
"org.apache.spark.sql.adapter.Spark3_2Adapter"
} else if (HoodieSparkUtils.isSpark3_0 || HoodieSparkUtils.isSpark3_1) {
} else if (HoodieSparkUtils.isSpark3_1) {
"org.apache.spark.sql.adapter.Spark3_1Adapter"
} else if (HoodieSparkUtils.isSpark3_0) {
"org.apache.spark.sql.adapter.Spark3_0Adapter"
} else {
"org.apache.spark.sql.adapter.Spark2Adapter"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ object HoodieAnalysis extends SparkAdapterSupport {
"org.apache.spark.sql.hudi.Spark32ResolveHudiAlterTableCommand"
} else if (HoodieSparkUtils.gteqSpark3_1) {
"org.apache.spark.sql.hudi.Spark31ResolveHudiAlterTableCommand"
} else if (HoodieSparkUtils.gteqSpark3_0) {
"org.apache.spark.sql.hudi.Spark30ResolveHudiAlterTableCommand"
} else {
throw new IllegalStateException("Unsupported Spark version")
}
Expand Down Expand Up @@ -126,15 +128,18 @@ object HoodieAnalysis extends SparkAdapterSupport {
// Default rules
)

if (HoodieSparkUtils.gteqSpark3_1) {
if (HoodieSparkUtils.gteqSpark3_0) {
val nestedSchemaPruningClass =
if (HoodieSparkUtils.gteqSpark3_3) {
"org.apache.spark.sql.execution.datasources.Spark33NestedSchemaPruning"
} else if (HoodieSparkUtils.gteqSpark3_2) {
"org.apache.spark.sql.execution.datasources.Spark32NestedSchemaPruning"
} else {
} else if (HoodieSparkUtils.gteqSpark3_1) {
// spark 3.1
"org.apache.spark.sql.execution.datasources.Spark31NestedSchemaPruning"
} else {
// spark 3.0
"org.apache.spark.sql.execution.datasources.Spark30NestedSchemaPruning"
}

val nestedSchemaPruningRule = ReflectionUtils.loadClass(nestedSchemaPruningClass).asInstanceOf[Rule[LogicalPlan]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.avro
import org.apache.avro.generic.GenericData
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, IntWrapper}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.SchemaConverters.SchemaType
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,58 +114,65 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
})
}

test("Test MergeInto with more than once update actions") {
withRecordType()(withTempDir {tmp =>
val targetTable = generateTableName
spark.sql(
s"""
|create table ${targetTable} (
| id int,
| name string,
| data int,
| country string,
| ts bigint
|) using hudi
|tblproperties (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
|partitioned by (country)
|location '${tmp.getCanonicalPath}/$targetTable'
|""".stripMargin)
spark.sql(
s"""
|merge into ${targetTable} as target
|using (
|select 1 as id, 'lb' as name, 6 as data, 'shu' as country, 1646643193 as ts
|) source
|on source.id = target.id
|when matched then
|update set *
|when not matched then
|insert *
|""".stripMargin)
spark.sql(
s"""
|merge into ${targetTable} as target
|using (
|select 1 as id, 'lb' as name, 5 as data, 'shu' as country, 1646643196 as ts
|) source
|on source.id = target.id
|when matched and source.data > target.data then
|update set target.data = source.data, target.ts = source.ts
|when matched and source.data = 5 then
|update set target.data = source.data, target.ts = source.ts
|when not matched then
|insert *
|""".stripMargin)

checkAnswer(s"select id, name, data, country, ts from $targetTable")(
Seq(1, "lb", 5, "shu", 1646643196L)
)
/**
* In Spark 3.0.x, UPDATE and DELETE can appear at most once in MATCHED clauses in a MERGE INTO statement.
* Refer to: `org.apache.spark.sql.catalyst.parser.AstBuilder#visitMergeIntoTable`
*
*/
test("Test MergeInto with more than once update actions for spark >= 3.1.x") {

if (HoodieSparkUtils.gteqSpark3_1) {
withRecordType()(withTempDir { tmp =>
val targetTable = generateTableName
spark.sql(
s"""
|create table ${targetTable} (
| id int,
| name string,
| data int,
| country string,
| ts bigint
|) using hudi
|tblproperties (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
|partitioned by (country)
|location '${tmp.getCanonicalPath}/$targetTable'
|""".stripMargin)
spark.sql(
s"""
|merge into ${targetTable} as target
|using (
|select 1 as id, 'lb' as name, 6 as data, 'shu' as country, 1646643193 as ts
|) source
|on source.id = target.id
|when matched then
|update set *
|when not matched then
|insert *
|""".stripMargin)
spark.sql(
s"""
|merge into ${targetTable} as target
|using (
|select 1 as id, 'lb' as name, 5 as data, 'shu' as country, 1646643196 as ts
|) source
|on source.id = target.id
|when matched and source.data > target.data then
|update set target.data = source.data, target.ts = source.ts
|when matched and source.data = 5 then
|update set target.data = source.data, target.ts = source.ts
|when not matched then
|insert *
|""".stripMargin)

})
checkAnswer(s"select id, name, data, country, ts from $targetTable")(
Seq(1, "lb", 5, "shu", 1646643196L)
)
})
}
}

test("Test MergeInto with ignored record") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, Predicate, PredicateHelper}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression}
abstract class HoodieSpark3CatalystExpressionUtils extends HoodieCatalystExpressionUtils {

trait HoodieSpark3CatalystExpressionUtils extends HoodieCatalystExpressionUtils
with PredicateHelper {
/**
* The attribute name may differ from the one in the schema if the query analyzer
* is case insensitive. We should change attribute names to match the ones in the schema,
* so we do not need to worry about case sensitivity anymore
*/
def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): Seq[Expression]

override def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): Seq[Expression] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataSourceStrategy.normalizeExprs(exprs, attributes) are not included in spark3.0. So that we need to remove this specific implement in spark3-common space.

DataSourceStrategy.normalizeExprs(exprs, attributes)

override def extractPredicatesWithinOutputSet(condition: Expression,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extractPredicatesWithinOutputSet are not included in spark3.0. So that we need to remove this specific implement in spark3-common space.

outputSet: AttributeSet): Option[Expression] =
super[PredicateHelper].extractPredicatesWithinOutputSet(condition, outputSet)
/**
* Returns a filter that its reference is a subset of `outputSet` and it contains the maximum
* constraints from `condition`. This is used for predicate push-down
* When there is no such filter, `None` is returned.
*/
def extractPredicatesWithinOutputSet(condition: Expression,
outputSet: AttributeSet): Option[Expression]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,15 @@ import org.apache.hudi.spark3.internal.ReflectUtil
import org.apache.hudi.{AvroConversionUtils, DefaultSource, Spark3RowSerDe}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Predicate}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SQLContext, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._

import java.time.ZoneId
import java.util.TimeZone
Expand All @@ -63,20 +56,6 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
new Spark3RowSerDe(encoder)
}

override def resolveHoodieTable(plan: LogicalPlan): Option[CatalogTable] = {
super.resolveHoodieTable(plan).orElse {
EliminateSubqueryAliases(plan) match {
// First, we need to weed out unresolved plans
case plan if !plan.resolved => None
// NOTE: When resolving Hudi table we allow [[Filter]]s and [[Project]]s be applied
// on top of it
case PhysicalOperation(_, _, DataSourceV2Relation(v2: V2TableWithV1Fallback, _, _, _, _)) if isHoodieTable(v2.v1Table) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V2TableWithV1Fallback are not included in spark3.0. So that we need to remove this specific implement in spark3-common space.

Some(v2.v1Table)
case _ => None
}
}
}

override def getAvroSchemaConverters: HoodieAvroSchemaConverters = HoodieSparkAvroSchemaConverters

override def getSparkParsePartitionUtil: SparkParsePartitionUtil = Spark3ParsePartitionUtil
Expand Down Expand Up @@ -108,23 +87,5 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
DefaultSource.createRelation(sqlContext, metaClient, dataSchema, globPaths, parameters.asScala.toMap)
}

/**
* Converts instance of [[StorageLevel]] to a corresponding string
*/
override def convertStorageLevelToString(level: StorageLevel): String = level match {
case NONE => "NONE"
case DISK_ONLY => "DISK_ONLY"
case DISK_ONLY_2 => "DISK_ONLY_2"
case DISK_ONLY_3 => "DISK_ONLY_3"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DISK_ONLY_3 are not included in spark3.0. So that we need to remove this specific implement in spark3-common space.

case MEMORY_ONLY => "MEMORY_ONLY"
case MEMORY_ONLY_2 => "MEMORY_ONLY_2"
case MEMORY_ONLY_SER => "MEMORY_ONLY_SER"
case MEMORY_ONLY_SER_2 => "MEMORY_ONLY_SER_2"
case MEMORY_AND_DISK => "MEMORY_AND_DISK"
case MEMORY_AND_DISK_2 => "MEMORY_AND_DISK_2"
case MEMORY_AND_DISK_SER => "MEMORY_AND_DISK_SER"
case MEMORY_AND_DISK_SER_2 => "MEMORY_AND_DISK_SER_2"
case OFF_HEAP => "OFF_HEAP"
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level")
}
override def convertStorageLevelToString(level: StorageLevel): String
}
Loading