Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType}
Expand Down Expand Up @@ -155,6 +155,18 @@ object HoodieSqlUtils extends SparkAdapterSupport {
}
}

/**
* Get the TableIdentifier of the target table in MergeInto.
*/
def getMergeIntoTargetTableId(mergeInto: MergeIntoTable): TableIdentifier = {
val aliaId = mergeInto.targetTable match {
case SubqueryAlias(_, SubqueryAlias(tableId, _)) => tableId
case SubqueryAlias(tableId, _) => tableId
case plan => throw new IllegalArgumentException(s"Illegal plan $plan in target")
}
sparkAdapter.toTableIdentify(aliaId)
}

/**
* Split the expression to a sub expression seq by the AND operation.
* @param expression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.sql.hudi.analysis

import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
import org.apache.hudi.SparkAdapterSupport

import scala.collection.JavaConverters._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar
import org.apache.spark.sql.catalyst.expressions.AttributeReference
Expand All @@ -31,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, De
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, AlterTableRenameCommand, CreateDataSourceTableCommand, TruncateTableCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, TruncateHoodieTableCommand, UpdateHoodieTableCommand}
import org.apache.spark.sql.types.StringType
Expand Down Expand Up @@ -102,7 +103,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
// Resolve merge into
case MergeIntoTable(target, source, mergeCondition, matchedActions, notMatchedActions)
case mergeInto @ MergeIntoTable(target, source, mergeCondition, matchedActions, notMatchedActions)
if isHoodieTable(target, sparkSession) && target.resolved =>

val resolvedSource = analyzer.execute(source)
Expand Down Expand Up @@ -164,7 +165,47 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
case UpdateAction(condition, assignments) =>
val (resolvedCondition, resolvedAssignments) =
resolveConditionAssignments(condition, assignments)
UpdateAction(resolvedCondition, resolvedAssignments)

// Get the target table type and pre-combine field.
val targetTableId = getMergeIntoTargetTableId(mergeInto)
val targetTable =
sparkSession.sessionState.catalog.getTableMetadata(targetTableId)
val targetTableType = HoodieOptionConfig.getTableType(targetTable.storage.properties)
val preCombineField = HoodieOptionConfig.getPreCombineField(targetTable.storage.properties)

// Get the map of target attribute to value of the update assignments.
val target2Values = resolvedAssignments.map {
case Assignment(attr: AttributeReference, value) =>
attr.name -> value
case o => throw new IllegalArgumentException(s"Assignment key must be an attribute, current is: ${o.key}")
}.toMap

// Validate if there are incorrect target attributes.
val unKnowTargets = target2Values.keys
.filterNot(removeMetaFields(target.output).map(_.name).contains(_))
if (unKnowTargets.nonEmpty) {
throw new AnalysisException(s"Cannot find target attributes: ${unKnowTargets.mkString(",")}.")
}

// Fill the missing target attribute in the update action for COW table to support partial update.
// e.g. If the update action missing 'id' attribute, we fill a "id = target.id" to the update action.
val newAssignments = removeMetaFields(target.output)
.map(attr => {
// TODO support partial update for MOR.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for partial update, do we need another payload like this? #2666?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we can support partial update for cow table by the ExpressionPayload added in the sql support.

if (!target2Values.contains(attr.name) && targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
throw new AnalysisException(s"Missing specify the value for target field: '${attr.name}' in merge into update action" +
s" for MOR table. Currently we cannot support partial update for MOR," +
s" please complete all the target fields just like '...update set id = s0.id, name = s0.name ....'")
}
if (preCombineField.isDefined && preCombineField.get.equalsIgnoreCase(attr.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why should we specify value for the preCombineField in merge-into update action。 maybe we can delete this judgment logic

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the table has defined preCombineField, we must know the mapping of preCombineField in target table to the source field. So we must specify the value for preCombineField, or not, we cannot read the preCombineField field from the source.

&& !target2Values.contains(attr.name)) {
throw new AnalysisException(s"Missing specify value for the preCombineField:" +
s" ${preCombineField.get} in merge-into update action. You should add" +
s" '... update set ${preCombineField.get} = xx....' to the when-matched clause.")
}
Assignment(attr, target2Values.getOrElse(attr.name, attr))
})
UpdateAction(resolvedCondition, newAssignments)
case DeleteAction(condition) =>
val resolvedCondition = condition.map(resolveExpressionFrom(resolvedSource)(_))
DeleteAction(resolvedCondition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
/**
* The target table identify.
*/
private lazy val targetTableIdentify: TableIdentifier = {
val aliaId = mergeInto.targetTable match {
case SubqueryAlias(_, SubqueryAlias(tableId, _)) => tableId
case SubqueryAlias(tableId, _) => tableId
case plan => throw new IllegalArgumentException(s"Illegal plan $plan in target")
}
sparkAdapter.toTableIdentify(aliaId)
}
private lazy val targetTableIdentify: TableIdentifier = getMergeIntoTargetTableId(mergeInto)

