Skip to content

Commit 319d45b

Browse files
committed
DescribeTable based on CatalogTable
1 parent 30efbce commit 319d45b

File tree

6 files changed

+176
-155
lines changed

6 files changed

+176
-155
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -320,23 +320,6 @@ class SessionCatalog(
320320
alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
321321
}
322322

323-
/**
324-
* Describes a table by returning various metadata pertaining to table/partitions/columns.
325-
*/
326-
def describeTable(
327-
table: TableIdentifier,
328-
partSpec: Option[TablePartitionSpec],
329-
colPath: Option[String],
330-
isExtended: Boolean,
331-
output: Seq[Attribute]): Seq[(String, String, String)] = {
332-
val relation = lookupRelation(table)
333-
relation.schema.fields.map { field =>
334-
val cmtKey = "comment"
335-
val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else ""
336-
(field.name, field.dataType.simpleString, comment)
337-
}
338-
}
339-
340323
/**
341324
* Return whether a table with the specified name exists.
342325
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,14 @@ case class CatalogTable(
110110
def partitionColumns: Seq[CatalogColumn] =
111111
schema.filter { c => partitionColumnNames.contains(c.name) }
112112

113+
/** Columns this table is bucketed by. */
114+
def bucketColumns: Seq[CatalogColumn] =
115+
schema.filter { c => bucketColumnNames.contains(c.name) }
116+
117+
/** Columns this table is sorted by. */
118+
def sortColumns: Seq[CatalogColumn] =
119+
schema.filter { c => sortColumnNames.contains(c.name) }
120+
113121
/** Return the database this table was specified to belong to, assuming it exists. */
114122
def database: String = identifier.database.getOrElse {
115123
throw new AnalysisException(s"table $identifier did not specify database")

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 157 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,17 @@ package org.apache.spark.sql.execution.command
2020
import java.io.File
2121
import java.net.URI
2222

23+
import scala.collection.mutable.ArrayBuffer
24+
2325
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
2426
import org.apache.spark.sql.catalyst.TableIdentifier
25-
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog}
26-
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
27-
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType}
27+
import org.apache.spark.sql.catalyst.catalog._
2828
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2929
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
30+
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
3031
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
31-
import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType}
32+
import org.apache.spark.sql.execution.datasources.PartitioningUtils
33+
import org.apache.spark.sql.types._
3234
import org.apache.spark.util.Utils
3335

