Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowTablesStatement}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowDatabasesStatement, ShowTablesStatement}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -2296,6 +2296,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}

/**
* Create a [[ShowDatabasesStatement]] command.
*/
override def visitShowDatabases(ctx: ShowDatabasesContext): LogicalPlan = withOrigin(ctx) {
ShowDatabasesStatement(Option(ctx.pattern).map(string))
}

/**
* Create a table, returning a [[CreateTableStatement]] logical plan.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.{Identifier, SupportsNamespaces, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, ColumnChange}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.AliasIdentifier
Expand Down Expand Up @@ -560,6 +560,16 @@ object OverwritePartitionsDynamic {
}
}

/**
* The logical plan of the SHOW DATABASES command that works for v2 catalogs.
*/
case class ShowDatabases(
catalog: SupportsNamespaces,
pattern: Option[String]) extends Command {
override val output: Seq[Attribute] = Seq(
AttributeReference("namespace", StringType, nullable = false)())
}

case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Command {

override def children: Seq[LogicalPlan] = Seq(table)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.sql

/**
* A SHOW DATABASES statement, as parsed from SQL.
*/
case class ShowDatabasesStatement(pattern: Option[String])
extends ParsedStatement
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransf
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowTablesStatement}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowDatabasesStatement, ShowTablesStatement}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -779,6 +779,15 @@ class DDLParserSuite extends AnalysisTest {
ShowTablesStatement(Some(Seq("tbl")), Some("*dog*")))
}

test("show databases") {
comparePlans(
parsePlan("SHOW DATABASES"),
ShowDatabasesStatement(None))
comparePlans(
parsePlan("SHOW DATABASES LIKE 'defau*'"),
ShowDatabasesStatement(Some("defau*")))
}

private case class TableSpec(
name: Seq[String],
schema: Option[StructType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
partitionSpec = partitionSpec)
}

/**
* Create a [[ShowDatabasesCommand]] logical plan.
* Example SQL:
* {{{
* SHOW (DATABASES|SCHEMAS) [LIKE 'identifier_with_wildcards'];
* }}}
*/
override def visitShowDatabases(ctx: ShowDatabasesContext): LogicalPlan = withOrigin(ctx) {
ShowDatabasesCommand(Option(ctx.pattern).map(string))
}

/**
* A command for users to list the properties for a table. If propertyKey is specified, the value
* for the propertyKey is returned. If propertyKey is not specified, all the keys and their
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ package org.apache.spark.sql.execution.datasources
import scala.collection.mutable

import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalog.v2.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, TableCatalog}
import org.apache.spark.sql.catalog.v2.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, ShowTables, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowTablesStatement}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, ShowDatabases, ShowTables, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowDatabasesStatement, ShowTablesStatement}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowDatabasesCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType}
Expand Down Expand Up @@ -178,6 +178,17 @@ case class DataSourceResolution(
val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation)
DeleteFromTable(aliased, delete.condition)

case ShowDatabasesStatement(pattern) =>
defaultCatalog match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be currentCatalog instead. @cloud-fan, do you agree?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's implement switching the current catalog first, otherwise we are not able to test it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imback82 are you working on it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I am working on USE NAMESPACE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should be able to send out the PR sometime tomorrow.

case Some(catalog: SupportsNamespaces) =>
ShowDatabases(catalog, pattern)
case Some(_) =>
throw new AnalysisException(
"The default v2 catalog doesn't support showing namespaces.")
case None =>
ShowDatabasesCommand(pattern)
}

case ShowTablesStatement(None, pattern) =>
defaultCatalog match {
case Some(catalog) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalog.v2.StagingTableCatalog
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect, ShowTables}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect, ShowDatabases, ShowTables}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
Expand Down Expand Up @@ -269,6 +269,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case AlterTable(catalog, ident, _, changes) =>
AlterTableExec(catalog, ident, changes) :: Nil

case r: ShowDatabases =>
ShowDatabasesExec(r.output, r.catalog, r.pattern) :: Nil

case r : ShowTables =>
ShowTablesExec(r.output, r.catalog, r.namespace, r.pattern) :: Nil

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.catalog.v2.SupportsNamespaces
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.LeafExecNode

/**
* Physical plan node for showing databases.
*/
case class ShowDatabasesExec(
output: Seq[Attribute],
catalog: SupportsNamespaces,
pattern: Option[String])
extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] = {
val namespaces = catalog.listNamespaces().flatMap(getNamespaces(catalog, _))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't list the entire space. It should only call listNamespaces once. If the current namespace is and empty array then call listNamespaces() and if it is anything else, call listNamespaces(current).

From the SPIP:

For a given operation, Spark will call the corresponding catalog method once. For example, SHOW TABLES will return results from listTables(currentNamespace). Spark will not traverse nested namespaces with multiple calls to listNamespaces and listTables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the current namespace is and empty array then call listNamespaces()

I just realized that this isn't the same behavior as v1. In v1, SHOW DATABASES ignores the current database because databases aren't nested. It always lists all databases (then filters).

The proposed behavior of SHOW NAMESPACES was to respect the current namespace and list namespaces nested in it.

There are a few options to fix this:

  • Add SHOW NAMESPACES that behaves differently than SHOW DATABASES
  • Make SHOW NAMESPACES list all namespaces recursively, like this PR
  • Make SHOW NAMESPACES list the namespace above the current. If current=a.b, then list a and show the results (including b).
  • Change the behavior of SHOW DATABASES to match SHOW NAMESPACES and list the current
  • Change the behavior of SHOW DATABASES to match SHOW NAMESPACES and list the current, but match behavior if the current namespace is "default"

@imback82, @brkyvz, @cloud-fan, @mccheah, any opinion here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add SHOW NAMESPACES that behaves differently than SHOW DATABASES

I prefer this.

Another idea is: SHOW NAMESPACES should list the root namespaces of the current catalog, no matter what the current namespace is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to ignore the current namespace entirely. SHOW NAMESPACES would list the root, and SHOW NAMESPACES IN ns1 lists namespaces in ns1. The context is always explicit.

I think I would prefer that option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to ignore the current namespace entirely. SHOW NAMESPACES would list the root, and SHOW NAMESPACES IN ns1 lists namespaces in ns1. The context is always explicit.

I like this idea. @cloud-fan are you OK with this approach?

Copy link
Contributor

@cloud-fan cloud-fan Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that's exactly what I mentioned before, with addition of SHOW NAMESPACES IN ns1, +1

Another idea is: SHOW NAMESPACES should list the root namespaces of the current catalog, no matter what the current namespace is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @rdblue Thanks for your suggestions.


val rows = new ArrayBuffer[InternalRow]()
val encoder = RowEncoder(schema).resolveAndBind()

namespaces.map(_.quoted).map { namespace =>
if (pattern.map(StringUtils.filterPattern(Seq(namespace), _).nonEmpty).getOrElse(true)) {
rows += encoder
.toRow(new GenericRowWithSchema(Array(namespace), schema))
.copy()
}
}

sparkContext.parallelize(rows, 1)
}

