Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,16 @@ case class ExplainCommand(plan: LogicalPlan) extends Command {
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
*/
case class CacheCommand(tableName: String, doCache: Boolean) extends Command

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this block

* Returned for the "Describe tableName" command.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be consistent either lowercase D, or uppercase the whole DESCRIBE

*/
case class DescribeCommand(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great to explain isFormatted / isExtended in @param.

table: LogicalPlan,
isFormatted: Boolean,
isExtended: Boolean) extends Command {
override def output = Seq(
BoundReference(0, AttributeReference("name", StringType, nullable = false)()),
BoundReference(1, AttributeReference("type", StringType, nullable = false)()),
BoundReference(2, AttributeReference("comment", StringType, nullable = false)()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,22 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context:

override def output: Seq[Attribute] = Seq.empty
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] =
child.output.map(field => (field.name, field.dataType.toString, None.toString))

override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
}
context.sparkContext.parallelize(rows, 1)
}
}
64 changes: 48 additions & 16 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ private[hive] case class AddFile(filePath: String) extends Command
private[hive] object HiveQl {
protected val nativeCommands = Seq(
"TOK_DESCFUNCTION",
"TOK_DESCTABLE",
"TOK_DESCDATABASE",
"TOK_SHOW_TABLESTATUS",
"TOK_SHOWDATABASES",
Expand Down Expand Up @@ -120,6 +119,12 @@ private[hive] object HiveQl {
"TOK_SWITCHDATABASE"
)

// Commands that we do not need to explain.
protected val noExplainCommands = Seq(
"TOK_CREATETABLE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does ctas fall in here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noExplainCommands is for those commands which we do not need to explain. For example, we will basically do nothing for "EXPLAIN CTAS". A regular CTAS query will not fall in here.

"TOK_DESCTABLE"
) ++ nativeCommands

/**
* A set of implicit transformations that allow Hive ASTNodes to be rewritten by transformations
* similar to [[catalyst.trees.TreeNode]].
Expand Down Expand Up @@ -362,13 +367,19 @@ private[hive] object HiveQl {
}
}

protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
val (db, tableName) =
tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space after map, and before the closing }

case Seq(tableOnly) => (None, tableOnly)
case Seq(databaseName, table) => (Some(databaseName), table)
}

(db, tableName)
}

protected def nodeToPlan(node: Node): LogicalPlan = node match {
// Just fake explain for any of the native commands.
case Token("TOK_EXPLAIN", explainArgs) if nativeCommands contains explainArgs.head.getText =>
ExplainCommand(NoRelation)
// Create tables aren't native commands due to CTAS queries, but we still don't need to
// explain them.
case Token("TOK_EXPLAIN", explainArgs) if explainArgs.head.getText == "TOK_CREATETABLE" =>
case Token("TOK_EXPLAIN", explainArgs) if noExplainCommands contains explainArgs.head.getText =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid infix contains here, i.e.

noExplainCommands.contains(explainArgs.head.getText)

ExplainCommand(NoRelation)
case Token("TOK_EXPLAIN", explainArgs) =>
// Ignore FORMATTED if present.
Expand All @@ -377,6 +388,34 @@ private[hive] object HiveQl {
// TODO: support EXTENDED?
ExplainCommand(nodeToPlan(query))

case Token("TOK_DESCTABLE", describeArgs) =>
// Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
val Some(tableType) :: formatted :: extended :: _ :: Nil =
getClauses(Seq("TOK_TABTYPE", "FORMATTED", "EXTENDED", "PRETTY"), describeArgs)
// TODO: support PRETTY?
tableType match {
case Token("TOK_TABTYPE", nameParts) if nameParts.size == 1 => {
nameParts.head match {
case Token(".", dbName :: tableName :: Nil) =>
// It is describing a table with the format like "describe db.table".
val (db, tableName) = extractDbNameTableName(nameParts.head)
DescribeCommand(
UnresolvedRelation(db, tableName, None), formatted.isDefined, extended.isDefined)
case Token(".", dbName :: tableName :: colName :: Nil) =>
// It is describing a column with the format like "describe db.table column".
NativePlaceholder
case tableName =>
// It is describing a table with the format like "describe table".
DescribeCommand(
UnresolvedRelation(None, tableName.getText, None),
formatted.isDefined,
extended.isDefined)
}
}
// All other cases.
case _ => NativePlaceholder
}

case Token("TOK_CREATETABLE", children)
if children.collect { case t@Token("TOK_QUERY", _) => t }.nonEmpty =>
// TODO: Parse other clauses.
Expand Down Expand Up @@ -414,11 +453,8 @@ private[hive] object HiveQl {
s"Unhandled clauses: ${notImplemented.flatten.map(dumpTree(_)).mkString("\n")}")
}

val (db, tableName) =
tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match {
case Seq(tableOnly) => (None, tableOnly)
case Seq(databaseName, table) => (Some(databaseName), table)
}
val (db, tableName) = extractDbNameTableName(tableNameParts)

InsertIntoCreatedTable(db, tableName, nodeToPlan(query))

// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
Expand Down Expand Up @@ -736,11 +772,7 @@ private[hive] object HiveQl {
val Some(tableNameParts) :: partitionClause :: Nil =
getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs)

val (db, tableName) =
tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match {
case Seq(tableOnly) => (None, tableOnly)
case Seq(databaseName, table) => (Some(databaseName), table)
}
val (db, tableName) = extractDbNameTableName(tableNameParts)

val partitionKeys = partitionClause.map(_.getChildren.map {
// Parse partitions. We also make keys case insensitive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hive

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SQLContext}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to change this

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -81,6 +81,20 @@ private[hive] trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil
case describe: logical.DescribeCommand => {
val resolvedTable = context.executePlan(describe.table).analyzed
resolvedTable match {
case t: MetastoreRelation =>
Seq(DescribeHiveTableCommand(
t, describe.output, describe.isFormatted, describe.isExtended)(context))
case o: LogicalPlan =>
if (describe.isFormatted)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe for non metastore tables, we can just added some formatted/extended information saying they are registered as temporary tables? Then we can get rid of the extra lines here ...

logger.info("Formatted is ignored because it is not defined for non-Hive tables.")
if (describe.isExtended)
logger.info("Extended is ignored because it is not defined for non-Hive tables.")
Seq(DescribeCommand(planLater(o), describe.output)(context))
}
}
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package org.apache.spark.sql.hive.execution

import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.FieldSchema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

api should go after MetaStoreUtils since api is a package

import org.apache.hadoop.hive.metastore.MetaStoreUtils
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one also should go after the next line

import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive}
import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
import org.apache.hadoop.hive.serde.serdeConstants
Expand Down Expand Up @@ -452,3 +454,48 @@ case class NativeCommand(

override def otherCopyArgs = context :: Nil
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class DescribeHiveTableCommand(
table: MetastoreRelation,
output: Seq[Attribute],
isFormatted: Boolean,
isExtended: Boolean)(
@transient context: HiveContext)
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
val cols: Seq[FieldSchema] = table.hiveQlTable.getCols
val parCols: Seq[FieldSchema] = table.hiveQlTable.getPartCols
val columnInfo = cols.map(field => (field.getName, field.getType, field.getComment))
val partColumnInfo = parCols.map(field => (field.getName, field.getType, field.getComment))

val formattedPart = if (isFormatted) {
(MetaDataFormatUtils.getTableInformation(table.hiveQlTable), null, null) :: Nil
} else {
Nil
}

val extendedPart = if (isExtended) {
("Detailed Table Information", table.hiveQlTable.getTTable.toString, null) :: Nil
} else {
Nil
}

// Trying to mimic the format of Hive's output. But not 100% the same.
columnInfo ++ partColumnInfo ++ Seq(("# Partition Information", null, null)) ++
partColumnInfo ++ formattedPart ++ extendedPart
}

override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
}
context.sparkContext.parallelize(rows, 1)
}

override def otherCopyArgs = context :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,6 @@ class HiveQuerySuite extends HiveComparisonTest {
.map(_.getString(0))
.contains(tableName))

assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) {
hql(s"DESCRIBE $tableName")
.select('result)
.collect()
.map(_.getString(0).split("\t").map(_.trim))
}

assert(isExplanation(hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))

TestHive.reset()
Expand All @@ -257,6 +250,88 @@ class HiveQuerySuite extends HiveComparisonTest {
assert(Try(q0.count()).isSuccess)
}

test("Describe commands") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be consistent either lowercase D, or uppercase the whole DESCRIBE

hql(s"CREATE TABLE test_describe_commands (key INT, value STRING) PARTITIONED BY (dt STRING)")

hql(
"""FROM src INSERT OVERWRITE TABLE test_describe_commands PARTITION (dt='2008-06-08')
|SELECT key, value
""".stripMargin)

// Describe a table
assertResult(
Array(
Array("key", "int", null),
Array("value", "string", null),
Array("dt", "string", null),
Array("# Partition Information", null, null),
Array("dt", "string", null))
) {
hql("DESCRIBE test_describe_commands")
.select('name, 'type, 'comment)
.collect()
}

// Describe a table with keyword FORMATTED
// We only
assertResult(6) {
hql("DESCRIBE FORMATTED test_describe_commands").count()
}

// Describe a table
assertResult(6) {
hql("DESCRIBE EXTENDED test_describe_commands").count()
}

// Describe a table with a fully qualified table name
assertResult(
Array(
Array("key", "int", null),
Array("value", "string", null),
Array("dt", "string", null),
Array("# Partition Information", null, null),
Array("dt", "string", null))
) {
hql("DESCRIBE default.test_describe_commands")
.select('name, 'type, 'comment)
.collect()
}

// Describe a column is a native command
assertResult(Array(Array("value", "string", "from deserializer"))) {
hql("DESCRIBE test_describe_commands value")
.select('result)
.collect()
.map(_.getString(0).split("\t").map(_.trim))
}

// Describe a column is a native command
assertResult(Array(Array("value", "string", "from deserializer"))) {
hql("DESCRIBE default.test_describe_commands value")
.select('result)
.collect()
.map(_.getString(0).split("\t").map(_.trim))
}

// Describe a partition is a native command
assertResult(
Array(
Array("key", "int", "None"),
Array("value", "string", "None"),
Array("dt", "string", "None"),
Array("", "", ""),
Array("# Partition Information", "", ""),
Array("# col_name", "data_type", "comment"),
Array("", "", ""),
Array("dt", "string", "None"))
) {
hql("DESCRIBE test_describe_commands PARTITION (dt='2008-06-08')")
.select('result)
.collect()
.map(_.getString(0).split("\t").map(_.trim))
}
}

test("parse HQL set commands") {
// Adapted from its SQL counterpart.
val testKey = "spark.sql.key.usedfortestonly"
Expand Down