Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.DataStreamWriter
Expand Down Expand Up @@ -175,19 +175,14 @@ class Dataset[T] private[sql](
}

@transient private[sql] val logicalPlan: LogicalPlan = {
def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
case _: Command |
_: InsertIntoTable => true
case _ => false
}

// For various commands (like DDL) and queries with side effects, we force query execution
// to happen right away to let these side effects take place eagerly.
queryExecution.analyzed match {
// For various commands (like DDL) and queries with side effects, we force query execution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually let me remove it while merging

// to happen right away to let these side effects take place eagerly.
case p if hasSideEffects(p) =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
case Union(children) if children.forall(hasSideEffects) =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
case c: Command =>
LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
case _ =>
queryExecution.analyzed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
// SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp.
case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] =>
command.executeCollect().map(_.getString(1))
case command: ExecutedCommandExec =>
command.executeCollect().map(_.getString(0))
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, Strategy}
import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution.streaming._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ SET spark.sql.caseSensitive=false
-- !query 19 schema
struct<key:string,value:string>
-- !query 19 output
spark.sql.caseSensitive
spark.sql.caseSensitive false


-- !query 20
Expand All @@ -212,7 +212,7 @@ SET spark.sql.caseSensitive=true
-- !query 21 schema
struct<key:string,value:string>
-- !query 21 output
spark.sql.caseSensitive
spark.sql.caseSensitive true


-- !query 22
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ set spark.sql.groupByOrdinal=false
-- !query 17 schema
struct<key:string,value:string>
-- !query 17 output
spark.sql.groupByOrdinal
spark.sql.groupByOrdinal false


-- !query 18
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ set spark.sql.orderByOrdinal=false
-- !query 9 schema
struct<key:string,value:string>
-- !query 9 output
spark.sql.orderByOrdinal
spark.sql.orderByOrdinal false


-- !query 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ set spark.sql.crossJoin.enabled = true
-- !query 5 schema
struct<key:string,value:string>
-- !query 5 output
spark.sql.crossJoin.enabled
spark.sql.crossJoin.enabled true


-- !query 6
Expand All @@ -85,4 +85,4 @@ set spark.sql.crossJoin.enabled = false
-- !query 7 schema
struct<key:string,value:string>
-- !query 7 output
spark.sql.crossJoin.enabled
spark.sql.crossJoin.enabled false
25 changes: 25 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package org.apache.spark.sql
import java.io.File
import java.math.MathContext
import java.sql.Timestamp
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
Expand Down Expand Up @@ -2564,4 +2566,27 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(sql(badQuery), Row(1) :: Nil)
}

test("SPARK-19650: An action on a Command should not trigger a Spark job") {
// Create a listener that checks if new jobs have started.
val jobStarted = new AtomicBoolean(false)
val listener = new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
jobStarted.set(true)
}
}

// Make sure no spurious job starts are pending in the listener bus.
sparkContext.listenerBus.waitUntilEmpty(500)
sparkContext.addSparkListener(listener)
try {
// Execute the command.
sql("show databases").head()

// Make sure we have seen all events triggered by DataFrame.show()
sparkContext.listenerBus.waitUntilEmpty(500)
} finally {
sparkContext.removeSparkListener(listener)
}
assert(!jobStarted.get(), "Command should not trigger a Spark job.")
}
}