From 04f629dd297b86fd0890c2b09a212d8fbb99f0d4 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 22 Jul 2020 20:59:06 -0700 Subject: [PATCH 1/6] initial commit --- .../sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/parser/AstBuilder.scala | 36 +++++++--- .../catalyst/plans/logical/statements.scala | 35 ---------- .../catalyst/plans/logical/v2Commands.scala | 42 +++++++++++ .../sql/connector/catalog/LookupCatalog.scala | 7 +- .../sql/catalyst/parser/DDLParserSuite.scala | 63 ++++++++++------- .../analysis/ResolveSessionCatalog.scala | 70 +++++++------------ .../sql/connector/DataSourceV2SQLSuite.scala | 10 +-- .../hive/execution/HiveComparisonTest.scala | 4 +- 9 files changed, 139 insertions(+), 130 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index cfc31858d47a5..db82af5f02d8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1894,7 +1894,7 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { // Resolve functions with concrete relations from v2 catalog. case UnresolvedFunc(multipartIdent) => - val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, "function lookup") + val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent) ResolvedFunc(Identifier.of(funcIdent.database.toArray, funcIdent.funcName)) case q: LogicalPlan => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 5663741bae505..3b9abac34341f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3595,7 +3595,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } else { Seq(describeFuncName.getText) } - DescribeFunctionStatement(functionName, EXTENDED != null) + DescribeFunction(UnresolvedFunc(functionName), EXTENDED != null) } /** @@ -3610,8 +3610,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case Some(x) => throw new ParseException(s"SHOW $x FUNCTIONS not supported", ctx) } val pattern = Option(ctx.pattern).map(string(_)) - val functionName = Option(ctx.multipartIdentifier).map(visitMultipartIdentifier) - ShowFunctionsStatement(userScope, systemScope, pattern, functionName) + val functionName = Option(ctx.multipartIdentifier) + .map(visitMultipartIdentifier) + .map(UnresolvedFunc(_)) + + ShowFunctions(functionName, userScope, systemScope, pattern) } /** @@ -3624,8 +3627,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging */ override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) { val functionName = visitMultipartIdentifier(ctx.multipartIdentifier) - DropFunctionStatement( - functionName, + DropFunction( + UnresolvedFunc(functionName), ctx.EXISTS != null, ctx.TEMPORARY != null) } @@ -3650,12 +3653,25 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } - val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier) - CreateFunctionStatement( - functionIdentifier, + val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier) + + val isTemp = ctx.TEMPORARY != null + val func: LogicalPlan = if (isTemp) { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + // temp func doesn't belong to any catalog and we shouldn't resolve catalog in the name. + if (nameParts.length > 2) { + throw new AnalysisException(s"Unsupported function name '${nameParts.quoted}'") + } + ResolvedFunc(nameParts.asIdentifier) + } else { + UnresolvedFunc(nameParts) + } + + CreateFunction( + func, string(ctx.className), - resources.toSeq, - ctx.TEMPORARY != null, + resources, + isTemp, ctx.EXISTS != null, ctx.REPLACE != null) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index b1129e741221b..4a1c8655e3ffd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -445,38 +445,3 @@ case class ShowColumnsStatement( * A SHOW CURRENT NAMESPACE statement, as parsed from SQL */ case class ShowCurrentNamespaceStatement() extends ParsedStatement - -/** - * A DESCRIBE FUNCTION statement, as parsed from SQL - */ -case class DescribeFunctionStatement( - functionName: Seq[String], - isExtended: Boolean) extends ParsedStatement - -/** - * SHOW FUNCTIONS statement, as parsed from SQL - */ -case class ShowFunctionsStatement( - userScope: Boolean, - systemScope: Boolean, - pattern: Option[String], - functionName: Option[Seq[String]]) extends ParsedStatement - -/** - * DROP FUNCTION statement, as parsed from SQL - */ -case class DropFunctionStatement( - functionName: Seq[String], - ifExists: Boolean, - isTemp: Boolean) extends ParsedStatement - -/** - * CREATE FUNCTION statement, as parsed from SQL - */ -case class CreateFunctionStatement( - functionName: Seq[String], - className: String, - resources: Seq[FunctionResource], - isTemp: Boolean, - ignoreIfExists: Boolean, - replace: Boolean) extends ParsedStatement diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 137fc70397642..2e116e361ec1e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.analysis.{NamedRelation, UnresolvedException} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.FunctionResource import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Unevaluable} import org.apache.spark.sql.catalyst.plans.DescribeTableSchema import org.apache.spark.sql.connector.catalog._ @@ -517,9 +518,50 @@ case class CommentOnTable(child: LogicalPlan, comment: String) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } +/** + * The logical plan of the CREATE FUNCTION command that works for v2 catalogs. + */ +case class CreateFunction( + child: LogicalPlan, + className: String, + resources: Seq[FunctionResource], + isTemp: Boolean, + ignoreIfExists: Boolean, + replace: Boolean) extends Command { + override def children: Seq[LogicalPlan] = child :: Nil +} + /** * The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. */ case class RefreshFunction(child: LogicalPlan) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } + +/** + * The logical plan of the DESCRIBE FUNCTION command that works for v2 catalogs. + */ +case class DescribeFunction(child: LogicalPlan, isExtended: Boolean) extends Command { + override def children: Seq[LogicalPlan] = child :: Nil +} + +/** + * The logical plan of the DROP FUNCTION command that works for v2 catalogs. + */ +case class DropFunction( + child: LogicalPlan, + ifExists: Boolean, + isTemp: Boolean) extends Command { + override def children: Seq[LogicalPlan] = child :: Nil +} + +/** + * The logical plan of the SHOW FUNCTIONS command that works for v2 catalogs. + */ +case class ShowFunctions( + child: Option[LogicalPlan], + userScope: Boolean, + systemScope: Boolean, + pattern: Option[String]) extends Command { + override def children: Seq[LogicalPlan] = if (child.isDefined) { child.get :: Nil } else { Nil } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index b84bf3e2786bc..d8cdecce0d172 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -156,10 +156,7 @@ private[sql] trait LookupCatalog extends Logging { } } - // TODO: move function related v2 statements to the new framework. - def parseSessionCatalogFunctionIdentifier( - nameParts: Seq[String], - sql: String): FunctionIdentifier = { + def parseSessionCatalogFunctionIdentifier(nameParts: Seq[String]): FunctionIdentifier = { if (nameParts.length == 1 && catalogManager.v1SessionCatalog.isTempFunction(nameParts.head)) { return FunctionIdentifier(nameParts.head) } @@ -179,7 +176,7 @@ private[sql] trait LookupCatalog extends Logging { } } - case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog") + case _ => throw new AnalysisException("function is only supported in v1 catalog") } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 47f21a0a1910d..f029ac723cc3c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, ResolvedFunc, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, FunctionResourceType, JarResource} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} import org.apache.spark.sql.catalyst.plans.logical._ @@ -2016,40 +2016,40 @@ class DDLParserSuite extends AnalysisTest { test("DESCRIBE FUNCTION") { comparePlans( parsePlan("DESC FUNCTION a"), - DescribeFunctionStatement(Seq("a"), false)) + DescribeFunction(UnresolvedFunc(Seq("a")), false)) comparePlans( parsePlan("DESCRIBE FUNCTION a"), - DescribeFunctionStatement(Seq("a"), false)) + DescribeFunction(UnresolvedFunc(Seq("a")), false)) comparePlans( parsePlan("DESCRIBE FUNCTION a.b.c"), - DescribeFunctionStatement(Seq("a", "b", "c"), false)) + DescribeFunction(UnresolvedFunc(Seq("a", "b", "c")), false)) comparePlans( parsePlan("DESCRIBE FUNCTION EXTENDED a.b.c"), - DescribeFunctionStatement(Seq("a", "b", "c"), true)) + DescribeFunction(UnresolvedFunc(Seq("a", "b", "c")), true)) } test("SHOW FUNCTIONS") { comparePlans( parsePlan("SHOW FUNCTIONS"), - ShowFunctionsStatement(true, true, None, None)) + ShowFunctions(None, true, true, None)) comparePlans( parsePlan("SHOW USER FUNCTIONS"), - ShowFunctionsStatement(true, false, None, None)) + ShowFunctions(None, true, false, None)) comparePlans( parsePlan("SHOW user FUNCTIONS"), - ShowFunctionsStatement(true, false, None, None)) + ShowFunctions(None, true, false, None)) comparePlans( parsePlan("SHOW SYSTEM FUNCTIONS"), - ShowFunctionsStatement(false, true, None, None)) + ShowFunctions(None, false, true, None)) comparePlans( parsePlan("SHOW ALL FUNCTIONS"), - ShowFunctionsStatement(true, true, None, None)) + ShowFunctions(None, true, true, None)) comparePlans( parsePlan("SHOW FUNCTIONS LIKE 'funct*'"), - ShowFunctionsStatement(true, true, Some("funct*"), None)) + ShowFunctions(None, true, true, Some("funct*"))) comparePlans( parsePlan("SHOW FUNCTIONS LIKE a.b.c"), - ShowFunctionsStatement(true, true, None, Some(Seq("a", "b", "c")))) + ShowFunctions(Some(UnresolvedFunc(Seq("a", "b", "c"))), true, true, None)) val sql = "SHOW other FUNCTIONS" intercept(sql, s"$sql not supported") } @@ -2057,51 +2057,60 @@ class DDLParserSuite extends AnalysisTest { test("DROP FUNCTION") { comparePlans( parsePlan("DROP FUNCTION a"), - DropFunctionStatement(Seq("a"), false, false)) + DropFunction(UnresolvedFunc(Seq("a")), false, false)) comparePlans( parsePlan("DROP FUNCTION a.b.c"), - DropFunctionStatement(Seq("a", "b", "c"), false, false)) + DropFunction(UnresolvedFunc(Seq("a", "b", "c")), false, false)) comparePlans( parsePlan("DROP TEMPORARY FUNCTION a.b.c"), - DropFunctionStatement(Seq("a", "b", "c"), false, true)) + DropFunction(UnresolvedFunc(Seq("a", "b", "c")), false, true)) comparePlans( parsePlan("DROP FUNCTION IF EXISTS a.b.c"), - DropFunctionStatement(Seq("a", "b", "c"), true, false)) + DropFunction(UnresolvedFunc(Seq("a", "b", "c")), true, false)) comparePlans( parsePlan("DROP TEMPORARY FUNCTION IF EXISTS a.b.c"), - DropFunctionStatement(Seq("a", "b", "c"), true, true)) + DropFunction(UnresolvedFunc(Seq("a", "b", "c")), true, true)) } test("CREATE FUNCTION") { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + parseCompare("CREATE FUNCTION a as 'fun'", - CreateFunctionStatement(Seq("a"), "fun", Seq(), false, false, false)) + CreateFunction(UnresolvedFunc(Seq("a")), "fun", Seq(), false, false, false)) parseCompare("CREATE FUNCTION a.b.c as 'fun'", - CreateFunctionStatement(Seq("a", "b", "c"), "fun", Seq(), false, false, false)) + CreateFunction(UnresolvedFunc(Seq("a", "b", "c")), "fun", Seq(), false, false, false)) parseCompare("CREATE OR REPLACE FUNCTION a.b.c as 'fun'", - CreateFunctionStatement(Seq("a", "b", "c"), "fun", Seq(), false, false, true)) + CreateFunction(UnresolvedFunc(Seq("a", "b", "c")), "fun", Seq(), false, false, true)) + + parseCompare("CREATE TEMPORARY FUNCTION a as 'fun'", + CreateFunction(ResolvedFunc(Seq("a").asIdentifier), "fun", Seq(), true, false, false)) - parseCompare("CREATE TEMPORARY FUNCTION a.b.c as 'fun'", - CreateFunctionStatement(Seq("a", "b", "c"), "fun", Seq(), true, false, false)) + parseCompare("CREATE TEMPORARY FUNCTION a.b as 'fun'", + CreateFunction(ResolvedFunc(Seq("a", "b").asIdentifier), "fun", Seq(), true, false, false)) + + val caught = intercept[AnalysisException]( + parsePlan("CREATE TEMPORARY FUNCTION a.b.c as 'fun'")) + assert(caught.getMessage.contains("Unsupported function name 'a.b.c'")) parseCompare("CREATE FUNCTION IF NOT EXISTS a.b.c as 'fun'", - CreateFunctionStatement(Seq("a", "b", "c"), "fun", Seq(), false, true, false)) + CreateFunction(UnresolvedFunc(Seq("a", "b", "c")), "fun", Seq(), false, true, false)) parseCompare("CREATE FUNCTION a as 'fun' USING JAR 'j'", - CreateFunctionStatement(Seq("a"), "fun", Seq(FunctionResource(JarResource, "j")), + CreateFunction(UnresolvedFunc(Seq("a")), "fun", Seq(FunctionResource(JarResource, "j")), false, false, false)) parseCompare("CREATE FUNCTION a as 'fun' USING ARCHIVE 'a'", - CreateFunctionStatement(Seq("a"), "fun", Seq(FunctionResource(ArchiveResource, "a")), + CreateFunction(UnresolvedFunc(Seq("a")), "fun", Seq(FunctionResource(ArchiveResource, "a")), false, false, false)) parseCompare("CREATE FUNCTION a as 'fun' USING FILE 'f'", - CreateFunctionStatement(Seq("a"), "fun", Seq(FunctionResource(FileResource, "f")), + CreateFunction(UnresolvedFunc(Seq("a")), "fun", Seq(FunctionResource(FileResource, "f")), false, false, false)) parseCompare("CREATE FUNCTION a as 'fun' USING JAR 'j', ARCHIVE 'a', FILE 'f'", - CreateFunctionStatement(Seq("a"), "fun", Seq(FunctionResource(JarResource, "j"), + CreateFunction(UnresolvedFunc(Seq("a")), "fun", Seq(FunctionResource(JarResource, "j"), FunctionResource(ArchiveResource, "a"), FunctionResource(FileResource, "f")), false, false, false)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 5717013b2eba2..581817037594a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -577,51 +577,31 @@ class ResolveSessionCatalog( case ShowTableProperties(r: ResolvedView, propertyKey) => ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey) - case DescribeFunctionStatement(nameParts, extended) => - val functionIdent = - parseSessionCatalogFunctionIdentifier(nameParts, "DESCRIBE FUNCTION") - DescribeFunctionCommand(functionIdent, extended) - - case ShowFunctionsStatement(userScope, systemScope, pattern, fun) => - val (database, function) = fun match { - case Some(nameParts) => - val FunctionIdentifier(fn, db) = - parseSessionCatalogFunctionIdentifier(nameParts, "SHOW FUNCTIONS") - (db, Some(fn)) - case None => (None, pattern) - } - ShowFunctionsCommand(database, function, userScope, systemScope) - - case DropFunctionStatement(nameParts, ifExists, isTemp) => - val FunctionIdentifier(function, database) = - parseSessionCatalogFunctionIdentifier(nameParts, "DROP FUNCTION") - DropFunctionCommand(database, function, ifExists, isTemp) - - case CreateFunctionStatement(nameParts, - className, resources, isTemp, ignoreIfExists, replace) => - if (isTemp) { - // temp func doesn't belong to any catalog and we shouldn't resolve catalog in the name. - val database = if (nameParts.length > 2) { - throw new AnalysisException(s"Unsupported function name '${nameParts.quoted}'") - } else if (nameParts.length == 2) { - Some(nameParts.head) - } else { - None - } - CreateFunctionCommand( - database, - nameParts.last, - className, - resources, - isTemp, - ignoreIfExists, - replace) - } else { - val FunctionIdentifier(function, database) = - parseSessionCatalogFunctionIdentifier(nameParts, "CREATE FUNCTION") - CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, - replace) - } + case DescribeFunction(ResolvedFunc(identifier), extended) => + DescribeFunctionCommand(identifier.asFunctionIdentifier, extended) + + case ShowFunctions(None, userScope, systemScope, pattern) => + ShowFunctionsCommand(None, pattern, userScope, systemScope) + + case ShowFunctions(Some(ResolvedFunc(identifier)), userScope, systemScope, _) => + val funcIdentifier = identifier.asFunctionIdentifier + ShowFunctionsCommand( + funcIdentifier.database, Some(funcIdentifier.funcName), userScope, systemScope) + + case DropFunction(ResolvedFunc(identifier), ifExists, isTemp) => + val funcIdentifier = identifier.asFunctionIdentifier + DropFunctionCommand(funcIdentifier.database, funcIdentifier.funcName, ifExists, isTemp) + + case c @ CreateFunction(ResolvedFunc(identifier), _, _, _, _, _) => + val funcIdentifier = identifier.asFunctionIdentifier + CreateFunctionCommand( + funcIdentifier.database, + funcIdentifier.funcName, + c.className, + c.resources, + c.isTemp, + c.ignoreIfExists, + c.replace) case RefreshFunction(ResolvedFunc(identifier)) => // Fallback to v1 command diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index d6c24e47e8652..ffc115e6b7600 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2185,7 +2185,7 @@ class DataSourceV2SQLSuite val e = intercept[AnalysisException] { sql("DESCRIBE FUNCTION testcat.ns1.ns2.fun") } - assert(e.message.contains("DESCRIBE FUNCTION is only supported in v1 catalog")) + assert(e.message.contains("function is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { sql("DESCRIBE FUNCTION default.ns1.ns2.fun") @@ -2200,14 +2200,14 @@ class DataSourceV2SQLSuite val e = intercept[AnalysisException] { sql(s"SHOW FUNCTIONS LIKE $function") } - assert(e.message.contains("SHOW FUNCTIONS is only supported in v1 catalog")) + assert(e.message.contains("function is only supported in v1 catalog")) } test("DROP FUNCTION: only support session catalog") { val e = intercept[AnalysisException] { sql("DROP FUNCTION testcat.ns1.ns2.fun") } - assert(e.message.contains("DROP FUNCTION is only supported in v1 catalog")) + assert(e.message.contains("function is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { sql("DROP FUNCTION default.ns1.ns2.fun") @@ -2220,7 +2220,7 @@ class DataSourceV2SQLSuite val e = intercept[AnalysisException] { sql("CREATE FUNCTION testcat.ns1.ns2.fun as 'f'") } - assert(e.message.contains("CREATE FUNCTION is only supported in v1 catalog")) + assert(e.message.contains("function is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { sql("CREATE FUNCTION default.ns1.ns2.fun as 'f'") @@ -2233,7 +2233,7 @@ class DataSourceV2SQLSuite val e = intercept[AnalysisException] { sql("REFRESH FUNCTION testcat.ns1.ns2.fun") } - assert(e.message.contains("function lookup is only supported in v1 catalog")) + assert(e.message.contains("function is only supported in v1 catalog")) val e1 = intercept[AnalysisException] { sql("REFRESH FUNCTION default.ns1.ns2.fun") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 2e4c01830432f..7f198632a1cd6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -371,8 +371,8 @@ abstract class HiveComparisonTest extends SparkFunSuite with BeforeAndAfterAll { // We will ignore the ExplainCommand, ShowFunctions, DescribeFunction if ((!hiveQuery.logical.isInstanceOf[ExplainCommand]) && - (!hiveQuery.logical.isInstanceOf[ShowFunctionsStatement]) && - (!hiveQuery.logical.isInstanceOf[DescribeFunctionStatement]) && + (!hiveQuery.logical.isInstanceOf[ShowFunctions]) && + (!hiveQuery.logical.isInstanceOf[DescribeFunction]) && (!hiveQuery.logical.isInstanceOf[DescribeCommandBase]) && (!hiveQuery.logical.isInstanceOf[DescribeRelation]) && (!hiveQuery.logical.isInstanceOf[DescribeColumnStatement]) && From 38756fc8f0232dd62dce3cd4eb4bf6b467742061 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 22 Jul 2020 21:08:47 -0700 Subject: [PATCH 2/6] spacing --- .../scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 3b9abac34341f..a90679df2031a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3613,7 +3613,6 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val functionName = Option(ctx.multipartIdentifier) .map(visitMultipartIdentifier) .map(UnresolvedFunc(_)) - ShowFunctions(functionName, userScope, systemScope, pattern) } @@ -3654,7 +3653,6 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier) - val isTemp = ctx.TEMPORARY != null val func: LogicalPlan = if (isTemp) { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ From 0e9f12fd718b17a67d535d642151de3239f5fd8e Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 24 Jul 2020 11:53:36 -0700 Subject: [PATCH 3/6] address PR comments --- .../sql/catalyst/parser/AstBuilder.scala | 26 ++++---------- .../catalyst/plans/logical/statements.scala | 11 ++++++ .../catalyst/plans/logical/v2Commands.scala | 13 ------- .../sql/catalyst/parser/DDLParserSuite.scala | 27 ++++++-------- .../analysis/ResolveSessionCatalog.scala | 35 +++++++++++++------ 5 files changed, 53 insertions(+), 59 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 03f1323045f77..fe99a8ea3cc12 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3615,10 +3615,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case Some(x) => throw new ParseException(s"SHOW $x FUNCTIONS not supported", ctx) } val pattern = Option(ctx.pattern).map(string(_)) - val functionName = Option(ctx.multipartIdentifier) + val unresolvedFuncOpt = Option(ctx.multipartIdentifier) .map(visitMultipartIdentifier) .map(UnresolvedFunc(_)) - ShowFunctions(functionName, userScope, systemScope, pattern) + ShowFunctions(unresolvedFuncOpt, userScope, systemScope, pattern) } /** @@ -3657,24 +3657,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } - val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier) - val isTemp = ctx.TEMPORARY != null - val func: LogicalPlan = if (isTemp) { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - // temp func doesn't belong to any catalog and we shouldn't resolve catalog in the name. - if (nameParts.length > 2) { - throw new AnalysisException(s"Unsupported function name '${nameParts.quoted}'") - } - ResolvedFunc(nameParts.asIdentifier) - } else { - UnresolvedFunc(nameParts) - } - - CreateFunction( - func, + val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier) + CreateFunctionStatement( + functionIdentifier, string(ctx.className), - resources, - isTemp, + resources.toSeq, + ctx.TEMPORARY != null, ctx.EXISTS != null, ctx.REPLACE != null) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 4a1c8655e3ffd..19831a7b5ef84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -445,3 +445,14 @@ case class ShowColumnsStatement( * A SHOW CURRENT NAMESPACE statement, as parsed from SQL */ case class ShowCurrentNamespaceStatement() extends ParsedStatement + +/** + * CREATE FUNCTION statement, as parsed from SQL + */ +case class CreateFunctionStatement( + functionName: Seq[String], + className: String, + resources: Seq[FunctionResource], + isTemp: Boolean, + ignoreIfExists: Boolean, + replace: Boolean) extends ParsedStatement diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 2e116e361ec1e..da89b3fdd78b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -518,19 +518,6 @@ case class CommentOnTable(child: LogicalPlan, comment: String) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } -/** - * The logical plan of the CREATE FUNCTION command that works for v2 catalogs. - */ -case class CreateFunction( - child: LogicalPlan, - className: String, - resources: Seq[FunctionResource], - isTemp: Boolean, - ignoreIfExists: Boolean, - replace: Boolean) extends Command { - override def children: Seq[LogicalPlan] = child :: Nil -} - /** * The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 3814d4314a3ab..1ae30a5a56c40 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2086,41 +2086,34 @@ class DDLParserSuite extends AnalysisTest { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ parseCompare("CREATE FUNCTION a as 'fun'", - CreateFunction(UnresolvedFunc(Seq("a")), "fun", Seq(), false, false, false)) + CreateFunctionStatement(Seq("a"), "fun", Seq(), false, false, false)) parseCompare("CREATE FUNCTION a.b.c as 'fun'", - CreateFunction(UnresolvedFunc(Seq("a", "b", "c")), "fun", Seq(), false, false, false)) + CreateFunctionStatement(Seq("a", "b", "c"), "fun", Seq(), false, false, false)) parseCompare("CREATE OR REPLACE FUNCTION a.b.c as 'fun'", - CreateFunction(UnresolvedFunc(Seq("a", "b", "c")), "fun", Seq(), false, false, true)) + CreateFunctionStatement(Seq("a", "b", "c"), "fun", Seq(), false, false, true)) - parseCompare("CREATE TEMPORARY FUNCTION a as 'fun'", - CreateFunction(ResolvedFunc(Seq("a").asIdentifier), "fun", Seq(), true, false, false)) - - parseCompare("CREATE TEMPORARY FUNCTION a.b as 'fun'", - CreateFunction(ResolvedFunc(Seq("a", "b").asIdentifier), "fun", Seq(), true, false, false)) - - val caught = intercept[AnalysisException]( - parsePlan("CREATE TEMPORARY FUNCTION a.b.c as 'fun'")) - assert(caught.getMessage.contains("Unsupported function name 'a.b.c'")) + parseCompare("CREATE TEMPORARY FUNCTION a.b.c as 'fun'", + CreateFunctionStatement(Seq("a", "b", "c"), "fun", Seq(), true, false, false)) parseCompare("CREATE FUNCTION IF NOT EXISTS a.b.c as 'fun'", - CreateFunction(UnresolvedFunc(Seq("a", "b", "c")), "fun", Seq(), false, true, false)) + CreateFunctionStatement(Seq("a", "b", "c"), "fun", Seq(), false, true, false)) parseCompare("CREATE FUNCTION a as 'fun' USING JAR 'j'", - CreateFunction(UnresolvedFunc(Seq("a")), "fun", Seq(FunctionResource(JarResource, "j")), + CreateFunctionStatement(Seq("a"), "fun", Seq(FunctionResource(JarResource, "j")), false, false, false)) parseCompare("CREATE FUNCTION a as 'fun' USING ARCHIVE 'a'", - CreateFunction(UnresolvedFunc(Seq("a")), "fun", Seq(FunctionResource(ArchiveResource, "a")), + CreateFunctionStatement(Seq("a"), "fun", Seq(FunctionResource(ArchiveResource, "a")), false, false, false)) parseCompare("CREATE FUNCTION a as 'fun' USING FILE 'f'", - CreateFunction(UnresolvedFunc(Seq("a")), "fun", Seq(FunctionResource(FileResource, "f")), + CreateFunctionStatement(Seq("a"), "fun", Seq(FunctionResource(FileResource, "f")), false, false, false)) parseCompare("CREATE FUNCTION a as 'fun' USING JAR 'j', ARCHIVE 'a', FILE 'f'", - CreateFunction(UnresolvedFunc(Seq("a")), "fun", Seq(FunctionResource(JarResource, "j"), + CreateFunctionStatement(Seq("a"), "fun", Seq(FunctionResource(JarResource, "j"), FunctionResource(ArchiveResource, "a"), FunctionResource(FileResource, "f")), false, false, false)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 581817037594a..8923d5c86e19a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -592,16 +592,31 @@ class ResolveSessionCatalog( val funcIdentifier = identifier.asFunctionIdentifier DropFunctionCommand(funcIdentifier.database, funcIdentifier.funcName, ifExists, isTemp) - case c @ CreateFunction(ResolvedFunc(identifier), _, _, _, _, _) => - val funcIdentifier = identifier.asFunctionIdentifier - CreateFunctionCommand( - funcIdentifier.database, - funcIdentifier.funcName, - c.className, - c.resources, - c.isTemp, - c.ignoreIfExists, - c.replace) + case CreateFunctionStatement(nameParts, + className, resources, isTemp, ignoreIfExists, replace) => + if (isTemp) { + // temp func doesn't belong to any catalog and we shouldn't resolve catalog in the name. + val database = if (nameParts.length > 2) { + throw new AnalysisException(s"Unsupported function name '${nameParts.quoted}'") + } else if (nameParts.length == 2) { + Some(nameParts.head) + } else { + None + } + CreateFunctionCommand( + database, + nameParts.last, + className, + resources, + isTemp, + ignoreIfExists, + replace) + } else { + val FunctionIdentifier(function, database) = + parseSessionCatalogFunctionIdentifier(nameParts) + CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, + replace) + } case RefreshFunction(ResolvedFunc(identifier)) => // Fallback to v1 command From daf4d6bb3e3f31ea90e071668245da682accae3b Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 24 Jul 2020 11:54:48 -0700 Subject: [PATCH 4/6] remove unused import --- .../org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index da89b3fdd78b2..c91deba355f27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.analysis.{NamedRelation, UnresolvedException} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.catalog.FunctionResource import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Unevaluable} import org.apache.spark.sql.catalyst.plans.DescribeTableSchema import org.apache.spark.sql.connector.catalog._ From 27f2fb1088ca5f45731a9a6c1d40b02cd76577f0 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 24 Jul 2020 11:55:53 -0700 Subject: [PATCH 5/6] remove unused import --- .../org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 1ae30a5a56c40..ac6af4f4e3231 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, ResolvedFunc, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, FunctionResourceType, JarResource} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} import org.apache.spark.sql.catalyst.plans.logical._ @@ -2083,8 +2083,6 @@ class DDLParserSuite extends AnalysisTest { } test("CREATE FUNCTION") { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - parseCompare("CREATE FUNCTION a as 'fun'", CreateFunctionStatement(Seq("a"), "fun", Seq(), false, false, false)) From c662619bd00e74d4f683fcb565868bf72e4890d6 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 28 Jul 2020 09:26:16 -0700 Subject: [PATCH 6/6] Address PR comment --- .../apache/spark/sql/catalyst/plans/logical/v2Commands.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index c91deba355f27..70e03c23fd115 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -549,5 +549,5 @@ case class ShowFunctions( userScope: Boolean, systemScope: Boolean, pattern: Option[String]) extends Command { - override def children: Seq[LogicalPlan] = if (child.isDefined) { child.get :: Nil } else { Nil } + override def children: Seq[LogicalPlan] = child.toSeq }