Skip to content

Commit a38eaf1

Browse files
committed
Merge branch 'SPARK-14795' into SPARK-14798
2 parents ecc9ea7 + f74b381 commit a38eaf1

File tree

40 files changed

+200
-214
lines changed

40 files changed

+200
-214
lines changed

core/src/main/scala/org/apache/spark/Accumulable.scala

Lines changed: 8 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,27 +28,20 @@ import org.apache.spark.util.Utils
2828

2929

3030
/**
31-
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
31+
* A data type that can be accumulated, i.e. has an commutative and associative "add" operation,
3232
* but where the result type, `R`, may be different from the element type being added, `T`.
3333
*
3434
* You must define how to add data, and how to merge two of these together. For some data types,
3535
* such as a counter, these might be the same operation. In that case, you can use the simpler
3636
* [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
3737
* accumulating a set. You will add items to the set, and you will union two sets together.
3838
*
39-
* All accumulators created on the driver to be used on the executors must be registered with
40-
* [[Accumulators]]. This is already done automatically for accumulators created by the user.
41-
* Internal accumulators must be explicitly registered by the caller.
42-
*
4339
* Operations are not thread-safe.
4440
*
4541
* @param id ID of this accumulator; for internal use only.
4642
* @param initialValue initial value of accumulator
4743
* @param param helper object defining how to add elements of type `R` and `T`
4844
* @param name human-readable name for use in Spark's web UI
49-
* @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported
50-
* to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be
51-
* thread safe so that they can be reported correctly.
5245
* @param countFailedValues whether to accumulate values from failed tasks. This is set to true
5346
* for system and time metrics like serialization time or bytes spilled,
5447
* and false for things with absolute values like number of input rows.
@@ -62,49 +55,28 @@ class Accumulable[R, T] private (
6255
@transient private val initialValue: R,
6356
param: AccumulableParam[R, T],
6457
val name: Option[String],
65-
internal: Boolean,
6658
private[spark] val countFailedValues: Boolean)
6759
extends Serializable {
6860

6961
private[spark] def this(
7062
initialValue: R,
7163
param: AccumulableParam[R, T],
7264
name: Option[String],
73-
internal: Boolean,
7465
countFailedValues: Boolean) = {
75-
this(Accumulators.newId(), initialValue, param, name, internal, countFailedValues)
66+
this(Accumulators.newId(), initialValue, param, name, countFailedValues)
7667
}
7768

78-
private[spark] def this(
79-
initialValue: R,
80-
param: AccumulableParam[R, T],
81-
name: Option[String],
82-
internal: Boolean) = {
83-
this(initialValue, param, name, internal, false /* countFailedValues */)
69+
private[spark] def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = {
70+
this(initialValue, param, name, false /* countFailedValues */)
8471
}
8572

86-
def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
87-
this(initialValue, param, name, false /* internal */)
88-
8973
def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None)
9074

9175
@volatile @transient private var value_ : R = initialValue // Current value on driver
9276
val zero = param.zero(initialValue) // Zero value to be passed to executors
9377
private var deserialized = false
9478

95-
// In many places we create internal accumulators without access to the active context cleaner,
96-
// so if we register them here then we may never unregister these accumulators. To avoid memory
97-
// leaks, we require the caller to explicitly register internal accumulators elsewhere.
98-
if (!internal) {
99-
Accumulators.register(this)
100-
}
101-
102-
/**
103-
* If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver
104-
* via heartbeats. For internal [[Accumulable]]s, `R` must be thread safe so that they can be
105-
* reported correctly.
106-
*/
107-
private[spark] def isInternal: Boolean = internal
79+
Accumulators.register(this)
10880

10981
/**
11082
* Return a copy of this [[Accumulable]].
@@ -114,7 +86,7 @@ class Accumulable[R, T] private (
11486
* same mutable instance around.
11587
*/
11688
private[spark] def copy(): Accumulable[R, T] = {
117-
new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues)
89+
new Accumulable[R, T](id, initialValue, param, name, countFailedValues)
11890
}
11991

12092
/**
@@ -192,7 +164,8 @@ class Accumulable[R, T] private (
192164
* Create an [[AccumulableInfo]] representation of this [[Accumulable]] with the provided values.
193165
*/
194166
private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
195-
new AccumulableInfo(id, name, update, value, internal, countFailedValues)
167+
val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
168+
new AccumulableInfo(id, name, update, value, isInternal, countFailedValues)
196169
}
197170

198171
// Called by Java when deserializing an object

