diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 5e732edb17ba..a87fc6e08d9e 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -58,6 +58,10 @@ singleTableIdentifier : tableIdentifier EOF ; +singleCatalogTableIdentifier + : catalogTableIdentifier EOF + ; + singleFunctionIdentifier : functionIdentifier EOF ; @@ -538,6 +542,10 @@ rowFormat (NULL DEFINED AS nullDefinedAs=STRING)? #rowFormatDelimited ; +catalogTableIdentifier + : ((catalog=identifier '.')? db=identifier '.')? table=identifier + ; + tableIdentifier : (db=identifier '.')? table=identifier ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 36cad3cf7478..9fd8c70264cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} @@ -38,15 +38,17 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str /** * Holds the name of a relation that has yet to be looked up in a catalog. * - * @param tableIdentifier table name + * @param table a [[CatalogTableIdentifier]] */ -case class UnresolvedRelation(tableIdentifier: TableIdentifier) - extends LeafNode { +case class UnresolvedRelation(table: CatalogTableIdentifier) extends LeafNode { /** Returns a `.` separated name for this relation. */ - def tableName: String = tableIdentifier.unquotedString + def tableName: String = table.unquotedString - override def output: Seq[Attribute] = Nil + /** Returns the table identifier without the catalog element */ + def tableIdentifier: TableIdentifier = table.asTableIdentifier + + override def output: Seq[AttributeReference] = Nil override lazy val resolved = false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala index deceec73dda3..5aaa102d6615 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala @@ -18,30 +18,32 @@ package org.apache.spark.sql.catalyst /** - * An identifier that optionally specifies a database. + * An identifier that optionally specifies a database and catalog. * * Format (unquoted): "name" or "db.name" * Format (quoted): "`name`" or "`db`.`name`" */ -sealed trait IdentifierWithDatabase { +sealed trait IdentifierWithOptionalDatabaseAndCatalog { val identifier: String def database: Option[String] + def catalog: Option[String] + /* * Escapes back-ticks within the identifier name with double-back-ticks. */ private def quoteIdentifier(name: String): String = name.replace("`", "``") def quotedString: String = { - val replacedId = quoteIdentifier(identifier) - val replacedDb = database.map(quoteIdentifier(_)) - - if (replacedDb.isDefined) s"`${replacedDb.get}`.`$replacedId`" else s"`$replacedId`" + // database is required if catalog is present + assert(database.isDefined || catalog.isEmpty) + def q(s: String): String = s"`${quoteIdentifier(s)}`" + Seq(catalog.map(q), database.map(q), Some(q(identifier))).flatten.mkString(".") } def unquotedString: String = { - if (database.isDefined) s"${database.get}.$identifier" else identifier + Seq(catalog, database, Some(identifier)).flatten.mkString(".") } override def toString: String = quotedString @@ -64,18 +66,74 @@ object AliasIdentifier { def apply(identifier: String): AliasIdentifier = new AliasIdentifier(identifier) } +object CatalogTableIdentifier { + def apply(table: String): CatalogTableIdentifier = + new CatalogTableIdentifier(table, None, None) + + def apply(table: String, database: String): CatalogTableIdentifier = + new CatalogTableIdentifier(table, Some(database), None) + + def apply(table: String, database: String, catalog: String): CatalogTableIdentifier = + new CatalogTableIdentifier(table, Some(database), Some(catalog)) +} + /** - * Identifies a table in a database. - * If `database` is not defined, the current database is used. - * When we register a permanent function in the FunctionRegistry, we use - * unquotedString as the function name. + * Identifies a table in a database and catalog. + * If `database` is not defined, the current catalog's default database is used. + * If `catalog` is not defined, the current catalog is used. */ -case class TableIdentifier(table: String, database: Option[String]) - extends IdentifierWithDatabase { +case class CatalogTableIdentifier(table: String, database: Option[String], catalog: Option[String]) + extends IdentifierWithOptionalDatabaseAndCatalog { + + // ensure database is present if catalog is defined + assert(database.isDefined || catalog.isEmpty) override val identifier: String = table + /** + * Returns this as a TableIdentifier if its catalog is not set, fail otherwise. + * + * This is used to provide TableIdentifier for paths that do not support the catalog element. To + * ensure that the identifier is compatible, this asserts that the catalog element is not defined. + */ + lazy val asTableIdentifier: TableIdentifier = { + assert(catalog.isEmpty, s"Cannot convert to TableIdentifier: catalog is ${catalog.get} != None") + new TableIdentifier(table, database) + } + + /** + * Returns this CatalogTableIdentifier without the catalog. + * + * This is used for code paths where the catalog has already been used. + */ + lazy val dropCatalog: CatalogTableIdentifier = catalog match { + case Some(_) => CatalogTableIdentifier(table, database, None) + case _ => this + } +} + + +/** + * Identifies a table in a database. + * If `database` is not defined, the current database is used. + * + * This class is used instead of CatalogTableIdentifier in paths that do not yet support table + * identifiers with catalogs. + */ +class TableIdentifier(table: String, db: Option[String]) + extends CatalogTableIdentifier(table, db, None) { + def this(table: String) = this(table, None) + + override lazy val asTableIdentifier: TableIdentifier = this + + override def copy( + name: String = this.table, + database: Option[String] = this.db, + catalog: Option[String] = None): TableIdentifier = { + assert(catalog.isEmpty, "Cannot add catalog to a TableIdentifier using copy") + new TableIdentifier(name, database) + } } /** A fully qualified identifier for a table (i.e., database.tableName) */ @@ -84,19 +142,27 @@ case class QualifiedTableName(database: String, name: String) { } object TableIdentifier { - def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName) + def apply(table: String): TableIdentifier = + new TableIdentifier(table) + + def apply(table: String, database: Option[String]): TableIdentifier = + new TableIdentifier(table, database) } /** * Identifies a function in a database. * If `database` is not defined, the current database is used. + * When we register a permanent function in the FunctionRegistry, we use + * unquotedString as the function name. */ case class FunctionIdentifier(funcName: String, database: Option[String]) - extends IdentifierWithDatabase { + extends IdentifierWithOptionalDatabaseAndCatalog { override val identifier: String = funcName + override val catalog: Option[String] = None + def this(funcName: String) = this(funcName, None) override def toString: String = unquotedString diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 672bffcfc0ca..17d239d2778a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -29,7 +29,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions._ @@ -81,6 +81,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging visitTableIdentifier(ctx.tableIdentifier) } + override def visitSingleCatalogTableIdentifier( + ctx: SingleCatalogTableIdentifierContext): CatalogTableIdentifier = withOrigin(ctx) { + visitCatalogTableIdentifier(ctx.catalogTableIdentifier) + } + override def visitSingleFunctionIdentifier( ctx: SingleFunctionIdentifierContext): FunctionIdentifier = withOrigin(ctx) { visitFunctionIdentifier(ctx.functionIdentifier) @@ -945,6 +950,17 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging TableIdentifier(ctx.table.getText, Option(ctx.db).map(_.getText)) } + /** + * Create a [[CatalogTableIdentifier]] from a 'tableName' or 'databaseName'.'tableName' pattern. + */ + override def visitCatalogTableIdentifier( + ctx: CatalogTableIdentifierContext): CatalogTableIdentifier = withOrigin(ctx) { + CatalogTableIdentifier( + ctx.table.getText, + Option(ctx.db).map(_.getText), + Option(ctx.catalog).map(_.getText)) + } + /** * Create a [[FunctionIdentifier]] from a 'functionName' or 'databaseName'.'functionName' pattern. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index 7d8cb1f18b4b..cd3c7a8ddc75 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -23,7 +23,7 @@ import org.antlr.v4.runtime.tree.TerminalNodeImpl import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.Origin @@ -50,6 +50,13 @@ abstract class AbstractSqlParser extends ParserInterface with Logging { astBuilder.visitSingleTableIdentifier(parser.singleTableIdentifier()) } + /** Creates a CatalogTableIdentifier for a given SQL string */ + override def parseCatalogTableIdentifier(sqlText: String): CatalogTableIdentifier = { + parse(sqlText) { parser => + astBuilder.visitSingleCatalogTableIdentifier(parser.singleCatalogTableIdentifier()) + } + } + /** Creates FunctionIdentifier for a given SQL string. */ override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = { parse(sqlText) { parser => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala index 75240d219622..f7f548b3401e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.parser import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{DataType, StructType} @@ -46,6 +46,12 @@ trait ParserInterface { @throws[ParseException]("Text cannot be parsed to a TableIdentifier") def parseTableIdentifier(sqlText: String): TableIdentifier + /** + * Parse a string to a [[CatalogTableIdentifier]]. + */ + @throws[ParseException]("Text cannot be parsed to a CatalogTableIdentifier") + def parseCatalogTableIdentifier(sqlText: String): CatalogTableIdentifier + /** * Parse a string to a [[FunctionIdentifier]]. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 2e9f9f53e94a..5c713e1abd46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -675,7 +675,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { try { val fieldNames = getConstructorParameterNames(p.getClass) val fieldValues = p.productIterator.toSeq - assert(fieldNames.length == fieldValues.length) + assert(fieldNames.length <= fieldValues.length) ("product-class" -> JString(p.getClass.getName)) :: fieldNames.zip(fieldValues).map { case (name, value) => name -> parseToJson(value) }.toList diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 234711ecdb66..4612f9fa3446 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, Literal} import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -138,6 +138,9 @@ case class MyParser(spark: SparkSession, delegate: ParserInterface) extends Pars override def parseTableIdentifier(sqlText: String): TableIdentifier = delegate.parseTableIdentifier(sqlText) + override def parseCatalogTableIdentifier(sqlText: String): CatalogTableIdentifier = + delegate.parseCatalogTableIdentifier(sqlText) + override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = delegate.parseFunctionIdentifier(sqlText)