Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.GuardedBy
import javax.ws.rs.core.UriBuilder

import scala.collection.{immutable, mutable}
import scala.collection.immutable
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
Expand All @@ -54,6 +54,7 @@ import org.apache.spark.shuffle.{FetchFailedException, ShuffleBlockPusher}
import org.apache.spark.status.api.v1.ThreadStackTrace
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util._
import org.apache.spark.util.ArrayImplicits._

private[spark] class IsolatedSessionState(
val sessionUUID: String,
Expand Down Expand Up @@ -749,10 +750,10 @@ private[spark] class Executor(
logInfo(s"Executor killed $taskName, reason: ${t.reason}")

val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
// Here and below, put task metric peaks in a ArraySeq to expose them as a Seq
// without requiring a copy.
val metricPeaks = mutable.ArraySeq.make(metricsPoller.getTaskMetricPeaks(taskId))
val reason = TaskKilled(t.reason, accUpdates, accums, metricPeaks.toSeq)
// Here and below, put task metric peaks in an immutable.ArraySeq to expose them as an
// immutable.Seq without requiring a copy.
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq
val reason = TaskKilled(t.reason, accUpdates, accums, metricPeaks)
plugins.foreach(_.onTaskFailed(reason))
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason))

Expand All @@ -762,8 +763,8 @@ private[spark] class Executor(
logInfo(s"Executor interrupted and killed $taskName, reason: $killReason")

val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
val metricPeaks = mutable.ArraySeq.make(metricsPoller.getTaskMetricPeaks(taskId))
val reason = TaskKilled(killReason, accUpdates, accums, metricPeaks.toSeq)
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq
val reason = TaskKilled(killReason, accUpdates, accums, metricPeaks)
plugins.foreach(_.onTaskFailed(reason))
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason))

Expand Down Expand Up @@ -806,18 +807,18 @@ private[spark] class Executor(
// instead of an app issue).
if (!ShutdownHookManager.inShutdown()) {
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
val metricPeaks = mutable.ArraySeq.make(metricsPoller.getTaskMetricPeaks(taskId))
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq

val (taskFailureReason, serializedTaskFailureReason) = {
try {
val ef = new ExceptionFailure(t, accUpdates).withAccums(accums)
.withMetricPeaks(metricPeaks.toSeq)
.withMetricPeaks(metricPeaks)
(ef, ser.serialize(ef))
} catch {
case _: NotSerializableException =>
// t is not serializable so just send the stacktrace
val ef = new ExceptionFailure(t, accUpdates, false).withAccums(accums)
.withMetricPeaks(metricPeaks.toSeq)
.withMetricPeaks(metricPeaks)
(ef, ser.serialize(ef))
}
}
Expand Down