/**
* The target table schema without hoodie meta fields.
Expand Down Expand Up @@ -124,7 +117,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
*/
private lazy val target2SourcePreCombineFiled: Option[(String, Expression)] = {
val updateActions = mergeInto.matchedActions.collect { case u: UpdateAction => u }
assert(updateActions.size <= 1, s"Only support one updateAction, current is: ${updateActions.size}")
assert(updateActions.size <= 1, s"Only support one updateAction currently, current update action count is: ${updateActions.size}")

val updateAction = updateActions.headOption
HoodieOptionConfig.getPreCombineField(targetTable.storage.properties).map(preCombineField => {
Expand All @@ -151,6 +144,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab

// Create the write parameters
val parameters = buildMergeIntoConfig(mergeInto)

val sourceDF = buildSourceDF(sparkSession)

if (mergeInto.matchedActions.nonEmpty) { // Do the upsert
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi

class TestPartialUpdateForMergeInto extends TestHoodieSqlBase {

test("Test Partial Update") {
withTempDir { tmp =>
// TODO after we support partial update for MOR, we can add test case for 'mor'.
Seq("cow").foreach { tableType =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| _ts long
|) using hudi
|options(
| type ='$tableType',
| primaryKey = 'id',
| preCombineField = '_ts'
|)
|location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")

spark.sql(
s"""
|merge into $tableName t0
|using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts) s0
|on t0.id = s0.id
|when matched then update set price = s0.price, _ts = s0.ts
|""".stripMargin)
checkAnswer(s"select id, name, price, _ts from $tableName")(
Seq(1, "a1", 12.0, 1001)
)

val tableName2 = generateTableName
spark.sql(
s"""
|create table $tableName2 (
| id int,
| name string,
| price double
|) using hudi
|options(
| type ='$tableType',
| primaryKey = 'id'
|)
|location '${tmp.getCanonicalPath}/$tableName2'
""".stripMargin)
spark.sql(s"insert into $tableName2 values(1, 'a1', 10)")

spark.sql(
s"""
|merge into $tableName2 t0
|using ( select 1 as id, 'a1' as name, 12 as price) s0
|on t0.id = s0.id
|when matched then update set price = s0.price
|""".stripMargin)
checkAnswer(s"select id, name, price from $tableName2")(
Seq(1, "a1", 12.0)
)
}
}
}

test("Test MergeInto Exception") {
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| _ts long
|) using hudi
|options(
| type = 'cow',
| primaryKey = 'id',
| preCombineField = '_ts'
|)""".stripMargin)

checkException(
s"""
|merge into $tableName t0
|using ( select 1 as id, 'a1' as name, 12 as price) s0
|on t0.id = s0.id
|when matched then update set price = s0.price
""".stripMargin)(
"Missing specify value for the preCombineField: _ts in merge-into update action. " +
"You should add '... update set _ts = xx....' to the when-matched clause.;")

val tableName2 = generateTableName
spark.sql(
s"""
|create table $tableName2 (
| id int,
| name string,
| price double,
| _ts long
|) using hudi
|options(
| type = 'mor',
| primaryKey = 'id',
| preCombineField = '_ts'
|)""".stripMargin)

checkException(
s"""
|merge into $tableName2 t0
|using ( select 1 as id, 'a1' as name, 12 as price) s0
|on t0.id = s0.id
|when matched then update set price = s0.price
""".stripMargin)(
"Missing specify the value for target field: 'id' in merge into update action for MOR table. " +
"Currently we cannot support partial update for MOR, please complete all the target fields " +
"just like '...update set id = s0.id, name = s0.name ....';")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.hudi.parser.HoodieSqlParser
import org.apache.spark.sql.internal.SQLConf

/**
* A sql adapter for spark2.
* The adapter for spark2.
*/
class Spark2Adapter extends SparkAdapter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.internal.SQLConf

/**
* A sql adapter for spark3.
* The adapter for spark3.
*/
class Spark3Adapter extends SparkAdapter {

Expand Down