core/src/main/scala/org/apache/spark/Accumulator.scala

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,27 +56,16 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
5656
* @param initialValue initial value of accumulator
5757
* @param param helper object defining how to add elements of type `T`
5858
* @param name human-readable name associated with this accumulator
59-
* @param internal whether this accumulator is used internally within Spark only
6059
* @param countFailedValues whether to accumulate values from failed tasks
6160
* @tparam T result type
6261
*/
6362
class Accumulator[T] private[spark] (
6463
// SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
6564
@transient private val initialValue: T,
6665
param: AccumulatorParam[T],
67-
name: Option[String],
68-
internal: Boolean,
69-
private[spark] override val countFailedValues: Boolean = false)
70-
extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) {
71-
72-
def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
73-
this(initialValue, param, name, false /* internal */)
74-
}
75-
76-
def this(initialValue: T, param: AccumulatorParam[T]) = {
77-
this(initialValue, param, None, false /* internal */)
78-
}
79-
}
66+
name: Option[String] = None,
67+
countFailedValues: Boolean = false)
68+
extends Accumulable[T, T](initialValue, param, name, countFailedValues)
8069

8170

8271
// TODO: The multi-thread support in accumulators is kind of lame; check

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ private[spark] class TaskContextImpl(
3636
override val taskMemoryManager: TaskMemoryManager,
3737
localProperties: Properties,
3838
@transient private val metricsSystem: MetricsSystem,
39-
override val taskMetrics: TaskMetrics = new TaskMetrics)
39+
// The default value is only used in tests.
40+
override val taskMetrics: TaskMetrics = TaskMetrics.empty)
4041
extends TaskContext
4142
with Logging {
4243

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 49 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
7979

8080
private val NOT_STARTED = "<Not Started>"
8181

82+
private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
83+
8284
// Interval between safemode checks.
8385
private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
8486
"spark.history.fs.safemodeCheck.interval", "5s")
@@ -89,6 +91,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
8991
// Interval between each cleaner checks for event logs to delete
9092
private val CLEAN_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.cleaner.interval", "1d")
9193

94+
// Number of threads used to replay event logs.
95+
private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS,
96+
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
97+
9298
private val logDir = conf.getOption("spark.history.fs.logDirectory")
9399
.map { d => Utils.resolveURI(d).toString }
94100
.getOrElse(DEFAULT_LOG_DIR)
@@ -129,11 +135,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
129135
}
130136