private def getNamespaces(
catalog: SupportsNamespaces,
parentNamespace: Array[String]): Array[Array[String]] = {
val namespaces = catalog.listNamespaces(parentNamespace)
Array(parentNamespace) ++ namespaces.flatMap(getNamespaces(catalog, _))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -802,17 +802,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
""".stripMargin)
}

test("show databases") {
val sql1 = "SHOW DATABASES"
val sql2 = "SHOW DATABASES LIKE 'defau*'"
val parsed1 = parser.parsePlan(sql1)
val expected1 = ShowDatabasesCommand(None)
val parsed2 = parser.parsePlan(sql2)
val expected2 = ShowDatabasesCommand(Some("defau*"))
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}

test("show tblproperties") {
val parsed1 = parser.parsePlan("SHOW TBLPROPERTIES tab1")
val expected1 = ShowTablePropertiesCommand(TableIdentifier("tab1", None), None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,45 @@ class DataSourceV2SQLSuite
assert(expected === df.collect())
}

test("ShowDatabases: use v2 catalog if the default catalog is set") {
spark.conf.set("spark.sql.default.catalog", "testcat")

spark.sql("CREATE TABLE testcat.ns1.table (id bigint) USING foo")
spark.sql("CREATE TABLE testcat.ns1.ns1_1.table (id bigint) USING foo")
spark.sql("CREATE TABLE testcat.ns1.ns1_1.ns1_2.table (id bigint) USING foo")
spark.sql("CREATE TABLE testcat.ns2.table (id bigint) USING foo")
spark.sql("CREATE TABLE testcat.ns2.ns2_1.ns2_2.table (id bigint) USING foo")

runShowDatabasesSql(
"SHOW DATABASES",
Seq("ns1", "ns1.ns1_1", "ns1.ns1_1.ns1_2", "ns2", "ns2.ns2_1", "ns2.ns2_1.ns2_2"))

runShowDatabasesSql(
"SHOW DATABASES LIKE '*_*'",
Seq("ns1.ns1_1", "ns1.ns1_1.ns1_2", "ns2.ns2_1", "ns2.ns2_1.ns2_2"))
}

test("ShowDatabases: fallback to v1 catalog if no default catalog is set") {
spark.sql("CREATE TABLE testcat.ns.table (id bigint, data string) USING foo")

runShowDatabasesSql("SHOW DATABASES", Seq("default"), false)
}

private def runShowDatabasesSql(
sqlText: String,
expected: Seq[String],
expectV2Catalog: Boolean = true): Unit = {
val schema = if (expectV2Catalog) {
new StructType().add("namespace", StringType, nullable = false)
} else {
new StructType().add("databaseName", StringType, nullable = false)
}

val df = spark.sql(sqlText)
assert(df.schema === schema)
assert(df.collect().map(_.getAs[String](0)).sorted === expected.sorted)
}

test("tableCreation: partition column case insensitive resolution") {
val testCatalog = catalog("testcat").asTableCatalog
val sessionCatalog = catalog("session").asTableCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class TestInMemoryTableCatalog extends TableCatalog with SupportsNamespaces {
}

override def listNamespaces: Array[Array[String]] = {
allNamespaces.map(_.head).distinct.map(Array(_)).toArray
allNamespaces.filter(_.nonEmpty).map(_.head).distinct.map(Array(_)).toArray
}

override def listNamespaces(namespace: Array[String]): Array[Array[String]] = {
Expand Down