Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ describeFuncName
;

describeColName
: identifier ('.' (identifier | STRING))*
: nameParts+=identifier ('.' nameParts+=identifier)*
;

ctes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
* Create a [[DescribeTableCommand]] logical plan.
*/
override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) {
// Describe column are not supported yet. Return null and let the parser decide
// what to do with this (create an exception or pass it on to a different system).
val isExtended = ctx.EXTENDED != null || ctx.FORMATTED != null
if (ctx.describeColName != null) {
null
if (ctx.partitionSpec != null) {
throw new ParseException("DESC TABLE COLUMN for a specific partition is not supported", ctx)
} else {
DescribeColumnCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.describeColName.nameParts.asScala.map(_.getText),
isExtended)
}
} else {
val partitionSpec = if (ctx.partitionSpec != null) {
// According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`.
Expand All @@ -338,7 +344,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
DescribeTableCommand(
visitTableIdentifier(ctx.tableIdentifier),
partitionSpec,
ctx.EXTENDED != null || ctx.FORMATTED != null)
isExtended)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,22 @@ package org.apache.spark.sql.execution.command
import java.io.File
import java.net.URI
import java.nio.file.FileSystems
import java.util.Date

import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import scala.util.Try

import org.apache.commons.lang3.StringEscapeUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Expand Down Expand Up @@ -626,6 +624,74 @@ case class DescribeTableCommand(
}
}

/**
* A command to list the info for a column, including name, data type, column stats and comment.
* This function creates a [[DescribeColumnCommand]] logical plan.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment line seems not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other two similar comments (ShowPartitionsCommand, ShowColumnsCommand) in this file, shall I remove them all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A followup PR to improve the comments is sent: #19213

*
* The syntax of using this command in SQL is:
* {{{
* DESCRIBE [EXTENDED|FORMATTED] table_name column_name;
* }}}
*/
case class DescribeColumnCommand(
table: TableIdentifier,
colNameParts: Seq[String],
isExtended: Boolean)
extends RunnableCommand {

override val output: Seq[Attribute] = {
Seq(
AttributeReference("info_name", StringType, nullable = false,
new MetadataBuilder().putString("comment", "name of the column info").build())(),
AttributeReference("info_value", StringType, nullable = false,
new MetadataBuilder().putString("comment", "value of the column info").build())()
)
}

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val resolver = sparkSession.sessionState.conf.resolver
val relation = sparkSession.table(table).queryExecution.analyzed

val colName = UnresolvedAttribute(colNameParts).name
val field = {
relation.resolve(colNameParts, resolver).getOrElse {
throw new AnalysisException(s"Column $colName does not exist")
}
}
if (!field.isInstanceOf[Attribute]) {
// If the field is not an attribute after `resolve`, then it's a nested field.
throw new AnalysisException(
s"DESC TABLE COLUMN command does not support nested data types: $colName")
}

val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
val colStats = catalogTable.stats.map(_.colStats).getOrElse(Map.empty)
val cs = colStats.get(field.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: val colStats = catalogTable.stats.flatMap(_.colStats.get(field.name))


val comment = if (field.metadata.contains("comment")) {
Option(field.metadata.getString("comment"))
} else {
None
}

val buffer = ArrayBuffer[Row](
Row("col_name", field.name),
Row("data_type", field.dataType.catalogString),
Row("comment", comment.getOrElse("NULL"))
)
if (isExtended) {
// Show column stats when EXTENDED or FORMATTED is specified.
buffer += Row("min", cs.flatMap(_.min.map(_.toString)).getOrElse("NULL"))
buffer += Row("max", cs.flatMap(_.max.map(_.toString)).getOrElse("NULL"))
buffer += Row("num_nulls", cs.map(_.nullCount.toString).getOrElse("NULL"))
buffer += Row("distinct_count", cs.map(_.distinctCount.toString).getOrElse("NULL"))
buffer += Row("avg_col_len", cs.map(_.avgLen.toString).getOrElse("NULL"))
buffer += Row("max_col_len", cs.map(_.maxLen.toString).getOrElse("NULL"))
}
buffer
}
}

