Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{Row, SQLConf, SQLContext}

trait Command {
this: SparkPlan =>

/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
Expand All @@ -35,7 +37,11 @@ trait Command {
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
protected[sql] lazy val sideEffectResult: Seq[Row] = Seq.empty[Row]

override def executeCollect(): Array[Row] = sideEffectResult.toArray

override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}

/**
Expand All @@ -47,17 +53,17 @@ case class SetCommand(
@transient context: SQLContext)
extends LeafNode with Command with Logging {

override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
override protected[sql] lazy val sideEffectResult: Seq[Row] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
Array(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
} else {
context.setConf(k, v)
Array(s"$k=$v")
Array(Row(s"$k=$v"))
}

// Query the value bound to key k.
Expand All @@ -73,28 +79,22 @@ case class SetCommand(
"hive-0.12.0.jar").mkString(":")

Array(
"system:java.class.path=" + hiveJars,
"system:sun.java.command=shark.SharkServer2")
}
else {
Array(s"$k=${context.getConf(k, "<undefined>")}")
Row("system:java.class.path=" + hiveJars),
Row("system:sun.java.command=shark.SharkServer2"))
} else {
Array(Row(s"$k=${context.getConf(k, "<undefined>")}"))
}

// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
context.getAllConfs.map { case (k, v) =>
s"$k=$v"
Row(s"$k=$v")
}.toSeq

case _ =>
throw new IllegalArgumentException()
}

def execute(): RDD[Row] = {
val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) }
context.sparkContext.parallelize(rows, 1)
}

override def otherCopyArgs = context :: Nil
}

Expand All @@ -113,19 +113,14 @@ case class ExplainCommand(
extends LeafNode with Command {

// Run through the optimizer to generate the physical plan.
override protected[sql] lazy val sideEffectResult: Seq[String] = try {
override protected[sql] lazy val sideEffectResult: Seq[Row] = try {
// TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties.
val queryExecution = context.executePlan(logicalPlan)
val outputString = if (extended) queryExecution.toString else queryExecution.simpleString

outputString.split("\n")
outputString.split("\n").map(Row(_))
} catch { case cause: TreeNodeException[_] =>
("Error occurred during query planning: \n" + cause.getMessage).split("\n")
}

def execute(): RDD[Row] = {
val explanation = sideEffectResult.map(row => new GenericRow(Array[Any](row)))
context.sparkContext.parallelize(explanation, 1)
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
}

override def otherCopyArgs = context :: Nil
Expand All @@ -144,12 +139,7 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context:
} else {
context.uncacheTable(tableName)
}
Seq.empty[Any]
}

override def execute(): RDD[Row] = {
sideEffectResult
context.emptyResult
Seq.empty[Row]
}

override def output: Seq[Attribute] = Seq.empty
Expand All @@ -163,15 +153,8 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
Seq(("# Registered as a temporary table", null, null)) ++
child.output.map(field => (field.name, field.dataType.toString, null))
}

override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
}
context.sparkContext.parallelize(rows, 1)
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
Row("# Registered as a temporary table", null, null) +:
child.output.map(field => Row(field.name, field.dataType.toString, null))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}.mkString("{", ",", "}")
case (seq: Seq[_], ArrayType(typ, _)) =>
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
case (map: Map[_,_], MapType(kType, vType, _)) =>
case (map: Map[_, _], MapType(kType, vType, _)) =>
map.map {
case (key, value) =>
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
Expand All @@ -409,7 +409,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
// be similar with Hive.
describeHiveTableCommand.hiveString
case command: PhysicalCommand =>
command.sideEffectResult.map(_.toString)
command.sideEffectResult.map(_.head.toString)

case other =>
val result: Seq[Seq[Any]] = toRdd.collect().toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
package org.apache.spark.sql.hive

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.catalyst.types.StringType
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan}
import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.{SQLContext, SchemaRDD}

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -196,9 +198,9 @@ private[hive] trait HiveStrategies {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil

case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil

case AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil

case describe: logical.DescribeCommand =>
val resolvedTable = context.executePlan(describe.table).analyzed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.execution.{Command, LeafNode}
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}

Expand All @@ -41,26 +41,21 @@ case class DescribeHiveTableCommand(
extends LeafNode with Command {

// Strings with the format like Hive. It is used for result comparison in our unit tests.
lazy val hiveString: Seq[String] = {
val alignment = 20
val delim = "\t"

sideEffectResult.map {
case (name, dataType, comment) =>
String.format("%-" + alignment + "s", name) + delim +
String.format("%-" + alignment + "s", dataType) + delim +
String.format("%-" + alignment + "s", Option(comment).getOrElse("None"))
}
lazy val hiveString: Seq[String] = sideEffectResult.map {
case Row(name: String, dataType: String, comment) =>
Seq(name, dataType, Option(comment.asInstanceOf[String]).getOrElse("None"))
.map(s => String.format(s"%-20s", s))
.mkString("\t")
}

override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
// Trying to mimic the format of Hive's output. But not exactly the same.
var results: Seq[(String, String, String)] = Nil

val columns: Seq[FieldSchema] = table.hiveQlTable.getCols
val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols
results ++= columns.map(field => (field.getName, field.getType, field.getComment))
if (!partitionColumns.isEmpty) {
if (partitionColumns.nonEmpty) {
val partColumnInfo =
partitionColumns.map(field => (field.getName, field.getType, field.getComment))
results ++=
Expand All @@ -74,14 +69,9 @@ case class DescribeHiveTableCommand(
results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
}

results
}

override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
results.map { case (name, dataType, comment) =>
Row(name, dataType, comment)
}
context.sparkContext.parallelize(rows, 1)
}

override def otherCopyArgs = context :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,7 @@ case class NativeCommand(
@transient context: HiveContext)
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql)

override def execute(): RDD[Row] = {
if (sideEffectResult.size == 0) {
context.emptyResult
} else {
val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r)))
context.sparkContext.parallelize(rows, 1)
}
}
override protected[sql] lazy val sideEffectResult: Seq[Row] = context.runSqlHive(sql).map(Row(_))

override def otherCopyArgs = context :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,13 @@ import org.apache.spark.sql.hive.HiveContext
*/
@DeveloperApi
case class AnalyzeTable(tableName: String) extends LeafNode with Command {

def hiveContext = sqlContext.asInstanceOf[HiveContext]

def output = Seq.empty

override protected[sql] lazy val sideEffectResult = {
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
hiveContext.analyze(tableName)
Seq.empty[Any]
}

override def execute(): RDD[Row] = {
sideEffectResult
sparkContext.emptyRDD[Row]
Seq.empty[Row]
}
}

Expand All @@ -55,20 +49,14 @@ case class AnalyzeTable(tableName: String) extends LeafNode with Command {
*/
@DeveloperApi
case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Command {

def hiveContext = sqlContext.asInstanceOf[HiveContext]

def output = Seq.empty

override protected[sql] lazy val sideEffectResult: Seq[Any] = {
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.catalog.unregisterTable(None, tableName)
Seq.empty
}

override def execute(): RDD[Row] = {
sideEffectResult
sparkContext.emptyRDD[Row]
Seq.empty[Row]
}
}