Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ statement
| SHOW identifier? FUNCTIONS
(LIKE? (qualifiedName | pattern=STRING))? #showFunctions
| SHOW CREATE TABLE multipartIdentifier #showCreateTable
| SHOW CURRENT NAMESPACE #showCurrentNamespace
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
| (DESC | DESCRIBE) database EXTENDED? db=errorCapturingIdentifier #describeDatabase
| (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
val CatalogAndNamespace(catalog, namespace) = nameParts
SetCatalogAndNamespace(catalogManager, Some(catalog.name()), namespace)
}

case ShowCurrentNamespaceStatement() =>
ShowCurrentNamespace(catalogManager)
}

object NonSessionCatalog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2592,6 +2592,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
UseStatement(ctx.NAMESPACE != null, nameParts)
}

/**
* Create a [[ShowCurrentNamespaceStatement]].
*/
override def visitShowCurrentNamespace(
ctx: ShowCurrentNamespaceContext) : LogicalPlan = withOrigin(ctx) {
ShowCurrentNamespaceStatement()
}

/**
* Create a [[ShowTablesStatement]] command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,3 +419,8 @@ case class RefreshTableStatement(tableName: Seq[String]) extends ParsedStatement
case class ShowColumnsStatement(
table: Seq[String],
namespace: Option[Seq[String]]) extends ParsedStatement

/**
* A SHOW CURRENT NAMESPACE statement, as parsed from SQL
*/
case class ShowCurrentNamespaceStatement() extends ParsedStatement
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,12 @@ case class SetCatalogAndNamespace(
case class RefreshTable(
catalog: TableCatalog,
ident: Identifier) extends Command

/**
* The logical plan of the SHOW CURRENT NAMESPACE command that works for v2 catalogs.
*/
case class ShowCurrentNamespace(catalogManager: CatalogManager) extends Command {
override val output: Seq[Attribute] = Seq(
AttributeReference("catalog", StringType, nullable = false)(),
AttributeReference("namespace", StringType, nullable = false)())
}
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,12 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(parsed3_table, expected3_table)
}

test("show current namespace") {
comparePlans(
parsePlan("SHOW CURRENT NAMESPACE"),
ShowCurrentNamespaceStatement())
}

private case class TableSpec(
name: Seq[String],
schema: Option[StructType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTables}
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
Expand Down Expand Up @@ -210,6 +210,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case SetCatalogAndNamespace(catalogManager, catalogName, namespace) =>
SetCatalogAndNamespaceExec(catalogManager, catalogName, namespace) :: Nil

case r: ShowCurrentNamespace =>
ShowCurrentNamespaceExec(r.output, r.catalogManager) :: Nil

case _ => Nil
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper

/**
* Physical plan node for showing current catalog/namespace.
*/
case class ShowCurrentNamespaceExec(
output: Seq[Attribute],
catalogManager: CatalogManager)
extends V2CommandExec {
override protected def run(): Seq[InternalRow] = {
val encoder = RowEncoder(schema).resolveAndBind()
Seq(encoder
.toRow(new GenericRowWithSchema(
Array(catalogManager.currentCatalog.name, catalogManager.currentNamespace.quoted), schema))
.copy())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,29 @@ class DataSourceV2SQLSuite
assert(catalogManager.currentNamespace === Array("ns1", "ns2"))
}

test("ShowCurrentNamespace: basic tests") {
def testShowCurrentNamespace(expectedCatalogName: String, expectedNamespace: String): Unit = {
val schema = new StructType()
.add("catalog", StringType, nullable = false)
.add("namespace", StringType, nullable = false)
val df = sql("SHOW CURRENT NAMESPACE")
val rows = df.collect

assert(df.schema === schema)
assert(rows.length == 1)
assert(rows(0).getAs[String](0) === expectedCatalogName)
assert(rows(0).getAs[String](1) === expectedNamespace)
}

// Initially, the v2 session catalog is set as a current catalog.
testShowCurrentNamespace("spark_catalog", "default")

sql("USE testcat")
testShowCurrentNamespace("testcat", "")
sql("USE testcat.ns1.ns2")
testShowCurrentNamespace("testcat", "ns1.ns2")
}

test("tableCreation: partition column case insensitive resolution") {
val testCatalog = catalog("testcat").asTableCatalog
val sessionCatalog = catalog(SESSION_CATALOG_NAME).asTableCatalog
Expand Down