Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand}
import org.apache.spark.sql.sources.BaseRelation
Expand Down Expand Up @@ -607,7 +608,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
try {
val start = System.nanoTime()
// call `QueryExecution.toRDD` to trigger the execution of commands.
qe.toRdd
SQLExecution.withNewExecutionId(session, qe)(qe.toRdd)
val end = System.nanoTime()
session.listenerManager.onSuccess(name, qe, end - start)
} catch {
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ class Dataset[T] private[sql](
// to happen right away to let these side effects take place eagerly.
queryExecution.analyzed match {
case c: Command =>
LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
Copy link
Contributor

@cloud-fan cloud-fan Apr 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about LocalRelation(c.output, withAction("collect", queryExecution)(_. executeCollect()))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually do we need to do this? most Commands are just local operations(talking with metastore).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the check I added to ensure we get the same results in the SQL tab has several hundred failures that go through this. Looks like the path is almost always spark.sql when the SQL statement is a command like CTAS.

I like your version and will update.

LocalRelation(c.output, withAction("collect", queryExecution)(_.executeCollect()))
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
LocalRelation(u.output, withAction("collect", queryExecution)(_.executeCollect()))
case _ =>
queryExecution.analyzed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,24 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
case ExecutedCommandExec(desc: DescribeTableCommand) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
desc.run(sparkSession).map {
SQLExecution.withNewExecutionId(sparkSession, this) {
desc.run(sparkSession)
}.map {
case Row(name: String, dataType: String, comment) =>
Seq(name, dataType,
Option(comment.asInstanceOf[String]).getOrElse(""))
.map(s => String.format(s"%-20s", s))
.mkString("\t")
.map(s => String.format(s"%-20s", s))
.mkString("\t")
}
// SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
command.executeCollect().map(_.getString(1))
SQLExecution.withNewExecutionId(sparkSession, this) {
command.executeCollect()
}.map(_.getString(1))
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
val result: Seq[Seq[Any]] = SQLExecution.withNewExecutionId(sparkSession, this) {
other.executeCollectPublic()
}.map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = analyzed.output.map(_.dataType)
// Reformat to match hive tab delimited output.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd,
SparkListenerSQLExecutionStart}
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}

object SQLExecution {
object SQLExecution extends Logging {

val EXECUTION_ID_KEY = "spark.sql.execution.id"

Expand All @@ -39,6 +39,32 @@ object SQLExecution {
executionIdToQueryExecution.get(executionId)
}

private val testing = sys.props.contains("spark.testing")

private[sql] def checkSQLExecutionId(sparkSession: SparkSession): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only called in FileFormatWirter, is there any other places we need to consider?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep this PR from growing too big, I want to just use it where I've removed withNewExecutionId to check for regressions. I'll follow up with another PR with more checks.

// only throw an exception during tests. a missing execution ID should not fail a job.
if (testing && sparkSession.sparkContext.getLocalProperty(EXECUTION_ID_KEY) == null) {
// Attention testers: when a test fails with this exception, it means that the action that
// started execution of a query didn't call withNewExecutionId. The execution ID should be
// set by calling withNewExecutionId in the action that begins execution, like
// Dataset.collect or DataFrameWriter.insertInto.
throw new IllegalStateException("Execution ID should be set")
}
}

private val ALLOW_NESTED_EXECUTION = "spark.sql.execution.nested"

private[sql] def nested[T](sparkSession: SparkSession)(body: => T): T = {
val sc = sparkSession.sparkContext
val allowNestedPreviousValue = sc.getLocalProperty(SQLExecution.ALLOW_NESTED_EXECUTION)
try {
sc.setLocalProperty(SQLExecution.ALLOW_NESTED_EXECUTION, "true")
body
} finally {
sc.setLocalProperty(SQLExecution.ALLOW_NESTED_EXECUTION, allowNestedPreviousValue)
}
}

/**
* Wrap an action that will execute "queryExecution" to track all Spark jobs in the body so that
* we can connect them with an execution.
Expand Down Expand Up @@ -73,21 +99,35 @@ object SQLExecution {
}
r
} else {
// Don't support nested `withNewExecutionId`. This is an example of the nested
// `withNewExecutionId`:
// Nesting `withNewExecutionId` may be incorrect; log a warning.
//
// This is an example of the nested `withNewExecutionId`:
//
// class DataFrame {
// // Note: `collect` will call withNewExecutionId
// def foo: T = withNewExecutionId { something.createNewDataFrame().collect() }
// }
//
// Note: `collect` will call withNewExecutionId
// In this case, only the "executedPlan" for "collect" will be executed. The "executedPlan"
// for the outer DataFrame won't be executed. So it's meaningless to create a new Execution
// for the outer DataFrame. Even if we track it, since its "executedPlan" doesn't run,
// for the outer Dataset won't be executed. So it's meaningless to create a new Execution
// for the outer Dataset. Even if we track it, since its "executedPlan" doesn't run,
// all accumulator metrics will be 0. It will confuse people if we show them in Web UI.
//
// A real case is the `DataFrame.count` method.
throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already set")
// Some operations will start nested executions. For example, CacheTableCommand will uses
// Dataset#count to materialize cached records when caching is not lazy. Because there are
// legitimate reasons to nest executions in withNewExecutionId, this logs a warning but does
// not throw an exception to avoid failing at runtime. Exceptions will be thrown for tests
// to ensure that nesting is avoided.
//
// To avoid this warning, use nested { ... }
if (!Option(sc.getLocalProperty(ALLOW_NESTED_EXECUTION)).exists(_.toBoolean)) {
if (testing) {
logWarning(s"$EXECUTION_ID_KEY is already set")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the comment, we should throw the exception here and log warning at runtime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed.

} else {
throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already set")
}
}
body
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SQLExecution

case class CacheTableCommand(
tableIdent: TableIdentifier,
Expand All @@ -36,13 +37,17 @@ case class CacheTableCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
plan.foreach { logicalPlan =>
Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString)
SQLExecution.nested(sparkSession) {
Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString)
}
}
sparkSession.catalog.cacheTable(tableIdent.quotedString)

if (!isLazy) {
// Performs eager caching
sparkSession.table(tableIdent).count()
SQLExecution.nested(sparkSession) {
sparkSession.table(tableIdent).count()
}
}

Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,50 +161,51 @@ object FileFormatWriter extends Logging {
}
}

SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)

try {
val rdd = if (orderingMatched) {
queryExecution.toRdd
} else {
SortExec(
requiredOrdering.map(SortOrder(_, Ascending)),
global = false,
child = queryExecution.executedPlan).execute()
}
val ret = new Array[WriteTaskResult](rdd.partitions.length)
sparkSession.sparkContext.runJob(
rdd,
(taskContext: TaskContext, iter: Iterator[InternalRow]) => {
executeTask(
description = description,
sparkStageId = taskContext.stageId(),
sparkPartitionId = taskContext.partitionId(),
sparkAttemptNumber = taskContext.attemptNumber(),
committer,
iterator = iter)
},
0 until rdd.partitions.length,
(index, res: WriteTaskResult) => {
committer.onTaskCommit(res.commitMsg)
ret(index) = res
})

val commitMsgs = ret.map(_.commitMsg)
val updatedPartitions = ret.flatMap(_.updatedPartitions)
.distinct.map(PartitioningUtils.parsePathFragment)

committer.commitJob(job, commitMsgs)
logInfo(s"Job ${job.getJobID} committed.")
refreshFunction(updatedPartitions)
} catch { case cause: Throwable =>
logError(s"Aborting job ${job.getJobID}.", cause)
committer.abortJob(job)
throw new SparkException("Job aborted.", cause)
// During tests, make sure there is an execution ID.
SQLExecution.checkSQLExecutionId(sparkSession)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major issue is this change. For all queries using FileFormatWriter, we won't get any metrics because of

queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
. It creates a new QueryExecution and we don't track it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make SQL metrics work, we should always wrap the correct QueryExecution with SparkListenerSQLExecutionStart.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing, that and similar cases are what I was talking about earlier when I said there are two physical plans. The inner Dataset.ofRows ends up creating a completely separate plan.

Are you saying that adding SparkListenerSQLExecutionStart (and also end) events will fix the metrics problem? I think it would at least require the metrics work-around I added to SQLListener, since metrics are filtered out if they aren't reported by the physical plan.


// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)

try {
val rdd = if (orderingMatched) {
queryExecution.toRdd
} else {
SortExec(
requiredOrdering.map(SortOrder(_, Ascending)),
global = false,
child = queryExecution.executedPlan).execute()
}
val ret = new Array[WriteTaskResult](rdd.partitions.length)
sparkSession.sparkContext.runJob(
rdd,
(taskContext: TaskContext, iter: Iterator[InternalRow]) => {
executeTask(
description = description,
sparkStageId = taskContext.stageId(),
sparkPartitionId = taskContext.partitionId(),
sparkAttemptNumber = taskContext.attemptNumber(),
committer,
iterator = iter)
},
0 until rdd.partitions.length,
(index, res: WriteTaskResult) => {
committer.onTaskCommit(res.commitMsg)
ret(index) = res
})

val commitMsgs = ret.map(_.commitMsg)
val updatedPartitions = ret.flatMap(_.updatedPartitions)
.distinct.map(PartitioningUtils.parsePathFragment)

committer.commitJob(job, commitMsgs)
logInfo(s"Job ${job.getJobID} committed.")
refreshFunction(updatedPartitions)
} catch { case cause: Throwable =>
logError(s"Aborting job ${job.getJobID}.", cause)
committer.abortJob(job)
throw new SparkException("Job aborted.", cause)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
Expand Down Expand Up @@ -283,44 +283,57 @@ class StreamExecution(
// Unblock `awaitInitialization`
initializationLatch.countDown()

triggerExecutor.execute(() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @zsxwing for streaming changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to see from the patch, but all that's happening here is that the execution is now wrapped in withNewExecutionID. I also had to create an IncrementalExecution to pass to that method, which is like a normal one, but uses currentBatchId=-1 (the default).

startTrigger()

if (isActive) {
reportTimeTaken("triggerExecution") {
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets(sparkSessionToRunBatches)
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
// execution hasn't started, so lastExecution isn't defined. create an IncrementalExecution
// with the logical plan for the SQL listener using the current initialized values.
val genericStreamExecution = new IncrementalExecution(
sparkSessionToRunBatches,
logicalPlan,
outputMode,
checkpointFile("state"),
currentBatchId,
offsetSeqMetadata)

SQLExecution.withNewExecutionId(sparkSessionToRunBatches, genericStreamExecution) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have an execution id for each streaming batch

triggerExecutor.execute(() => {
startTrigger()

if (isActive) {
reportTimeTaken("triggerExecution") {
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets(sparkSessionToRunBatches)
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
}
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")
runBatch(sparkSessionToRunBatches)
}
}
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")
runBatch(sparkSessionToRunBatches)
// Update committed offsets.
batchCommitLog.add(currentBatchId)
committedOffsets ++= availableOffsets
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
}
}
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// Update committed offsets.
batchCommitLog.add(currentBatchId)
committedOffsets ++= availableOffsets
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
}
}
updateStatusMessage("Waiting for next trigger")
isActive
})
updateStatusMessage("Waiting for next trigger")
isActive
})
}

updateStatusMessage("Stopped")
} else {
// `stop()` is already called. Let `finally` finish the cleanup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -196,11 +197,15 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
logDebug(s"Committing batch $batchId to $this")
outputMode match {
case Append | Update =>
val rows = AddedData(batchId, data.collect())
val rows = SQLExecution.nested(data.sparkSession) {
AddedData(batchId, data.collect())
}
synchronized { batches += rows }

case Complete =>
val rows = AddedData(batchId, data.collect())
val rows = SQLExecution.nested(data.sparkSession) {
AddedData(batchId, data.collect())
}
synchronized {
batches.clear()
batches += rows
Expand Down
Loading