From e18a7a52ccf3da3689b6bfc3a623c8d608814ab4 Mon Sep 17 00:00:00 2001 From: xy_xin Date: Fri, 19 Jun 2020 17:51:02 +0800 Subject: [PATCH 1/5] [SPARK-32030][SQL] Support unlimited MATCHED and NOT MATCHED clauses in MERGE INTO --- .../spark/sql/catalyst/parser/SqlBase.g4 | 8 +- .../sql/catalyst/parser/AstBuilder.scala | 41 +++++---- .../catalyst/plans/logical/v2Commands.scala | 8 +- .../sql/catalyst/parser/DDLParserSuite.scala | 84 +++++++++---------- 4 files changed, 71 insertions(+), 70 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 691fde8d48f94..a5123d1f291f5 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -437,8 +437,7 @@ dmlStatementNoWith USING (source=multipartIdentifier | '(' sourceQuery=query')') sourceAlias=tableAlias ON mergeCondition=booleanExpression - matchedClause* - notMatchedClause* #mergeIntoTable + matchedOrNotMatchedClause* #mergeIntoTable ; queryOrganization @@ -526,6 +525,11 @@ setClause : SET assignmentList ; +matchedOrNotMatchedClause + : matchedClause + | notMatchedClause + ; + matchedClause : WHEN MATCHED (AND matchedCond=booleanExpression)? THEN matchedAction ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 03571a740df3e..47e3447efb2d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -411,12 +411,9 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val mergeCondition = expression(ctx.mergeCondition) - val matchedClauses = ctx.matchedClause() - if (matchedClauses.size() > 2) { - throw new ParseException("There should be at most 2 'WHEN MATCHED' clauses.", - matchedClauses.get(2)) - } - val matchedActions = matchedClauses.asScala.map { + val whenClauses = ctx.matchedOrNotMatchedClause() + val matchedClauses = whenClauses.asScala.map(_.matchedClause()).filter(_ != null) + val matchedActions = matchedClauses.map { clause => { if (clause.matchedAction().DELETE() != null) { DeleteAction(Option(clause.matchedCond).map(expression)) @@ -435,12 +432,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } } - val notMatchedClauses = ctx.notMatchedClause() - if (notMatchedClauses.size() > 1) { - throw new ParseException("There should be at most 1 'WHEN NOT MATCHED' clause.", - notMatchedClauses.get(1)) - } - val notMatchedActions = notMatchedClauses.asScala.map { + val notMatchedClauses = whenClauses.asScala.map(_.notMatchedClause()).filter(_ != null) + val notMatchedActions = notMatchedClauses.map { clause => { if (clause.notMatchedAction().INSERT() != null) { val condition = Option(clause.notMatchedCond).map(expression) @@ -468,13 +461,25 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging throw new ParseException("There must be at least one WHEN clause in a MERGE statement", ctx) } // children being empty means that the condition is not set - if (matchedActions.length == 2 && matchedActions.head.children.isEmpty) { - throw new ParseException("When there are 2 MATCHED clauses in a MERGE statement, " + - "the first MATCHED clause must have a condition", ctx) - } - if (matchedActions.groupBy(_.getClass).mapValues(_.size).exists(_._2 > 1)) { + val matchedActionSize = matchedActions.length + if (matchedActionSize >= 2 && !matchedActions.init.forall(_.condition.nonEmpty)) { + throw new ParseException( + s"When there are $matchedActionSize MATCHED clauses in a MERGE statement, " + + s"${if (matchedActionSize == 2) { + "the first MATCHED clause must have a condition" + } else { + s"the first ${matchedActionSize - 1} MATCHED clauses must have conditions" + }}", ctx) + } + val notMatchedActionSize = notMatchedActions.length + if (notMatchedActionSize >= 2 && !notMatchedActions.init.forall(_.condition.nonEmpty)) { throw new ParseException( - "UPDATE and DELETE can appear at most once in MATCHED clauses in a MERGE statement", ctx) + s"When there are $notMatchedActionSize NOT MATCHED clauses in a MERGE statement, " + + s"${if (notMatchedActionSize == 2) { + "the first NOT MATCHED clause must have a condition" + } else { + s"the first ${notMatchedActionSize - 1} NOT MATCHED clauses must have conditions" + }}", ctx) } MergeIntoTable( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 579157a6f2f2e..b6a92307d1798 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -347,23 +347,23 @@ case class MergeIntoTable( } sealed abstract class MergeAction( - condition: Option[Expression]) extends Expression with Unevaluable { + val condition: Option[Expression]) extends Expression with Unevaluable { override def foldable: Boolean = false override def nullable: Boolean = false override def dataType: DataType = throw new UnresolvedException(this, "nullable") override def children: Seq[Expression] = condition.toSeq } -case class DeleteAction(condition: Option[Expression]) extends MergeAction(condition) +case class DeleteAction(override val condition: Option[Expression]) extends MergeAction(condition) case class UpdateAction( - condition: Option[Expression], + override val condition: Option[Expression], assignments: Seq[Assignment]) extends MergeAction(condition) { override def children: Seq[Expression] = condition.toSeq ++ assignments } case class InsertAction( - condition: Option[Expression], + override val condition: Option[Expression], assignments: Seq[Assignment]) extends MergeAction(condition) { override def children: Seq[Expression] = condition.toSeq ++ assignments } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 6499b5d8e7974..27519e884dde8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1134,58 +1134,70 @@ class DDLParserSuite extends AnalysisTest { } } - test("merge into table: at most two matched clauses") { - val exc = intercept[ParseException] { - parsePlan( - """ - |MERGE INTO testcat1.ns1.ns2.tbl AS target - |USING testcat2.ns1.ns2.tbl AS source - |ON target.col1 = source.col1 - |WHEN MATCHED AND (target.col2='delete') THEN DELETE - |WHEN MATCHED AND (target.col2='update1') THEN UPDATE SET target.col2 = source.col2 - |WHEN MATCHED AND (target.col2='update2') THEN UPDATE SET target.col2 = source.col2 - |WHEN NOT MATCHED AND (target.col2='insert') - |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2) - """.stripMargin) - } - - assert(exc.getMessage.contains("There should be at most 2 'WHEN MATCHED' clauses.")) + test("merge into table: multi matched and not matched clauses") { + parseCompare( + """ + |MERGE INTO testcat1.ns1.ns2.tbl AS target + |USING testcat2.ns1.ns2.tbl AS source + |ON target.col1 = source.col1 + |WHEN MATCHED AND (target.col2='delete') THEN DELETE + |WHEN MATCHED AND (target.col2='update to 1') THEN UPDATE SET target.col2 = 1 + |WHEN MATCHED AND (target.col2='update to 2') THEN UPDATE SET target.col2 = 2 + |WHEN NOT MATCHED AND (target.col2='insert 1') + |THEN INSERT (target.col1, target.col2) values (source.col1, 1) + |WHEN NOT MATCHED AND (target.col2='insert 2') + |THEN INSERT (target.col1, target.col2) values (source.col1, 2) + """.stripMargin, + MergeIntoTable( + SubqueryAlias("target", UnresolvedRelation(Seq("testcat1", "ns1", "ns2", "tbl"))), + SubqueryAlias("source", UnresolvedRelation(Seq("testcat2", "ns1", "ns2", "tbl"))), + EqualTo(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")), + Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("delete")))), + UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("update to 1"))), + Seq(Assignment(UnresolvedAttribute("target.col2"), Literal(1)))), + UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("update to 2"))), + Seq(Assignment(UnresolvedAttribute("target.col2"), Literal(2))))), + Seq(InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert 1"))), + Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")), + Assignment(UnresolvedAttribute("target.col2"), Literal(1)))), + InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert 2"))), + Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")), + Assignment(UnresolvedAttribute("target.col2"), Literal(2))))))) } - test("merge into table: at most one not matched clause") { + test("merge into table: the first matched clause must have a condition if there's a second") { val exc = intercept[ParseException] { parsePlan( """ |MERGE INTO testcat1.ns1.ns2.tbl AS target |USING testcat2.ns1.ns2.tbl AS source |ON target.col1 = source.col1 - |WHEN MATCHED AND (target.col2='delete') THEN DELETE - |WHEN MATCHED AND (target.col2='update1') THEN UPDATE SET target.col2 = source.col2 - |WHEN NOT MATCHED AND (target.col2='insert1') - |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2) - |WHEN NOT MATCHED AND (target.col2='insert2') + |WHEN MATCHED THEN DELETE + |WHEN MATCHED THEN UPDATE SET target.col2 = source.col2 + |WHEN NOT MATCHED AND (target.col2='insert') |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2) """.stripMargin) } - assert(exc.getMessage.contains("There should be at most 1 'WHEN NOT MATCHED' clause.")) + assert(exc.getMessage.contains("the first MATCHED clause must have a condition")) } - test("merge into table: the first matched clause must have a condition if there's a second") { + test("merge into table: the first 2 matched clause must have conditions if there's a third") { val exc = intercept[ParseException] { parsePlan( """ |MERGE INTO testcat1.ns1.ns2.tbl AS target |USING testcat2.ns1.ns2.tbl AS source |ON target.col1 = source.col1 + |WHEN MATCHED AND (source.col2 == '1') THEN UPDATE SET target.col2 = 1 + |WHEN MATCHED THEN UPDATE SET target.col2 = 2 |WHEN MATCHED THEN DELETE - |WHEN MATCHED THEN UPDATE SET target.col2 = source.col2 |WHEN NOT MATCHED AND (target.col2='insert') |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2) """.stripMargin) } - assert(exc.getMessage.contains("the first MATCHED clause must have a condition")) + assert(exc.getMessage.contains("the first 2 MATCHED clauses must have conditions")) } test("merge into table: there must be a when (not) matched condition") { @@ -1201,26 +1213,6 @@ class DDLParserSuite extends AnalysisTest { assert(exc.getMessage.contains("There must be at least one WHEN clause in a MERGE statement")) } - test("merge into table: there can be only a single use DELETE or UPDATE") { - Seq("UPDATE SET *", "DELETE").foreach { op => - val exc = intercept[ParseException] { - parsePlan( - s""" - |MERGE INTO testcat1.ns1.ns2.tbl AS target - |USING testcat2.ns1.ns2.tbl AS source - |ON target.col1 = source.col1 - |WHEN MATCHED AND (target.col2='delete') THEN $op - |WHEN MATCHED THEN $op - |WHEN NOT MATCHED AND (target.col2='insert') - |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2) - """.stripMargin) - } - - assert(exc.getMessage.contains( - "UPDATE and DELETE can appear at most once in MATCHED clauses")) - } - } - test("show tables") { comparePlans( parsePlan("SHOW TABLES"), From a6ac36320c43450e3450972da27ad5af9c5abf54 Mon Sep 17 00:00:00 2001 From: xy_xin Date: Tue, 23 Jun 2020 10:53:15 +0800 Subject: [PATCH 2/5] Preserve the order of the MATCHED and NOT MATCHED clauses --- .../org/apache/spark/sql/catalyst/parser/SqlBase.g4 | 8 ++------ .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 5 ++--- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index a5123d1f291f5..691fde8d48f94 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -437,7 +437,8 @@ dmlStatementNoWith USING (source=multipartIdentifier | '(' sourceQuery=query')') sourceAlias=tableAlias ON mergeCondition=booleanExpression - matchedOrNotMatchedClause* #mergeIntoTable + matchedClause* + notMatchedClause* #mergeIntoTable ; queryOrganization @@ -525,11 +526,6 @@ setClause : SET assignmentList ; -matchedOrNotMatchedClause - : matchedClause - | notMatchedClause - ; - matchedClause : WHEN MATCHED (AND matchedCond=booleanExpression)? THEN matchedAction ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 47e3447efb2d2..47e7de8bd91cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -411,8 +411,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val mergeCondition = expression(ctx.mergeCondition) - val whenClauses = ctx.matchedOrNotMatchedClause() - val matchedClauses = whenClauses.asScala.map(_.matchedClause()).filter(_ != null) + val matchedClauses = ctx.matchedClause().asScala.filter(_ != null) val matchedActions = matchedClauses.map { clause => { if (clause.matchedAction().DELETE() != null) { @@ -432,7 +431,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } } - val notMatchedClauses = whenClauses.asScala.map(_.notMatchedClause()).filter(_ != null) + val notMatchedClauses = ctx.notMatchedClause().asScala.filter(_ != null) val notMatchedActions = notMatchedClauses.map { clause => { if (clause.notMatchedAction().INSERT() != null) { From 1d39c925d257555f737ca4505a87b45139e68c74 Mon Sep 17 00:00:00 2001 From: xy_xin Date: Tue, 23 Jun 2020 11:04:25 +0800 Subject: [PATCH 3/5] Optimize the code --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 47e7de8bd91cd..14c05028f5ed3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -411,8 +411,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val mergeCondition = expression(ctx.mergeCondition) - val matchedClauses = ctx.matchedClause().asScala.filter(_ != null) - val matchedActions = matchedClauses.map { + val matchedActions = ctx.matchedClause().asScala.map { clause => { if (clause.matchedAction().DELETE() != null) { DeleteAction(Option(clause.matchedCond).map(expression)) @@ -431,8 +430,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } } - val notMatchedClauses = ctx.notMatchedClause().asScala.filter(_ != null) - val notMatchedActions = notMatchedClauses.map { + val notMatchedActions = ctx.notMatchedClause().asScala.map { clause => { if (clause.notMatchedAction().INSERT() != null) { val condition = Option(clause.notMatchedCond).map(expression) From ab97e31041091b4592f86349eaa81e379022b725 Mon Sep 17 00:00:00 2001 From: xy_xin Date: Tue, 23 Jun 2020 14:48:02 +0800 Subject: [PATCH 4/5] Update according to review comments --- .../sql/catalyst/parser/AstBuilder.scala | 18 ++++------------ .../sql/catalyst/parser/DDLParserSuite.scala | 21 ++----------------- 2 files changed, 6 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 14c05028f5ed3..d08bcb1420176 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -460,23 +460,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging // children being empty means that the condition is not set val matchedActionSize = matchedActions.length if (matchedActionSize >= 2 && !matchedActions.init.forall(_.condition.nonEmpty)) { - throw new ParseException( - s"When there are $matchedActionSize MATCHED clauses in a MERGE statement, " + - s"${if (matchedActionSize == 2) { - "the first MATCHED clause must have a condition" - } else { - s"the first ${matchedActionSize - 1} MATCHED clauses must have conditions" - }}", ctx) + throw new ParseException("When there are more than one MATCHED clauses in a MERGE " + + "statement, only the last MATCHED clause can omit the condition.", ctx) } val notMatchedActionSize = notMatchedActions.length if (notMatchedActionSize >= 2 && !notMatchedActions.init.forall(_.condition.nonEmpty)) { - throw new ParseException( - s"When there are $notMatchedActionSize NOT MATCHED clauses in a MERGE statement, " + - s"${if (notMatchedActionSize == 2) { - "the first NOT MATCHED clause must have a condition" - } else { - s"the first ${notMatchedActionSize - 1} NOT MATCHED clauses must have conditions" - }}", ctx) + throw new ParseException("When there are more than one NOT MATCHED clauses in a MERGE " + + "statement, only the last NOT MATCHED clause can omit the condition.", ctx) } MergeIntoTable( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 27519e884dde8..957b5a09afe82 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1165,24 +1165,7 @@ class DDLParserSuite extends AnalysisTest { Assignment(UnresolvedAttribute("target.col2"), Literal(2))))))) } - test("merge into table: the first matched clause must have a condition if there's a second") { - val exc = intercept[ParseException] { - parsePlan( - """ - |MERGE INTO testcat1.ns1.ns2.tbl AS target - |USING testcat2.ns1.ns2.tbl AS source - |ON target.col1 = source.col1 - |WHEN MATCHED THEN DELETE - |WHEN MATCHED THEN UPDATE SET target.col2 = source.col2 - |WHEN NOT MATCHED AND (target.col2='insert') - |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2) - """.stripMargin) - } - - assert(exc.getMessage.contains("the first MATCHED clause must have a condition")) - } - - test("merge into table: the first 2 matched clause must have conditions if there's a third") { + test("merge into table: only the last matched clause can omit the condition") { val exc = intercept[ParseException] { parsePlan( """ @@ -1197,7 +1180,7 @@ class DDLParserSuite extends AnalysisTest { """.stripMargin) } - assert(exc.getMessage.contains("the first 2 MATCHED clauses must have conditions")) + assert(exc.getMessage.contains("only the last MATCHED clause can omit the condition")) } test("merge into table: there must be a when (not) matched condition") { From d5edef3c2b950440614fc5c9ee1e770bcd0b9884 Mon Sep 17 00:00:00 2001 From: xy_xin Date: Sun, 28 Jun 2020 10:38:52 +0800 Subject: [PATCH 5/5] Update accordding to comments --- .../catalyst/plans/logical/v2Commands.scala | 14 +++---- .../sql/catalyst/parser/DDLParserSuite.scala | 39 ++++++++++++++----- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index b6a92307d1798..b4120d9f64cc5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -346,25 +346,25 @@ case class MergeIntoTable( override def children: Seq[LogicalPlan] = Seq(targetTable, sourceTable) } -sealed abstract class MergeAction( - val condition: Option[Expression]) extends Expression with Unevaluable { +sealed abstract class MergeAction extends Expression with Unevaluable { + def condition: Option[Expression] override def foldable: Boolean = false override def nullable: Boolean = false override def dataType: DataType = throw new UnresolvedException(this, "nullable") override def children: Seq[Expression] = condition.toSeq } -case class DeleteAction(override val condition: Option[Expression]) extends MergeAction(condition) +case class DeleteAction(condition: Option[Expression]) extends MergeAction case class UpdateAction( - override val condition: Option[Expression], - assignments: Seq[Assignment]) extends MergeAction(condition) { + condition: Option[Expression], + assignments: Seq[Assignment]) extends MergeAction { override def children: Seq[Expression] = condition.toSeq ++ assignments } case class InsertAction( - override val condition: Option[Expression], - assignments: Seq[Assignment]) extends MergeAction(condition) { + condition: Option[Expression], + assignments: Seq[Assignment]) extends MergeAction { override def children: Seq[Expression] = condition.toSeq ++ assignments } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 957b5a09afe82..e802449a69743 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1141,11 +1141,11 @@ class DDLParserSuite extends AnalysisTest { |USING testcat2.ns1.ns2.tbl AS source |ON target.col1 = source.col1 |WHEN MATCHED AND (target.col2='delete') THEN DELETE - |WHEN MATCHED AND (target.col2='update to 1') THEN UPDATE SET target.col2 = 1 - |WHEN MATCHED AND (target.col2='update to 2') THEN UPDATE SET target.col2 = 2 - |WHEN NOT MATCHED AND (target.col2='insert 1') + |WHEN MATCHED AND (target.col2='update1') THEN UPDATE SET target.col2 = 1 + |WHEN MATCHED AND (target.col2='update2') THEN UPDATE SET target.col2 = 2 + |WHEN NOT MATCHED AND (target.col2='insert1') |THEN INSERT (target.col1, target.col2) values (source.col1, 1) - |WHEN NOT MATCHED AND (target.col2='insert 2') + |WHEN NOT MATCHED AND (target.col2='insert2') |THEN INSERT (target.col1, target.col2) values (source.col1, 2) """.stripMargin, MergeIntoTable( @@ -1153,14 +1153,14 @@ class DDLParserSuite extends AnalysisTest { SubqueryAlias("source", UnresolvedRelation(Seq("testcat2", "ns1", "ns2", "tbl"))), EqualTo(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")), Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("delete")))), - UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("update to 1"))), + UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("update1"))), Seq(Assignment(UnresolvedAttribute("target.col2"), Literal(1)))), - UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("update to 2"))), + UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("update2"))), Seq(Assignment(UnresolvedAttribute("target.col2"), Literal(2))))), - Seq(InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert 1"))), + Seq(InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert1"))), Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")), Assignment(UnresolvedAttribute("target.col2"), Literal(1)))), - InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert 2"))), + InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert2"))), Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")), Assignment(UnresolvedAttribute("target.col2"), Literal(2))))))) } @@ -1172,7 +1172,7 @@ class DDLParserSuite extends AnalysisTest { |MERGE INTO testcat1.ns1.ns2.tbl AS target |USING testcat2.ns1.ns2.tbl AS source |ON target.col1 = source.col1 - |WHEN MATCHED AND (source.col2 == '1') THEN UPDATE SET target.col2 = 1 + |WHEN MATCHED AND (target.col2 == 'update1') THEN UPDATE SET target.col2 = 1 |WHEN MATCHED THEN UPDATE SET target.col2 = 2 |WHEN MATCHED THEN DELETE |WHEN NOT MATCHED AND (target.col2='insert') @@ -1183,6 +1183,27 @@ class DDLParserSuite extends AnalysisTest { assert(exc.getMessage.contains("only the last MATCHED clause can omit the condition")) } + test("merge into table: only the last not matched clause can omit the condition") { + val exc = intercept[ParseException] { + parsePlan( + """ + |MERGE INTO testcat1.ns1.ns2.tbl AS target + |USING testcat2.ns1.ns2.tbl AS source + |ON target.col1 = source.col1 + |WHEN MATCHED AND (target.col2 == 'update') THEN UPDATE SET target.col2 = source.col2 + |WHEN MATCHED THEN DELETE + |WHEN NOT MATCHED AND (target.col2='insert1') + |THEN INSERT (target.col1, target.col2) values (source.col1, 1) + |WHEN NOT MATCHED + |THEN INSERT (target.col1, target.col2) values (source.col1, 2) + |WHEN NOT MATCHED + |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2) + """.stripMargin) + } + + assert(exc.getMessage.contains("only the last NOT MATCHED clause can omit the condition")) + } + test("merge into table: there must be a when (not) matched condition") { val exc = intercept[ParseException] { parsePlan(