Skip to content

Commit 50e9671

Browse files
committed
Throw memory leak warning even in case of error; add warning about code duplication
1 parent 70a39e4 commit 50e9671

File tree

2 files changed

+7
-9
lines changed

2 files changed

+7
-9
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,13 @@ private[spark] class Executor(
210210

211211
// Run the actual task and measure its runtime.
212212
taskStart = System.currentTimeMillis()
213-
var succeeded: Boolean = false
214213
val value = try {
215-
val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
216-
succeeded = true
217-
value
214+
task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
218215
} finally {
219-
// Release managed memory used by this task
216+
// Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
217+
// when changing this, make sure to update both copies.
220218
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
221-
if (succeeded && freedMemory > 0) {
219+
if (freedMemory > 0) {
222220
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
223221
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
224222
throw new SparkException(errMsg)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -654,16 +654,16 @@ class DAGScheduler(
654654
taskMemoryManager = taskMemoryManager,
655655
runningLocally = true)
656656
TaskContext.setTaskContext(taskContext)
657-
var succeeded: Boolean = false
658657
try {
659658
val result = job.func(taskContext, rdd.iterator(split, taskContext))
660-
succeeded = true
661659
job.listener.taskSucceeded(0, result)
662660
} finally {
663661
taskContext.markTaskCompleted()
664662
TaskContext.unset()
663+
// Note: this memory freeing logic is duplicated in Executor.run(); when changing this,
664+
// make sure to update both copies.
665665
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
666-
if (succeeded && freedMemory > 0) {
666+
if (freedMemory > 0) {
667667
if (sc.getConf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
668668
throw new SparkException(s"Managed memory leak detected; size = $freedMemory bytes")
669669
} else {

0 commit comments

Comments
 (0)