From a28633361ac5bd4eb41c5d8157fdcce99a8b78f2 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sun, 3 Nov 2019 09:48:55 -0800 Subject: [PATCH 1/4] initial checkin --- docs/sql-keywords.md | 1 + .../spark/sql/catalyst/parser/SqlBase.g4 | 4 ++ .../catalyst/analysis/ResolveCatalogs.scala | 3 ++ .../sql/catalyst/parser/AstBuilder.scala | 8 ++++ .../catalyst/plans/logical/statements.scala | 5 +++ .../catalyst/plans/logical/v2Commands.scala | 8 ++++ .../sql/catalyst/parser/DDLParserSuite.scala | 6 +++ .../datasources/v2/DataSourceV2Strategy.scala | 5 ++- .../v2/ShowCurrentCatalogExec.scala | 38 +++++++++++++++++++ .../sql/connector/DataSourceV2SQLSuite.scala | 18 +++++++++ 10 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCurrentCatalogExec.scala diff --git a/docs/sql-keywords.md b/docs/sql-keywords.md index b4f8d8be11c4..1ef360dcddce 100644 --- a/docs/sql-keywords.md +++ b/docs/sql-keywords.md @@ -57,6 +57,7 @@ Below is a list of all the keywords in Spark SQL. CASCADEnon-reservednon-reservedreserved CASEreservednon-reservedreserved CASTreservednon-reservedreserved + CATALOGnon-reservednon-reservednon-reserved CHANGEnon-reservednon-reservednon-reserved CHECKreservednon-reservedreserved CLEARnon-reservednon-reservednon-reserved diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 11b5d4ae5ebf..a784062ffddb 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -202,6 +202,7 @@ statement | SHOW identifier? FUNCTIONS (LIKE? (qualifiedName | pattern=STRING))? #showFunctions | SHOW CREATE TABLE multipartIdentifier #showCreateTable + | SHOW CURRENT CATALOG #showCurrentCatalog | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction | (DESC | DESCRIBE) database EXTENDED? db=errorCapturingIdentifier #describeDatabase | (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)? @@ -938,6 +939,7 @@ ansiNonReserved | BY | CACHE | CASCADE + | CATALOG | CHANGE | CLEAR | CLUSTER @@ -1161,6 +1163,7 @@ nonReserved | CASCADE | CASE | CAST + | CATALOG | CHANGE | CHECK | CLEAR @@ -1419,6 +1422,7 @@ CACHE: 'CACHE'; CASCADE: 'CASCADE'; CASE: 'CASE'; CAST: 'CAST'; +CATALOG: 'CATALOG'; CHANGE: 'CHANGE'; CHECK: 'CHECK'; CLEAR: 'CLEAR'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index ddd60fa5ec11..a21ffb019912 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -204,6 +204,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager) val CatalogAndNamespace(catalog, namespace) = nameParts SetCatalogAndNamespace(catalogManager, Some(catalog.name()), namespace) } + + case ShowCurrentCatalogStatement() => + ShowCurrentCatalog(catalogManager) } object NonSessionCatalog { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 911a5b3aa36a..bb1fee5d83e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2951,4 +2951,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx: RecoverPartitionsContext): LogicalPlan = withOrigin(ctx) { AlterTableRecoverPartitionsStatement(visitMultipartIdentifier(ctx.multipartIdentifier)) } + + /** + * Create a [[ShowCurrentCatalogStatement]]. + */ + override def visitShowCurrentCatalog( + ctx: ShowCurrentCatalogContext) : LogicalPlan = withOrigin(ctx) { + ShowCurrentCatalogStatement() + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 41c7438eaa9e..1b728e7ac5ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -401,3 +401,8 @@ case class RefreshTableStatement(tableName: Seq[String]) extends ParsedStatement case class ShowColumnsStatement( table: Seq[String], namespace: Option[Seq[String]]) extends ParsedStatement + +/** + * A SHOW CURRENT CATALOG statement, as parsed from SQL + */ +case class ShowCurrentCatalogStatement() extends ParsedStatement diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 3c625e9acb5a..fbdaf230cca2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -362,3 +362,11 @@ case class SetCatalogAndNamespace( case class RefreshTable( catalog: TableCatalog, ident: Identifier) extends Command + +/** + * The logical plan of the SHOW CURRENT CATALOG command that works for v2 catalogs. + */ +case class ShowCurrentCatalog(catalogManager: CatalogManager) extends Command { + override val output: Seq[Attribute] = Seq( + AttributeReference("catalogName", StringType, nullable = false)()) +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 53da49ead4e4..2b3b25eccd10 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1202,6 +1202,12 @@ class DDLParserSuite extends AnalysisTest { AlterTableRecoverPartitionsStatement(Seq("a", "b", "c"))) } + test("show current catalog") { + comparePlans( + parsePlan("SHOW CURRENT CATALOG"), + ShowCurrentCatalogStatement()) + } + private case class TableSpec( name: Seq[String], schema: Option[StructType], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index bc66c154b57a..031b38839233 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentCatalog, ShowNamespaces, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} @@ -210,6 +210,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case SetCatalogAndNamespace(catalogManager, catalogName, namespace) => SetCatalogAndNamespaceExec(catalogManager, catalogName, namespace) :: Nil + case r: ShowCurrentCatalog => + ShowCurrentCatalogExec(r.output, r.catalogManager) :: Nil + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCurrentCatalogExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCurrentCatalogExec.scala new file mode 100644 index 000000000000..e222aff29ca2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCurrentCatalogExec.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema} +import org.apache.spark.sql.connector.catalog.CatalogManager + +/** + * Physical plan node for showing current catalog. + */ +case class ShowCurrentCatalogExec( + output: Seq[Attribute], + catalogManager: CatalogManager) + extends V2CommandExec { + override protected def run(): Seq[InternalRow] = { + val encoder = RowEncoder(schema).resolveAndBind() + Seq(encoder + .toRow(new GenericRowWithSchema(Array(catalogManager.currentCatalog.name), schema)) + .copy()) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 6e20b248ebc1..cc15888279c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -990,6 +990,24 @@ class DataSourceV2SQLSuite assert(catalogManager.currentNamespace === Array("ns1", "ns2")) } + test("ShowCurrentCatalog: basic tests") { + def testShowCurrentCatalog(expectedCatalogName: String): Unit = { + val schema = new StructType().add("catalogName", StringType, nullable = false) + val df = sql("SHOW CURRENT CATALOG") + val rows = df.collect + + assert(df.schema === schema) + assert(rows.length == 1) + assert(rows(0).getAs[String](0) === expectedCatalogName) + } + + // Initially, the v2 session catalog is set as a current catalog. + testShowCurrentCatalog("spark_catalog") + + sql("Use testcat") + testShowCurrentCatalog("testcat") + } + test("tableCreation: partition column case insensitive resolution") { val testCatalog = catalog("testcat").asTableCatalog val sessionCatalog = catalog(SESSION_CATALOG_NAME).asTableCatalog From 506ce2a4ec8546b18d1fe0a2f2d4d249cc32592a Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 4 Nov 2019 10:44:54 -0500 Subject: [PATCH 2/4] address PR comments --- docs/sql-keywords.md | 1 - .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 5 +---- .../sql/catalyst/analysis/ResolveCatalogs.scala | 4 ++-- .../spark/sql/catalyst/parser/AstBuilder.scala | 8 ++++---- .../sql/catalyst/plans/logical/statements.scala | 4 ++-- .../sql/catalyst/plans/logical/v2Commands.scala | 7 ++++--- .../sql/catalyst/parser/DDLParserSuite.scala | 6 +++--- .../datasources/v2/DataSourceV2Strategy.scala | 6 +++--- ...xec.scala => ShowCurrentNamespaceExec.scala} | 8 +++++--- .../sql/connector/DataSourceV2SQLSuite.scala | 17 +++++++++++------ 10 files changed, 35 insertions(+), 31 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{ShowCurrentCatalogExec.scala => ShowCurrentNamespaceExec.scala} (80%) diff --git a/docs/sql-keywords.md b/docs/sql-keywords.md index 1ef360dcddce..b4f8d8be11c4 100644 --- a/docs/sql-keywords.md +++ b/docs/sql-keywords.md @@ -57,7 +57,6 @@ Below is a list of all the keywords in Spark SQL. CASCADEnon-reservednon-reservedreserved CASEreservednon-reservedreserved CASTreservednon-reservedreserved - CATALOGnon-reservednon-reservednon-reserved CHANGEnon-reservednon-reservednon-reserved CHECKreservednon-reservedreserved CLEARnon-reservednon-reservednon-reserved diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index fa4e7668c8be..b2b611e417c1 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -200,7 +200,7 @@ statement | SHOW identifier? FUNCTIONS (LIKE? (qualifiedName | pattern=STRING))? #showFunctions | SHOW CREATE TABLE multipartIdentifier #showCreateTable - | SHOW CURRENT CATALOG #showCurrentCatalog + | SHOW CURRENT NAMESPACE #showCurrentNamespace | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction | (DESC | DESCRIBE) database EXTENDED? db=errorCapturingIdentifier #describeDatabase | (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)? @@ -949,7 +949,6 @@ ansiNonReserved | BY | CACHE | CASCADE - | CATALOG | CHANGE | CLEAR | CLUSTER @@ -1173,7 +1172,6 @@ nonReserved | CASCADE | CASE | CAST - | CATALOG | CHANGE | CHECK | CLEAR @@ -1432,7 +1430,6 @@ CACHE: 'CACHE'; CASCADE: 'CASCADE'; CASE: 'CASE'; CAST: 'CAST'; -CATALOG: 'CATALOG'; CHANGE: 'CHANGE'; CHECK: 'CHECK'; CLEAR: 'CLEAR'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index a21ffb019912..bca07262a6f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -205,8 +205,8 @@ class ResolveCatalogs(val catalogManager: CatalogManager) SetCatalogAndNamespace(catalogManager, Some(catalog.name()), namespace) } - case ShowCurrentCatalogStatement() => - ShowCurrentCatalog(catalogManager) + case ShowCurrentNamespaceStatement() => + ShowCurrentNamespace(catalogManager) } object NonSessionCatalog { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 34df09e367e8..0ad4fc8d63af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2593,11 +2593,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Create a [[ShowCurrentCatalogStatement]]. + * Create a [[ShowCurrentNamespaceStatement]]. */ - override def visitShowCurrentCatalog( - ctx: ShowCurrentCatalogContext) : LogicalPlan = withOrigin(ctx) { - ShowCurrentCatalogStatement() + override def visitShowCurrentNamespace( + ctx: ShowCurrentNamespaceContext) : LogicalPlan = withOrigin(ctx) { + ShowCurrentNamespaceStatement() } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index b578873f39c3..e67d50437256 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -421,6 +421,6 @@ case class ShowColumnsStatement( namespace: Option[Seq[String]]) extends ParsedStatement /** - * A SHOW CURRENT CATALOG statement, as parsed from SQL + * A SHOW CURRENT NAMESPACE statement, as parsed from SQL */ -case class ShowCurrentCatalogStatement() extends ParsedStatement +case class ShowCurrentNamespaceStatement() extends ParsedStatement diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index fbdaf230cca2..e27d052ae88e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -364,9 +364,10 @@ case class RefreshTable( ident: Identifier) extends Command /** - * The logical plan of the SHOW CURRENT CATALOG command that works for v2 catalogs. + * The logical plan of the SHOW CURRENT NAMESPACE command that works for v2 catalogs. */ -case class ShowCurrentCatalog(catalogManager: CatalogManager) extends Command { +case class ShowCurrentNamespace(catalogManager: CatalogManager) extends Command { override val output: Seq[Attribute] = Seq( - AttributeReference("catalogName", StringType, nullable = false)()) + AttributeReference("catalogName", StringType, nullable = false)(), + AttributeReference("namespace", StringType, nullable = false)()) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 471740b4db4a..2e6073501f28 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1278,10 +1278,10 @@ class DDLParserSuite extends AnalysisTest { comparePlans(parsed3_table, expected3_table) } - test("show current catalog") { + test("show current namespace") { comparePlans( - parsePlan("SHOW CURRENT CATALOG"), - ShowCurrentCatalogStatement()) + parsePlan("SHOW CURRENT NAMESPACE"), + ShowCurrentNamespaceStatement()) } private case class TableSpec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 031b38839233..0a7785b0e088 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentCatalog, ShowNamespaces, ShowTables} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} @@ -210,8 +210,8 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case SetCatalogAndNamespace(catalogManager, catalogName, namespace) => SetCatalogAndNamespaceExec(catalogManager, catalogName, namespace) :: Nil - case r: ShowCurrentCatalog => - ShowCurrentCatalogExec(r.output, r.catalogManager) :: Nil + case r: ShowCurrentNamespace => + ShowCurrentNamespaceExec(r.output, r.catalogManager) :: Nil case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCurrentCatalogExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCurrentNamespaceExec.scala similarity index 80% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCurrentCatalogExec.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCurrentNamespaceExec.scala index e222aff29ca2..42b80a15080a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCurrentCatalogExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCurrentNamespaceExec.scala @@ -21,18 +21,20 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema} import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper /** - * Physical plan node for showing current catalog. + * Physical plan node for showing current catalog/namespace. */ -case class ShowCurrentCatalogExec( +case class ShowCurrentNamespaceExec( output: Seq[Attribute], catalogManager: CatalogManager) extends V2CommandExec { override protected def run(): Seq[InternalRow] = { val encoder = RowEncoder(schema).resolveAndBind() Seq(encoder - .toRow(new GenericRowWithSchema(Array(catalogManager.currentCatalog.name), schema)) + .toRow(new GenericRowWithSchema( + Array(catalogManager.currentCatalog.name, catalogManager.currentNamespace.quoted), schema)) .copy()) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index d3c051433e1d..62b9c4778e53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -990,22 +990,27 @@ class DataSourceV2SQLSuite assert(catalogManager.currentNamespace === Array("ns1", "ns2")) } - test("ShowCurrentCatalog: basic tests") { - def testShowCurrentCatalog(expectedCatalogName: String): Unit = { - val schema = new StructType().add("catalogName", StringType, nullable = false) - val df = sql("SHOW CURRENT CATALOG") + test("ShowCurrentNamespace: basic tests") { + def testShowCurrentNamespace(expectedCatalogName: String, expectedNamespace: String): Unit = { + val schema = new StructType() + .add("catalogName", StringType, nullable = false) + .add("namespace", StringType, nullable = false) + val df = sql("SHOW CURRENT NAMESPACE") val rows = df.collect assert(df.schema === schema) assert(rows.length == 1) assert(rows(0).getAs[String](0) === expectedCatalogName) + assert(rows(0).getAs[String](1) === expectedNamespace) } // Initially, the v2 session catalog is set as a current catalog. - testShowCurrentCatalog("spark_catalog") + testShowCurrentNamespace("spark_catalog", "default") sql("Use testcat") - testShowCurrentCatalog("testcat") + testShowCurrentNamespace("testcat", "") + sql("Use testcat.ns1.ns2") + testShowCurrentNamespace("testcat", "ns1.ns2") } test("tableCreation: partition column case insensitive resolution") { From aed9bc8e01add5b8c2e9c49a5aa173cf7e29ebb2 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 4 Nov 2019 12:13:29 -0500 Subject: [PATCH 3/4] resolve PR comments --- .../apache/spark/sql/catalyst/plans/logical/v2Commands.scala | 2 +- .../org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index e27d052ae88e..a25224a6c299 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -368,6 +368,6 @@ case class RefreshTable( */ case class ShowCurrentNamespace(catalogManager: CatalogManager) extends Command { override val output: Seq[Attribute] = Seq( - AttributeReference("catalogName", StringType, nullable = false)(), + AttributeReference("catalog", StringType, nullable = false)(), AttributeReference("namespace", StringType, nullable = false)()) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 62b9c4778e53..cd678b3d351e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -993,7 +993,7 @@ class DataSourceV2SQLSuite test("ShowCurrentNamespace: basic tests") { def testShowCurrentNamespace(expectedCatalogName: String, expectedNamespace: String): Unit = { val schema = new StructType() - .add("catalogName", StringType, nullable = false) + .add("catalog", StringType, nullable = false) .add("namespace", StringType, nullable = false) val df = sql("SHOW CURRENT NAMESPACE") val rows = df.collect From a667321f277e727fb12bf2f1e544febd1c814d57 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 4 Nov 2019 17:34:04 -0500 Subject: [PATCH 4/4] address PR comments --- .../org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index cd678b3d351e..3a3d22bc059a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1007,9 +1007,9 @@ class DataSourceV2SQLSuite // Initially, the v2 session catalog is set as a current catalog. testShowCurrentNamespace("spark_catalog", "default") - sql("Use testcat") + sql("USE testcat") testShowCurrentNamespace("testcat", "") - sql("Use testcat.ns1.ns2") + sql("USE testcat.ns1.ns2") testShowCurrentNamespace("testcat", "ns1.ns2") }