Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,8 @@ private[spark] class BlockManager(
// Need to compute the block.
}
// Initially we hold no locks on this block.
doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true,
localCompute = true) match {
case None =>
// doPut() didn't hand work back to us, so the block already existed or was successfully
// stored. Therefore, we now hold a read lock on the block.
Expand Down Expand Up @@ -1310,11 +1311,17 @@ private[spark] class BlockManager(
level: StorageLevel,
classTag: ClassTag[_],
tellMaster: Boolean,
keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
keepReadLock: Boolean,
localCompute: Boolean = false)(putBody: BlockInfo => Option[T]): Option[T] = {

require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
if (isDecommissioning()) {
// Allow blocks to be put by local computation. This is only really needed in testing since
// doPut is called before the iterator (e.g. any useful work) starts being computed.
// but to make our tests reliable we want to avoid the race condition where
// the task has started, then the worker is decommissioned, and then the iterator
// starts being computed.
if (!localCompute && isDecommissioning()) {
throw new BlockSavedOnDecommissionedBlockManagerException(blockId)
}

Expand Down Expand Up @@ -1398,8 +1405,10 @@ private[spark] class BlockManager(
level: StorageLevel,
classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
keepReadLock: Boolean = false,
localCompute: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
doPut(blockId, level, classTag, tellMaster = tellMaster,
keepReadLock = keepReadLock, localCompute = localCompute) { info =>
val startTimeNs = System.nanoTime()
var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
// Size of the block in bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.storage

import java.util.concurrent.Semaphore
import java.util.concurrent.{ConcurrentLinkedQueue, Semaphore}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._

Expand Down Expand Up @@ -87,31 +88,23 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
}

// Listen for the job & block updates
val taskStartSem = new Semaphore(0)
val broadcastSem = new Semaphore(0)
val executorRemovedSem = new Semaphore(0)
val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
val taskEndEvents = new ConcurrentLinkedQueue[SparkListenerTaskEnd]()
val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
sc.addSparkListener(new SparkListener {

def getExecutorIdOfAnySuccessfulTask(): Option[String] =
taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption

sc.addSparkListener(new SparkListener {
override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = {
executorRemovedSem.release()
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
taskStartSem.release()
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskEndEvents.append(taskEnd)
taskEndEvents.add(taskEnd)
}

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
// Once broadcast start landing on the executors we're good to proceed.
// We don't only use task start as it can occur before the work is on the executor.
if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
broadcastSem.release()
}
blocksUpdated.append(blockUpdated)
}
})
Expand All @@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
// Start the computation of RDD - this step will also cache the RDD
val asyncCount = testRdd.countAsync()

// Wait for the job to have started.
taskStartSem.acquire(1)
// Wait for each executor + driver to have it's broadcast info delivered.
broadcastSem.acquire((numExecs + 1))

// Make sure the job is either mid run or otherwise has data to migrate.
if (migrateDuring) {
// Give Spark a tiny bit to start executing after the broadcast blocks land.
// For me this works at 100, set to 300 for system variance.
Thread.sleep(300)
// Wait for one of the tasks to succeed and finish writing its blocks.
// This way we know that this executor had real data to migrate when it is subsequently
// decommissioned below.
eventually(timeout(3.seconds), interval(10.milliseconds)) {
assert(getExecutorIdOfAnySuccessfulTask().isDefined)
}
} else {
ThreadUtils.awaitResult(asyncCount, 15.seconds)
}

// Decommission one of the executors.
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
assert(execs.size == numExecs, s"Expected ${numExecs} executors but found ${execs.size}")

val execToDecommission = execs.head
logDebug(s"Decommissioning executor ${execToDecommission}")
// When `migrateDuring=true`, we have already waited for a successful task.
// When `migrateDuring=false`, then we know the job is finished, so there
// must be a successful executor. Thus the '.get' should succeed in either case.
val execToDecommission = getExecutorIdOfAnySuccessfulTask().get
logInfo(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo("", false))

// Wait for job to finish.
Expand All @@ -157,11 +149,11 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
sc.listenerBus.waitUntilEmpty()
if (shuffle) {
// mappers & reducers which succeeded
assert(taskEndEvents.count(_.reason == Success) === 2 * numParts,
assert(taskEndEvents.asScala.count(_.reason == Success) === 2 * numParts,
s"Expected ${2 * numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})")
} else {
// only mappers which executed successfully
assert(taskEndEvents.count(_.reason == Success) === numParts,
assert(taskEndEvents.asScala.count(_.reason == Success) === numParts,
s"Expected ${numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})")
}

Expand Down