3436
case class CreateTableAsSelectLogicalPlan(
@@ -274,6 +276,7 @@ case class LoadData(
274276
* {{{
275277
* DESCRIBE [EXTENDED|FORMATTED] [db_name.]table_name [column_name] [PARTITION partition_spec]
276278
* }}}
279+
* Note : FORMATTED option is not supported.
277280
* @param table table to be described.
278281
* @param partSpec spec If specified, the specified partition is described. It is effective only
279282
* when the table is a Hive table
@@ -289,7 +292,7 @@ case class DescribeTableCommand(
289292
isExtended: Boolean)
290293
extends RunnableCommand {
291294

292-
override val output: Seq[Attribute] = Seq(
295+
override val output: Seq[Attribute] = Seq(
293296
// Column names are based on Hive.
294297
AttributeReference("col_name", StringType, nullable = false,
295298
new MetadataBuilder().putString("comment", "name of the column").build())(),
@@ -299,28 +302,161 @@ case class DescribeTableCommand(
299302
new MetadataBuilder().putString("comment", "comment of the column").build())()
300303
)
301304

302-
override def run(sparkSession: SparkSession): Seq[Row] = {
303-
val catalog = sparkSession.sessionState.catalog
304-
// Check to make sure supplied partition are valid partition columns. .
305-
if (partSpec.isDefined && !catalog.isTemporaryTable(table)) {
306-
val tab = catalog.getTableMetadata(table)
307-
val badColumns = partSpec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
308-
if (badColumns.nonEmpty) {
309-
throw new AnalysisException(
310-
s"Non-partitioned column(s) [${badColumns.mkString(", ")}] are " +
311-
s"specified for DESCRIBE command")
305+
private def formatColumns(cols: Seq[CatalogColumn]): String = {
306+
cols.map { col =>
307+
s"""
308+
|${col.getClass.getSimpleName}
309+
|(name:${col.name}
310+
|type:${col.dataType}
311+
|comment:${col.comment.orNull}
312+
""".stripMargin
313+
}.mkString(",")
314+
}
315+
316+
private def formatProperties(props: Map[String, String]): String = {
317+
props.map {
318+
case (k, v) => s"$k=$v"
319+
}.mkString("{", ", ", "}")
320+
}
321+
322+
private def getPartValues(part: CatalogTablePartition, cols: Seq[String]): String = {
323+
cols.map { name =>
324+
PartitioningUtils.escapePathName(part.spec(name))
325+
}.mkString(", ")
326+
}
327+
328+
private def descColPath(table: CatalogTable, colPath: String): Array[Row] = {
329+
val names = colPath.split("\\.");
330+
val lastName = names(names.length - 1)
331+
val fields = table.schema.map {c =>
332+
StructField(c.name, CatalystSqlParser.parseDataType(c.dataType), c.nullable)
333+
}
334+
var dataType: DataType = StructType(fields)
335+
for (i <- 0 to names.length -1) {
336+
dataType match {
337+
case s: StructType =>
338+
try {
339+
dataType = s.apply(names(i)).dataType
340+
} catch {
341+
case e: Exception =>
342+
throw new AnalysisException(s"Column name/path: ${colPath} does not exist.")
343+
}
344+
case m: MapType if names(i) == "$key$" => dataType = m.keyType
345+
case m: MapType if names(i) == "$value$" => dataType = m.valueType
346+
case a: ArrayType if names(i) == "$value$" => dataType = a.elementType
347+
case _ => throw new AnalysisException("Column name/path: ${colPath} does not exist")
312348
}
313349
}
314350

315-
val results =
316-
sparkSession.sessionState.catalog.describeTable(table, partSpec, colPath, isExtended, output)
317-
val rows = results.map { case (name, dataType, comment) =>
318-
Row(name, dataType, comment)
351+
val result: Seq[Row] = dataType match {
352+
case s: StructType =>
353+
s.map { f =>
354+
Row(f.name, f.dataType.simpleString, "from deserializer")}
355+
case d: DataType => Seq(Row(lastName, dataType.simpleString, "from deserializer"))
319356
}
320-
rows
357+
result.toArray
358+
}
359+
360+
private def descStorageFormat(
361+
table: CatalogTable,
362+
storage: CatalogStorageFormat): String = {
363+
// TODO - check with Lian - from StorageDesc - compress, skewedInfo, storedAsSubDirectories
364+
// are not availble. So these are dropped from the output.
365+
val storageLocationStr =
366+
s"""
367+
|${storage.getClass.getSimpleName}(location:${storage.locationUri.orNull},
368+
| inputFormat:${storage.inputFormat.orNull},
369+
| outputFormat:${storage.outputFormat.orNull},
370+
| numBuckets:${table.numBuckets},
371+
| serializationLib=${storage.serde.orNull},
372+
| parameters=${formatProperties(storage.serdeProperties)},
373+
| bucketCols:[${formatColumns(table.bucketColumns)}],
374+
| sortCols=[${formatColumns(table.sortColumns)}])
375+
""".stripMargin.replaceAll("\n", "").trim
376+
storageLocationStr
377+
}
378+
379+
private def descPartExtended(table: CatalogTable, part: CatalogTablePartition): String = {
380+
val result = StringBuilder.newBuilder
381+
val clsName = part.getClass.getSimpleName
382+
result ++= s"${clsName}(values:[${getPartValues(part, table.partitionColumnNames)}], "
383+
result ++= s"dbName:${table.database}, "
384+
// TODO - check with Lian - no owner info available.
385+
result ++= s"createTime:${table.createTime}, "
386+
result ++= s"lastAccessTime:${table.lastAccessTime}, "
387+
// TODO - check with Lian - no retention info available.
388+
389+
result ++= s"sd:${descStorageFormat(table, part.storage)}, "
390+
// TODO Check with Lian - Hive prints partition keys here. Since we output paritioning keys and
391+
// schema already at the start i don't output it here again.
392+
result ++= s"parameters:${formatProperties(table.properties)}, "
393+
result ++= s"viewOriginalText:${table.viewOriginalText.orNull}, "
394+
result ++= s"viewExpandedText:${table.viewText.orNull}, "
395+
result ++= s"tableType:${table.tableType})"
396+
result.toString
321397
}
322-
}
323398

399+
private def descTableExtended(table: CatalogTable): String = {
400+
val result = StringBuilder.newBuilder
401+
result ++= s"${table.getClass.getSimpleName}(tableName:${table.identifier.table}, "
402+
result ++= s"dbName:${table.database}, "
403+
// TODO - check with Lian - no owner info available.
404+
result ++= s"createTime:${table.createTime}, "
405+
result ++= s"lastAccessTime:${table.lastAccessTime}, "
406+
// TODO - check with Lian - no retention info available.
407+
408+
result ++= s"sd:${descStorageFormat(table, table.storage)}, "
409+
// TODO Check with Lian - Hive prints partition keys here. Since we output paritioning keys
410+
// and schema already i don't output it here again.
411+
result ++= s"parameters:${formatProperties(table.properties)}, "
412+
result ++= s"viewOriginalText:${table.viewOriginalText.orNull}, "
413+
result ++= s"viewExpandedText:${table.viewText.orNull}, "
414+
result ++= s"tableType:${table.tableType})"
415+
result.toString
416+
}
417+
418+
override def run(sparkSession: SparkSession): Seq[Row] = {
419+
val result = new ArrayBuffer[Row]
420+
val catalog = sparkSession.sessionState.catalog
421+
catalog.lookupRelation(table) match {
422+
case catalogRelation: CatalogRelation =>
423+
val tab = catalogRelation.catalogTable
424+
val part = partSpec.map(p => Option(catalog.getPartition(table, p))).getOrElse(None)
425+
if (colPath.nonEmpty) {
426+
result ++= descColPath(tab, colPath.get)
427+
} else {
428+
catalogRelation.catalogTable.schema.foreach { column =>
429+
result += Row(column.name, column.dataType, column.comment.orNull)
430+
}
431+
if (tab.partitionColumns.nonEmpty) {
432+
result += Row("# Partition Information", "", "")
433+
result += Row(s"# ${output(0).name}", output(1).name, output(2).name)
434+
435+
tab.partitionColumns.foreach { col =>
436+
result += Row(col.name, col.dataType, col.comment.orNull)
437+
}
438+
}
439+
if (isExtended) {
440+
if (partSpec.isEmpty) {
441+
result += Row("Detailed Table Information", descTableExtended(tab), "")
442+
} else {
443+
result +=
444+
Row("Detailed Partition Information", descPartExtended(tab, part.get), "")
445+
}
446+
}
447+
}
448+
449+
case relation =>
450+
relation.schema.fields.foreach { field =>
451+
val comment =
452+
if (field.metadata.contains("comment")) field.metadata.getString("comment") else ""
453+
result += Row(field.name, field.dataType.simpleString, comment)
454+
}
455+
}
456+
457+
result
458+
}
459+
}
324460

325461
/**
326462
* A command for users to get tables in the given database.

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
2929
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
3030
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
3131
import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog}
32-
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog._
33-
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo}
32+
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
3433
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
3534
import org.apache.spark.sql.catalyst.rules.Rule
3635
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
@@ -67,26 +66,6 @@ private[sql] class HiveSessionCatalog(
6766
}
6867
}
6968

70-
/**
71-
* Describes a table by returning various metadata pertaining to table/partitions/columns.
72-
*/
73-
override def describeTable(
74-
table: TableIdentifier,
75-
partSpec: Option[TablePartitionSpec],
76-
colPath: Option[String],
77-
isExtended: Boolean,
78-
output: Seq[Attribute]): Seq[(String, String, String)] = {
79-
val relation = lookupRelation(table)
80-
relation match {
81-
case r: MetastoreRelation =>
82-
val db = table.database.getOrElse(currentDb)
83-
val tableName = formatTableName(table.table)
84-
client.describeTable(db, tableName, partSpec, colPath, isExtended, output)
85-
case o: LogicalPlan =>
86-
super.describeTable(table, partSpec, colPath, isExtended, output)
87-
}
88-
}
89-
9069
// ----------------------------------------------------------------
9170
// | Methods and fields for interacting with HiveMetastoreCatalog |
9271
// ----------------------------------------------------------------

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 0 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -478,90 +478,6 @@ private[hive] class HiveClientImpl(
478478
client.getTablesByPattern(dbName, pattern).asScala
479479
}
480480

481-
/**
482-
* Describes a Hive table.
483-
* The syntax of using this command in SQL is:
484-
* {{{
485-
* DESCRIBE [EXTENDED|FORMATTED] [db_name.]table_name [column_name] [PARTITION partition_spec]
486-
* }}}
487-
* @param table The table to be described.
488-
* @param partSpec spec If specified, the specified partition is described.
489-
* @param colPath If specified, only the specified column is described.
490-
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
491-
*/
492-
override def describeTable(
493-
db: String,
494-
table: String,
495-
partSpec: Option[TablePartitionSpec],
496-
colPath: Option[String],
497-
isExtended: Boolean,
498-
output: Seq[Attribute]): Seq[(String, String, String)] = withHiveState {
499-
500-
// Get partition columns or table columns based on the supplied partition spec.
501-
def getCols(tab: HiveTable, part: Option[HivePartition]): Seq[FieldSchema] = {
502-
if (!partSpec.isEmpty && tab.getTableType() != HiveTableType.VIRTUAL_VIEW) {
503-
part.get.getCols.asScala
504-
} else {
505-
tab.getCols.asScala
506-
}
507-
}
508-
// Formats the column metadata as per output schema.
509-
def formatColumns(cols: Seq[FieldSchema]): Seq[(String, String, String)] = {
510-
cols.map(field => (field.getName, field.getType, field.getComment))
511-
}
512-
513-
var results: Seq[(String, String, String)] = Nil
514-
// Get Table
515-
val tab =
516-
Option(client.getTable(db, table, false)).
517-
getOrElse(throw new NoSuchTableException(db, table))
518-
519-
// Get Partition info
520-
var part = partSpec.map(p => Option(client.getPartition(tab, p.asJava, false))).getOrElse(None)
521-
if (partSpec.nonEmpty && part.isEmpty) {
522-
throw new AnalysisException(
523-
s"partition to describe '${partSpec.get}' does not exist" +
524-
s" in table '$table' database '$db'")
525-
}
526-
527-
// Get columns if colPath is specified.
528-
val cols = colPath.map { p =>
529-
val qualifiedColPath = if (!p.startsWith(table)) s"${table}.${p}" else p
530-
try {
531-
Hive.getFieldsFromDeserializer(qualifiedColPath, tab.getDeserializer(true)).asScala
532-
} catch {
533-
case e: Exception => throw new AnalysisException(e.getMessage)
534-
}
535-
}.getOrElse(getCols(tab, part))
536-
537-
538-
if (colPath.isEmpty) {
539-
// describe all the columns in the table first
540-
results ++= formatColumns(getCols(tab, part) ++ tab.getPartCols().asScala)
541-
542-
// describe partition columns
543-
val partitionColumns = tab.getPartCols.asScala
544-
if (partitionColumns.nonEmpty) {
545-
results ++=
546-
Seq(("# Partition Information", "", "")) ++
547-
Seq((s"# ${output(0).name}", output(1).name, output(2).name)) ++
548-
formatColumns(partitionColumns)
549-
}
550-
551-
// describe additional table/parition details
552-
if (isExtended) {
553-
if (partSpec.isEmpty) {
554-
results ++= Seq(("Detailed Table Information", tab.getTTable.toString, ""))
555-
} else {
556-
results ++= Seq(("Detailed Partition Information", part.get.getTPartition.toString, ""))
557-
}
558-
}
559-
} else {
560-
results ++= formatColumns(cols)
561-
}
562-
results
563-
}
564-
565481
/**
566482
* Runs the specified SQL query using Hive.
567483
*/

0 commit comments

Comments
 (0)