diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala index f820401da2fc..505b3206d4d5 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala @@ -31,14 +31,15 @@ private[spark] object JavaUtils { } // Workaround for SPARK-3926 / SI-8911 - def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]): SerializableMapWrapper[A, B] - = new SerializableMapWrapper(underlying) + def mapAsSerializableJavaMap[A, B](underlying: scala.collection.Map[A, B]): + SerializableMapWrapper[A, B] + = new SerializableMapWrapper(underlying) // Implementation is copied from scala.collection.convert.Wrappers.MapWrapper, // but implements java.io.Serializable. It can't just be subclassed to make it // Serializable since the MapWrapper class has no no-arg constructor. This class // doesn't need a no-arg constructor though. - class SerializableMapWrapper[A, B](underlying: collection.Map[A, B]) + class SerializableMapWrapper[A, B](underlying: scala.collection.Map[A, B]) extends ju.AbstractMap[A, B] with java.io.Serializable { self => override def size: Int = underlying.size diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 2822eb5d6002..0d24bdae858b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -461,7 +461,7 @@ private[spark] object PythonRDD extends Logging { JavaRDD[Array[Byte]] = { val file = new DataInputStream(new FileInputStream(filename)) try { - val objs = new collection.mutable.ArrayBuffer[Array[Byte]] + val objs = new scala.collection.mutable.ArrayBuffer[Array[Byte]] try { while (true) { val length = file.readInt() diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala index 807835105ec3..0d22f6706834 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala @@ -114,7 +114,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") }.getOrElse(Seq[Node]()) } - private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = { + private def propertiesRow(properties: scala.collection.Map[String, String]): Seq[Node] = { properties.map { case (k, v) => {k}{v} diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 1175fa347ea3..e30839c49c04 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -69,9 +69,6 @@ private[spark] class CoarseGrainedExecutorBackend( }(ThreadUtils.sameThread) } - protected def registerExecutor: Executor = - new Executor(executorId, hostname, env, userClassPath, isLocal = false) - def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" sys.env.filterKeys(_.startsWith(prefix)) @@ -82,7 +79,7 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisteredExecutor => logInfo("Successfully registered with driver") try { - executor = registerExecutor + executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch { case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 1e3c6505d535..74eb784e7422 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -34,7 +34,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.rpc.RpcTimeout -import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task} +import org.apache.spark.scheduler.{AccumulableInfo, DirectTaskResult, IndirectTaskResult, Task} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.util._ @@ -60,7 +60,7 @@ private[spark] class Executor( // Application dependencies (added through SparkContext) that we've fetched so far on this node. // Each map holds the master's timestamp for the version of that file or JAR we got. private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() - protected val currentJars: HashMap[String, Long] = new HashMap[String, Long]() + private val currentJars: HashMap[String, Long] = new HashMap[String, Long]() private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0)) @@ -95,7 +95,7 @@ private[spark] class Executor( // Create our ClassLoader // do this after SparkEnv creation so can access the SecurityManager - private val urlClassLoader = createClassLoader() + protected val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) // Set the classloader for serializer @@ -420,7 +420,7 @@ private[spark] class Executor( * Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes * created by the interpreter to the search path */ - protected def createClassLoader(): MutableURLClassLoader = { + private def createClassLoader(): MutableURLClassLoader = { // Bootstrap the list of jars with the user class path. val now = System.currentTimeMillis() userClassPath.foreach { url => @@ -471,7 +471,7 @@ private[spark] class Executor( * Download any missing dependencies if we receive a new set of files and JARs from the * SparkContext. Also adds any new JARs we fetched to the class loader. */ - private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { + protected def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) synchronized { // Fetch missing dependencies diff --git a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala index bc1431835e25..c18c45c3b93b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import collection.mutable.ArrayBuffer +import scala.collection.mutable.ArrayBuffer import org.apache.spark.annotation.DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 8fa12150114d..b002fae2d4b4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -509,7 +509,7 @@ private[spark] class BlockManagerInfo( def blocks: JHashMap[BlockId, BlockStatus] = _blocks // This does not include broadcast blocks. - def cachedBlocks: collection.Set[BlockId] = _cachedBlocks + def cachedBlocks: scala.collection.Set[BlockId] = _cachedBlocks override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem