Skip to content

Commit 366f891

Browse files
committed
Add describe command.
1 parent 74bd1d4 commit 366f891

File tree

6 files changed

+225
-37
lines changed

6 files changed

+225
-37
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,16 @@ case class ExplainCommand(plan: LogicalPlan) extends Command {
6060
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
6161
*/
6262
case class CacheCommand(tableName: String, doCache: Boolean) extends Command
63+
64+
/**
65+
* Returned for the "Describe tableName" command. [Extended|Formatted|Pretty] is not handled.
66+
*/
67+
case class DescribeCommand(
68+
table: LogicalPlan,
69+
isFormatted: Boolean,
70+
isExtended: Boolean) extends Command {
71+
override def output = Seq(
72+
BoundReference(0, AttributeReference("name", StringType, nullable = false)()),
73+
BoundReference(1, AttributeReference("type", StringType, nullable = false)()),
74+
BoundReference(2, AttributeReference("comment", StringType, nullable = false)()))
75+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,22 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context:
113113

114114
override def output: Seq[Attribute] = Seq.empty
115115
}
116+
117+
/**
118+
* :: DeveloperApi ::
119+
*/
120+
@DeveloperApi
121+
case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
122+
@transient context: SQLContext)
123+
extends LeafNode with Command {
124+
125+
override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] =
126+
child.output.map(field => (field.name, field.dataType.toString, None.toString))
127+
128+
override def execute(): RDD[Row] = {
129+
val rows = sideEffectResult.map {
130+
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
131+
}
132+
context.sparkContext.parallelize(rows, 1)
133+
}
134+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ private[hive] case class AddFile(filePath: String) extends Command
5252
private[hive] object HiveQl {
5353
protected val nativeCommands = Seq(
5454
"TOK_DESCFUNCTION",
55-
"TOK_DESCTABLE",
5655
"TOK_DESCDATABASE",
5756
"TOK_SHOW_TABLESTATUS",
5857
"TOK_SHOWDATABASES",
@@ -120,6 +119,12 @@ private[hive] object HiveQl {
120119
"TOK_SWITCHDATABASE"
121120
)
122121

122+
// Commands that we do not need to explain.
123+
protected val noExplainCommands = Seq(
124+
"TOK_CREATETABLE",
125+
"TOK_DESCTABLE"
126+
) ++ nativeCommands
127+
123128
/**
124129
* A set of implicit transformations that allow Hive ASTNodes to be rewritten by transformations
125130
* similar to [[catalyst.trees.TreeNode]].
@@ -362,13 +367,19 @@ private[hive] object HiveQl {
362367
}
363368
}
364369

370+
protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
371+
val (db, tableName) =
372+
tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match {
373+
case Seq(tableOnly) => (None, tableOnly)
374+
case Seq(databaseName, table) => (Some(databaseName), table)
375+
}
376+
377+
(db, tableName)
378+
}
379+
365380
protected def nodeToPlan(node: Node): LogicalPlan = node match {
366381
// Just fake explain for any of the native commands.
367-
case Token("TOK_EXPLAIN", explainArgs) if nativeCommands contains explainArgs.head.getText =>
368-
ExplainCommand(NoRelation)
369-
// Create tables aren't native commands due to CTAS queries, but we still don't need to
370-
// explain them.
371-
case Token("TOK_EXPLAIN", explainArgs) if explainArgs.head.getText == "TOK_CREATETABLE" =>
382+
case Token("TOK_EXPLAIN", explainArgs) if noExplainCommands contains explainArgs.head.getText =>
372383
ExplainCommand(NoRelation)
373384
case Token("TOK_EXPLAIN", explainArgs) =>
374385
// Ignore FORMATTED if present.
@@ -377,6 +388,34 @@ private[hive] object HiveQl {
377388
// TODO: support EXTENDED?
378389
ExplainCommand(nodeToPlan(query))
379390

391+
case Token("TOK_DESCTABLE", describeArgs) =>
392+
// Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
393+
val Some(tableType) :: formatted :: extended :: _ :: Nil =
394+
getClauses(Seq("TOK_TABTYPE", "FORMATTED", "EXTENDED", "PRETTY"), describeArgs)
395+
// TODO: support PRETTY?
396+
tableType match {
397+
case Token("TOK_TABTYPE", nameParts) if nameParts.size == 1 => {
398+
nameParts.head match {
399+
case Token(".", dbName :: tableName :: Nil) =>
400+
// It is describing a table with the format like "describe db.table".
401+
val (db, tableName) = extractDbNameTableName(nameParts.head)
402+
DescribeCommand(
403+
UnresolvedRelation(db, tableName, None), formatted.isDefined, extended.isDefined)
404+
case Token(".", dbName :: tableName :: colName :: Nil) =>
405+
// It is describing a column with the format like "describe db.table column".
406+
NativePlaceholder
407+
case tableName =>
408+
// It is describing a table with the format like "describe table".
409+
DescribeCommand(
410+
UnresolvedRelation(None, tableName.getText, None),
411+
formatted.isDefined,
412+
extended.isDefined)
413+
}
414+
}
415+
// All other cases.
416+
case _ => NativePlaceholder
417+
}
418+
380419
case Token("TOK_CREATETABLE", children)
381420
if children.collect { case t@Token("TOK_QUERY", _) => t }.nonEmpty =>
382421
// TODO: Parse other clauses.
@@ -414,11 +453,8 @@ private[hive] object HiveQl {
414453
s"Unhandled clauses: ${notImplemented.flatten.map(dumpTree(_)).mkString("\n")}")
415454
}
416455

417-
val (db, tableName) =
418-
tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match {
419-
case Seq(tableOnly) => (None, tableOnly)
420-
case Seq(databaseName, table) => (Some(databaseName), table)
421-
}
456+
val (db, tableName) = extractDbNameTableName(tableNameParts)
457+
422458
InsertIntoCreatedTable(db, tableName, nodeToPlan(query))
423459

424460
// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
@@ -736,11 +772,7 @@ private[hive] object HiveQl {
736772
val Some(tableNameParts) :: partitionClause :: Nil =
737773
getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs)
738774

739-
val (db, tableName) =
740-
tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match {
741-
case Seq(tableOnly) => (None, tableOnly)
742-
case Seq(databaseName, table) => (Some(databaseName), table)
743-
}
775+
val (db, tableName) = extractDbNameTableName(tableNameParts)
744776

745777
val partitionKeys = partitionClause.map(_.getChildren.map {
746778
// Parse partitions. We also make keys case insensitive.

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.hive
1919

20-
import org.apache.spark.sql.SQLContext
20+
import org.apache.spark.sql.{SQLContext}
2121
import org.apache.spark.sql.catalyst.expressions._
2222
import org.apache.spark.sql.catalyst.planning._
2323
import org.apache.spark.sql.catalyst.plans._
@@ -81,6 +81,20 @@ private[hive] trait HiveStrategies {
8181
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
8282
case logical.NativeCommand(sql) =>
8383
NativeCommand(sql, plan.output)(context) :: Nil
84+
case describe: logical.DescribeCommand => {
85+
val resolvedTable = context.executePlan(describe.table).analyzed
86+
resolvedTable match {
87+
case t: MetastoreRelation =>
88+
Seq(DescribeHiveTableCommand(
89+
t, describe.output, describe.isFormatted, describe.isExtended)(context))
90+
case o: LogicalPlan =>
91+
if (describe.isFormatted)
92+
logger.info("Formatted is ignored because it is not defined for non-Hive tables.")
93+
if (describe.isExtended)
94+
logger.info("Extended is ignored because it is not defined for non-Hive tables.")
95+
Seq(DescribeCommand(planLater(o), describe.output)(context))
96+
}
97+
}
8498
case _ => Nil
8599
}
86100
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ package org.apache.spark.sql.hive.execution
1919

2020
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
2121
import org.apache.hadoop.hive.conf.HiveConf
22+
import org.apache.hadoop.hive.metastore.api.FieldSchema
2223
import org.apache.hadoop.hive.metastore.MetaStoreUtils
2324
import org.apache.hadoop.hive.ql.Context
25+
import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils
2426
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive}
2527
import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
2628
import org.apache.hadoop.hive.serde.serdeConstants
@@ -445,22 +447,55 @@ case class NativeCommand(
445447
if (sideEffectResult.size == 0) {
446448
context.emptyResult
447449
} else {
448-
// TODO: Need a better way to handle the result of a native command.
449-
// We may want to consider to use JsonMetaDataFormatter in Hive.
450-
val isDescribe = sql.trim.startsWith("describe")
451-
val rows = if (isDescribe) {
452-
// TODO: If we upgrade Hive to 0.13, we need to check the results of
453-
// context.sessionState.isHiveServerQuery() to determine how to split the result.
454-
// This method is introduced by https://issues.apache.org/jira/browse/HIVE-4545.
455-
// Right now, we split every string by any number of consecutive spaces.
456-
sideEffectResult.map(
457-
r => r.trim.split("\\s+", 3)).map(r => new GenericRow(r.asInstanceOf[Array[Any]]))
458-
} else {
459-
sideEffectResult.map(r => new GenericRow(Array[Any](r)))
460-
}
450+
val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r)))
461451
context.sparkContext.parallelize(rows, 1)
462452
}
463453
}
464454

465455
override def otherCopyArgs = context :: Nil
466456
}
457+
458+
/**
459+
* :: DeveloperApi ::
460+
*/
461+
@DeveloperApi
462+
case class DescribeHiveTableCommand(
463+
table: MetastoreRelation,
464+
output: Seq[Attribute],
465+
isFormatted: Boolean,
466+
isExtended: Boolean)(
467+
@transient context: HiveContext)
468+
extends LeafNode with Command {
469+
470+
override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
471+
val cols: Seq[FieldSchema] = table.hiveQlTable.getCols
472+
val parCols: Seq[FieldSchema] = table.hiveQlTable.getPartCols
473+
val columnInfo = cols.map(field => (field.getName, field.getType, field.getComment))
474+
val partColumnInfo = parCols.map(field => (field.getName, field.getType, field.getComment))
475+
476+
val formattedPart = if (isFormatted) {
477+
(MetaDataFormatUtils.getTableInformation(table.hiveQlTable), null, null) :: Nil
478+
} else {
479+
Nil
480+
}
481+
482+
val extendedPart = if (isExtended) {
483+
("Detailed Table Information", table.hiveQlTable.getTTable.toString, null) :: Nil
484+
} else {
485+
Nil
486+
}
487+
488+
// Trying to mimic the format of Hive's output. But not 100% the same.
489+
columnInfo ++ partColumnInfo ++ Seq(("# Partition Information", null, null)) ++
490+
partColumnInfo ++ formattedPart ++ extendedPart
491+
}
492+
493+
override def execute(): RDD[Row] = {
494+
val rows = sideEffectResult.map {
495+
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
496+
}
497+
context.sparkContext.parallelize(rows, 1)
498+
}
499+
500+
override def otherCopyArgs = context :: Nil
501+
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 82 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -237,13 +237,6 @@ class HiveQuerySuite extends HiveComparisonTest {
237237
.map(_.getString(0))
238238
.contains(tableName))
239239

240-
assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) {
241-
hql(s"DESCRIBE $tableName")
242-
.select('result)
243-
.collect()
244-
.map(_.getString(0).split("\t").map(_.trim))
245-
}
246-
247240
assert(isExplanation(hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))
248241

249242
TestHive.reset()
@@ -260,6 +253,88 @@ class HiveQuerySuite extends HiveComparisonTest {
260253
assert(Try(q0.count()).isSuccess)
261254
}
262255

256+
test("Describe commands") {
257+
hql(s"CREATE TABLE test_describe_commands (key INT, value STRING) PARTITIONED BY (dt STRING)")
258+
259+
hql(
260+
"""FROM src INSERT OVERWRITE TABLE test_describe_commands PARTITION (dt='2008-06-08')
261+
|SELECT key, value
262+
""".stripMargin)
263+
264+
// Describe a table
265+
assertResult(
266+
Array(
267+
Array("key", "int", null),
268+
Array("value", "string", null),
269+
Array("dt", "string", null),
270+
Array("# Partition Information", null, null),
271+
Array("dt", "string", null))
272+
) {
273+
hql("DESCRIBE test_describe_commands")
274+
.select('name, 'type, 'comment)
275+
.collect()
276+
}
277+
278+
// Describe a table with keyword FORMATTED
279+
// We only
280+
assertResult(6) {
281+
hql("DESCRIBE FORMATTED test_describe_commands").count()
282+
}
283+
284+
// Describe a table
285+
assertResult(6) {
286+
hql("DESCRIBE EXTENDED test_describe_commands").count()
287+
}
288+
289+
// Describe a table with a fully qualified table name
290+
assertResult(
291+
Array(
292+
Array("key", "int", null),
293+
Array("value", "string", null),
294+
Array("dt", "string", null),
295+
Array("# Partition Information", null, null),
296+
Array("dt", "string", null))
297+
) {
298+
hql("DESCRIBE default.test_describe_commands")
299+
.select('name, 'type, 'comment)
300+
.collect()
301+
}
302+
303+
// Describe a column is a native command
304+
assertResult(Array(Array("value", "string", "from deserializer"))) {
305+
hql("DESCRIBE test_describe_commands value")
306+
.select('result)
307+
.collect()
308+
.map(_.getString(0).split("\t").map(_.trim))
309+
}
310+
311+
// Describe a column is a native command
312+
assertResult(Array(Array("value", "string", "from deserializer"))) {
313+
hql("DESCRIBE default.test_describe_commands value")
314+
.select('result)
315+
.collect()
316+
.map(_.getString(0).split("\t").map(_.trim))
317+
}
318+
319+
// Describe a partition is a native command
320+
assertResult(
321+
Array(
322+
Array("key", "int", "None"),
323+
Array("value", "string", "None"),
324+
Array("dt", "string", "None"),
325+
Array("", "", ""),
326+
Array("# Partition Information", "", ""),
327+
Array("# col_name", "data_type", "comment"),
328+
Array("", "", ""),
329+
Array("dt", "string", "None"))
330+
) {
331+
hql("DESCRIBE test_describe_commands PARTITION (dt='2008-06-08')")
332+
.select('result)
333+
.collect()
334+
.map(_.getString(0).split("\t").map(_.trim))
335+
}
336+
}
337+
263338
test("parse HQL set commands") {
264339
// Adapted from its SQL counterpart.
265340
val testKey = "spark.sql.key.usedfortestonly"

0 commit comments

Comments
 (0)