Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
/** Stop the cleaner. */
def stop() {
stopped = true
cleaningThread.interrupt()
}

/** Register a RDD for cleanup when it is garbage collected. */
Expand Down Expand Up @@ -119,8 +118,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}
} catch {
case ie: InterruptedException =>
if (!stopped) logWarning("Cleaning thread interrupted")
case t: Throwable => logError("Error in cleaning thread", t)
}
}
Expand Down
30 changes: 17 additions & 13 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,18 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
* (driver and worker) use different HashMap to store its metadata.
*/
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging {

private val timeout = AkkaUtils.askTimeout(conf)

/** Set to the MapOutputTrackerActor living on the driver */
/** Set to the MapOutputTrackerActor living on the driver. */
var trackerActor: ActorRef = _

/** This HashMap needs to have different storage behavior for driver and worker */
/**
* This HashMap has different behavior for the master and the workers.
*
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
* master's corresponding HashMap.
*/
protected val mapStatuses: Map[Int, Array[MapStatus]]

/**
Expand All @@ -87,7 +92,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
protected var epoch: Long = 0
protected val epochLock = new AnyRef

/** Remembers which map output locations are currently being fetched on a worker */
/** Remembers which map output locations are currently being fetched on a worker. */
private val fetching = new HashSet[Int]

/**
Expand Down Expand Up @@ -173,7 +178,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
}

/** Called to get current epoch number */
/** Called to get current epoch number. */
def getEpoch: Long = {
epochLock.synchronized {
return epoch
Expand All @@ -195,16 +200,13 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
}

/** Unregister shuffle data */
/** Unregister shuffle data. */
def unregisterShuffle(shuffleId: Int) {
mapStatuses.remove(shuffleId)
}

def stop() {
sendTracker(StopMapOutputTracker)
mapStatuses.clear()
trackerActor = null
}
/** Stop the tracker. */
def stop() { }
}

/**
Expand All @@ -219,7 +221,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)

/**
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the master,
* so that statuses are dropped only by explicit deregistering or by TTL-based cleaning (if set).
* so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
* Other than these two scenarios, nothing should be dropped from this HashMap.
*/
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
Expand Down Expand Up @@ -314,7 +316,9 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
}

