Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,18 @@ object CatalogStorageFormat {
case class CatalogTablePartition(
spec: CatalogTypes.TablePartitionSpec,
storage: CatalogStorageFormat,
parameters: Map[String, String] = Map.empty)
parameters: Map[String, String] = Map.empty) {

override def toString: String = {
val output =
Seq(
s"Partition Values: [${spec.values.mkString(", ")}]",
s"$storage",
s"Partition Parameters:{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}")

output.filter(_.nonEmpty).mkString("CatalogPartition(\n\t", "\n\t", ")")
}
}


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,24 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
* Create a [[DescribeTableCommand]] logical plan.
*/
override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) {
// Describe partition and column are not supported yet. Return null and let the parser decide
// Describe column are not supported yet. Return null and let the parser decide
// what to do with this (create an exception or pass it on to a different system).
if (ctx.describeColName != null || ctx.partitionSpec != null) {
if (ctx.describeColName != null) {
null
} else {
val partitionSpec = if (ctx.partitionSpec != null) {
// According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`.
visitPartitionSpec(ctx.partitionSpec).map {
case (key, Some(value)) => key -> value
case (key, _) =>
throw new ParseException(s"PARTITION specification is incomplete: `$key`", ctx)
}
} else {
Map.empty[String, String]
}
DescribeTableCommand(
visitTableIdentifier(ctx.tableIdentifier),
partitionSpec,
ctx.EXTENDED != null,
ctx.FORMATTED != null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
Expand Down Expand Up @@ -390,10 +390,14 @@ case class TruncateTableCommand(
/**
* Command that looks like
* {{{
* DESCRIBE [EXTENDED|FORMATTED] table_name;
* DESCRIBE [EXTENDED|FORMATTED] table_name partitionSpec?;
* }}}
*/
case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isFormatted: Boolean)
case class DescribeTableCommand(
table: TableIdentifier,
partitionSpec: TablePartitionSpec,
isExtended: Boolean,
isFormatted: Boolean)
extends RunnableCommand {

override val output: Seq[Attribute] = Seq(
Expand All @@ -411,17 +415,25 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
val catalog = sparkSession.sessionState.catalog

if (catalog.isTemporaryTable(table)) {
if (partitionSpec.nonEmpty) {
throw new AnalysisException(
s"DESC PARTITION is not allowed on a temporary view: ${table.identifier}")
}
describeSchema(catalog.lookupRelation(table).schema, result)
} else {
val metadata = catalog.getTableMetadata(table)
describeSchema(metadata.schema, result)

if (isExtended) {
describeExtended(metadata, result)
} else if (isFormatted) {
describeFormatted(metadata, result)
describePartitionInfo(metadata, result)

if (partitionSpec.isEmpty) {
if (isExtended) {
describeExtendedTableInfo(metadata, result)
} else if (isFormatted) {
describeFormattedTableInfo(metadata, result)
}
} else {
describePartitionInfo(metadata, result)
describeDetailedPartitionInfo(catalog, metadata, result)
}
}

Expand All @@ -436,16 +448,12 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
}

private def describeExtended(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
describePartitionInfo(table, buffer)

private def describeExtendedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
append(buffer, "", "", "")
append(buffer, "# Detailed Table Information", table.toString, "")
}

private def describeFormatted(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
describePartitionInfo(table, buffer)

private def describeFormattedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
append(buffer, "", "", "")
append(buffer, "# Detailed Table Information", "", "")
append(buffer, "Database:", table.database, "")
Expand Down Expand Up @@ -499,6 +507,53 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
}

private def describeDetailedPartitionInfo(
catalog: SessionCatalog,
metadata: CatalogTable,
result: ArrayBuffer[Row]): Unit = {
if (metadata.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
s"DESC PARTITION is not allowed on a view: ${table.identifier}")
}
if (DDLUtils.isDatasourceTable(metadata)) {
throw new AnalysisException(
s"DESC PARTITION is not allowed on a datasource table: ${table.identifier}")
}
val partition = catalog.getPartition(table, partitionSpec)
if (isExtended) {
describeExtendedDetailedPartitionInfo(table, metadata, partition, result)
} else if (isFormatted) {
describeFormattedDetailedPartitionInfo(table, metadata, partition, result)
describeStorageInfo(metadata, result)
}
}

private def describeExtendedDetailedPartitionInfo(
tableIdentifier: TableIdentifier,
table: CatalogTable,
partition: CatalogTablePartition,
buffer: ArrayBuffer[Row]): Unit = {
append(buffer, "", "", "")
append(buffer, "Detailed Partition Information " + partition.toString, "", "")
}

private def describeFormattedDetailedPartitionInfo(
tableIdentifier: TableIdentifier,
table: CatalogTable,
partition: CatalogTablePartition,
buffer: ArrayBuffer[Row]): Unit = {
append(buffer, "", "", "")
append(buffer, "# Detailed Partition Information", "", "")
append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "")
append(buffer, "Database:", table.database, "")
append(buffer, "Table:", tableIdentifier.table, "")
append(buffer, "Location:", partition.storage.locationUri.getOrElse(""), "")
append(buffer, "Partition Parameters:", "", "")
partition.parameters.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
}
}

private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.simpleString, column.getComment().orNull)
Expand Down
24 changes: 24 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/describe.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
CREATE TABLE t (a STRING, b INT) PARTITIONED BY (c STRING, d STRING);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop the table?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thank you!


ALTER TABLE t ADD PARTITION (c='Us', d=1);

DESC t;

-- Ignore these because there exist timestamp results, e.g., `Create Table`.
-- DESC EXTENDED t;
-- DESC FORMATTED t;

DESC t PARTITION (c='Us', d=1);

-- Ignore these because there exist timestamp results, e.g., transient_lastDdlTime.
-- DESC EXTENDED t PARTITION (c='Us', d=1);
-- DESC FORMATTED t PARTITION (c='Us', d=1);

-- NoSuchPartitionException: Partition not found in table
DESC t PARTITION (c='Us', d=2);

-- AnalysisException: Partition spec is invalid
DESC t PARTITION (c='Us');

-- ParseException: PARTITION specification is incomplete
DESC t PARTITION (c='Us', d);
82 changes: 82 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/describe.sql.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 7


-- !query 0
CREATE TABLE t (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)
-- !query 0 schema
struct<>
-- !query 0 output



-- !query 1
ALTER TABLE t ADD PARTITION (c='Us', d=1)
-- !query 1 schema
struct<>
-- !query 1 output



-- !query 2
DESC t
-- !query 2 schema
struct<col_name:string,data_type:string,comment:string>
-- !query 2 output
# Partition Information
# col_name data_type comment
a string
b int
c string
c string
d string
d string


-- !query 3
DESC t PARTITION (c='Us', d=1)
-- !query 3 schema
struct<col_name:string,data_type:string,comment:string>
-- !query 3 output
# Partition Information
# col_name data_type comment
a string
b int
c string
c string
d string
d string


-- !query 4
DESC t PARTITION (c='Us', d=2)
-- !query 4 schema
struct<>
-- !query 4 output
org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
Partition not found in table 't' database 'default':
c -> Us
d -> 2;


-- !query 5
DESC t PARTITION (c='Us')
-- !query 5 schema
struct<>
-- !query 5 output
org.apache.spark.sql.AnalysisException
Partition spec is invalid. The spec (c) must match the partition spec (c, d) defined in table '`default`.`t`';


-- !query 6
DESC t PARTITION (c='Us', d)
-- !query 6 schema
struct<>
-- !query 6 output
org.apache.spark.sql.catalyst.parser.ParseException

PARTITION specification is incomplete: `d`(line 1, pos 0)

== SQL ==
DESC t PARTITION (c='Us', d)
^^^
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
Expand Down Expand Up @@ -341,6 +341,81 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("describe partition") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to test the output with SQLQueryTestSuite? Or does that have portability issues?

withTable("partitioned_table") {
sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)")
sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)")

checkKeywordsExist(sql("DESC partitioned_table PARTITION (c='Us', d=1)"),
"# Partition Information",
"# col_name")

checkKeywordsExist(sql("DESC EXTENDED partitioned_table PARTITION (c='Us', d=1)"),
"# Partition Information",
"# col_name",
"Detailed Partition Information CatalogPartition(",
"Partition Values: [Us, 1]",
"Storage(Location:",
"Partition Parameters")

checkKeywordsExist(sql("DESC FORMATTED partitioned_table PARTITION (c='Us', d=1)"),
"# Partition Information",
"# col_name",
"# Detailed Partition Information",
"Partition Value:",
"Database:",
"Table:",
"Location:",
"Partition Parameters:",
"# Storage Information")
}
}

