From 2779ca40748e4aa90ddecb01288c61fe478767a1 Mon Sep 17 00:00:00 2001 From: KnightChess <981159963@qq.com> Date: Fri, 2 Sep 2022 16:05:03 +0800 Subject: [PATCH] [HUDI-4776] fix merge into use unresolved assignment --- .../sql/hudi/analysis/HoodieAnalysis.scala | 12 +++++- .../spark/sql/hudi/TestMergeIntoTable.scala | 37 ++++++++++++++++++- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 6afa8681339ac..4f4446ac281c6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -282,13 +282,21 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi // the hoodie's meta field in sql statement, it is a system field, cannot set the value // by user. if (HoodieSparkUtils.isSpark3) { - val assignmentFieldNames = assignments.map(_.key).map { + val resolvedAssignments = assignments.map { assign => + val resolvedKey = assign.key match { + case c if !c.resolved => + resolveExpressionFrom(target)(c) + case o => o + } + Assignment(resolvedKey, null) + } + val assignmentFieldNames = resolvedAssignments.map(_.key).map { case attr: AttributeReference => attr.name case _ => "" }.toArray val metaFields = HoodieRecord.HOODIE_META_COLUMNS.asScala - if (metaFields.mkString(",").startsWith(assignmentFieldNames.take(metaFields.length).mkString(","))) { + if (assignmentFieldNames.take(metaFields.length).mkString(",").startsWith(metaFields.mkString(","))) { true } else { false diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala index 5e826973d24e0..93079ac554db9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala @@ -869,7 +869,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase { } } - test("Test MereInto With All Kinds Of DataType") { + test("Test MergeInto With All Kinds Of DataType") { withTempDir { tmp => val dataAndTypes = Seq( ("string", "'a1'"), @@ -914,4 +914,39 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase { } } } + + test("Test MergeInto with no-full fields source") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | value int, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey ='id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, 1001 as ts + | ) s0 + | on h0.id = s0.id + | when matched then update set h0.ts = s0.ts + |""".stripMargin) + checkAnswer(s"select id, name, value, ts from $tableName")( + Seq(1, "a1", 10, 1001) + ) + } + } }