Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ describeFuncName
;

describeColName
: identifier ('.' (identifier | STRING))*
: identifier ('.' colpathIdentifier)*
;

ctes
Expand Down Expand Up @@ -454,6 +454,10 @@ tableIdentifier
: (db=identifier '.')? table=identifier
;

colpathIdentifier
: identifier | ELEM_TYPE | KEY_TYPE | VALUE_TYPE
;

namedExpression
: expression (AS? (identifier | identifierList))?
;
Expand Down Expand Up @@ -902,6 +906,9 @@ OPTION: 'OPTION';
ANTI: 'ANTI';
LOCAL: 'LOCAL';
INPATH: 'INPATH';
KEY_TYPE: '$KEY$';
VALUE_TYPE: '$VALUE$';
ELEM_TYPE: '$ELEM$';

STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException, SimpleFunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.util.StringUtils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ case class CatalogTable(
def partitionColumns: Seq[CatalogColumn] =
schema.filter { c => partitionColumnNames.contains(c.name) }

/** Columns this table is bucketed by. */
def bucketColumns: Seq[CatalogColumn] =
schema.filter { c => bucketColumnNames.contains(c.name) }

/** Columns this table is sorted by. */
def sortColumns: Seq[CatalogColumn] =
schema.filter { c => sortColumnNames.contains(c.name) }

/** Return the database this table was specified to belong to, assuming it exists. */
def database: String = identifier.database.getOrElse {
throw new AnalysisException(s"table $identifier did not specify database")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,32 +218,31 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {

// Create the explain comment.
val statement = plan(ctx.statement)
if (isExplainableStatement(statement)) {
ExplainCommand(statement, extended = options.exists(_.EXTENDED != null),
codegen = options.exists(_.CODEGEN != null))
} else {
ExplainCommand(OneRowRelation)
}
}

/**
* Determine if a plan should be explained at all.
*/
protected def isExplainableStatement(plan: LogicalPlan): Boolean = plan match {
case _: DescribeTableCommand => false
case _ => true
ExplainCommand(statement, extended = options.exists(_.EXTENDED != null),
codegen = options.exists(_.CODEGEN != null))
}

/**
* Create a [[DescribeTableCommand]] logical plan.
* A command for users to describe a table in the given database. If a databaseName is not given,
* the current database will be used.
* The syntax of using this command in SQL is:
* {{{
* DESCRIBE [EXTENDED|FORMATTED] [db_name.]table_name [column_name] [PARTITION partition_spec]
* }}}
*/
override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) {
// FORMATTED and columns are not supported. Return null and let the parser decide what to do
// with this (create an exception or pass it on to a different system).
if (ctx.describeColName != null || ctx.FORMATTED != null || ctx.partitionSpec != null) {
if (ctx.FORMATTED != null) {
null
} else {
DescribeTableCommand(visitTableIdentifier(ctx.tableIdentifier), ctx.EXTENDED != null)
val partitionKeys = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
val columnPath = Option(ctx.describeColName).map(visitDescribeColName)
DescribeTableCommand(
visitTableIdentifier(ctx.tableIdentifier),
partitionKeys,
columnPath,
ctx.EXTENDED != null)
}
}

Expand Down Expand Up @@ -349,6 +348,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
}

/**
* A column path can be specified as an parameter to describe command. It is a dot separated
* list of identifiers with three special kinds of identifiers namely '$elem$', '$key$' and
* '$value$' which are used to represent array element, map key and values respectively.
*/
override def visitDescribeColName(ctx: DescribeColNameContext): String = {
var result = ctx.identifier.getText
if (!ctx.colpathIdentifier.isEmpty) {
result = result ++ "." ++ ctx.colpathIdentifier.asScala.map { _.getText}.mkString(".")
}
result
}

/**
* Create a [[CreateDatabase]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

case class CreateTableAsSelectLogicalPlan(
Expand Down Expand Up @@ -268,12 +270,26 @@ case class LoadData(
}

/**
* Command that looks like
* A command for users to describe a table in the given database. If a databaseName is not given,
* the current database will be used.
* The syntax of using this command in SQL is:
* {{{
* DESCRIBE (EXTENDED) table_name;
* DESCRIBE [EXTENDED|FORMATTED] [db_name.]table_name [column_name] [PARTITION partition_spec]
* }}}
* Note : FORMATTED option is not supported.
* @param table table to be described.
* @param partSpec spec If specified, the specified partition is described. It is effective only
* when the table is a Hive table
* @param colPath If specified, only the specified column is described. It is effective only
* when the table is a Hive table
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false. It is effective only
* when the table is a Hive table
*/
case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean)
case class DescribeTableCommand(
table: TableIdentifier,
partSpec: Option[TablePartitionSpec],
colPath: Option[String],
isExtended: Boolean)
extends RunnableCommand {

override val output: Seq[Attribute] = Seq(
Expand All @@ -286,20 +302,147 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean)
new MetadataBuilder().putString("comment", "comment of the column").build())()
)

