From 39641ed4851cb886bce2bee2af71be297483cc34 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 17 Jan 2022 18:07:26 -0800 Subject: [PATCH 1/5] initial commit --- .../sql/catalyst/parser/DDLParserSuite.scala | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 8dd719d3e1d85..62195823bcc03 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1213,21 +1213,6 @@ class DDLParserSuite extends AnalysisTest { "The feature is not supported: DESC TABLE COLUMN for a specific partition.")) } - test("SPARK-17328 Fix NPE with EXPLAIN DESCRIBE TABLE") { - comparePlans(parsePlan("describe t"), - DescribeRelation( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), Map.empty, isExtended = false)) - comparePlans(parsePlan("describe table t"), - DescribeRelation( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), Map.empty, isExtended = false)) - comparePlans(parsePlan("describe table extended t"), - DescribeRelation( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), Map.empty, isExtended = true)) - comparePlans(parsePlan("describe table formatted t"), - DescribeRelation( - UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), Map.empty, isExtended = true)) - } - test("insert table: basic append") { Seq( "INSERT INTO TABLE testcat.ns1.ns2.tbl SELECT * FROM source", From 98372ab7c519ebb89ac43113d48110ef2b6701a9 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 20 Jan 2022 20:12:28 -0800 Subject: [PATCH 2/5] more changes --- .../spark/sql/execution/command/tables.scala | 2 +- .../datasources/v2/DescribeTableExec.scala | 2 +- .../sql/connector/DataSourceV2SQLSuite.scala | 91 ------------------- .../command/v2/CommandSuiteBase.scala | 4 +- 4 files changed, 5 insertions(+), 94 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index da54efd331b52..1cf60d7b51f45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -593,7 +593,7 @@ abstract class DescribeCommandBase extends LeafRunnableCommand { if (header) { append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) } - schema.foreach { column => + schema.sortBy(_.name).foreach { column => append(buffer, column.name, column.dataType.simpleString, column.getComment().orNull) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index c20189efc91fb..757a2648765ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -60,7 +60,7 @@ case class DescribeTableExec( } private def addSchema(rows: ArrayBuffer[InternalRow]): Unit = { - rows ++= table.schema.map{ column => + rows ++= table.schema.sortBy(_.name).map { column => toCatalystRow( column.name, column.dataType.simpleString, column.getComment().getOrElse("")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 98dec46b0ae1a..6dc849f2be346 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -39,7 +39,6 @@ import org.apache.spark.sql.sources.SimpleScanSource import org.apache.spark.sql.types.{BooleanType, LongType, MetadataBuilder, StringType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.Utils class DataSourceV2SQLSuite extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = true) @@ -50,7 +49,6 @@ class DataSourceV2SQLSuite private val v2Source = classOf[FakeV2Provider].getName override protected val v2Format = v2Source override protected val catalogAndNamespace = "testcat.ns1.ns2." - private val defaultUser: String = Utils.getCurrentUserName() protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = { val tmpView = "tmp_view" @@ -89,71 +87,6 @@ class DataSourceV2SQLSuite checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty) } - test("DescribeTable using v2 catalog") { - spark.sql("CREATE TABLE testcat.table_name (id bigint, data string)" + - " USING foo" + - " PARTITIONED BY (id)") - val descriptionDf = spark.sql("DESCRIBE TABLE testcat.table_name") - assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === - Seq( - ("col_name", StringType), - ("data_type", StringType), - ("comment", StringType))) - val description = descriptionDf.collect() - assert(description === Seq( - Row("id", "bigint", ""), - Row("data", "string", ""), - Row("", "", ""), - Row("# Partitioning", "", ""), - Row("Part 0", "id", ""))) - - val e = intercept[AnalysisException] { - sql("DESCRIBE TABLE testcat.table_name PARTITION (id = 1)") - } - assert(e.message.contains("DESCRIBE does not support partition for v2 tables")) - } - - test("DescribeTable with v2 catalog when table does not exist.") { - intercept[AnalysisException] { - spark.sql("DESCRIBE TABLE testcat.table_name") - } - } - - test("DescribeTable extended using v2 catalog") { - spark.sql("CREATE TABLE testcat.table_name (id bigint, data string)" + - " USING foo" + - " PARTITIONED BY (id)" + - " TBLPROPERTIES ('bar'='baz')" + - " COMMENT 'this is a test table'" + - " LOCATION 'file:/tmp/testcat/table_name'") - val descriptionDf = spark.sql("DESCRIBE TABLE EXTENDED testcat.table_name") - assert(descriptionDf.schema.map(field => (field.name, field.dataType)) - === Seq( - ("col_name", StringType), - ("data_type", StringType), - ("comment", StringType))) - assert(descriptionDf.collect() - .map(_.toSeq) - .map(_.toArray.map(_.toString.trim)) === Array( - Array("id", "bigint", ""), - Array("data", "string", ""), - Array("", "", ""), - Array("# Partitioning", "", ""), - Array("Part 0", "id", ""), - Array("", "", ""), - Array("# Metadata Columns", "", ""), - Array("index", "int", "Metadata column used to conflict with a data column"), - Array("_partition", "string", "Partition key used to store the row"), - Array("", "", ""), - Array("# Detailed Table Information", "", ""), - Array("Name", "testcat.table_name", ""), - Array("Comment", "this is a test table", ""), - Array("Location", "file:/tmp/testcat/table_name", ""), - Array("Provider", "foo", ""), - Array(TableCatalog.PROP_OWNER.capitalize, defaultUser, ""), - Array("Table Properties", "[bar=baz]", ""))) - } - test("Describe column for v2 catalog") { val t = "testcat.tbl" withTable(t) { @@ -2408,30 +2341,6 @@ class DataSourceV2SQLSuite } } - test("SPARK-34561: drop/add columns to a dataset of `DESCRIBE TABLE`") { - val tbl = s"${catalogAndNamespace}tbl" - withTable(tbl) { - sql(s"CREATE TABLE $tbl (c0 INT) USING $v2Format") - val description = sql(s"DESCRIBE TABLE $tbl") - val noCommentDataset = description.drop("comment") - val expectedSchema = new StructType() - .add( - name = "col_name", - dataType = StringType, - nullable = false, - metadata = new MetadataBuilder().putString("comment", "name of the column").build()) - .add( - name = "data_type", - dataType = StringType, - nullable = false, - metadata = new MetadataBuilder().putString("comment", "data type of the column").build()) - assert(noCommentDataset.schema === expectedSchema) - val isNullDataset = noCommentDataset - .withColumn("is_null", noCommentDataset("col_name").isNull) - assert(isNullDataset.schema === expectedSchema.add("is_null", BooleanType, false)) - } - } - test("SPARK-34576: drop/add columns to a dataset of `DESCRIBE COLUMN`") { val tbl = s"${catalogAndNamespace}tbl" withTable(tbl) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala index 1ff9e74f180e7..001c4728bea0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala @@ -33,7 +33,9 @@ trait CommandSuiteBase extends SharedSparkSession { def catalogVersion: String = "V2" // The catalog version is added to test names def commandVersion: String = "V2" // The command version is added to test names def catalog: String = "test_catalog" // The default V2 catalog for testing - def defaultUsing: String = "USING _" // The clause is used in creating v2 tables under testing + def defaultProvider: String = "_" + // The clause is used in creating v2 tables under testing + def defaultUsing: String = s"USING $defaultProvider" // V2 catalogs created and used especially for testing override def sparkConf: SparkConf = super.sparkConf From e3b46d25f137e142724c8fb6cfd3ebf217a32bba Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 20 Jan 2022 20:15:40 -0800 Subject: [PATCH 3/5] Add missing files --- .../command/DescribeRelationParserSuite.scala | 39 ++++++ .../command/DescribeTableSuiteBase.scala | 38 ++++++ .../command/v1/DescribeTableSuite.scala | 64 +++++++++ .../command/v2/DescribeTableSuite.scala | 123 ++++++++++++++++++ 4 files changed, 264 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeRelationParserSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeRelationParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeRelationParserSuite.scala new file mode 100644 index 0000000000000..f2b78943847fa --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeRelationParserSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedTableOrView} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan +import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation + +class DescribeRelationParserSuite extends AnalysisTest { + test("SPARK-17328 Fix NPE with EXPLAIN DESCRIBE TABLE") { + comparePlans(parsePlan("describe t"), + DescribeRelation( + UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), Map.empty, isExtended = false)) + comparePlans(parsePlan("describe table t"), + DescribeRelation( + UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), Map.empty, isExtended = false)) + comparePlans(parsePlan("describe table extended t"), + DescribeRelation( + UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), Map.empty, isExtended = true)) + comparePlans(parsePlan("describe table formatted t"), + DescribeRelation( + UnresolvedTableOrView(Seq("t"), "DESCRIBE TABLE", true), Map.empty, isExtended = true)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala new file mode 100644 index 0000000000000..185cb1ffca14d --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.QueryTest + +/** + * This base suite contains unified tests for the `DESCRIBE TABLE` command that check V1 and V2 + * table catalogs. The tests that cannot run for all supported catalogs are located in more + * specific test suites: + * + * - V2 table catalog tests: `org.apache.spark.sql.execution.command.v2.DescribeTableSuite` + * - V1 table catalog tests: + * `org.apache.spark.sql.execution.command.v1.DescribeTableSuiteBase` + * - V1 In-Memory catalog: `org.apache.spark.sql.execution.command.v1.DescribeTableSuite` + * - V1 Hive External catalog: + * `org.apache.spark.sql.hive.execution.command.DescribeTableSuite` + */ +trait DescribeTableSuiteBase extends QueryTest with DDLCommandTestUtils { + override val command = "DESCRIBE TABLE" + + protected def namespace: String +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala new file mode 100644 index 0000000000000..0c509c99fc178 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.v1 + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.command +import org.apache.spark.sql.types.StringType + +/** + * This base suite contains unified tests for the `DESCRIBE TABLE` command that checks V1 + * table catalogs. The tests that cannot run for all V1 catalogs are located in more + * specific test suites: + * + * - V1 In-Memory catalog: `org.apache.spark.sql.execution.command.v1.DescribeTableSuite` + * - V1 Hive External catalog: + * `org.apache.spark.sql.hive.execution.command.DescribeTableSuite` + */ +trait DescribeTableSuiteBase extends command.DescribeTableSuiteBase + with command.TestsV1AndV2Commands { + override def namespace: String = "db" + + test("basic") { + withNamespaceAndTable(namespace, "table") { tbl => + spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + + " PARTITIONED BY (id)") + val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === + Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + val description = descriptionDf.collect() + assert(description === Seq( + Row("data", "string", "null"), + Row("id", "bigint", "null"), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("id", "bigint", "null")).toArray) + } + } +} + +/** + * The class contains tests for the `DESCRIBE TABLE` command to check V1 In-Memory + * table catalog. + */ +class DescribeTableSuite extends DescribeTableSuiteBase with CommandSuiteBase { + override def commandVersion: String = super[DescribeTableSuiteBase].commandVersion +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala new file mode 100644 index 0000000000000..2610f2008dcfb --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.v2 + +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.execution.command +import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType, StructType} +import org.apache.spark.util.Utils + +/** + * The class contains tests for the `DESCRIBE TABLE` command to check V2 table catalogs. + */ +class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuiteBase { + override def namespace: String = "ns1.ns2" + + test("DESCRIBE TABLE using v2 catalog") { + withNamespaceAndTable(namespace, "table") { tbl => + spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + + " PARTITIONED BY (id)") + val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === + Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + val description = descriptionDf.collect() + assert(description === Seq( + Row("data", "string", ""), + Row("id", "bigint", ""), + Row("", "", ""), + Row("# Partitioning", "", ""), + Row("Part 0", "id", ""))) + + val e = intercept[AnalysisException] { + sql(s"DESCRIBE TABLE $tbl PARTITION (id = 1)") + } + assert(e.message.contains("DESCRIBE does not support partition for v2 tables")) + } + } + + test("DESCRIBE TABLE with v2 catalog when table does not exist.") { + intercept[AnalysisException] { + spark.sql(s"DESCRIBE TABLE $catalog.$namespace.non_existence") + } + } + + test("DESCRIBE TABLE EXTENDED using v2 catalog") { + val tbl = s"$catalog.$namespace.tbl" + withTable(tbl) { + spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + + " PARTITIONED BY (id)" + + " TBLPROPERTIES ('bar'='baz')" + + " COMMENT 'this is a test table'" + + " LOCATION 'file:/tmp/testcat/table_name'") + val descriptionDf = spark.sql(s"DESCRIBE TABLE EXTENDED $tbl") + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) + === Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + assert(descriptionDf.collect() + .map(_.toSeq) + .map(_.toArray.map(_.toString.trim)) === Array( + Array("data", "string", ""), + Array("id", "bigint", ""), + Array("", "", ""), + Array("# Partitioning", "", ""), + Array("Part 0", "id", ""), + Array("", "", ""), + Array("# Metadata Columns", "", ""), + Array("index", "int", "Metadata column used to conflict with a data column"), + Array("_partition", "string", "Partition key used to store the row"), + Array("", "", ""), + Array("# Detailed Table Information", "", ""), + Array("Name", tbl, ""), + Array("Comment", "this is a test table", ""), + Array("Location", "file:/tmp/testcat/table_name", ""), + Array("Provider", defaultProvider, ""), + Array(TableCatalog.PROP_OWNER.capitalize, Utils.getCurrentUserName(), ""), + Array("Table Properties", "[bar=baz]", ""))) + } + } + + test("SPARK-34561: drop/add columns to a dataset of `DESCRIBE TABLE`") { + val tbl = s"$catalog.$namespace.tbl" + withTable(tbl) { + sql(s"CREATE TABLE $tbl (c0 INT) $defaultUsing") + val description = sql(s"DESCRIBE TABLE $tbl") + val noCommentDataset = description.drop("comment") + val expectedSchema = new StructType() + .add( + name = "col_name", + dataType = StringType, + nullable = false, + metadata = new MetadataBuilder().putString("comment", "name of the column").build()) + .add( + name = "data_type", + dataType = StringType, + nullable = false, + metadata = new MetadataBuilder().putString("comment", "data type of the column").build()) + assert(noCommentDataset.schema === expectedSchema) + val isNullDataset = noCommentDataset + .withColumn("is_null", noCommentDataset("col_name").isNull) + assert(isNullDataset.schema === expectedSchema.add("is_null", BooleanType, false)) + } + } +} From 09e21484c030c87a19d3db4ae84ff12fe74fd852 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sat, 29 Jan 2022 21:24:34 -0800 Subject: [PATCH 4/5] more changes --- .../analysis/ResolveSessionCatalog.scala | 3 +- .../datasources/v2/DescribeTableExec.scala | 27 ++++++--- .../command/DescribeTableSuiteBase.scala | 60 ++++++++++++++++++- .../command/v1/DescribeTableSuite.scala | 42 ++++++------- .../command/v2/DescribeTableSuite.scala | 22 ++++--- 5 files changed, 114 insertions(+), 40 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index f4df3bea532b6..dd1a7e7396aec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -125,7 +125,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // Use v1 command to describe (temp) view, as v2 catalog doesn't support view yet. case DescribeRelation( - ResolvedV1TableOrViewIdentifier(ident), partitionSpec, isExtended, output) => + ResolvedV1TableOrViewIdentifier(ident), partitionSpec, isExtended, output) + if conf.useV1Command => DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended, output) case DescribeColumn( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index 757a2648765ac..d5c1daa47f2b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, Table} +import org.apache.spark.sql.connector.expressions.IdentityTransform case class DescribeTableExec( output: Seq[Attribute], @@ -62,7 +63,7 @@ case class DescribeTableExec( private def addSchema(rows: ArrayBuffer[InternalRow]): Unit = { rows ++= table.schema.sortBy(_.name).map { column => toCatalystRow( - column.name, column.dataType.simpleString, column.getComment().getOrElse("")) + column.name, column.dataType.simpleString, column.getComment().orNull) } } @@ -80,13 +81,23 @@ case class DescribeTableExec( } private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = { - rows += emptyRow() - rows += toCatalystRow("# Partitioning", "", "") - if (table.partitioning.isEmpty) { - rows += toCatalystRow("Not partitioned", "", "") - } else { - rows ++= table.partitioning.zipWithIndex.map { - case (transform, index) => toCatalystRow(s"Part $index", transform.describe(), "") + if (table.partitioning.nonEmpty) { + val partitionColumnsOnly = table.partitioning.forall(t => t.isInstanceOf[IdentityTransform]) + if (partitionColumnsOnly) { + val nameToField = table.schema.map(f => (f.name, f)).toMap + rows += toCatalystRow("# Partition Information", "", "") + rows += toCatalystRow(s"# ${output(0).name}", output(1).name, output(2).name) + rows ++= table.partitioning.sortBy(_.describe).map { + case t => + val field = nameToField(t.describe) + toCatalystRow(t.describe, field.dataType.simpleString, field.getComment().orNull) + } + } else { + rows += emptyRow() + rows += toCatalystRow("# Partitioning", "", "") + rows ++= table.partitioning.zipWithIndex.map { + case (transform, index) => toCatalystRow(s"Part $index", transform.describe(), "") + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala index 185cb1ffca14d..41fbf8d8beb4c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.types.StringType /** * This base suite contains unified tests for the `DESCRIBE TABLE` command that check V1 and V2 @@ -35,4 +36,61 @@ trait DescribeTableSuiteBase extends QueryTest with DDLCommandTestUtils { override val command = "DESCRIBE TABLE" protected def namespace: String + + test("basic") { + withNamespaceAndTable(namespace, "table") { tbl => + spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing") + val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === + Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + val description = descriptionDf.collect() + assert(description === Seq( + Row("data", "string", null), + Row("id", "bigint", null)).toArray) + } + } + + test("describe table with partition columns") { + withNamespaceAndTable(namespace, "table") { tbl => + spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + + " PARTITIONED BY (id)") + val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === + Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + val description = descriptionDf.collect() + assert(description === Seq( + Row("data", "string", null), + Row("id", "bigint", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("id", "bigint", null)).toArray) + } + } + + test("describe table extended") { + withNamespaceAndTable(namespace, "table") { tbl => + spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + + " PARTITIONED BY (id)") + val descriptionDf = spark.sql(s"DESCRIBE TABLE EXTENDED $tbl") + descriptionDf.show(false) + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === + Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + val description = descriptionDf.collect() + assert(description === Seq( + Row("data", "string", null), + Row("id", "bigint", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("id", "bigint", null)).toArray) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala index 0c509c99fc178..1f34e08440596 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.execution.command.v1 -import org.apache.spark.sql.Row +// import org.apache.spark.sql.Row import org.apache.spark.sql.execution.command -import org.apache.spark.sql.types.StringType +// import org.apache.spark.sql.types.StringType /** * This base suite contains unified tests for the `DESCRIBE TABLE` command that checks V1 @@ -34,25 +34,25 @@ trait DescribeTableSuiteBase extends command.DescribeTableSuiteBase with command.TestsV1AndV2Commands { override def namespace: String = "db" - test("basic") { - withNamespaceAndTable(namespace, "table") { tbl => - spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + - " PARTITIONED BY (id)") - val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") - assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === - Seq( - ("col_name", StringType), - ("data_type", StringType), - ("comment", StringType))) - val description = descriptionDf.collect() - assert(description === Seq( - Row("data", "string", "null"), - Row("id", "bigint", "null"), - Row("# Partition Information", "", ""), - Row("# col_name", "data_type", "comment"), - Row("id", "bigint", "null")).toArray) - } - } +// test("basic v1") { +// withNamespaceAndTable(namespace, "table") { tbl => +// spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + +// " PARTITIONED BY (id)") +// val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") +// assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === +// Seq( +// ("col_name", StringType), +// ("data_type", StringType), +// ("comment", StringType))) +// val description = descriptionDf.collect() +// assert(description === Seq( +// Row("data", "string", "null"), +// Row("id", "bigint", "null"), +// Row("# Partition Information", "", ""), +// Row("# col_name", "data_type", "comment"), +// Row("id", "bigint", "null")).toArray) +// } +// } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala index 2610f2008dcfb..9dd22fd2a70f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala @@ -29,10 +29,10 @@ import org.apache.spark.util.Utils class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuiteBase { override def namespace: String = "ns1.ns2" - test("DESCRIBE TABLE using v2 catalog") { + test("DESCRIBE TABLE with non-'partitioned-by' clause") { withNamespaceAndTable(namespace, "table") { tbl => - spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + - " PARTITIONED BY (id)") + spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing " + + "CLUSTERED BY (id) INTO 3 BUCKETS") val descriptionDf = spark.sql(s"DESCRIBE TABLE $tbl") assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( @@ -41,16 +41,20 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuit ("comment", StringType))) val description = descriptionDf.collect() assert(description === Seq( - Row("data", "string", ""), - Row("id", "bigint", ""), + Row("data", "string", null), + Row("id", "bigint", null), Row("", "", ""), Row("# Partitioning", "", ""), - Row("Part 0", "id", ""))) + Row("Part 0", "bucket(3, id)", ""))) + } - val e = intercept[AnalysisException] { - sql(s"DESCRIBE TABLE $tbl PARTITION (id = 1)") + test("Describing a partition is not supported") { + withNamespaceAndTable(namespace, "table") { tbl => + val e = intercept[AnalysisException] { + sql(s"DESCRIBE TABLE $tbl PARTITION (id = 1)") + } + assert(e.message.contains("DESCRIBE does not support partition for v2 tables")) } - assert(e.message.contains("DESCRIBE does not support partition for v2 tables")) } } From 639318af69b472118c40206fc09d51c95e4e25d4 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 7 Feb 2022 21:13:26 -0800 Subject: [PATCH 5/5] WIP: evaluating the differences --- .../command/DescribeTableSuiteBase.scala | 81 ++++++++++++++++--- .../command/v2/DescribeTableSuite.scala | 52 ++++++------ 2 files changed, 97 insertions(+), 36 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala index 41fbf8d8beb4c..20cf98720e305 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala @@ -18,7 +18,10 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.command.DDLCommandTestUtils.V1_COMMAND_VERSION +import org.apache.spark.sql.functions.when import org.apache.spark.sql.types.StringType +import org.apache.spark.util.Utils /** * This base suite contains unified tests for the `DESCRIBE TABLE` command that check V1 and V2 @@ -76,21 +79,79 @@ trait DescribeTableSuiteBase extends QueryTest with DDLCommandTestUtils { test("describe table extended") { withNamespaceAndTable(namespace, "table") { tbl => spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + - " PARTITIONED BY (id)") + " PARTITIONED BY (id)" + + " TBLPROPERTIES ('bar'='baz')" + + " COMMENT 'this is a test table'" + + " LOCATION 'file:/tmp/table_name'") val descriptionDf = spark.sql(s"DESCRIBE TABLE EXTENDED $tbl") - descriptionDf.show(false) - assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === + val cleansedDescriptionDf = descriptionDf + .withColumn("data_type", when(descriptionDf("col_name") === "Created Time", "") + .otherwise(descriptionDf("data_type"))) + assert(cleansedDescriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( ("col_name", StringType), ("data_type", StringType), ("comment", StringType))) - val description = descriptionDf.collect() - assert(description === Seq( - Row("data", "string", null), - Row("id", "bigint", null), - Row("# Partition Information", "", ""), - Row("# col_name", "data_type", "comment"), - Row("id", "bigint", null)).toArray) + + val cleansedDescription = cleansedDescriptionDf.collect + if (commandVersion == V1_COMMAND_VERSION) { + assert(cleansedDescription === Seq( + Row("data", "string", null), + Row("id", "bigint", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("id", "bigint", null), + Row("", "", ""), + Row("# Detailed Table Information", "", ""), + Row("Database", "db", ""), + Row("Table", "table", ""), + Row("Created Time", "", ""), + Row("Last Access", "UNKNOWN", ""), + Row("Created By", s"Spark ${org.apache.spark.SPARK_VERSION}", ""), + Row("Type", "EXTERNAL", ""), + Row("Provider", "parquet", ""), + Row("Comment", "this is a test table", ""), + Row("Table Properties", "[bar=baz]", ""), + Row("Location", "file:/tmp/table_name", ""), + Row("Partition Provider", "Catalog", "")).toArray) + } else { + if (catalogVersion == "V1") { + assert(cleansedDescription === Seq( + Row("data", "string", null), + Row("id", "bigint", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("id", "bigint", null), + Row("", "", ""), + Row("# Detailed Table Information", "", ""), + Row("Name", "db.table", ""), + Row("Comment", "this is a test table", ""), + Row("Location", "file:/tmp/table_name", ""), + Row("Provider", "parquet", ""), + Row("Owner", "", ""), + Row("External", "true", ""), + Row("Table Properties", "[bar=baz]", "")).toArray) + } else { + assert(cleansedDescription === Seq( + Row("data", "string", null), + Row("id", "bigint", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("id", "bigint", null), + Row("", "", ""), + Row("# Metadata Columns", "", ""), + Row("index", "int", "Metadata column used to conflict with a data column"), + Row("_partition", "string", "Partition key used to store the row"), + Row("", "", ""), + Row("# Detailed Table Information", "", ""), + Row("Name", "test_catalog.ns1.ns2.table", ""), + Row("Comment", "this is a test table", ""), + Row("Location", "file:/tmp/table_name", ""), + Row("Provider", "_", ""), + Row("Owner", Utils.getCurrentUserName(), ""), + Row("Table Properties", "[bar=baz]", "")).toArray) + } + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala index 9dd22fd2a70f2..8b63e64968b88 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala @@ -46,18 +46,21 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuit Row("", "", ""), Row("# Partitioning", "", ""), Row("Part 0", "bucket(3, id)", ""))) + } } test("Describing a partition is not supported") { withNamespaceAndTable(namespace, "table") { tbl => - val e = intercept[AnalysisException] { - sql(s"DESCRIBE TABLE $tbl PARTITION (id = 1)") - } - assert(e.message.contains("DESCRIBE does not support partition for v2 tables")) + spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing " + + "PARTITIONED BY (id)") + val e = intercept[AnalysisException] { + sql(s"DESCRIBE TABLE $tbl PARTITION (id = 1)") } + assert(e.message.contains("DESCRIBE does not support partition for v2 tables")) } } + test("DESCRIBE TABLE with v2 catalog when table does not exist.") { intercept[AnalysisException] { spark.sql(s"DESCRIBE TABLE $catalog.$namespace.non_existence") @@ -65,8 +68,7 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuit } test("DESCRIBE TABLE EXTENDED using v2 catalog") { - val tbl = s"$catalog.$namespace.tbl" - withTable(tbl) { + withNamespaceAndTable(namespace, "table") { tbl => spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + " PARTITIONED BY (id)" + " TBLPROPERTIES ('bar'='baz')" + @@ -78,26 +80,24 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuit ("col_name", StringType), ("data_type", StringType), ("comment", StringType))) - assert(descriptionDf.collect() - .map(_.toSeq) - .map(_.toArray.map(_.toString.trim)) === Array( - Array("data", "string", ""), - Array("id", "bigint", ""), - Array("", "", ""), - Array("# Partitioning", "", ""), - Array("Part 0", "id", ""), - Array("", "", ""), - Array("# Metadata Columns", "", ""), - Array("index", "int", "Metadata column used to conflict with a data column"), - Array("_partition", "string", "Partition key used to store the row"), - Array("", "", ""), - Array("# Detailed Table Information", "", ""), - Array("Name", tbl, ""), - Array("Comment", "this is a test table", ""), - Array("Location", "file:/tmp/testcat/table_name", ""), - Array("Provider", defaultProvider, ""), - Array(TableCatalog.PROP_OWNER.capitalize, Utils.getCurrentUserName(), ""), - Array("Table Properties", "[bar=baz]", ""))) + assert(descriptionDf.collect() === Seq( + Row("data", "string", null), + Row("id", "bigint", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("id", "bigint", null), + Row("", "", ""), + Row("# Metadata Columns", "", ""), + Row("index", "int", "Metadata column used to conflict with a data column"), + Row("_partition", "string", "Partition key used to store the row"), + Row("", "", ""), + Row("# Detailed Table Information", "", ""), + Row("Name", tbl, ""), + Row("Comment", "this is a test table", ""), + Row("Location", "file:/tmp/testcat/table_name", ""), + Row("Provider", defaultProvider, ""), + Row(TableCatalog.PROP_OWNER.capitalize, Utils.getCurrentUserName(), ""), + Row("Table Properties", "[bar=baz]", "")).toArray) } }