diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 73a21884a471..690e1239fe7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -42,7 +42,7 @@ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends L * * @tparam PhysicalPlan The type of physical plan produced by this [[QueryPlanner]] */ -abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { +trait QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { /** A list of execution strategies that can be used by the planner */ def strategies: Seq[GenericStrategy[PhysicalPlan]] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 3ec1c4a2f102..e85ebb4ddc2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -118,7 +118,7 @@ private[sql] object DataFrame { @Experimental class DataFrame private[sql]( @transient val sqlContext: SQLContext, - @DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) + @DeveloperApi @transient val queryExecution: QueryExecution) extends RDDApi[Row] with Serializable { /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1ea596dddff0..59f37c1cda36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -166,9 +166,9 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false) - protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql)) + protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql)) - protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan) + protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(this, plan) @transient protected[sql] val tlSession = new ThreadLocal[SQLSession]() { @@ -784,90 +784,12 @@ class SQLContext(@transient val sparkContext: SparkContext) }.toArray } - protected[sql] class SparkPlanner extends SparkStrategies { - val sparkContext: SparkContext = self.sparkContext - - val sqlContext: SQLContext = self - - def codegenEnabled: Boolean = self.conf.codegenEnabled - - def unsafeEnabled: Boolean = self.conf.unsafeEnabled - - def numPartitions: Int = self.conf.numShufflePartitions - - def strategies: Seq[Strategy] = - experimental.extraStrategies ++ ( - DataSourceStrategy :: - DDLStrategy :: - TakeOrdered :: - HashAggregation :: - LeftSemiJoin :: - HashJoin :: - InMemoryScans :: - ParquetOperations :: - BasicOperators :: - CartesianProduct :: - BroadcastNestedLoopJoin :: Nil) - - /** - * Used to build table scan operators where complex projection and filtering are done using - * separate physical operators. This function returns the given scan operator with Project and - * Filter nodes added only when needed. For example, a Project operator is only used when the - * final desired output requires complex expressions to be evaluated or when columns can be - * further eliminated out after filtering has been done. - * - * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized - * away by the filter pushdown optimization. - * - * The required attributes for both filtering and expression evaluation are passed to the - * provided `scanBuilder` function so that it can avoid unnecessary column materialization. - */ - def pruneFilterProject( - projectList: Seq[NamedExpression], - filterPredicates: Seq[Expression], - prunePushedDownFilters: Seq[Expression] => Seq[Expression], - scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = { - - val projectSet = AttributeSet(projectList.flatMap(_.references)) - val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) - val filterCondition = - prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And) - - // Right now we still use a projection even if the only evaluation is applying an alias - // to a column. Since this is a no-op, it could be avoided. However, using this - // optimization with the current implementation would change the output schema. - // TODO: Decouple final output schema from expression evaluation so this copy can be - // avoided safely. - - if (AttributeSet(projectList.map(_.toAttribute)) == projectSet && - filterSet.subsetOf(projectSet)) { - // When it is possible to just use column pruning to get the right projection and - // when the columns of this projection are enough to evaluate all filter conditions, - // just do a scan followed by a filter, with no extra project. - val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]]) - filterCondition.map(Filter(_, scan)).getOrElse(scan) - } else { - val scan = scanBuilder((projectSet ++ filterSet).toSeq) - Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan)) - } - } - } - @transient - protected[sql] val planner = new SparkPlanner + protected[sql] val planner = new SparkPlanner(this) @transient protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1) - /** - * Prepares a planned SparkPlan for execution by inserting shuffle operations as needed. - */ - @transient - protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { - val batches = - Batch("Add exchange", Once, EnsureRequirements(self)) :: Nil - } - protected[sql] def openSession(): SQLSession = { detachSession() val session = createSession() @@ -893,64 +815,6 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] lazy val conf: SQLConf = new SQLConf } - /** - * :: DeveloperApi :: - * The primary workflow for executing relational queries using Spark. Designed to allow easy - * access to the intermediate phases of query execution for developers. - */ - @DeveloperApi - protected[sql] class QueryExecution(val logical: LogicalPlan) { - def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed) - - lazy val analyzed: LogicalPlan = analyzer.execute(logical) - lazy val withCachedData: LogicalPlan = { - assertAnalyzed() - cacheManager.useCachedData(analyzed) - } - lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData) - - // TODO: Don't just pick the first one... - lazy val sparkPlan: SparkPlan = { - SparkPlan.currentContext.set(self) - planner.plan(optimizedPlan).next() - } - // executedPlan should not be used to initialize any SparkPlan. It should be - // only used for execution. - lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) - - /** Internal version of the RDD. Avoids copies and has no schema */ - lazy val toRdd: RDD[Row] = executedPlan.execute() - - protected def stringOrError[A](f: => A): String = - try f.toString catch { case e: Throwable => e.toString } - - def simpleString: String = - s"""== Physical Plan == - |${stringOrError(executedPlan)} - """.stripMargin.trim - - override def toString: String = { - def output = - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") - - // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)}) - // however, the `toRdd` will cause the real execution, which is not what we want. - // We need to think about how to avoid the side effect. - s"""== Parsed Logical Plan == - |${stringOrError(logical)} - |== Analyzed Logical Plan == - |${stringOrError(output)} - |${stringOrError(analyzed)} - |== Optimized Logical Plan == - |${stringOrError(optimizedPlan)} - |== Physical Plan == - |${stringOrError(executedPlan)} - |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} - |== RDD == - """.stripMargin.trim - } - } - /** * Parses the data type in our internal string representation. The data type string should * have the same format as the one generated by `toString` in scala. @@ -1321,3 +1185,95 @@ object SQLContext { } } } + +/** + * :: DeveloperApi :: + * The primary workflow for executing relational queries using Spark. Designed to allow easy + * access to the intermediate phases of query execution for developers. + */ +@DeveloperApi +protected[sql] class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { + def assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed) + + lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical) + lazy val withCachedData: LogicalPlan = { + assertAnalyzed() + sqlContext.cacheManager.useCachedData(analyzed) + } + lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData) + + // TODO: Don't just pick the first one... + lazy val sparkPlan: SparkPlan = { + SparkPlan.currentContext.set(sqlContext) + sqlContext.planner.plan(optimizedPlan).next() + } + // executedPlan should not be used to initialize any SparkPlan. It should be + // only used for execution. + lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) + + /** + * Prepares a planned SparkPlan for execution by inserting shuffle operations as needed. + */ + @transient + protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { + val batches = + Batch("Add exchange", Once, EnsureRequirements(sqlContext)) :: Nil + } + + /** Internal version of the RDD. Avoids copies and has no schema */ + lazy val toRdd: RDD[Row] = executedPlan.execute() + + protected def stringOrError[A](f: => A): String = + try f.toString catch { case e: Throwable => e.toString } + + def simpleString: String = + s"""== Physical Plan == + |${stringOrError(executedPlan)} + """.stripMargin.trim + + override def toString: String = { + def output = + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") + + // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)}) + // however, the `toRdd` will cause the real execution, which is not what we want. + // We need to think about how to avoid the side effect. + s"""== Parsed Logical Plan == + |${stringOrError(logical)} + |== Analyzed Logical Plan == + |${stringOrError(output)} + |${stringOrError(analyzed)} + |== Optimized Logical Plan == + |${stringOrError(optimizedPlan)} + |== Physical Plan == + |${stringOrError(executedPlan)} + |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} + |== RDD == + """.stripMargin.trim + } +} + + +protected[sql] class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies { + val sparkContext: SparkContext = sqlContext.sparkContext + + def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled + + def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled + + def numPartitions: Int = sqlContext.conf.numShufflePartitions + + def strategies: Seq[Strategy] = + sqlContext.experimental.extraStrategies ++ ( + DataSourceStrategy :: + DDLStrategy :: + TakeOrdered :: + HashAggregation :: + LeftSemiJoin :: + HashJoin :: + InMemoryScans :: + ParquetOperations :: + BasicOperators :: + CartesianProduct :: + BroadcastNestedLoopJoin :: Nil) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 3f6a0345bc17..d8b3bde60d33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.SparkContext import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ @@ -24,13 +25,63 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand} + import org.apache.spark.sql.parquet._ import org.apache.spark.sql.sources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{SQLContext, Strategy, execution} +import org.apache.spark.sql.{catalyst, SQLContext, Strategy, execution} private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { - self: SQLContext#SparkPlanner => + def sqlContext: SQLContext + def sparkContext: SparkContext + def codegenEnabled: Boolean + def unsafeEnabled: Boolean + def numPartitions: Int + + lazy val planner = sqlContext.planner + + /** + * Used to build table scan operators where complex projection and filtering are done using + * separate physical operators. This function returns the given scan operator with Project and + * Filter nodes added only when needed. For example, a Project operator is only used when the + * final desired output requires complex expressions to be evaluated or when columns can be + * further eliminated out after filtering has been done. + * + * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized + * away by the filter pushdown optimization. + * + * The required attributes for both filtering and expression evaluation are passed to the + * provided `scanBuilder` function so that it can avoid unnecessary column materialization. + */ + def pruneFilterProject( + projectList: Seq[NamedExpression], + filterPredicates: Seq[Expression], + prunePushedDownFilters: Seq[Expression] => Seq[Expression], + scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = { + + val projectSet = AttributeSet(projectList.flatMap(_.references)) + val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) + val filterCondition = + prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And) + + // Right now we still use a projection even if the only evaluation is applying an alias + // to a column. Since this is a no-op, it could be avoided. However, using this + // optimization with the current implementation would change the output schema. + // TODO: Decouple final output schema from expression evaluation so this copy can be + // avoided safely. + + if (AttributeSet(projectList.map(_.toAttribute)) == projectSet && + filterSet.subsetOf(projectSet)) { + // When it is possible to just use column pruning to get the right projection and + // when the columns of this projection are enough to evaluate all filter conditions, + // just do a scan followed by a filter, with no extra project. + val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]]) + filterCondition.map(Filter(_, scan)).getOrElse(scan) + } else { + val scan = scanBuilder((projectSet ++ filterSet).toSeq) + Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan)) + } + } object LeftSemiJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { @@ -131,7 +182,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if canBeCodeGened( allAggregates(partialComputation) ++ allAggregates(rewrittenAggregateExpressions)) && - codegenEnabled => + planner.codegenEnabled => execution.GeneratedAggregate( partial = false, namedGroupingAttributes, @@ -250,7 +301,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } else { identity[Seq[Expression]] _ } - pruneFilterProject( + planner.pruneFilterProject( projectList, filters, prunePushedDownFilters, @@ -266,7 +317,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object InMemoryScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projectList, filters, mem: InMemoryRelation) => - pruneFilterProject( + planner.pruneFilterProject( projectList, filters, identity[Seq[Expression]], // All filters still need to be evaluated. @@ -277,7 +328,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // Can we automate these 'pass through' operations? object BasicOperators extends Strategy { - def numPartitions: Int = self.numPartitions + def numPartitions: Int = planner.numPartitions def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommand(r) :: Nil @@ -355,7 +406,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") case describe @ LogicalDescribeCommand(table, isExtended) => - val resultPlan = self.sqlContext.executePlan(table).executedPlan + val resultPlan = sqlContext.executePlan(table).executedPlan ExecutedCommand( RunnableDescribeCommand(resultPlan, describe.output, isExtended)) :: Nil diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala index 48ac9062af96..4d3bb31d682c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse import org.apache.spark.Logging -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} +import org.apache.spark.sql.hive.{HiveContext, HiveQueryExecution, HiveMetastoreTypes} private[hive] abstract class AbstractSparkSQLDriver( val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver with Logging { @@ -36,7 +36,7 @@ private[hive] abstract class AbstractSparkSQLDriver( override def init(): Unit = { } - private def getResultSetSchema(query: context.QueryExecution): Schema = { + private def getResultSetSchema(query: HiveQueryExecution): Schema = { val analyzed = query.analyzed logDebug(s"Result Schema: ${analyzed.output}") if (analyzed.output.size == 0) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 863a5db1bf98..d7e265a5f8cc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -246,8 +246,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { super.parseSql(substitutor.substitute(hiveconf, sql)) } - override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = - new this.QueryExecution(plan) + override protected[sql] def executePlan(plan: LogicalPlan): HiveQueryExecution = + new HiveQueryExecution(this, plan) /** * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, @@ -418,8 +418,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } @transient - private val hivePlanner = new SparkPlanner with HiveStrategies { - val hiveContext = self + private val hivePlanner = new SparkPlanner(this) with HiveStrategies { + val hiveContext = sqlContext.asInstanceOf[HiveContext] override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq( DataSourceStrategy, @@ -456,43 +456,45 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @transient override protected[sql] val planner = hivePlanner - /** Extends QueryExecution with hive specific features. */ - protected[sql] class QueryExecution(logicalPlan: LogicalPlan) - extends super.QueryExecution(logicalPlan) { +} - /** - * Returns the result as a hive compatible sequence of strings. For native commands, the - * execution is simply passed back to Hive. - */ - def stringResult(): Seq[String] = executedPlan match { - case ExecutedCommand(desc: DescribeHiveTableCommand) => - // If it is a describe command for a Hive table, we want to have the output format - // be similar with Hive. - desc.run(self).map { - case Row(name: String, dataType: String, comment) => - Seq(name, dataType, - Option(comment.asInstanceOf[String]).getOrElse("")) - .map(s => String.format(s"%-20s", s)) - .mkString("\t") - } - case command: ExecutedCommand => - command.executeCollect().map(_(0).toString) - - case other => - val result: Seq[Seq[Any]] = other.executeCollect().map(_.toSeq).toSeq - // We need the types so we can output struct field names - val types = analyzed.output.map(_.dataType) - // Reformat to match hive tab delimited output. - result.map(_.zip(types).map(HiveContext.toHiveString)).map(_.mkString("\t")).toSeq - } - override def simpleString: String = - logical match { - case _: HiveNativeCommand => "" - case _: SetCommand => "" - case _ => super.simpleString +/** Extends QueryExecution with hive specific features. */ +protected[sql] class HiveQueryExecution(hiveContext: HiveContext, logicalPlan: LogicalPlan) + extends QueryExecution(hiveContext, logicalPlan) { + + /** + * Returns the result as a hive compatible sequence of strings. For native commands, the + * execution is simply passed back to Hive. + */ + def stringResult(): Seq[String] = executedPlan match { + case ExecutedCommand(desc: DescribeHiveTableCommand) => + // If it is a describe command for a Hive table, we want to have the output format + // be similar with Hive. + desc.run(sqlContext).map { + case Row(name: String, dataType: String, comment) => + Seq(name, dataType, + Option(comment.asInstanceOf[String]).getOrElse("")) + .map(s => String.format(s"%-20s", s)) + .mkString("\t") } + case command: ExecutedCommand => + command.executeCollect().map(_(0).toString) + + case other => + val result: Seq[Seq[Any]] = other.executeCollect().map(_.toSeq).toSeq + // We need the types so we can output struct field names + val types = analyzed.output.map(_.dataType) + // Reformat to match hive tab delimited output. + result.map(_.zip(types).map(HiveContext.toHiveString)).map(_.mkString("\t")).toSeq } + + override def simpleString: String = + logical match { + case _: HiveNativeCommand => "" + case _: SetCommand => "" + case _ => super.simpleString + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index c6b65106452b..d4c724c6d743 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -35,11 +35,8 @@ import org.apache.spark.sql.sources.{CreateTableUsing, CreateTableUsingAsSelect, import org.apache.spark.sql.types.StringType -private[hive] trait HiveStrategies { - // Possibly being too clever with types here... or not clever enough. - self: SQLContext#SparkPlanner => - - val hiveContext: HiveContext +private[hive] trait HiveStrategies extends SparkStrategies { + def hiveContext: HiveContext /** * :: Experimental :: @@ -137,7 +134,7 @@ private[hive] trait HiveStrategies { val partitionLocations = partitions.map(_.getLocation) if (partitionLocations.isEmpty) { - PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil + PhysicalRDD(plan.output, hiveContext.sparkContext.emptyRDD[Row]) :: Nil } else { hiveContext .read.parquet(partitionLocations: _*) @@ -165,7 +162,7 @@ private[hive] trait HiveStrategies { // TODO: Remove this hack for Spark 1.3. case iae: java.lang.IllegalArgumentException if iae.getMessage.contains("Can not create a Path from an empty string") => - PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil + PhysicalRDD(plan.output, hiveContext.sparkContext.emptyRDD[Row]) :: Nil } case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 964828407481..07521ccf4017 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -101,7 +101,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { super.runSqlHive(rewritePaths(substitutor.substitute(this.hiveconf, sql))) override def executePlan(plan: LogicalPlan): this.QueryExecution = - new this.QueryExecution(plan) + new this.QueryExecution(this, plan) override protected[sql] def createSession(): SQLSession = { new this.SQLSession() @@ -165,9 +165,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { /** * Override QueryExecution with special debug workflow. */ - class QueryExecution(logicalPlan: LogicalPlan) - extends super.QueryExecution(logicalPlan) { - def this(sql: String) = this(parseSql(sql)) + class QueryExecution(hiveContext: HiveContext, logicalPlan: LogicalPlan) + extends HiveQueryExecution(hiveContext, logicalPlan) { + def this(sql: String) = this(self, parseSql(sql)) override lazy val analyzed = { val describedTables = logical match { case HiveNativeCommand(describedTable(tbl)) => tbl :: Nil