131137
/**
132-
* An Executor to fetch and parse log files.
138+
* Fixed size thread pool to fetch and parse log files.
133139
*/
134140
private val replayExecutor: ExecutorService = {
135141
if (!conf.contains("spark.testing")) {
136-
ThreadUtils.newDaemonSingleThreadExecutor("log-replay-executor")
142+
ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, "log-replay-executor")
137143
} else {
138144
MoreExecutors.sameThreadExecutor()
139145
}
@@ -297,10 +303,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
297303
if (logInfos.nonEmpty) {
298304
logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
299305
}
300-
logInfos.grouped(20)
301-
.map { batch =>
306+
logInfos.map { file =>
302307
replayExecutor.submit(new Runnable {
303-
override def run(): Unit = mergeApplicationListing(batch)
308+
override def run(): Unit = mergeApplicationListing(file)
304309
})
305310
}
306311
.foreach { task =>
@@ -385,9 +390,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
385390
/**
386391
* Replay the log files in the list and merge the list of old applications with new ones
387392
*/
388-
private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
389-
val newAttempts = logs.flatMap { fileStatus =>
390-
try {
393+
private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
394+
val newAttempts = try {
391395
val bus = new ReplayListenerBus()
392396
val res = replay(fileStatus, bus)
393397
res match {
@@ -403,7 +407,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
403407
e)
404408
None
405409
}
406-
}
407410

408411
if (newAttempts.isEmpty) {
409412
return
@@ -413,45 +416,48 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
413416
// contains both the new app attempt, and those that were already loaded in the existing apps
414417
// map. If an attempt has been updated, it replaces the old attempt in the list.
415418
val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
416-
newAttempts.foreach { attempt =>
417-
val appInfo = newAppMap.get(attempt.appId)
418-
.orElse(applications.get(attempt.appId))
419-
.map { app =>
420-
val attempts =
421-
app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt)
422-
new FsApplicationHistoryInfo(attempt.appId, attempt.name,
423-
attempts.sortWith(compareAttemptInfo))
424-
}
425-
.getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
426-
newAppMap(attempt.appId) = appInfo
427-
}
428419

429-
// Merge the new app list with the existing one, maintaining the expected ordering (descending
430-
// end time). Maintaining the order is important to avoid having to sort the list every time
431-
// there is a request for the log list.
432-
val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
433-
val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
434-
def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
435-
if (!mergedApps.contains(info.id)) {
436-
mergedApps += (info.id -> info)
420+
applications.synchronized {
421+
newAttempts.foreach { attempt =>
422+
val appInfo = newAppMap.get(attempt.appId)
423+
.orElse(applications.get(attempt.appId))
424+
.map { app =>
425+
val attempts =
426+
app.attempts.filter(_.attemptId != attempt.attemptId) ++ List(attempt)
427+
new FsApplicationHistoryInfo(attempt.appId, attempt.name,
428+
attempts.sortWith(compareAttemptInfo))
429+
}
430+
.getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
431+
newAppMap(attempt.appId) = appInfo
437432
}
438-
}
439433

440-
val newIterator = newApps.iterator.buffered
441-
val oldIterator = applications.values.iterator.buffered
442-
while (newIterator.hasNext && oldIterator.hasNext) {
443-
if (newAppMap.contains(oldIterator.head.id)) {
444-
oldIterator.next()
445-
} else if (compareAppInfo(newIterator.head, oldIterator.head)) {
446-
addIfAbsent(newIterator.next())
447-
} else {
448-
addIfAbsent(oldIterator.next())
434+
// Merge the new app list with the existing one, maintaining the expected ordering (descending
435+
// end time). Maintaining the order is important to avoid having to sort the list every time
436+
// there is a request for the log list.
437+
val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
438+
val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
439+
def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
440+
if (!mergedApps.contains(info.id)) {
441+
mergedApps += (info.id -> info)
442+
}
449443
}
450-
}
451-
newIterator.foreach(addIfAbsent)
452-
oldIterator.foreach(addIfAbsent)
453444

454-
applications = mergedApps
445+
val newIterator = newApps.iterator.buffered
446+
val oldIterator = applications.values.iterator.buffered
447+
while (newIterator.hasNext && oldIterator.hasNext) {
448+
if (newAppMap.contains(oldIterator.head.id)) {
449+
oldIterator.next()
450+
} else if (compareAppInfo(newIterator.head, oldIterator.head)) {
451+
addIfAbsent(newIterator.next())
452+
} else {
453+
addIfAbsent(oldIterator.next())
454+
}
455+
}
456+
newIterator.foreach(addIfAbsent)
457+
oldIterator.foreach(addIfAbsent)
458+
459+
applications = mergedApps
460+
}
455461
}
456462

457463
/**

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ private[deploy] class Worker(
496496

497497
case KillExecutor(masterUrl, appId, execId) =>
498498
if (masterUrl != activeMasterUrl) {
499-
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId)
499+
logWarning("Invalid Master (" + masterUrl + ") attempted to kill executor " + execId)
500500
} else {
501501
val fullId = appId + "/" + execId
502502
executors.get(fullId) match {

core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,9 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
101101

102102
/**
103103
* Resets the value of the current metrics (`this`) and and merges all the independent
104-
* [[ShuffleReadMetrics]] into `this`.
104+
* [[TempShuffleReadMetrics]] into `this`.
105105
*/
106-
private[spark] def setMergeValues(metrics: Seq[ShuffleReadMetrics]): Unit = {
106+
private[spark] def setMergeValues(metrics: Seq[TempShuffleReadMetrics]): Unit = {
107107
_remoteBlocksFetched.setValue(_remoteBlocksFetched.zero)
108108
_localBlocksFetched.setValue(_localBlocksFetched.zero)
109109
_remoteBytesRead.setValue(_remoteBytesRead.zero)
@@ -119,5 +119,32 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
119119
_recordsRead.add(metric.recordsRead)
120120
}
121121
}
122+
}
122123

124+
/**
125+
* A temporary shuffle read metrics holder that is used to collect shuffle read metrics for each
126+
* shuffle dependency, and all temporary metrics will be merged into the [[ShuffleReadMetrics]] at
127+
* last.
128+
*/
129+
private[spark] class TempShuffleReadMetrics {
130+
private[this] var _remoteBlocksFetched = 0
131+
private[this] var _localBlocksFetched = 0
132+
private[this] var _remoteBytesRead = 0L
133+
private[this] var _localBytesRead = 0L
134+
private[this] var _fetchWaitTime = 0L
135+
private[this] var _recordsRead = 0L
136+
137+
def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched += v
138+
def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched += v
139+
def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead += v
140+
def incLocalBytesRead(v: Long): Unit = _localBytesRead += v
141+
def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v
142+
def incRecordsRead(v: Long): Unit = _recordsRead += v
143+
144+
def remoteBlocksFetched: Int = _remoteBlocksFetched
145+
def localBlocksFetched: Int = _localBlocksFetched
146+
def remoteBytesRead: Long = _remoteBytesRead
147+
def localBytesRead: Long = _localBytesRead
148+
def fetchWaitTime: Long = _fetchWaitTime
149+
def recordsRead: Long = _recordsRead
123150
}

0 commit comments

Comments
 (0)