Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions sql/core/benchmarks/MetricsAggregationBenchmark-jdk11-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
OpenJDK 64-Bit Server VM 11.0.4+11 on Linux 4.15.0-66-generic
Intel(R) Core(TM) i7-6820HQ CPU @ 2.70GHz
metrics aggregation (50 metrics, 100000 tasks per stage): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
1 stage(s) 672 841 179 0.0 671888474.0 1.0X
2 stage(s) 1700 1842 201 0.0 1699591662.0 0.4X
3 stage(s) 2601 2776 247 0.0 2601465786.0 0.3X

Stage Count Stage Proc. Time Aggreg. Time
1 436 164
2 537 354
3 480 602
12 changes: 12 additions & 0 deletions sql/core/benchmarks/MetricsAggregationBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Linux 4.15.0-66-generic
Intel(R) Core(TM) i7-6820HQ CPU @ 2.70GHz
metrics aggregation (50 metrics, 100000 tasks per stage): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
1 stage(s) 740 883 147 0.0 740089816.0 1.0X
2 stage(s) 1661 1943 399 0.0 1660649192.0 0.4X
3 stage(s) 2711 2967 362 0.0 2711110178.0 0.3X

Stage Count Stage Proc. Time Aggreg. Time
1 405 179
2 375 414
3 364 644
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.metric

import java.text.NumberFormat
import java.util.Locale
import java.util.{Arrays, Locale}

import scala.concurrent.duration._

Expand Down Expand Up @@ -150,7 +150,7 @@ object SQLMetrics {
* A function that defines how we aggregate the final accumulator results among all tasks,
* and represent it in string for a SQL physical operator.
*/
def stringValue(metricsType: String, values: Seq[Long]): String = {
def stringValue(metricsType: String, values: Array[Long]): String = {
if (metricsType == SUM_METRIC) {
val numberFormat = NumberFormat.getIntegerInstance(Locale.US)
numberFormat.format(values.sum)
Expand All @@ -162,8 +162,9 @@ object SQLMetrics {
val metric = if (validValues.isEmpty) {
Seq.fill(3)(0L)
} else {
val sorted = validValues.sorted
Seq(sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
Arrays.sort(validValues)
Seq(validValues(0), validValues(validValues.length / 2),
validValues(validValues.length - 1))
}
metric.map(v => numberFormat.format(v.toDouble / baseForAvgMetric))
}
Expand All @@ -184,8 +185,9 @@ object SQLMetrics {
val metric = if (validValues.isEmpty) {
Seq.fill(4)(0L)
} else {
val sorted = validValues.sorted
Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
Arrays.sort(validValues)
Seq(validValues.sum, validValues(0), validValues(validValues.length / 2),
validValues(validValues.length - 1))
}
metric.map(strFormat)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
*/
package org.apache.spark.sql.execution.ui

import java.util.{Date, NoSuchElementException}
import java.util.{Arrays, Date, NoSuchElementException}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.internal.Logging
Expand All @@ -29,6 +30,7 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.metric._
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity}
import org.apache.spark.util.collection.OpenHashMap

class SQLAppStatusListener(
conf: SparkConf,
Expand Down Expand Up @@ -103,8 +105,10 @@ class SQLAppStatusListener(
// Record the accumulator IDs for the stages of this job, so that the code that keeps
// track of the metrics knows which accumulators to look at.
val accumIds = exec.metrics.map(_.accumulatorId).toSet
event.stageIds.foreach { id =>
stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds, new ConcurrentHashMap()))
if (accumIds.nonEmpty) {
event.stageInfos.foreach { stage =>
stageMetrics.put(stage.stageId, new LiveStageMetrics(0, stage.numTasks, accumIds))
}
}

exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
Expand All @@ -118,9 +122,11 @@ class SQLAppStatusListener(
}

// Reset the metrics tracking object for the new attempt.
Option(stageMetrics.get(event.stageInfo.stageId)).foreach { metrics =>
metrics.taskMetrics.clear()
metrics.attemptId = event.stageInfo.attemptNumber
Option(stageMetrics.get(event.stageInfo.stageId)).foreach { stage =>
if (stage.attemptId != event.stageInfo.attemptNumber) {
stageMetrics.put(event.stageInfo.stageId,
new LiveStageMetrics(event.stageInfo.attemptNumber, stage.numTasks, stage.accumulatorIds))
}
}
}

Expand All @@ -140,7 +146,16 @@ class SQLAppStatusListener(

override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) =>
updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
updateStageMetrics(stageId, attemptId, taskId, SQLAppStatusListener.UNKNOWN_INDEX,
accumUpdates, false)
}
}

override def onTaskStart(event: SparkListenerTaskStart): Unit = {
Option(stageMetrics.get(event.stageId)).foreach { stage =>
if (stage.attemptId == event.stageAttemptId) {
stage.registerTask(event.taskInfo.taskId, event.taskInfo.index)
}
}
}

Expand All @@ -165,7 +180,7 @@ class SQLAppStatusListener(
} else {
info.accumulables
}
updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, accums,
updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, info.index, accums,
info.successful)
}

Expand All @@ -181,17 +196,40 @@ class SQLAppStatusListener(

private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = {
val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
val metrics = exec.stages.toSeq

val taskMetrics = exec.stages.toSeq
.flatMap { stageId => Option(stageMetrics.get(stageId)) }
.flatMap(_.taskMetrics.values().asScala)
.flatMap { metrics => metrics.ids.zip(metrics.values) }

val aggregatedMetrics = (metrics ++ exec.driverAccumUpdates.toSeq)
.filter { case (id, _) => metricTypes.contains(id) }
.groupBy(_._1)
.map { case (id, values) =>
id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2))
.flatMap(_.metricValues())

val allMetrics = new mutable.HashMap[Long, Array[Long]]()

taskMetrics.foreach { case (id, values) =>
val prev = allMetrics.getOrElse(id, null)
val updated = if (prev != null) {
prev ++ values
} else {
values
}
allMetrics(id) = updated
}

exec.driverAccumUpdates.foreach { case (id, value) =>
if (metricTypes.contains(id)) {
val prev = allMetrics.getOrElse(id, null)
val updated = if (prev != null) {
val _copy = Arrays.copyOf(prev, prev.length + 1)
_copy(prev.length) = value
_copy
} else {
Array(value)
}
allMetrics(id) = updated
}
}

val aggregatedMetrics = allMetrics.map { case (id, values) =>
id -> SQLMetrics.stringValue(metricTypes(id), values)
}.toMap

// Check the execution again for whether the aggregated metrics data has been calculated.
// This can happen if the UI is requesting this data, and the onExecutionEnd handler is
Expand All @@ -208,43 +246,13 @@ class SQLAppStatusListener(
stageId: Int,
attemptId: Int,
taskId: Long,
taskIdx: Int,
accumUpdates: Seq[AccumulableInfo],
succeeded: Boolean): Unit = {
Option(stageMetrics.get(stageId)).foreach { metrics =>
if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
return
}

val oldTaskMetrics = metrics.taskMetrics.get(taskId)
if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
return
if (metrics.attemptId == attemptId) {
metrics.updateTaskMetrics(taskId, taskIdx, succeeded, accumUpdates)
}

val updates = accumUpdates
.filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
.sortBy(_.id)

if (updates.isEmpty) {
return
}

val ids = new Array[Long](updates.size)
val values = new Array[Long](updates.size)
updates.zipWithIndex.foreach { case (acc, idx) =>
ids(idx) = acc.id
// In a live application, accumulators have Long values, but when reading from event
// logs, they have String values. For now, assume all accumulators are Long and covert
// accordingly.
values(idx) = acc.update.get match {
case s: String => s.toLong
case l: Long => l
case o => throw new IllegalArgumentException(s"Unexpected: $o")
}
}

// TODO: storing metrics by task ID can cause metrics for the same task index to be
// counted multiple times, for example due to speculation or re-attempts.
metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
}
}

Expand Down Expand Up @@ -425,12 +433,76 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
}

private class LiveStageMetrics(
val stageId: Int,
var attemptId: Int,
val accumulatorIds: Set[Long],
val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics])