private def formatColumns(cols: Seq[CatalogColumn]): String = {
cols.map { col =>
s"""
|${col.getClass.getSimpleName}
|(name:${col.name}
|type:${col.dataType}
|comment:${col.comment.orNull}
""".stripMargin
}.mkString(",")
}

private def formatProperties(props: Map[String, String]): String = {
props.map {
case (k, v) => s"$k=$v"
}.mkString("{", ", ", "}")
}

private def getPartValues(part: CatalogTablePartition, cols: Seq[String]): String = {
cols.map { name =>
PartitioningUtils.escapePathName(part.spec(name))
}.mkString(", ")
}

private def descColPath(table: CatalogTable, colPath: String): Array[Row] = {
val names = colPath.split("\\.");
val lastName = names(names.length - 1)
val fields = table.schema.map {c =>
StructField(c.name, CatalystSqlParser.parseDataType(c.dataType), c.nullable)
}
var dataType: DataType = StructType(fields)
for (i <- 0 to names.length -1) {
dataType match {
case s: StructType =>
try {
dataType = s.apply(names(i)).dataType
} catch {
case e: Exception =>
throw new AnalysisException(s"Column name/path: ${colPath} does not exist.")
}
case m: MapType if names(i) == "$key$" => dataType = m.keyType
case m: MapType if names(i) == "$value$" => dataType = m.valueType
case a: ArrayType if names(i) == "$value$" => dataType = a.elementType
case _ => throw new AnalysisException("Column name/path: ${colPath} does not exist")
}
}

val result: Seq[Row] = dataType match {
case s: StructType =>
s.map { f =>
Row(f.name, f.dataType.simpleString, "from deserializer")}
case d: DataType => Seq(Row(lastName, dataType.simpleString, "from deserializer"))
}
result.toArray
}

private def descStorageFormat(
table: CatalogTable,
storage: CatalogStorageFormat): String = {
// TODO - check with Lian - from StorageDesc - compress, skewedInfo, storedAsSubDirectories
// are not availble. So these are dropped from the output.
val storageLocationStr =
s"""
|${storage.getClass.getSimpleName}(location:${storage.locationUri.orNull},
| inputFormat:${storage.inputFormat.orNull},
| outputFormat:${storage.outputFormat.orNull},
| numBuckets:${table.numBuckets},
| serializationLib=${storage.serde.orNull},
| parameters=${formatProperties(storage.serdeProperties)},
| bucketCols:[${formatColumns(table.bucketColumns)}],
| sortCols=[${formatColumns(table.sortColumns)}])
""".stripMargin.replaceAll("\n", "").trim
storageLocationStr
}

private def descPartExtended(table: CatalogTable, part: CatalogTablePartition): String = {
val result = StringBuilder.newBuilder
val clsName = part.getClass.getSimpleName
result ++= s"${clsName}(values:[${getPartValues(part, table.partitionColumnNames)}], "
result ++= s"dbName:${table.database}, "
// TODO - check with Lian - no owner info available.
result ++= s"createTime:${table.createTime}, "
result ++= s"lastAccessTime:${table.lastAccessTime}, "
// TODO - check with Lian - no retention info available.

result ++= s"sd:${descStorageFormat(table, part.storage)}, "
// TODO Check with Lian - Hive prints partition keys here. Since we output paritioning keys and
// schema already at the start i don't output it here again.
result ++= s"parameters:${formatProperties(table.properties)}, "
result ++= s"viewOriginalText:${table.viewOriginalText.orNull}, "
result ++= s"viewExpandedText:${table.viewText.orNull}, "
result ++= s"tableType:${table.tableType})"
result.toString
}

