Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,9 @@ case class SetCommand(
throw new IllegalArgumentException()
}

def execute(): RDD[Row] = {
val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) }
context.sparkContext.parallelize(rows, 1)
}
def execute(): RDD[Row] = context.sparkContext.parallelize(executeCollect(), 1)

override def executeCollect(): Array[Row] = sideEffectResult.map(Row(_)).toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we can't just define these in the super class command?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Refactored a bit, now Command.sideEffectResult return Seq[Row] and Command.executeCollect() simply returns sideEffectResult.toArray.


override def otherCopyArgs = context :: Nil
}
Expand Down Expand Up @@ -123,10 +122,9 @@ case class ExplainCommand(
("Error occurred during query planning: \n" + cause.getMessage).split("\n")
}

def execute(): RDD[Row] = {
val explanation = sideEffectResult.map(row => new GenericRow(Array[Any](row)))
context.sparkContext.parallelize(explanation, 1)
}
def execute(): RDD[Row] = context.sparkContext.parallelize(executeCollect(), 1)

override def executeCollect(): Array[Row] = sideEffectResult.map(Row(_)).toArray

override def otherCopyArgs = context :: Nil
}
Expand All @@ -147,6 +145,8 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context:
Seq.empty[Any]
}

override def executeCollect(): Array[Row] = Array.empty[Row]

override def execute(): RDD[Row] = {
sideEffectResult
context.emptyResult
Expand All @@ -168,10 +168,9 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
child.output.map(field => (field.name, field.dataType.toString, null))
}

override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
}
context.sparkContext.parallelize(rows, 1)
}
override def executeCollect(): Array[Row] = sideEffectResult.map {
case (name, dataType, comment) => Row(name, dataType, comment)
}.toArray

override def execute(): RDD[Row] = context.sparkContext.parallelize(executeCollect(), 1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
package org.apache.spark.sql.hive

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.catalyst.types.StringType
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan}
import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.{SQLContext, SchemaRDD}

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -196,9 +198,9 @@ private[hive] trait HiveStrategies {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil

case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil

case AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil

case describe: logical.DescribeCommand =>
val resolvedTable = context.executePlan(describe.table).analyzed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.execution.{Command, LeafNode}
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}

Expand All @@ -41,16 +41,11 @@ case class DescribeHiveTableCommand(
extends LeafNode with Command {

// Strings with the format like Hive. It is used for result comparison in our unit tests.
lazy val hiveString: Seq[String] = {
val alignment = 20
val delim = "\t"

sideEffectResult.map {
case (name, dataType, comment) =>
String.format("%-" + alignment + "s", name) + delim +
String.format("%-" + alignment + "s", dataType) + delim +
String.format("%-" + alignment + "s", Option(comment).getOrElse("None"))
}
lazy val hiveString: Seq[String] = sideEffectResult.map {
case (name, dataType, comment) =>
Seq(name, dataType, Option(comment).getOrElse("None"))
.map(String.format(s"%-20s", _))
.mkString("\t")
}

override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
Expand All @@ -60,7 +55,7 @@ case class DescribeHiveTableCommand(
val columns: Seq[FieldSchema] = table.hiveQlTable.getCols
val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols
results ++= columns.map(field => (field.getName, field.getType, field.getComment))
if (!partitionColumns.isEmpty) {
if (partitionColumns.nonEmpty) {
val partColumnInfo =
partitionColumns.map(field => (field.getName, field.getType, field.getComment))
results ++=
Expand All @@ -77,12 +72,11 @@ case class DescribeHiveTableCommand(
results
}

override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
}
context.sparkContext.parallelize(rows, 1)
}
override def executeCollect(): Array[Row] = sideEffectResult.map {
case (name, dataType, comment) => Row(name, dataType, comment)
}.toArray

override def execute(): RDD[Row] = context.sparkContext.parallelize(executeCollect(), 1)

override def otherCopyArgs = context :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,9 @@ case class NativeCommand(

override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql)

override def execute(): RDD[Row] = {
if (sideEffectResult.size == 0) {
context.emptyResult
} else {
val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r)))
context.sparkContext.parallelize(rows, 1)
}
}
override def execute(): RDD[Row] = context.sparkContext.parallelize(executeCollect(), 1)

override def executeCollect(): Array[Row] = sideEffectResult.map(Row(_)).toArray

override def otherCopyArgs = context :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.hive.HiveContext
*/
@DeveloperApi
case class AnalyzeTable(tableName: String) extends LeafNode with Command {

def hiveContext = sqlContext.asInstanceOf[HiveContext]

def output = Seq.empty
Expand All @@ -43,6 +42,8 @@ case class AnalyzeTable(tableName: String) extends LeafNode with Command {
Seq.empty[Any]
}

override def executeCollect(): Array[Row] = Array.empty[Row]

override def execute(): RDD[Row] = {
sideEffectResult
sparkContext.emptyRDD[Row]
Expand All @@ -67,6 +68,8 @@ case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with
Seq.empty
}

override def executeCollect(): Array[Row] = Array.empty[Row]

override def execute(): RDD[Row] = {
sideEffectResult
sparkContext.emptyRDD[Row]
Expand Down