/**
* A command for users to get tables in the given database.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- Test temp table
CREATE TEMPORARY VIEW desc_col_temp_table (key int COMMENT 'column_comment') USING PARQUET;

DESC desc_col_temp_table key;

DESC EXTENDED desc_col_temp_table key;

DESC FORMATTED desc_col_temp_table key;

-- Describe a column with qualified name
DESC FORMATTED desc_col_temp_table desc_col_temp_table.key;

-- Describe a non-existent column
DESC desc_col_temp_table key1;

-- Test persistent table
CREATE TABLE desc_col_table (key int COMMENT 'column_comment') USING PARQUET;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we drop these testing tables at the end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we should. I'll drop them in the followup pr.


ANALYZE TABLE desc_col_table COMPUTE STATISTICS FOR COLUMNS key;

DESC desc_col_table key;

DESC EXTENDED desc_col_table key;

DESC FORMATTED desc_col_table key;

-- Test complex columns
CREATE TABLE desc_col_complex_table (`a.b` int, col struct<x:int, y:string>) USING PARQUET;

DESC FORMATTED desc_col_complex_table `a.b`;

DESC FORMATTED desc_col_complex_table col;

-- Describe a nested column
DESC FORMATTED desc_col_complex_table col.x;
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 15


-- !query 0
CREATE TEMPORARY VIEW desc_col_temp_table (key int COMMENT 'column_comment') USING PARQUET
-- !query 0 schema
struct<>
-- !query 0 output



-- !query 1
DESC desc_col_temp_table key
-- !query 1 schema
struct<info_name:string,info_value:string>
-- !query 1 output
col_name key
data_type int
comment column_comment


-- !query 2
DESC EXTENDED desc_col_temp_table key
-- !query 2 schema
struct<info_name:string,info_value:string>
-- !query 2 output
col_name key
data_type int
comment column_comment
min NULL
max NULL
num_nulls NULL
distinct_count NULL
avg_col_len NULL
max_col_len NULL


-- !query 3
DESC FORMATTED desc_col_temp_table key
-- !query 3 schema
struct<info_name:string,info_value:string>
-- !query 3 output
col_name key
data_type int
comment column_comment
min NULL
max NULL
num_nulls NULL
distinct_count NULL
avg_col_len NULL
max_col_len NULL


-- !query 4
DESC FORMATTED desc_col_temp_table desc_col_temp_table.key
-- !query 4 schema
struct<info_name:string,info_value:string>
-- !query 4 output
col_name key
data_type int
comment column_comment
min NULL
max NULL
num_nulls NULL
distinct_count NULL
avg_col_len NULL
max_col_len NULL


-- !query 5
DESC desc_col_temp_table key1
-- !query 5 schema
struct<>
-- !query 5 output
org.apache.spark.sql.AnalysisException
Column key1 does not exist;


-- !query 6
CREATE TABLE desc_col_table (key int COMMENT 'column_comment') USING PARQUET
-- !query 6 schema
struct<>
-- !query 6 output



-- !query 7
ANALYZE TABLE desc_col_table COMPUTE STATISTICS FOR COLUMNS key
-- !query 7 schema
struct<>
-- !query 7 output



-- !query 8
DESC desc_col_table key
-- !query 8 schema
struct<info_name:string,info_value:string>
-- !query 8 output
col_name key
data_type int
comment column_comment


-- !query 9
DESC EXTENDED desc_col_table key
-- !query 9 schema
struct<info_name:string,info_value:string>
-- !query 9 output
col_name key
data_type int
comment column_comment
min NULL
max NULL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why min max is NULL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the table is empty

num_nulls 0
distinct_count 0
avg_col_len 4
max_col_len 4


-- !query 10
DESC FORMATTED desc_col_table key
-- !query 10 schema
struct<info_name:string,info_value:string>
-- !query 10 output
col_name key
data_type int
comment column_comment
min NULL
max NULL
num_nulls 0
distinct_count 0
avg_col_len 4
max_col_len 4


-- !query 11
CREATE TABLE desc_col_complex_table (`a.b` int, col struct<x:int, y:string>) USING PARQUET
-- !query 11 schema
struct<>
-- !query 11 output



-- !query 12
DESC FORMATTED desc_col_complex_table `a.b`
-- !query 12 schema
struct<info_name:string,info_value:string>
-- !query 12 output
col_name a.b
data_type int
comment NULL
min NULL
max NULL
num_nulls NULL
distinct_count NULL
avg_col_len NULL
max_col_len NULL


-- !query 13
DESC FORMATTED desc_col_complex_table col
-- !query 13 schema
struct<info_name:string,info_value:string>
-- !query 13 output
col_name col
data_type struct<x:int,y:string>
comment NULL
min NULL
max NULL
num_nulls NULL
distinct_count NULL
avg_col_len NULL
max_col_len NULL


-- !query 14
DESC FORMATTED desc_col_complex_table col.x
-- !query 14 schema
struct<>
-- !query 14 output
org.apache.spark.sql.AnalysisException
DESC TABLE COLUMN command does not support nested data types: col.x;
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile}
import org.apache.spark.sql.execution.command.DescribeTableCommand
import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeTableCommand}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -214,11 +214,11 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
/** Executes a query and returns the result as (schema of the output, normalized output). */
private def getNormalizedResult(session: SparkSession, sql: String): (StructType, Seq[String]) = {
// Returns true if the plan is supposed to be sorted.
def needSort(plan: LogicalPlan): Boolean = plan match {
def isSorted(plan: LogicalPlan): Boolean = plan match {
case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false
case _: DescribeTableCommand => true
case _: DescribeTableCommand | _: DescribeColumnCommand => true
case PhysicalOperation(_, _, Sort(_, true, _)) => true
case _ => plan.children.iterator.exists(needSort)
case _ => plan.children.iterator.exists(isSorted)
}

try {
Expand All @@ -232,7 +232,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
.replaceAll("Last Access.*", s"Last Access $notIncludedMsg"))

// If the output is not pre-sorted, sort it.
if (needSort(df.queryExecution.analyzed)) (schema, answer) else (schema, answer.sorted)
if (isSorted(df.queryExecution.analyzed)) (schema, answer) else (schema, answer.sorted)

} catch {
case a: AnalysisException =>
Expand Down
Loading