Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.metric

import java.text.NumberFormat
import java.util.Locale
import java.util.concurrent.atomic.LongAdder

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.AccumulableInfo
Expand All @@ -32,40 +33,45 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
* on the driver side must be explicitly posted using [[SQLMetrics.postDriverMetricUpdates()]].
*/
class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] {

// This is a workaround for SPARK-11013.
// We may use -1 as initial value of the accumulator, if the accumulator is valid, we will
// update it at the end of task and the value will be at least 0. Then we can filter out the -1
// values before calculate max, min, etc.
private[this] var _value = initValue
private var _zeroValue = initValue
private[this] val _value = new LongAdder
private val _zeroValue = initValue
_value.add(initValue)

override def copy(): SQLMetric = {
val newAcc = new SQLMetric(metricType, _value)
newAcc._zeroValue = initValue
val newAcc = new SQLMetric(metricType, initValue)
newAcc.add(_value.sum())
newAcc
}

override def reset(): Unit = _value = _zeroValue
override def reset(): Unit = this.set(_zeroValue)

override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
case o: SQLMetric => _value += o.value
case o: SQLMetric => _value.add(o.value)
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

override def isZero(): Boolean = _value == _zeroValue
override def isZero(): Boolean = _value.sum() == _zeroValue

override def add(v: Long): Unit = _value += v
override def add(v: Long): Unit = _value.add(v)

// We can set a double value to `SQLMetric` which stores only long value, if it is
// average metrics.
def set(v: Double): Unit = SQLMetrics.setDoubleForAverageMetrics(this, v)

def set(v: Long): Unit = _value = v
def set(v: Long): Unit = {
_value.reset()
_value.add(v)
}

def +=(v: Long): Unit = _value += v
def +=(v: Long): Unit = _value.add(v)

override def value: Long = _value
override def value: Long = _value.sum()

// Provide special identifier as metadata so we can tell that this is a `SQLMetric` later
override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
Expand Down Expand Up @@ -153,7 +159,7 @@ object SQLMetrics {
Seq.fill(3)(0L)
} else {
val sorted = validValues.sorted
Seq(sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
Seq(sorted.head, sorted(validValues.length / 2), sorted(validValues.length - 1))
}
metric.map(v => numberFormat.format(v.toDouble / baseForAvgMetric))
}
Expand All @@ -173,7 +179,8 @@ object SQLMetrics {
Seq.fill(4)(0L)
} else {
val sorted = validValues.sorted
Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
Seq(sorted.sum, sorted.head, sorted(validValues.length / 2),
sorted(validValues.length - 1))
}
metric.map(strFormat)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package org.apache.spark.sql.execution.metric

import java.io.File

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.util.Random

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.ui.SQLAppStatusStore
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -504,4 +504,38 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
test("writing data out metrics with dynamic partition: parquet") {
testMetricsDynamicPartition("parquet", "parquet", "t1")
}

test("writing metrics from single thread") {
val nAdds = 10
val acc = new SQLMetric("test", -10)
assert(acc.isZero())
acc.set(0)
for (i <- 1 to nAdds) acc.add(1)
assert(!acc.isZero())
assert(nAdds === acc.value)
acc.reset()
assert(acc.isZero())
}

test("writing metrics from multiple threads") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really dumb question, does this fail without the fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Checking in a separate branch:
org.scalatest.exceptions.TestFailedException: 100000 did not equal 56544

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, I think this is not a valid test. Spark assumes there should be only one writer at the same time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, we just need AccumulatorV2 can be read in the heart beat thread. That's the only place need to think about concurrence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean it's a one-writer, multi-reader scene?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was originally raised because an implementation of broadcast join did have multiple writers. Unfortunately we recently determined that the LongAdder is causing a performance regression and we are going to revert this.
@cloud-fan or @hvanhovell can one of you send the rollback PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean it's a one-writer, multi-reader scene?

Yes.

implicit val ec: ExecutionContextExecutor = ExecutionContext.global
val nFutures = 1000
val nAdds = 100
val acc = new SQLMetric("test", -10)
assert(acc.isZero() === true)
acc.set(0)
val l = for ( i <- 1 to nFutures ) yield {
Future {
for (j <- 1 to nAdds) acc.add(1)
i
}
}
for (futures <- Future.sequence(l)) {
assert(nFutures === futures.length)
assert(!acc.isZero())
assert(nFutures * nAdds === acc.value)
acc.reset()
assert(acc.isZero())
}
}
}