Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ private[spark] object JavaUtils {
}

// Workaround for SPARK-3926 / SI-8911
def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]): SerializableMapWrapper[A, B]
= new SerializableMapWrapper(underlying)
def mapAsSerializableJavaMap[A, B](underlying: scala.collection.Map[A, B]):
SerializableMapWrapper[A, B]
= new SerializableMapWrapper(underlying)

// Implementation is copied from scala.collection.convert.Wrappers.MapWrapper,
// but implements java.io.Serializable. It can't just be subclassed to make it
// Serializable since the MapWrapper class has no no-arg constructor. This class
// doesn't need a no-arg constructor though.
class SerializableMapWrapper[A, B](underlying: collection.Map[A, B])
class SerializableMapWrapper[A, B](underlying: scala.collection.Map[A, B])
extends ju.AbstractMap[A, B] with java.io.Serializable { self =>

override def size: Int = underlying.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ private[spark] object PythonRDD extends Logging {
JavaRDD[Array[Byte]] = {
val file = new DataInputStream(new FileInputStream(filename))
try {
val objs = new collection.mutable.ArrayBuffer[Array[Byte]]
val objs = new scala.collection.mutable.ArrayBuffer[Array[Byte]]
try {
while (true) {
val length = file.readInt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
}.getOrElse(Seq[Node]())
}

private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = {
private def propertiesRow(properties: scala.collection.Map[String, String]): Seq[Node] = {
properties.map { case (k, v) =>
<tr>
<td>{k}</td><td>{v}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ private[spark] class CoarseGrainedExecutorBackend(
}(ThreadUtils.sameThread)
}

protected def registerExecutor: Executor =
new Executor(executorId, hostname, env, userClassPath, isLocal = false)

def extractLogUrls: Map[String, String] = {
val prefix = "SPARK_LOG_URL_"
sys.env.filterKeys(_.startsWith(prefix))
Expand All @@ -82,7 +79,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = registerExecutor
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
import org.apache.spark.scheduler.{AccumulableInfo, DirectTaskResult, IndirectTaskResult, Task}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util._
Expand All @@ -60,7 +60,7 @@ private[spark] class Executor(
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
// Each map holds the master's timestamp for the version of that file or JAR we got.
private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
protected val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()

private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))

Expand Down Expand Up @@ -95,7 +95,7 @@ private[spark] class Executor(

// Create our ClassLoader
// do this after SparkEnv creation so can access the SecurityManager
private val urlClassLoader = createClassLoader()
protected val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)

// Set the classloader for serializer
Expand Down Expand Up @@ -420,7 +420,7 @@ private[spark] class Executor(
* Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes
* created by the interpreter to the search path
*/
protected def createClassLoader(): MutableURLClassLoader = {
private def createClassLoader(): MutableURLClassLoader = {
// Bootstrap the list of jars with the user class path.
val now = System.currentTimeMillis()
userClassPath.foreach { url =>
Expand Down Expand Up @@ -471,7 +471,7 @@ private[spark] class Executor(
* Download any missing dependencies if we receive a new set of files and JARs from the
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
protected def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
synchronized {
// Fetch missing dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler

import collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ private[spark] class BlockManagerInfo(
def blocks: JHashMap[BlockId, BlockStatus] = _blocks

// This does not include broadcast blocks.
def cachedBlocks: collection.Set[BlockId] = _cachedBlocks
def cachedBlocks: scala.collection.Set[BlockId] = _cachedBlocks

override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem

Expand Down