Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -628,13 +628,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
val dataSize = rows.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
longMetric("dataSize") += dataSize

// There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
// directly without setting an execution id. We should be tolerant to it.
if (executionId != null) {
sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
}

SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
rows
}
}(SubqueryExec.executionContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,7 @@ case class BroadcastExchangeExec(
val broadcasted = sparkContext.broadcast(relation)
longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000

// There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
// directly without setting an execution id. We should be tolerant to it.
if (executionId != null) {
sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
}

SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
broadcasted
} catch {
case oe: OutOfMemoryError =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ import java.util.Locale

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}


/**
* A metric used in a SQL query plan. This is implemented as an [[AccumulatorV2]]. Updates on
* the executor side are automatically propagated and shown in the SQL UI through metrics. Updates
* on the driver side must be explicitly posted using [[SQLMetrics.postDriverMetricUpdates()]].
*/
class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
// This is a workaround for SPARK-11013.
// We may use -1 as initial value of the accumulator, if the accumulator is valid, we will
Expand Down Expand Up @@ -126,4 +132,18 @@ object SQLMetrics {
s"\n$sum ($min, $med, $max)"
}
}

/**
* Updates metrics based on the driver side value. This is useful for certain metrics that
* are only updated on the driver, e.g. subquery execution time, or number of files.
*/
def postDriverMetricUpdates(
sc: SparkContext, executionId: String, metrics: Seq[SQLMetric]): Unit = {
// There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
// directly without setting an execution id. We should be tolerant to it.
if (executionId != null) {
sc.listenerBus.post(
SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m => m.id -> m.value)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ case class SparkListenerSQLExecutionStart(
case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
extends SparkListenerEvent

/**
* A message used to update SQL metric value for driver-side updates (which doesn't get reflected
* automatically).
*
* @param executionId The execution id for a query, so we can find the query plan.
* @param accumUpdates Map from accumulator id to the metric value (metrics are always 64-bit ints).
*/
@DeveloperApi
case class SparkListenerDriverAccumUpdates(
executionId: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,11 @@ private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExe

override def doExecute(): RDD[InternalRow] = {
longMetric("dummy") += expectedValue
sc.listenerBus.post(SparkListenerDriverAccumUpdates(
sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY).toLong,
metrics.values.map(m => m.id -> m.value).toSeq))

SQLMetrics.postDriverMetricUpdates(
sc,
sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
metrics.values.toSeq)
sc.emptyRDD
}
}
Expand Down