private def descTableExtended(table: CatalogTable): String = {
val result = StringBuilder.newBuilder
result ++= s"${table.getClass.getSimpleName}(tableName:${table.identifier.table}, "
result ++= s"dbName:${table.database}, "
// TODO - check with Lian - no owner info available.
result ++= s"createTime:${table.createTime}, "
result ++= s"lastAccessTime:${table.lastAccessTime}, "
// TODO - check with Lian - no retention info available.

result ++= s"sd:${descStorageFormat(table, table.storage)}, "
// TODO Check with Lian - Hive prints partition keys here. Since we output paritioning keys
// and schema already i don't output it here again.
result ++= s"parameters:${formatProperties(table.properties)}, "
result ++= s"viewOriginalText:${table.viewOriginalText.orNull}, "
result ++= s"viewExpandedText:${table.viewText.orNull}, "
result ++= s"tableType:${table.tableType})"
result.toString
}

override def run(sparkSession: SparkSession): Seq[Row] = {
val result = new ArrayBuffer[Row]
sparkSession.sessionState.catalog.lookupRelation(table) match {
val catalog = sparkSession.sessionState.catalog
catalog.lookupRelation(table) match {
case catalogRelation: CatalogRelation =>
catalogRelation.catalogTable.schema.foreach { column =>
result += Row(column.name, column.dataType, column.comment.orNull)
}

if (catalogRelation.catalogTable.partitionColumns.nonEmpty) {
result += Row("# Partition Information", "", "")
result += Row(s"# ${output(0).name}", output(1).name, output(2).name)
val tab = catalogRelation.catalogTable
val part = partSpec.map(p => Option(catalog.getPartition(table, p))).getOrElse(None)
if (colPath.nonEmpty) {
result ++= descColPath(tab, colPath.get)
} else {
catalogRelation.catalogTable.schema.foreach { column =>
result += Row(column.name, column.dataType, column.comment.orNull)
}
if (tab.partitionColumns.nonEmpty) {
result += Row("# Partition Information", "", "")
result += Row(s"# ${output(0).name}", output(1).name, output(2).name)

catalogRelation.catalogTable.partitionColumns.foreach { col =>
result += Row(col.name, col.dataType, col.comment.orNull)
tab.partitionColumns.foreach { col =>
result += Row(col.name, col.dataType, col.comment.orNull)
}
}
if (isExtended) {
if (partSpec.isEmpty) {
result += Row("Detailed Table Information", descTableExtended(tab), "")
} else {
result +=
Row("Detailed Partition Information", descPartExtended(tab, part.get), "")
}
}
}

Expand All @@ -315,7 +458,6 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean)
}
}


/**
* A command for users to get tables in the given database.
* If a databaseName is not given, the current database will be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,4 +757,28 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
}

test("describe table") {
val parsed1 = parser.parsePlan("DESCRIBE tab1")
val expected1 = DescribeTableCommand(TableIdentifier("tab1", None), None, None, false)
val parsed2 = parser.parsePlan("DESCRIBE db1.tab1")
val expected2 = DescribeTableCommand(TableIdentifier("tab1", Some("db1")), None, None, false)
val parsed3 = parser.parsePlan("DESCRIBE tab1 col1")
val expected3 = DescribeTableCommand(TableIdentifier("tab1", None), None, Some("col1"), false)
val parsed4 = parser.parsePlan("DESCRIBE tab1 PARTITION (c1 = 'val1')")
val expected4 = DescribeTableCommand(TableIdentifier("tab1", None),
Some(Map("c1" -> "val1")), None, false)
val parsed5 = parser.parsePlan("DESCRIBE EXTENDED tab1 PARTITION (c1 = 'val1')")
val expected5 = DescribeTableCommand(TableIdentifier("tab1", None),
Some(Map("c1" -> "val1")), None, true)
val parsed6 = parser.parsePlan("DESCRIBE EXTENDED tab1 tab1.col1.field1.$elem$")
val expected6 = DescribeTableCommand(TableIdentifier("tab1", None),
None, Some("tab1.col1.field1.$elem$"), true)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
comparePlans(parsed4, expected4)
comparePlans(parsed5, expected5)
comparePlans(parsed6, expected6)
}
}
Loading