diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index f2a65aab1ba4..12471915cd97 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -29,7 +29,7 @@ import java.util.concurrent.locks.ReentrantLock import javax.annotation.concurrent.GuardedBy import javax.ws.rs.core.UriBuilder -import scala.collection.{immutable, mutable} +import scala.collection.immutable import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ @@ -54,6 +54,7 @@ import org.apache.spark.shuffle.{FetchFailedException, ShuffleBlockPusher} import org.apache.spark.status.api.v1.ThreadStackTrace import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.util._ +import org.apache.spark.util.ArrayImplicits._ private[spark] class IsolatedSessionState( val sessionUUID: String, @@ -749,10 +750,10 @@ private[spark] class Executor( logInfo(s"Executor killed $taskName, reason: ${t.reason}") val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) - // Here and below, put task metric peaks in a ArraySeq to expose them as a Seq - // without requiring a copy. - val metricPeaks = mutable.ArraySeq.make(metricsPoller.getTaskMetricPeaks(taskId)) - val reason = TaskKilled(t.reason, accUpdates, accums, metricPeaks.toSeq) + // Here and below, put task metric peaks in an immutable.ArraySeq to expose them as an + // immutable.Seq without requiring a copy. + val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq + val reason = TaskKilled(t.reason, accUpdates, accums, metricPeaks) plugins.foreach(_.onTaskFailed(reason)) execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason)) @@ -762,8 +763,8 @@ private[spark] class Executor( logInfo(s"Executor interrupted and killed $taskName, reason: $killReason") val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) - val metricPeaks = mutable.ArraySeq.make(metricsPoller.getTaskMetricPeaks(taskId)) - val reason = TaskKilled(killReason, accUpdates, accums, metricPeaks.toSeq) + val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq + val reason = TaskKilled(killReason, accUpdates, accums, metricPeaks) plugins.foreach(_.onTaskFailed(reason)) execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason)) @@ -806,18 +807,18 @@ private[spark] class Executor( // instead of an app issue). if (!ShutdownHookManager.inShutdown()) { val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) - val metricPeaks = mutable.ArraySeq.make(metricsPoller.getTaskMetricPeaks(taskId)) + val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq val (taskFailureReason, serializedTaskFailureReason) = { try { val ef = new ExceptionFailure(t, accUpdates).withAccums(accums) - .withMetricPeaks(metricPeaks.toSeq) + .withMetricPeaks(metricPeaks) (ef, ser.serialize(ef)) } catch { case _: NotSerializableException => // t is not serializable so just send the stacktrace val ef = new ExceptionFailure(t, accUpdates, false).withAccums(accums) - .withMetricPeaks(metricPeaks.toSeq) + .withMetricPeaks(metricPeaks) (ef, ser.serialize(ef)) } }