-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28856][SQL] Implement SHOW DATABASES for Data Source V2 Tables #25601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1f3b8b9
bab1a8f
87fe6ec
da65365
9974a58
ba1e7f4
9f738cf
7d606e1
672f526
6256aeb
2295f91
9a55a03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -92,6 +92,8 @@ statement | |
| | DROP database (IF EXISTS)? db=errorCapturingIdentifier | ||
| (RESTRICT | CASCADE)? #dropDatabase | ||
| | SHOW DATABASES (LIKE? pattern=STRING)? #showDatabases | ||
| | SHOW NAMESPACES ((FROM | IN) multipartIdentifier)? | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I put both |
||
| (LIKE? pattern=STRING)? #showNamespaces | ||
| | createTableHeader ('(' colTypeList ')')? tableProvider | ||
| ((OPTIONS options=tablePropertyList) | | ||
| (PARTITIONED BY partitioning=transformList) | | ||
|
|
@@ -1006,6 +1008,7 @@ ansiNonReserved | |
| | MINUTES | ||
| | MONTHS | ||
| | MSCK | ||
| | NAMESPACES | ||
| | NO | ||
| | NULLS | ||
| | OF | ||
|
|
@@ -1255,6 +1258,7 @@ nonReserved | |
| | MONTH | ||
| | MONTHS | ||
| | MSCK | ||
| | NAMESPACES | ||
| | NO | ||
| | NOT | ||
| | NULL | ||
|
|
@@ -1515,6 +1519,7 @@ MINUTES: 'MINUTES'; | |
| MONTH: 'MONTH'; | ||
| MONTHS: 'MONTHS'; | ||
| MSCK: 'MSCK'; | ||
| NAMESPACES: 'NAMESPACES'; | ||
imback82 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| NATURAL: 'NATURAL'; | ||
| NO: 'NO'; | ||
| NOT: 'NOT' | '!'; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| package org.apache.spark.sql.catalyst.plans.logical | ||
|
|
||
| import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange} | ||
| import org.apache.spark.sql.catalog.v2.{Identifier, SupportsNamespaces, TableCatalog, TableChange} | ||
| import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, ColumnChange} | ||
| import org.apache.spark.sql.catalog.v2.expressions.Transform | ||
| import org.apache.spark.sql.catalyst.AliasIdentifier | ||
|
|
@@ -560,6 +560,17 @@ object OverwritePartitionsDynamic { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * The logical plan of the SHOW NAMESPACES command that works for v2 catalogs. | ||
| */ | ||
| case class ShowNamespaces( | ||
| catalog: SupportsNamespaces, | ||
| namespace: Option[Seq[String]], | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After reading the code, it's actually
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just |
||
| pattern: Option[String]) extends Command { | ||
| override val output: Seq[Attribute] = Seq( | ||
| AttributeReference("namespace", StringType, nullable = false)()) | ||
| } | ||
|
|
||
| case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Command { | ||
|
|
||
| override def children: Seq[LogicalPlan] = Seq(table) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.plans.logical.sql | ||
|
|
||
| /** | ||
| * A SHOW NAMESPACES statement, as parsed from SQL. | ||
| */ | ||
| case class ShowNamespacesStatement(namespace: Option[Seq[String]], pattern: Option[String]) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is catalog + namespace, but I followed the same convention as other statements - i.e., |
||
| extends ParsedStatement | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransf | |
| import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedRelation, UnresolvedStar} | ||
| import org.apache.spark.sql.catalyst.catalog.BucketSpec | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} | ||
| import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowTablesStatement} | ||
| import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement} | ||
| import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType} | ||
| import org.apache.spark.unsafe.types.UTF8String | ||
|
|
||
|
|
@@ -779,6 +779,21 @@ class DDLParserSuite extends AnalysisTest { | |
| ShowTablesStatement(Some(Seq("tbl")), Some("*dog*"))) | ||
| } | ||
|
|
||
| test("show namespaces") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @xianyinxin can you add similar parser tests for DELETE/UPDATE as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| comparePlans( | ||
| parsePlan("SHOW NAMESPACES"), | ||
| ShowNamespacesStatement(None, None)) | ||
| comparePlans( | ||
| parsePlan("SHOW NAMESPACES FROM testcat.ns1.ns2"), | ||
| ShowNamespacesStatement(Some(Seq("testcat", "ns1", "ns2")), None)) | ||
| comparePlans( | ||
| parsePlan("SHOW NAMESPACES IN testcat.ns1.ns2"), | ||
| ShowNamespacesStatement(Some(Seq("testcat", "ns1", "ns2")), None)) | ||
| comparePlans( | ||
| parsePlan("SHOW NAMESPACES IN testcat.ns1 LIKE '*pattern*'"), | ||
| ShowNamespacesStatement(Some(Seq("testcat", "ns1")), Some("*pattern*"))) | ||
| } | ||
|
|
||
| private case class TableSpec( | ||
| name: Seq[String], | ||
| schema: Option[StructType], | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,15 +20,15 @@ package org.apache.spark.sql.execution.datasources | |
| import scala.collection.mutable | ||
|
|
||
| import org.apache.spark.sql.{AnalysisException, SaveMode} | ||
| import org.apache.spark.sql.catalog.v2.{CatalogManager, Identifier, LookupCatalog, TableCatalog} | ||
| import org.apache.spark.sql.catalog.v2.{CatalogManager, Identifier, LookupCatalog, SupportsNamespaces, TableCatalog} | ||
| import org.apache.spark.sql.catalog.v2.expressions.Transform | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation} | ||
| import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, ShowTables, SubqueryAlias} | ||
| import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowTablesStatement} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, ShowNamespaces, ShowTables, SubqueryAlias} | ||
| import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement} | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand} | ||
| import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowDatabasesCommand, ShowTablesCommand} | ||
| import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} | ||
|
|
@@ -169,6 +169,24 @@ case class DataSourceResolution( | |
| val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation) | ||
| DeleteFromTable(aliased, delete.condition) | ||
|
|
||
| case ShowNamespacesStatement(None, pattern) => | ||
| defaultCatalog match { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's implement switching the current catalog first, otherwise we are not able to test it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @imback82 are you working on it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, I am working on
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I should be able to send out the PR sometime tomorrow. |
||
| case Some(catalog) => | ||
| ShowNamespaces(catalog.asNamespaceCatalog, None, pattern) | ||
| case None => | ||
| throw new AnalysisException("No default v2 catalog is set.") | ||
| } | ||
|
|
||
| case ShowNamespacesStatement(Some(namespace), pattern) => | ||
| val CatalogNamespace(maybeCatalog, ns) = namespace | ||
| maybeCatalog match { | ||
| case Some(catalog) => | ||
| ShowNamespaces(catalog.asNamespaceCatalog, Some(ns), pattern) | ||
| case None => | ||
| throw new AnalysisException( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs to distinguish between the case where the catalog is None and the catalog does not support namespaces. For the second case, this should report that the catalog doesn't support namespaces. You can also add a conversion method,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use the current catalog instead of failing?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the catalog name is specified, but catalog doesn't support namespace, I think we should fail instead of falling back to the current catalog. It's similar to: if the catalog name is specified, but doesn't contain the table we need, we should fail instead of falling back to the current catalog. |
||
| s"No v2 catalog is available for ${namespace.quoted}") | ||
| } | ||
|
|
||
| case ShowTablesStatement(None, pattern) => | ||
| defaultCatalog match { | ||
| case Some(catalog) => | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.datasources.v2 | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.NamespaceHelper | ||
| import org.apache.spark.sql.catalog.v2.SupportsNamespaces | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.encoders.RowEncoder | ||
| import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema} | ||
| import org.apache.spark.sql.catalyst.util.StringUtils | ||
| import org.apache.spark.sql.execution.LeafExecNode | ||
|
|
||
| /** | ||
| * Physical plan node for showing namespaces. | ||
| */ | ||
| case class ShowNamespacesExec( | ||
| output: Seq[Attribute], | ||
| catalog: SupportsNamespaces, | ||
| namespace: Option[Seq[String]], | ||
| pattern: Option[String]) | ||
| extends LeafExecNode { | ||
| override protected def doExecute(): RDD[InternalRow] = { | ||
| val namespaces = namespace.map { ns => | ||
| if (ns.nonEmpty) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdblue @cloud-fan this is for handling the case
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think these 2 are the same?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the SPIP, I see the following: Since the behavior of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Calling |
||
| catalog.listNamespaces(ns.toArray) | ||
| } else { | ||
| catalog.listNamespaces() | ||
| } | ||
| } | ||
| .getOrElse(catalog.listNamespaces()) | ||
|
|
||
| val rows = new ArrayBuffer[InternalRow]() | ||
| val encoder = RowEncoder(schema).resolveAndBind() | ||
|
|
||
| namespaces.map(_.quoted).map { ns => | ||
| if (pattern.map(StringUtils.filterPattern(Seq(ns), _).nonEmpty).getOrElse(true)) { | ||
| rows += encoder | ||
| .toRow(new GenericRowWithSchema(Array(ns), schema)) | ||
| .copy() | ||
| } | ||
| } | ||
|
|
||
| sparkContext.parallelize(rows, 1) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @xianyinxin , we should also add DELETE and UPDATE. Can you open a PR to do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, will open a pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DELETE is already there. UPDATE is included in #25626