Skip to content

Commit 280b619

Browse files
allenmaGitHub Enterprise
authored andcommitted
[CARMEL-6283] DAGScheduler may stuck when there are too many result task and result is spilled (#1085)
1 parent d809219 commit 280b619

File tree

1 file changed

+18
-4
lines changed

1 file changed

+18
-4
lines changed

core/src/main/scala/org/apache/spark/scheduler/IterableJobWaiter.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ private[spark] class IterableJobWaiter[U: ClassTag, R](
4949
private var spilledResultData: Option[Array[SpilledPartitionResult]] = None
5050
private val resultData: Array[U] = new Array[U](totalTasks)
5151

52+
// indicate whether the in memory result data has been cleaned after
53+
// the result data is spilled to disk
54+
@volatile private var inMemoryResultDataCleaned = false
55+
5256
def jobFinished: Boolean = jobPromise.isCompleted
5357

5458
def completionFuture: Future[Unit] = jobPromise.future
@@ -67,12 +71,16 @@ private[spark] class IterableJobWaiter[U: ClassTag, R](
6771
case spilledPartitionResult: Array[SpilledPartitionResult] =>
6872
spilledResultData = Some(spilledPartitionResult)
6973
// if spill is triggered for this task set, clean the cached in memory results
70-
(0 until totalTasks).foreach { i =>
71-
resultData(i) = null.asInstanceOf[U]
74+
if (!inMemoryResultDataCleaned) {
75+
cleanInMemResultData
76+
inMemoryResultDataCleaned = true
7277
}
7378
case data: U =>
74-
resultData(index) = data
75-
resultStatCollector(index, result.asInstanceOf[U])
79+
if (spilledResultData.isEmpty) {
80+
// if the spilledResultData has been set, no need to save result to memory
81+
resultData(index) = data
82+
resultStatCollector(index, result.asInstanceOf[U])
83+
}
7684

7785
// case stats: MapOutputStatistics => // ignore
7886
case _ =>
@@ -85,6 +93,12 @@ private[spark] class IterableJobWaiter[U: ClassTag, R](
8593
}
8694
}
8795

96+
def cleanInMemResultData: Unit = {
97+
(0 until totalTasks).foreach { i =>
98+
resultData(i) = null.asInstanceOf[U]
99+
}
100+
}
101+
88102
override def jobFailed(exception: Exception): Unit = {
89103
if (!jobPromise.tryFailure(exception)) {
90104
logWarning("Ignore failure", exception)

0 commit comments

Comments
 (0)