override def stop() {
super.stop()
sendTracker(StopMapOutputTracker)
mapStatuses.clear()
trackerActor = null
metadataCleaner.cancel()
cachedSerializedStatuses.clear()
}
Expand Down
15 changes: 10 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
Expand Down Expand Up @@ -227,8 +228,12 @@ class SparkContext(
@volatile private[spark] var dagScheduler = new DAGScheduler(this)
dagScheduler.start()

private[spark] val cleaner = new ContextCleaner(this)
cleaner.start()
private[spark] val cleaner: Option[ContextCleaner] =
if (conf.getBoolean("spark.cleaner.automatic", true)) {
Some(new ContextCleaner(this))
} else None

cleaner.foreach(_.start())

postEnvironmentUpdate()

Expand Down Expand Up @@ -643,9 +648,9 @@ class SparkContext(
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
*/
def broadcast[T](value: T) = {
def broadcast[T](value: T): Broadcast[T] = {
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
cleaner.registerBroadcastForCleanup(bc)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
}

Expand Down Expand Up @@ -840,7 +845,7 @@ class SparkContext(
dagScheduler = null
if (dagSchedulerCopy != null) {
metadataCleaner.cancel()
cleaner.stop()
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
listenerBus.stop()
taskScheduler = null
Expand Down
15 changes: 10 additions & 5 deletions core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,21 @@ abstract class Broadcast[T](val id: Long) extends Serializable {
def value: T

/**
* Remove all persisted state associated with this broadcast on the executors. The next use
* of this broadcast on the executors will trigger a remote fetch.
* Delete cached copies of this broadcast on the executors. If the broadcast is used after
* this is called, it will need to be re-sent to each executor.
*/
def unpersist()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unpersist() should get the same treatment as destroy() .. .that is ... unpersist() checks for validity and then calls doUnpersist(). All public methods of broadcast should fail fast if broadcast variable is invalidated.


/**
* Remove all persisted state associated with this broadcast on both the executors and the
* driver. Overriding implementations should set isValid to false.
* Remove all persisted state associated with this broadcast on both the executors and
* the driver.
*/
private[spark] def destroy()
private[spark] def destroy() {
_isValid = false
onDestroy()
}

protected def onDestroy()

/**
* If this broadcast is no longer valid, throw an exception.
Expand Down
12 changes: 2 additions & 10 deletions core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
HttpBroadcast.unpersist(id, removeFromDriver = false)
}

/**
* Remove all persisted state associated with this HTTP Broadcast on both the executors
* and the driver.
*/
private[spark] def destroy() {
_isValid = false
protected def onDestroy() {
HttpBroadcast.unpersist(id, removeFromDriver = true)
}

Expand Down Expand Up @@ -91,7 +86,6 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea

private[spark] object HttpBroadcast extends Logging {
private var initialized = false

private var broadcastDir: File = null
private var compress: Boolean = false
private var bufferSize: Int = 65536
Expand All @@ -101,11 +95,9 @@ private[spark] object HttpBroadcast extends Logging {

// TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist
private val files = new TimeStampedHashSet[String]
private var cleaner: MetadataCleaner = null

private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt

private var compressionCodec: CompressionCodec = null
private var cleaner: MetadataCleaner = null

def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
TorrentBroadcast.unpersist(id, removeFromDriver = false)
}

/**
* Remove all persisted state associated with this Torrent broadcast on both the executors
* and the driver.
*/
private[spark] def destroy() {
_isValid = false
protected def onDestroy() {
TorrentBroadcast.unpersist(id, removeFromDriver = true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
*
* THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
* threads, so it needs locks in public API methods to maintain its state. In addition, some
* SchedulerBackends sycnchronize on themselves when they want to send events here, and then
* SchedulerBackends synchronize on themselves when they want to send events here, and then
* acquire a lock on us, so we need to make sure that we don't try to lock the backend while
* we are holding a lock on ourselves.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,12 +829,12 @@ private[spark] class BlockManager(
/**
* Remove all blocks belonging to the given broadcast.
*/
def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) {
def removeBroadcast(broadcastId: Long, tellMaster: Boolean) {
logInfo("Removing broadcast " + broadcastId)
val blocksToRemove = blockInfo.keys.collect {
case bid @ BroadcastBlockId(`broadcastId`, _) => bid
}
blocksToRemove.foreach { blockId => removeBlock(blockId, removeFromDriver) }
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,20 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
/** Remove all blocks belonging to the given RDD. */
def removeRdd(rddId: Int, blocking: Boolean) {
val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
future onFailure {
future.onFailure {
case e: Throwable => logError("Failed to remove RDD " + rddId, e)
}
if (blocking) {
Await.result(future, timeout)
}
}

/** Remove all blocks belonging to the given shuffle. */
/** Remove all blocks belonging to the given shuffle asynchronously. */
def removeShuffle(shuffleId: Int) {
askDriverWithReply(RemoveShuffle(shuffleId))
}

/** Remove all blocks belonging to the given broadcast. */
/** Remove all blocks belonging to the given broadcast asynchronously. */
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean) {
askDriverWithReply(RemoveBroadcast(broadcastId, removeFromMaster))
}
Expand All @@ -142,7 +142,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
}

/**
* Return the block's status on all block managers, if any.
* Return the block's status on all block managers, if any. This can potentially be an
* expensive operation and is used mainly for testing.
*
* If askSlaves is true, this invokes the master to query each block manager for the most
* updated block statuses. This is useful when the master is not informed of the given block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
*/
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) {
// TODO: Consolidate usages of <driver>
val removeMsg = RemoveBroadcast(broadcastId)
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
blockManagerInfo.values
.filter { info => removeFromDriver || info.blockManagerId.executorId != "<driver>" }
.foreach { bm => bm.slaveActor ! removeMsg }
Expand Down Expand Up @@ -255,7 +255,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}

/**
* Return the block's status for all block managers, if any.
* Return the block's status for all block managers, if any. This can potentially be an
* expensive operation and is used mainly for testing.
*
* If askSlaves is true, the master queries each block manager for the most updated block
* statuses. This is useful when the master is not informed of the given block by all block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.storage

import scala.concurrent.Future

import akka.actor.Actor

import org.apache.spark.MapOutputTracker
import org.apache.spark.{Logging, MapOutputTracker}
import org.apache.spark.storage.BlockManagerMessages._

/**
Expand All @@ -30,25 +32,40 @@ private[storage]
class BlockManagerSlaveActor(
blockManager: BlockManager,
mapOutputTracker: MapOutputTracker)
extends Actor {
extends Actor with Logging {

override def receive = {
import context.dispatcher

// Operations that involve removing blocks may be slow and should be done asynchronously
override def receive = {
case RemoveBlock(blockId) =>
blockManager.removeBlock(blockId)
val removeBlock = Future { blockManager.removeBlock(blockId) }
removeBlock.onFailure { case t: Throwable =>
logError("Error in removing block " + blockId, t)
}

case RemoveRdd(rddId) =>
val numBlocksRemoved = blockManager.removeRdd(rddId)
sender ! numBlocksRemoved
val removeRdd = Future { sender ! blockManager.removeRdd(rddId) }
removeRdd.onFailure { case t: Throwable =>
logError("Error in removing RDD " + rddId, t)
}

case RemoveShuffle(shuffleId) =>
blockManager.shuffleBlockManager.removeShuffle(shuffleId)
if (mapOutputTracker != null) {
mapOutputTracker.unregisterShuffle(shuffleId)
val removeShuffle = Future {
blockManager.shuffleBlockManager.removeShuffle(shuffleId)
if (mapOutputTracker != null) {
mapOutputTracker.unregisterShuffle(shuffleId)
}
}
removeShuffle.onFailure { case t: Throwable =>
logError("Error in removing shuffle " + shuffleId, t)
}

case RemoveBroadcast(broadcastId, removeFromDriver) =>
blockManager.removeBroadcast(broadcastId, removeFromDriver)
case RemoveBroadcast(broadcastId, tellMaster) =>
val removeBroadcast = Future { blockManager.removeBroadcast(broadcastId, tellMaster) }
removeBroadcast.onFailure { case t: Throwable =>
logError("Error in removing broadcast " + broadcastId, t)
}

case GetBlockStatus(blockId, _) =>
sender ! blockManager.getStatus(blockId)
Expand Down