test("describe partition - error handling") {
withTable("partitioned_table", "datasource_table") {
sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)")
sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)")

val m = intercept[NoSuchPartitionException] {
sql("DESC partitioned_table PARTITION (c='Us', d=2)")
}.getMessage()
assert(m.contains("Partition not found in table"))

val m2 = intercept[AnalysisException] {
sql("DESC partitioned_table PARTITION (c='Us')")
}.getMessage()
assert(m2.contains("Partition spec is invalid"))

val m3 = intercept[ParseException] {
sql("DESC partitioned_table PARTITION (c='Us', d)")
}.getMessage()
assert(m3.contains("PARTITION specification is incomplete: `d`"))

spark
.range(1).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write
.partitionBy("d")
.saveAsTable("datasource_table")
val m4 = intercept[AnalysisException] {
sql("DESC datasource_table PARTITION (d=2)")
}.getMessage()
assert(m4.contains("DESC PARTITION is not allowed on a datasource table"))

val m5 = intercept[AnalysisException] {
spark.range(10).select('id as 'a, 'id as 'b).createTempView("view1")
sql("DESC view1 PARTITION (c='Us', d=1)")
}.getMessage()
assert(m5.contains("DESC PARTITION is not allowed on a temporary view"))

withView("permanent_view") {
val m = intercept[AnalysisException] {
sql("CREATE VIEW permanent_view AS SELECT * FROM partitioned_table")
sql("DESC permanent_view PARTITION (c='Us', d=1)")
}.getMessage()
assert(m.contains("DESC PARTITION is not allowed on a view"))
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you split the test case to two? One is covering the positive cases; another is covering the negative cases. We normally do not like a large test case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem!


test("SPARK-5371: union with null and sum") {
val df = Seq((1, 1)).toDF("c1", "c2")
df.createOrReplaceTempView("table1")
Expand Down