private class LiveTaskMetrics(
val ids: Array[Long],
val values: Array[Long],
val succeeded: Boolean)
val attemptId: Int,
val numTasks: Int,
val accumulatorIds: Set[Long]) {

/**
* Mapping of task IDs to their respective index. Note this may contain more elements than the
* stage's number of tasks, if speculative execution is on.
*/
private val taskIndices = new OpenHashMap[Long, Int]()

/** Bit set tracking which indices have been successfully computed. */
private val completedIndices = new mutable.BitSet()

/**
* Task metrics values for the stage. Maps the metric ID to the metric values for each
* index. For each metric ID, there will be the same number of values as the number
* of indices. This relies on `SQLMetrics.stringValue` treating 0 as a neutral value,
* independent of the actual metric type.
*/
private val taskMetrics = new ConcurrentHashMap[Long, Array[Long]]()

def registerTask(taskId: Long, taskIdx: Int): Unit = {
taskIndices.update(taskId, taskIdx)
}

def updateTaskMetrics(
taskId: Long,
eventIdx: Int,
finished: Boolean,
accumUpdates: Seq[AccumulableInfo]): Unit = {
val taskIdx = if (eventIdx == SQLAppStatusListener.UNKNOWN_INDEX) {
if (!taskIndices.contains(taskId)) {
// We probably missed the start event for the task, just ignore it.
return
}
taskIndices(taskId)
} else {
// Here we can recover from a missing task start event. Just register the task again.
registerTask(taskId, eventIdx)
eventIdx
}

if (completedIndices.contains(taskIdx)) {
return
}

accumUpdates
.filter { acc => acc.update.isDefined && accumulatorIds.contains(acc.id) }
.foreach { acc =>
// In a live application, accumulators have Long values, but when reading from event
// logs, they have String values. For now, assume all accumulators are Long and convert
// accordingly.
val value = acc.update.get match {
case s: String => s.toLong
case l: Long => l
case o => throw new IllegalArgumentException(s"Unexpected: $o")
}

val metricValues = taskMetrics.computeIfAbsent(acc.id, _ => new Array(numTasks))
metricValues(taskIdx) = value
}

if (finished) {
completedIndices += taskIdx
}
}

def metricValues(): Seq[(Long, Array[Long])] = taskMetrics.asScala.toSeq
}

private object SQLAppStatusListener {
val UNKNOWN_INDEX = -1
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
val (actualNodeName, actualMetricsMap) = actualMetrics(nodeId)
assert(expectedNodeName === actualNodeName)
for ((metricName, metricPredicate) <- expectedMetricsPredicatesMap) {
assert(metricPredicate(actualMetricsMap(metricName)))
assert(metricPredicate(actualMetricsMap(metricName)),
s"$nodeId / '$metricName' (= ${actualMetricsMap(metricName)}) did not match predicate.")
}
}
}
Expand Down
Loading