From 54c62984942a4afbf15264e42371ef3e00aa8ff2 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 21 Jul 2020 11:44:28 -0700 Subject: [PATCH 01/20] Shutdown executor once we are done decommissioning Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads --- .../apache/spark/deploy/DeployMessage.scala | 2 - .../apache/spark/deploy/worker/Worker.scala | 2 +- .../CoarseGrainedExecutorBackend.scala | 53 +++++++++++- .../cluster/CoarseGrainedClusterMessage.scala | 3 + .../CoarseGrainedSchedulerBackend.scala | 9 ++ .../apache/spark/storage/BlockManager.scala | 8 ++ .../storage/BlockManagerDecommissioner.scala | 85 ++++++++++++++++--- ...kManagerDecommissionIntegrationSuite.scala | 7 +- .../BlockManagerDecommissionUnitSuite.scala | 12 ++- 9 files changed, 153 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c8c6e5a192a2..b7a64d75a8d4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -165,8 +165,6 @@ private[deploy] object DeployMessages { case object ReregisterWithMaster // used when a worker attempts to reconnect to a master - case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future. - // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index aa8c46fc6831..862e685c2dce 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -668,7 +668,7 @@ private[deploy] class Worker( finishedApps += id maybeCleanupApplication(id) - case DecommissionSelf => + case WorkerDecommission(_, _) => decommissionSelf() } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index def125bb6bfb..0072832bd1d4 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -64,7 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend( private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null - @volatile private var decommissioned = false @volatile var driver: Option[RpcEndpointRef] = None // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need @@ -80,6 +79,9 @@ private[spark] class CoarseGrainedExecutorBackend( */ private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]] + // Track our decommissioning status internally. + @volatile private var decommissioned = false + override def onStart(): Unit = { logInfo("Registering PWR handler.") SignalUtils.register("PWR", "Failed to register SIGPWR handler - " + @@ -214,6 +216,10 @@ private[spark] class CoarseGrainedExecutorBackend( case UpdateDelegationTokens(tokenBytes) => logInfo(s"Received tokens of ${tokenBytes.length} bytes") SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) + + case DecommissionSelf => + logInfo("Received decommission self") + decommissionSelf() } override def onDisconnected(remoteAddress: RpcAddress): Unit = { @@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend( if (executor != null) { executor.decommission() } - logInfo("Done decommissioning self.") + // Shutdown the executor once all tasks are gone & any configured migrations completed. + // Detecting migrations completion doesn't need to be perfect and we want to minimize the + // overhead for executors that are not in decommissioning state as overall that will be + // more of the executors. For example, this will not catch a block which is already in + // the process of being put from a remote executor before migration starts. This trade-off + // is viewed as acceptable to minimize introduction of any new locking structures in critical + // code paths. + + val shutdownThread = new Thread() { + var lastTaskRunningTime = System.nanoTime() + val sleep_time = 1000 // 1s + + while (true) { + logInfo("Checking to see if we can shutdown.") + if (executor == null || executor.numRunningTasks == 0) { + if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { + logInfo("No running tasks, checking migrations") + val allBlocksMigrated = env.blockManager.lastMigrationInfo() + // We can only trust allBlocksMigrated boolean value if there were no tasks running + // since the start of computing it. + if (allBlocksMigrated._2 && + (allBlocksMigrated._1 > lastTaskRunningTime)) { + logInfo("No running tasks, all blocks migrated, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) + } else { + logInfo("All blocks not yet migrated.") + } + } else { + logInfo("No running tasks, no block migration configured, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) + } + Thread.sleep(sleep_time) + } else { + logInfo("Blocked from shutdown by running task") + // If there is a running task it could store blocks, so make sure we wait for a + // migration loop to complete after the last task is done. + Thread.sleep(sleep_time) + lastTaskRunningTime = System.nanoTime() + } + } + } + logInfo("Will exit when finished decommissioning") // Return true since we are handling a signal true } catch { case e: Exception => - logError(s"Error ${e} during attempt to decommission self") + logError("Unexpected error while decommissioning self", e) false } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 91485f01bf00..49b859817fb3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages { // The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not. case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage + + // Used to ask an executor to decommission it's self. + case object DecommissionSelf extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8fbefae58af1..e05dd823c88a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -442,6 +442,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case e: Exception => logError(s"Unexpected error during decommissioning ${e.toString}", e) } + // Send decommission message to the executor (it could have originated on the executor + // but not necessarily. + executorDataMap.get(executorId) match { + case Some(executorInfo) => + executorInfo.executorEndpoint.send(DecommissionSelf) + case None => + // Ignoring the executor since it is not registered. + logWarning(s"Attempted to decommission unknown executor $executorId.") + } logInfo(s"Finished decommissioning executor $executorId.") if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 47af854b6e8f..56e64ef8a765 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1822,6 +1822,14 @@ private[spark] class BlockManager( } } + /* + * Returns the last migration time and a boolean for if all blocks have been migrated. + * If there are any tasks running since that time the boolean may be incorrect. + */ + private[spark] def lastMigrationInfo(): (Long, Boolean) = { + decommissioner.map(_.lastMigrationInfo()).getOrElse((0, false)) + } + private[storage] def getMigratableRDDBlocks(): Seq[ReplicateBlock] = master.getReplicateInfoForRDDBlocks(blockManagerId) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 1cc7ef6a25f9..00cb22e56bf5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -18,6 +18,7 @@ package org.apache.spark.storage import java.util.concurrent.ExecutorService +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable @@ -41,6 +42,13 @@ private[storage] class BlockManagerDecommissioner( private val maxReplicationFailuresForDecommission = conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) + // Used for tracking if our migrations are complete. + @volatile private var lastRDDMigrationTime: Long = 0 + @volatile private var lastShuffleMigrationTime: Long = 0 + @volatile private var rddBlocksLeft: Boolean = true + @volatile private var shuffleBlocksLeft: Boolean = true + + /** * This runnable consumes any shuffle blocks in the queue for migration. This part of a * producer/consumer where the main migration loop updates the queue of blocks to be migrated @@ -91,10 +99,12 @@ private[storage] class BlockManagerDecommissioner( null)// class tag, we don't need for shuffle logDebug(s"Migrated sub block ${blockId}") } - logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}") + logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}") } else { logError(s"Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}") } + logInfo(s"Migrated ${shuffleBlockInfo}") + numMigratedShuffles.incrementAndGet() } } // This catch is intentionally outside of the while running block. @@ -115,12 +125,21 @@ private[storage] class BlockManagerDecommissioner( // Shuffles which are either in queue for migrations or migrated private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]() + // Shuffles which have migrated. This used to know when we are "done", being done can change + // if a new shuffle file is created by a running task. + private val numMigratedShuffles = new AtomicInteger(0) + + + // Shuffles which are queued for migration & number of retries so far. + // Visible in storage for testing. private[storage] val shufflesToMigrate = new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]() // Set if we encounter an error attempting to migrate and stop. @volatile private var stopped = false + @volatile private var stoppedRDD = false + @volatile private var stoppedShuffle = false private val migrationPeers = mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]() @@ -133,22 +152,24 @@ private[storage] class BlockManagerDecommissioner( override def run(): Unit = { assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) - while (!stopped && !Thread.interrupted()) { + while (!stopped && !stoppedRDD && !Thread.interrupted()) { logInfo("Iterating on migrating from the block manager.") try { + val startTime = System.nanoTime() logDebug("Attempting to replicate all cached RDD blocks") - decommissionRddCacheBlocks() + rddBlocksLeft = decommissionRddCacheBlocks() + lastRDDMigrationTime = startTime logInfo("Attempt to replicate all cached blocks done") logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.") Thread.sleep(sleepInterval) } catch { case e: InterruptedException => - logInfo("Interrupted during migration, will not refresh migrations.") - stopped = true + logInfo("Interrupted during RDD migration, stopping") + stoppedRDD = true case NonFatal(e) => - logError("Error occurred while trying to replicate for block manager decommissioning.", + logError("Error occurred replicating RDD for block manager decommissioning.", e) - stopped = true + stoppedRDD = true } } } @@ -162,20 +183,22 @@ private[storage] class BlockManagerDecommissioner( override def run() { assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) - while (!stopped && !Thread.interrupted()) { + while (!stopped && !stoppedShuffle && !Thread.interrupted()) { try { logDebug("Attempting to replicate all shuffle blocks") - refreshOffloadingShuffleBlocks() + val startTime = System.nanoTime() + shuffleBlocksLeft = refreshOffloadingShuffleBlocks() + lastShuffleMigrationTime = startTime logInfo("Done starting workers to migrate shuffle blocks") Thread.sleep(sleepInterval) } catch { case e: InterruptedException => logInfo("Interrupted during migration, will not refresh migrations.") - stopped = true + stoppedShuffle = true case NonFatal(e) => logError("Error occurred while trying to replicate for block manager decommissioning.", e) - stopped = true + stoppedShuffle = true } } } @@ -191,8 +214,9 @@ private[storage] class BlockManagerDecommissioner( * but rather shadows them. * Requires an Indexed based shuffle resolver. * Note: if called in testing please call stopOffloadingShuffleBlocks to avoid thread leakage. + * Returns true if we are not done migrating shuffle blocks. */ - private[storage] def refreshOffloadingShuffleBlocks(): Unit = { + private[storage] def refreshOffloadingShuffleBlocks(): Boolean = { // Update the queue of shuffles to be migrated logInfo("Offloading shuffle blocks") val localShuffles = bm.migratableResolver.getStoredShuffles().toSet @@ -215,6 +239,8 @@ private[storage] class BlockManagerDecommissioner( deadPeers.foreach { peer => migrationPeers.get(peer).foreach(_.running = false) } + // If we found any new shuffles to migrate or otherwise have not migrated everything. + newShufflesToMigrate.nonEmpty || migratingShuffles.size < numMigratedShuffles.get() } /** @@ -231,8 +257,9 @@ private[storage] class BlockManagerDecommissioner( /** * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers * Visible for testing + * Returns true if we have not migrated all of our RDD blocks. */ - private[storage] def decommissionRddCacheBlocks(): Unit = { + private[storage] def decommissionRddCacheBlocks(): Boolean = { val replicateBlocksInfo = bm.getMigratableRDDBlocks() if (replicateBlocksInfo.nonEmpty) { @@ -240,7 +267,7 @@ private[storage] class BlockManagerDecommissioner( "for block manager decommissioning") } else { logWarning(s"Asked to decommission RDD cache blocks, but no blocks to migrate") - return + return false } // TODO: We can sort these blocks based on some policy (LRU/blockSize etc) @@ -252,7 +279,9 @@ private[storage] class BlockManagerDecommissioner( if (blocksFailedReplication.nonEmpty) { logWarning("Blocks failed replication in cache decommissioning " + s"process: ${blocksFailedReplication.mkString(",")}") + return true } + return false } private def migrateBlock(blockToReplicate: ReplicateBlock): Boolean = { @@ -275,9 +304,13 @@ private[storage] class BlockManagerDecommissioner( logInfo("Starting block migration thread") if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { rddBlockMigrationExecutor.submit(rddBlockMigrationRunnable) + } else { + stoppedRDD = true } if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) { shuffleBlockMigrationRefreshExecutor.submit(shuffleBlockMigrationRefreshRunnable) + } else { + stoppedShuffle = true } if (!conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) && !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { @@ -327,4 +360,28 @@ private[storage] class BlockManagerDecommissioner( } logInfo("Stopped storage decommissioner") } + + /* + * Returns the last migration time and a boolean for if all blocks have been migrated. + * If there are any tasks running since that time the boolean may be incorrect. + */ + private[storage] def lastMigrationInfo(): (Long, Boolean) = { + if (stopped || (stoppedRDD && stoppedShuffle)) { + (System.nanoTime(), true) + } else { + + val lastMigrationTime = if ( + conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) && + conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { + Math.min(lastRDDMigrationTime, lastShuffleMigrationTime) + } else if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) { + lastShuffleMigrationTime + } else { + lastRDDMigrationTime + } + + val blocksMigrated = (!shuffleBlocksLeft || stoppedShuffle) && (!rddBlocksLeft || stoppedRDD) + (lastMigrationTime, blocksMigrated) + } + } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 6a52f72938c6..d2036af0c65a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -71,7 +71,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle) // Just replicate blocks as fast as we can during testing, there isn't another // workload we need to worry about. - .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L) + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) if (whenToDecom == TaskStarted) { // We are using accumulators below, make sure those are reported frequently. @@ -273,10 +273,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts) } - // Make the executor we decommissioned exit - sched.client.killExecutors(List(execToDecommission)) - - // Wait for the executor to be removed + // Wait for the executor to be removed automatically after migration. executorRemovedSem.acquire(1) // Since the RDD is cached or shuffled so further usage of same RDD should use the diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala index 41b68d5978d1..91632f21e333 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import scala.concurrent.duration._ import org.mockito.{ArgumentMatchers => mc} -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.Mockito.{atLeast => least, mock, times, verify, when} import org.scalatest.concurrent.Eventually._ import org.scalatest.matchers.must.Matchers @@ -38,6 +38,9 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { private val sparkConf = new SparkConf(false) .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true) .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, true) + // Just replicate blocks as fast as we can during testing, there isn't another + // workload we need to worry about. + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) private def registerShuffleBlocks( mockMigratableShuffleResolver: MigratableResolver, @@ -77,9 +80,10 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { try { bmDecomManager.start() - eventually(timeout(5.second), interval(10.milliseconds)) { + // We don't check that all blocks are migrated because out mock is always returning an RDD. + eventually(timeout(10.second), interval(10.milliseconds)) { assert(bmDecomManager.shufflesToMigrate.isEmpty == true) - verify(bm, times(1)).replicateBlock( + verify(bm, least(1)).replicateBlock( mc.eq(storedBlockId1), mc.any(), mc.any(), mc.eq(Some(3))) verify(blockTransferService, times(2)) .uploadBlockSync(mc.eq("host2"), mc.eq(bmPort), mc.eq("exec2"), mc.any(), mc.any(), @@ -88,5 +92,7 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { } finally { bmDecomManager.stop() } + + bmDecomManager.stop() } } From 8c10f9a3879d8755456c1ab88fc8d94675f61aa4 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 23 Jul 2020 14:51:33 -0700 Subject: [PATCH 02/20] Whitespace cleanup --- .../apache/spark/storage/BlockManagerDecommissioner.scala | 5 +---- .../spark/storage/BlockManagerDecommissionUnitSuite.scala | 1 - 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 00cb22e56bf5..20a6317d75db 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -48,7 +48,6 @@ private[storage] class BlockManagerDecommissioner( @volatile private var rddBlocksLeft: Boolean = true @volatile private var shuffleBlocksLeft: Boolean = true - /** * This runnable consumes any shuffle blocks in the queue for migration. This part of a * producer/consumer where the main migration loop updates the queue of blocks to be migrated @@ -129,8 +128,6 @@ private[storage] class BlockManagerDecommissioner( // if a new shuffle file is created by a running task. private val numMigratedShuffles = new AtomicInteger(0) - - // Shuffles which are queued for migration & number of retries so far. // Visible in storage for testing. private[storage] val shufflesToMigrate = @@ -369,7 +366,7 @@ private[storage] class BlockManagerDecommissioner( if (stopped || (stoppedRDD && stoppedShuffle)) { (System.nanoTime(), true) } else { - + // Chose the min of the running times. val lastMigrationTime = if ( conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) && conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala index 91632f21e333..eb9f2f86c979 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala @@ -92,7 +92,6 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { } finally { bmDecomManager.stop() } - bmDecomManager.stop() } } From 27a5f491c4cc671f6a9e201096efd65d0634370b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 23 Jul 2020 18:16:56 -0700 Subject: [PATCH 03/20] Code review feedback, various cleanups --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 10 ++++------ .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 5 +++-- .../scala/org/apache/spark/storage/BlockManager.scala | 2 +- .../spark/storage/BlockManagerDecommissioner.scala | 11 ++++------- .../storage/BlockManagerDecommissionUnitSuite.scala | 3 +-- 6 files changed, 14 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 0072832bd1d4..cede4edb8acf 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -79,7 +79,6 @@ private[spark] class CoarseGrainedExecutorBackend( */ private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]] - // Track our decommissioning status internally. @volatile private var decommissioned = false override def onStart(): Unit = { @@ -291,7 +290,7 @@ private[spark] class CoarseGrainedExecutorBackend( // is viewed as acceptable to minimize introduction of any new locking structures in critical // code paths. - val shutdownThread = new Thread() { + val shutdownThread = ThreadUtils.runInNewThread("wait-for-blocks-to-migrate") { var lastTaskRunningTime = System.nanoTime() val sleep_time = 1000 // 1s @@ -300,11 +299,10 @@ private[spark] class CoarseGrainedExecutorBackend( if (executor == null || executor.numRunningTasks == 0) { if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { logInfo("No running tasks, checking migrations") - val allBlocksMigrated = env.blockManager.lastMigrationInfo() + val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo() // We can only trust allBlocksMigrated boolean value if there were no tasks running // since the start of computing it. - if (allBlocksMigrated._2 && - (allBlocksMigrated._1 > lastTaskRunningTime)) { + if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) { logInfo("No running tasks, all blocks migrated, stopping.") exitExecutor(0, "Finished decommissioning", notifyDriver = true) } else { @@ -316,7 +314,7 @@ private[spark] class CoarseGrainedExecutorBackend( } Thread.sleep(sleep_time) } else { - logInfo("Blocked from shutdown by running task") + logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks") // If there is a running task it could store blocks, so make sure we wait for a // migration loop to complete after the last task is done. Thread.sleep(sleep_time) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 49b859817fb3..7242ab778606 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -137,6 +137,6 @@ private[spark] object CoarseGrainedClusterMessages { // The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not. case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage - // Used to ask an executor to decommission it's self. + // Used to ask an executor to decommission itself. (Can be an internal message) case object DecommissionSelf extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e05dd823c88a..d81a617d0ed7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -442,8 +442,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case e: Exception => logError(s"Unexpected error during decommissioning ${e.toString}", e) } - // Send decommission message to the executor (it could have originated on the executor - // but not necessarily. + // Send decommission message to the executor, this may be a duplicate since the executor + // could have been the one to notify us. But it's also possible the notification came from + // elsewhere and the executor does not yet know. executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.executorEndpoint.send(DecommissionSelf) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 56e64ef8a765..6ec93df67f7d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1823,7 +1823,7 @@ private[spark] class BlockManager( } /* - * Returns the last migration time and a boolean for if all blocks have been migrated. + * Returns the last migration time and a boolean denoting if all the blocks have been migrated. * If there are any tasks running since that time the boolean may be incorrect. */ private[spark] def lastMigrationInfo(): (Long, Boolean) = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 20a6317d75db..9babde01a4e3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -102,7 +102,6 @@ private[storage] class BlockManagerDecommissioner( } else { logError(s"Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}") } - logInfo(s"Migrated ${shuffleBlockInfo}") numMigratedShuffles.incrementAndGet() } } @@ -135,8 +134,10 @@ private[storage] class BlockManagerDecommissioner( // Set if we encounter an error attempting to migrate and stop. @volatile private var stopped = false - @volatile private var stoppedRDD = false - @volatile private var stoppedShuffle = false + @volatile private var stoppedRDD = + !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED) + @volatile private var stoppedShuffle = + !conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) private val migrationPeers = mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]() @@ -301,13 +302,9 @@ private[storage] class BlockManagerDecommissioner( logInfo("Starting block migration thread") if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { rddBlockMigrationExecutor.submit(rddBlockMigrationRunnable) - } else { - stoppedRDD = true } if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) { shuffleBlockMigrationRefreshExecutor.submit(shuffleBlockMigrationRefreshRunnable) - } else { - stoppedShuffle = true } if (!conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) && !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala index eb9f2f86c979..243acc158158 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala @@ -38,7 +38,7 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { private val sparkConf = new SparkConf(false) .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true) .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, true) - // Just replicate blocks as fast as we can during testing, there isn't another + // Just replicate blocks quickly during testing, as there isn't another // workload we need to worry about. .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) @@ -92,6 +92,5 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { } finally { bmDecomManager.stop() } - bmDecomManager.stop() } } From 5d4b52a85143cbabdefce8d1005a08b1980aa035 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 23 Jul 2020 18:34:53 -0700 Subject: [PATCH 04/20] Just use new thread and set daemon since runInNewThread does a join we don't want. --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index cede4edb8acf..527958f0198e 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -290,7 +290,7 @@ private[spark] class CoarseGrainedExecutorBackend( // is viewed as acceptable to minimize introduction of any new locking structures in critical // code paths. - val shutdownThread = ThreadUtils.runInNewThread("wait-for-blocks-to-migrate") { + val shutdownThread = new Thread("wait-for-blocks-to-migrate") { var lastTaskRunningTime = System.nanoTime() val sleep_time = 1000 // 1s @@ -321,7 +321,7 @@ private[spark] class CoarseGrainedExecutorBackend( lastTaskRunningTime = System.nanoTime() } } - } + }.setDaemon(true) logInfo("Will exit when finished decommissioning") // Return true since we are handling a signal true From cdc3336e6c3b87cb1c4da80b4c0ecf1e5b8935aa Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 23 Jul 2020 18:42:53 -0700 Subject: [PATCH 05/20] Update the text since 10 seconds is quick but not as fast as we can --- .../storage/BlockManagerDecommissionIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index d2036af0c65a..6d639ddb49f0 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -69,7 +69,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS .set(config.STORAGE_DECOMMISSION_ENABLED, true) .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist) .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle) - // Just replicate blocks as fast as we can during testing, there isn't another + // Just replicate blocks quickly during testing, there isn't another // workload we need to worry about. .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) From a17c4429f59ca6abf48b1fc69ba86201d2368d82 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 23 Jul 2020 19:12:51 -0700 Subject: [PATCH 06/20] rename WorkerDecommission message to DecommissionWorker. (CR feedback) --- .../main/scala/org/apache/spark/deploy/DeployMessage.scala | 2 +- .../main/scala/org/apache/spark/deploy/master/Master.scala | 6 +++--- .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index b7a64d75a8d4..ec05496c9986 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -64,7 +64,7 @@ private[deploy] object DeployMessages { * @param id the worker id * @param worker the worker endpoint ref */ - case class WorkerDecommission( + case class DecommissionWorker( id: String, worker: RpcEndpointRef) extends DeployMessage diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 220e1c963d5e..74c77468a19a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -245,7 +245,7 @@ private[deploy] class Master( logError("Leadership has been revoked -- master shutting down.") System.exit(0) - case WorkerDecommission(id, workerRef) => + case DecommissionWorker(id, workerRef) => logInfo("Recording worker %s decommissioning".format(id)) if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) @@ -874,7 +874,7 @@ private[deploy] class Master( /** * Decommission all workers that are active on any of the given hostnames. The decommissioning is - * asynchronously done by enqueueing WorkerDecommission messages to self. No checks are done about + * asynchronously done by enqueueing DecommissionWorker messages to self. No checks are done about * the prior state of the worker. So an already decommissioned worker will match as well. * * @param hostnames: A list of hostnames without the ports. Like "localhost", "foo.bar.com" etc @@ -893,7 +893,7 @@ private[deploy] class Master( // The workers are removed async to avoid blocking the receive loop for the entire batch workersToRemove.foreach(wi => { logInfo(s"Sending the worker decommission to ${wi.id} and ${wi.endpoint}") - self.send(WorkerDecommission(wi.id, wi.endpoint)) + self.send(DecommissionWorker(wi.id, wi.endpoint)) }) // Return the count of workers actually removed diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 862e685c2dce..36bac0835ac7 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -668,7 +668,7 @@ private[deploy] class Worker( finishedApps += id maybeCleanupApplication(id) - case WorkerDecommission(_, _) => + case DecommissionWorker(_, _) => decommissionSelf() } @@ -772,7 +772,7 @@ private[deploy] class Worker( if (conf.get(WORKER_DECOMMISSION_ENABLED)) { logDebug("Decommissioning self") decommissioned = true - sendToMaster(WorkerDecommission(workerId, self)) + sendToMaster(DecommissionWorker(workerId, self)) } else { logWarning("Asked to decommission self, but decommissioning not enabled") } From 2bbd5d1c78f6c05bbb9da4ab1b11e69d6fec59d6 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jul 2020 11:42:08 -0700 Subject: [PATCH 07/20] Revert "rename WorkerDecommission message to DecommissionWorker. (CR feedback)" This reverts commit 50b9cb2495d59ae117861971d3181b077ef7afac. --- .../main/scala/org/apache/spark/deploy/DeployMessage.scala | 2 +- .../main/scala/org/apache/spark/deploy/master/Master.scala | 6 +++--- .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index ec05496c9986..b7a64d75a8d4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -64,7 +64,7 @@ private[deploy] object DeployMessages { * @param id the worker id * @param worker the worker endpoint ref */ - case class DecommissionWorker( + case class WorkerDecommission( id: String, worker: RpcEndpointRef) extends DeployMessage diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 74c77468a19a..220e1c963d5e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -245,7 +245,7 @@ private[deploy] class Master( logError("Leadership has been revoked -- master shutting down.") System.exit(0) - case DecommissionWorker(id, workerRef) => + case WorkerDecommission(id, workerRef) => logInfo("Recording worker %s decommissioning".format(id)) if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) @@ -874,7 +874,7 @@ private[deploy] class Master( /** * Decommission all workers that are active on any of the given hostnames. The decommissioning is - * asynchronously done by enqueueing DecommissionWorker messages to self. No checks are done about + * asynchronously done by enqueueing WorkerDecommission messages to self. No checks are done about * the prior state of the worker. So an already decommissioned worker will match as well. * * @param hostnames: A list of hostnames without the ports. Like "localhost", "foo.bar.com" etc @@ -893,7 +893,7 @@ private[deploy] class Master( // The workers are removed async to avoid blocking the receive loop for the entire batch workersToRemove.foreach(wi => { logInfo(s"Sending the worker decommission to ${wi.id} and ${wi.endpoint}") - self.send(DecommissionWorker(wi.id, wi.endpoint)) + self.send(WorkerDecommission(wi.id, wi.endpoint)) }) // Return the count of workers actually removed diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 36bac0835ac7..862e685c2dce 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -668,7 +668,7 @@ private[deploy] class Worker( finishedApps += id maybeCleanupApplication(id) - case DecommissionWorker(_, _) => + case WorkerDecommission(_, _) => decommissionSelf() } @@ -772,7 +772,7 @@ private[deploy] class Worker( if (conf.get(WORKER_DECOMMISSION_ENABLED)) { logDebug("Decommissioning self") decommissioned = true - sendToMaster(DecommissionWorker(workerId, self)) + sendToMaster(WorkerDecommission(workerId, self)) } else { logWarning("Asked to decommission self, but decommissioning not enabled") } From 016029968490b689afc3ca45a6e69dea2da036a1 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 27 Jul 2020 11:43:52 -0700 Subject: [PATCH 08/20] Fix it so we don't block the decom message --- .../CoarseGrainedExecutorBackend.scala | 59 ++++++++++--------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 527958f0198e..646c2a13b6f6 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -291,40 +291,41 @@ private[spark] class CoarseGrainedExecutorBackend( // code paths. val shutdownThread = new Thread("wait-for-blocks-to-migrate") { - var lastTaskRunningTime = System.nanoTime() - val sleep_time = 1000 // 1s - - while (true) { - logInfo("Checking to see if we can shutdown.") - if (executor == null || executor.numRunningTasks == 0) { - if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { - logInfo("No running tasks, checking migrations") - val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo() - // We can only trust allBlocksMigrated boolean value if there were no tasks running - // since the start of computing it. - if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) { - logInfo("No running tasks, all blocks migrated, stopping.") - exitExecutor(0, "Finished decommissioning", notifyDriver = true) + override def run(): Unit = { + var lastTaskRunningTime = System.nanoTime() + val sleep_time = 1000 // 1s + + while (true) { + logInfo("Checking to see if we can shutdown.") + if (executor == null || executor.numRunningTasks == 0) { + if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { + logInfo("No running tasks, checking migrations") + val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo() + // We can only trust allBlocksMigrated boolean value if there were no tasks running + // since the start of computing it. + if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) { + logInfo("No running tasks, all blocks migrated, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) + } else { + logInfo("All blocks not yet migrated.") + } } else { - logInfo("All blocks not yet migrated.") + logInfo("No running tasks, no block migration configured, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) } + Thread.sleep(sleep_time) } else { - logInfo("No running tasks, no block migration configured, stopping.") - exitExecutor(0, "Finished decommissioning", notifyDriver = true) + logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks") + // If there is a running task it could store blocks, so make sure we wait for a + // migration loop to complete after the last task is done. + Thread.sleep(sleep_time) + lastTaskRunningTime = System.nanoTime() } - Thread.sleep(sleep_time) - } else { - logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks") - // If there is a running task it could store blocks, so make sure we wait for a - // migration loop to complete after the last task is done. - Thread.sleep(sleep_time) - lastTaskRunningTime = System.nanoTime() } - } - }.setDaemon(true) - logInfo("Will exit when finished decommissioning") - // Return true since we are handling a signal - true + }.setDaemon(true) + logInfo("Will exit when finished decommissioning") + // Return true since we are handling a signal + true } catch { case e: Exception => logError("Unexpected error while decommissioning self", e) From 284e0cd53d436589e2af184699bf9d6e712bb895 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 27 Jul 2020 11:51:00 -0700 Subject: [PATCH 09/20] Was thinking a little bit, if we move the sleep up a bit we can unify it. The only cost is we _might_ wait an extra one second in the event that storage decom is not enabled and there is no running tasks when decommissioning is first called, but that seems ok --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 646c2a13b6f6..69fea1ee49a3 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -297,6 +297,7 @@ private[spark] class CoarseGrainedExecutorBackend( while (true) { logInfo("Checking to see if we can shutdown.") + Thread.sleep(sleep_time) if (executor == null || executor.numRunningTasks == 0) { if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { logInfo("No running tasks, checking migrations") @@ -313,12 +314,10 @@ private[spark] class CoarseGrainedExecutorBackend( logInfo("No running tasks, no block migration configured, stopping.") exitExecutor(0, "Finished decommissioning", notifyDriver = true) } - Thread.sleep(sleep_time) } else { logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks") // If there is a running task it could store blocks, so make sure we wait for a // migration loop to complete after the last task is done. - Thread.sleep(sleep_time) lastTaskRunningTime = System.nanoTime() } } From 7a157ffcc902db8e0f5bdee0b1795dfc6b730d3b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 28 Jul 2020 12:05:21 -0700 Subject: [PATCH 10/20] Fix --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 69fea1ee49a3..f29e14af1890 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -321,10 +321,13 @@ private[spark] class CoarseGrainedExecutorBackend( lastTaskRunningTime = System.nanoTime() } } - }.setDaemon(true) - logInfo("Will exit when finished decommissioning") - // Return true since we are handling a signal - true + } + }.setDaemon(true) + shutdownThread.start() + + logInfo("Will exit when finished decommissioning") + // Return true since we are handling a signal + true } catch { case e: Exception => logError("Unexpected error while decommissioning self", e) From 640b0efa69bd594d5e51d444fdf09e14f778a6aa Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 28 Jul 2020 12:06:06 -0700 Subject: [PATCH 11/20] Intentionally don't start the tread to verify the executor is being shut down as planned --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index f29e14af1890..c5ab0f0ce19f 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -323,7 +323,7 @@ private[spark] class CoarseGrainedExecutorBackend( } } }.setDaemon(true) - shutdownThread.start() + // shutdownThread.start() logInfo("Will exit when finished decommissioning") // Return true since we are handling a signal From 7e7821b465ff9717f2550a5e47bedd3707991319 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 28 Jul 2020 13:37:05 -0700 Subject: [PATCH 12/20] Revert "Intentionally don't start the tread to verify the executor is being shut down as planned" This reverts commit 285600aeccc5539d1bc892851c567721df8533b1. --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index c5ab0f0ce19f..f29e14af1890 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -323,7 +323,7 @@ private[spark] class CoarseGrainedExecutorBackend( } } }.setDaemon(true) - // shutdownThread.start() + shutdownThread.start() logInfo("Will exit when finished decommissioning") // Return true since we are handling a signal From dcb01fce05b6a155b220e75d4d641ccf54279e7d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 28 Jul 2020 17:50:36 -0700 Subject: [PATCH 13/20] fix thread start --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index f29e14af1890..db58ab3f108b 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -322,7 +322,8 @@ private[spark] class CoarseGrainedExecutorBackend( } } } - }.setDaemon(true) + } + shutdownThread.setDaemon(true) shutdownThread.start() logInfo("Will exit when finished decommissioning") From 060a443d358617c737329e86bca3b9d4eaf77949 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 29 Jul 2020 15:56:35 -0700 Subject: [PATCH 14/20] Use tryAcquire so we don't block forever (CR feedback) --- .../storage/BlockManagerDecommissionIntegrationSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 6d639ddb49f0..2948e49c166e 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore} +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -274,7 +274,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS } // Wait for the executor to be removed automatically after migration. - executorRemovedSem.acquire(1) + assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES)) // Since the RDD is cached or shuffled so further usage of same RDD should use the // cached data. Original RDD partitions should not be recomputed i.e. accum From 20d3cc00ce42dfe8b547b43d97da1309f2437920 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 30 Jul 2020 05:35:03 -0700 Subject: [PATCH 15/20] Since we shut down & then re-launch the executors now as decommissioning (unless we specify not to relaunch) we no longer expect the task to fail. --- .../scheduler/WorkerDecommissionSuite.scala | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 3c34070e8bb9..bb0c33acc0af 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -47,7 +47,12 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { assert(sleepyRdd.count() === 10) } - test("verify a task with all workers decommissioned succeeds") { + test("verify a running task with all workers decommissioned succeeds") { + // Wait for the executors to come up + TestUtils.waitUntilExecutorsUp(sc = sc, + numExecutors = 2, + timeout = 30000) // 30s + val input = sc.parallelize(1 to 10) // Listen for the job val sem = new Semaphore(0) @@ -56,9 +61,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { sem.release() } }) - TestUtils.waitUntilExecutorsUp(sc = sc, - numExecutors = 2, - timeout = 30000) // 30s + val sleepyRdd = input.mapPartitions{ x => Thread.sleep(5000) // 5s x @@ -76,13 +79,5 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false))) val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) - // Try and launch task after decommissioning, this should fail - val postDecommissioned = input.map(x => x) - val postDecomAsyncCount = postDecommissioned.countAsync() - val thrown = intercept[java.util.concurrent.TimeoutException]{ - val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds) - } - assert(postDecomAsyncCount.isCompleted === false, - "After exec decommission new task could not launch") } } From 22eb4c6369c13d4be0fa32415b03b7d55fe84210 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 30 Jul 2020 14:11:34 -0700 Subject: [PATCH 16/20] The executor can also have exited completely and then there is no entry --- .../storage/BlockManagerDecommissionIntegrationSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 2948e49c166e..25145dac5268 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -266,7 +266,9 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val execIdToBlocksMapping = storageStatus.map( status => (status.blockManagerId.executorId, status.blocks)).toMap // No cached blocks should be present on executor which was decommissioned - assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq(), + assert( + !execIdToBlocksMapping.contains(execToDecommission) || + execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq(), "Cache blocks should be migrated") if (persist) { // There should still be all the RDD blocks cached From e139b9f4941d4cbbb0dd0afd61ce58f80dc1ee48 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 31 Jul 2020 12:25:16 -0700 Subject: [PATCH 17/20] Try and write some comments --- .../executor/CoarseGrainedExecutorBackend.scala | 3 +++ .../storage/BlockManagerDecommissioner.scala | 15 +++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index db58ab3f108b..55fb76b3572a 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -318,6 +318,9 @@ private[spark] class CoarseGrainedExecutorBackend( logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks") // If there is a running task it could store blocks, so make sure we wait for a // migration loop to complete after the last task is done. + // Note: this is only advanced if there is a running task, if there + // is no running task but the blocks are not done migrating this does not + // move forward. lastTaskRunningTime = System.nanoTime() } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 9babde01a4e3..0902f2966b13 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -357,23 +357,26 @@ private[storage] class BlockManagerDecommissioner( /* * Returns the last migration time and a boolean for if all blocks have been migrated. - * If there are any tasks running since that time the boolean may be incorrect. + * The last migration time is calculated to be the minimum of the last migration of any + * running migration (and if there are now current running migrations it is set to current). + * This provides a timeStamp which, if there have been no tasks running since that time + * we can know that all potential blocks that can be have been migrated off. */ private[storage] def lastMigrationInfo(): (Long, Boolean) = { if (stopped || (stoppedRDD && stoppedShuffle)) { (System.nanoTime(), true) } else { - // Chose the min of the running times. - val lastMigrationTime = if ( - conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) && - conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { + // Chose the min of the active times. See the function description for more information. + val lastMigrationTime = if (!stoppedRDD && !stoppedShuffle) { Math.min(lastRDDMigrationTime, lastShuffleMigrationTime) - } else if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) { + } else if (!stoppedShuffle) { lastShuffleMigrationTime } else { lastRDDMigrationTime } + // Technically we could have blocks left if we encountered an error, but those blocks will + // never be migrated, so we don't care about them. val blocksMigrated = (!shuffleBlocksLeft || stoppedShuffle) && (!rddBlocksLeft || stoppedRDD) (lastMigrationTime, blocksMigrated) } From 5c0a54459517428a67f6fba3f80a7b01f3fdb28f Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 3 Aug 2020 12:32:38 -0700 Subject: [PATCH 18/20] Add unit tests for how we handle timestamp tracking of migration times and add some more explicit cases where the migration being done remains valid and has the max value timestamp (e.g. the migrations have finished, there are no peers, etc.) --- .../storage/BlockManagerDecommissioner.scala | 26 +++- .../BlockManagerDecommissionUnitSuite.scala | 116 ++++++++++++++++++ 2 files changed, 136 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 0902f2966b13..f0a8e47aa320 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -42,11 +42,11 @@ private[storage] class BlockManagerDecommissioner( private val maxReplicationFailuresForDecommission = conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) - // Used for tracking if our migrations are complete. - @volatile private var lastRDDMigrationTime: Long = 0 - @volatile private var lastShuffleMigrationTime: Long = 0 - @volatile private var rddBlocksLeft: Boolean = true - @volatile private var shuffleBlocksLeft: Boolean = true + // Used for tracking if our migrations are complete. Readable for testing + @volatile private[storage] var lastRDDMigrationTime: Long = 0 + @volatile private[storage] var lastShuffleMigrationTime: Long = 0 + @volatile private[storage] var rddBlocksLeft: Boolean = true + @volatile private[storage] var shuffleBlocksLeft: Boolean = true /** * This runnable consumes any shuffle blocks in the queue for migration. This part of a @@ -152,6 +152,13 @@ private[storage] class BlockManagerDecommissioner( assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) while (!stopped && !stoppedRDD && !Thread.interrupted()) { logInfo("Iterating on migrating from the block manager.") + // Validate we have peers to migrate to. + val peers = bm.getPeers(false) + // If we have no peers give up. + if (peers.isEmpty) { + stopped = true + stoppedRDD = true + } try { val startTime = System.nanoTime() logDebug("Attempting to replicate all cached RDD blocks") @@ -237,6 +244,10 @@ private[storage] class BlockManagerDecommissioner( deadPeers.foreach { peer => migrationPeers.get(peer).foreach(_.running = false) } + // If we don't have anyone to migrate to give up + if (migrationPeers.values.find(_.running == true).isEmpty) { + stoppedShuffle = true + } // If we found any new shuffles to migrate or otherwise have not migrated everything. newShufflesToMigrate.nonEmpty || migratingShuffles.size < numMigratedShuffles.get() } @@ -259,6 +270,7 @@ private[storage] class BlockManagerDecommissioner( */ private[storage] def decommissionRddCacheBlocks(): Boolean = { val replicateBlocksInfo = bm.getMigratableRDDBlocks() + // Refresh peers and validate we have somewhere to move blocks. if (replicateBlocksInfo.nonEmpty) { logInfo(s"Need to replicate ${replicateBlocksInfo.size} RDD blocks " + @@ -364,7 +376,9 @@ private[storage] class BlockManagerDecommissioner( */ private[storage] def lastMigrationInfo(): (Long, Boolean) = { if (stopped || (stoppedRDD && stoppedShuffle)) { - (System.nanoTime(), true) + // Since we don't have anything left to migrate ever (since we don't restart once + // stopped), return that we're done with a validity timestamp that doesn't expire. + (Long.MaxValue, true) } else { // Chose the min of the active times. See the function description for more information. val lastMigrationTime = if (!stoppedRDD && !stoppedShuffle) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala index 243acc158158..0edaad0ac2f0 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala @@ -57,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { } } + /** + * Validate a given configuration with the mocks. + * The fail variable controls if we expect migration to fail, in which case we expect + * a constant Long.MaxValue timestamp. + */ + private def validateWithMocks(conf: SparkConf, bm: BlockManager, + migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = false) = { + // Verify the decom manager handles this correctly + val bmDecomManager = new BlockManagerDecommissioner(conf, bm) + var previousTime = Long.MaxValue + try { + bmDecomManager.start() + eventually(timeout(10.second), interval(10.milliseconds)) { + val (currentTime, done) = bmDecomManager.lastMigrationInfo() + assert(done) + // Make sure the time stamp starts moving forward. + if (!fail && previousTime > currentTime) { + previousTime = currentTime + assert(false) + } else if (fail) { + assert(currentTime === Long.MaxValue) + } + } + if (!fail) { + // Wait 5 seconds and assert times keep moving forward. + Thread.sleep(5000) + val (currentTime, done) = bmDecomManager.lastMigrationInfo() + assert(done && currentTime > previousTime) + } + } finally { + bmDecomManager.stop() + } + } + + test("test that with no blocks we finish migration") { + // Set up the mocks so we return empty + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + when(migratableShuffleBlockResolver.getStoredShuffles()) + .thenReturn(Seq()) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345))) + + + // Verify the decom manager handles this correctly + validateWithMocks(sparkConf, bm, migratableShuffleBlockResolver) + } + + test("block decom manager with no migrations configured") { + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1))) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345))) + + val badConf = new SparkConf(false) + .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, false) + .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, false) + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) + // Verify the decom manager handles this correctly + validateWithMocks(badConf, bm, migratableShuffleBlockResolver, fail = true) + } + + test("block decom manager with no peers") { + // Set up the mocks so we return one shuffle block + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1))) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq()) + + // Verify the decom manager handles this correctly + validateWithMocks(sparkConf, bm, migratableShuffleBlockResolver, fail = true) + } + + + test("block decom manager with only shuffle files time moves forward") { + // Set up the mocks so we return one shuffle block + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1))) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345))) + + // Verify the decom manager handles this correctly + validateWithMocks(sparkConf, bm, migratableShuffleBlockResolver) + } + test("test shuffle and cached rdd migration without any error") { val blockTransferService = mock(classOf[BlockTransferService]) val bm = mock(classOf[BlockManager]) @@ -80,6 +180,9 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { try { bmDecomManager.start() + var previousRDDTime = Long.MaxValue + var previousShuffleTime = Long.MaxValue + // We don't check that all blocks are migrated because out mock is always returning an RDD. eventually(timeout(10.second), interval(10.milliseconds)) { assert(bmDecomManager.shufflesToMigrate.isEmpty == true) @@ -88,6 +191,19 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { verify(blockTransferService, times(2)) .uploadBlockSync(mc.eq("host2"), mc.eq(bmPort), mc.eq("exec2"), mc.any(), mc.any(), mc.eq(StorageLevel.DISK_ONLY), mc.isNull()) + // Since we never "finish" the RDD blocks make sure the time is always moving forward. + assert(bmDecomManager.rddBlocksLeft) + if (!(bmDecomManager.lastRDDMigrationTime > previousRDDTime)) { + previousRDDTime = bmDecomManager.lastRDDMigrationTime + assert(false) + } + // Since we do eventually finish the shuffle blocks make sure the shuffle blocks complete + // and that the time keeps moving forward. + assert(!bmDecomManager.shuffleBlocksLeft) + if (!(bmDecomManager.lastShuffleMigrationTime > previousShuffleTime)) { + previousShuffleTime = bmDecomManager.lastShuffleMigrationTime + assert(false) + } } } finally { bmDecomManager.stop() From 679fbe701fe0e925e3e03faecb1dff4c9942c5fe Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 4 Aug 2020 11:14:33 -0700 Subject: [PATCH 19/20] Code review feedback, mostly cleanup the unit tests --- .../BlockManagerDecommissionUnitSuite.scala | 35 +++++++++++-------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala index 0edaad0ac2f0..51d0feef8630 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala @@ -62,21 +62,27 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { * The fail variable controls if we expect migration to fail, in which case we expect * a constant Long.MaxValue timestamp. */ - private def validateWithMocks(conf: SparkConf, bm: BlockManager, + private def validateDecommissionTimestamps(conf: SparkConf, bm: BlockManager, migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = false) = { - // Verify the decom manager handles this correctly + // Verify the decommissioning manager timestamps and status val bmDecomManager = new BlockManagerDecommissioner(conf, bm) - var previousTime = Long.MaxValue + var previousTime: Option[Long] = None try { bmDecomManager.start() - eventually(timeout(10.second), interval(10.milliseconds)) { + eventually(timeout(100.second), interval(10.milliseconds)) { val (currentTime, done) = bmDecomManager.lastMigrationInfo() assert(done) // Make sure the time stamp starts moving forward. - if (!fail && previousTime > currentTime) { - previousTime = currentTime - assert(false) - } else if (fail) { + if (!fail) { + previousTime match { + case None => + previousTime = Some(currentTime) + assert(false) + case Some(t) => + assert(t < currentTime) + } + } else { + // If we expect migration to fail we should get the max value quickly. assert(currentTime === Long.MaxValue) } } @@ -84,7 +90,7 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { // Wait 5 seconds and assert times keep moving forward. Thread.sleep(5000) val (currentTime, done) = bmDecomManager.lastMigrationInfo() - assert(done && currentTime > previousTime) + assert(done && currentTime > previousTime.get) } } finally { bmDecomManager.stop() @@ -103,9 +109,8 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { when(bm.getPeers(mc.any())) .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345))) - // Verify the decom manager handles this correctly - validateWithMocks(sparkConf, bm, migratableShuffleBlockResolver) + validateDecommissionTimestamps(sparkConf, bm, migratableShuffleBlockResolver) } test("block decom manager with no migrations configured") { @@ -123,7 +128,8 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, false) .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) // Verify the decom manager handles this correctly - validateWithMocks(badConf, bm, migratableShuffleBlockResolver, fail = true) + validateDecommissionTimestamps(badConf, bm, migratableShuffleBlockResolver, + fail = true) } test("block decom manager with no peers") { @@ -138,7 +144,8 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { .thenReturn(Seq()) // Verify the decom manager handles this correctly - validateWithMocks(sparkConf, bm, migratableShuffleBlockResolver, fail = true) + validateDecommissionTimestamps(sparkConf, bm, migratableShuffleBlockResolver, + fail = true) } @@ -154,7 +161,7 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345))) // Verify the decom manager handles this correctly - validateWithMocks(sparkConf, bm, migratableShuffleBlockResolver) + validateDecommissionTimestamps(sparkConf, bm, migratableShuffleBlockResolver) } test("test shuffle and cached rdd migration without any error") { From e81c3fc2d0b8c28cd0aedf08a7ea1e86a153ae49 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 4 Aug 2020 13:57:45 -0700 Subject: [PATCH 20/20] CR feedback, replace some magic values with option in the tests to improve readability. --- .../BlockManagerDecommissionUnitSuite.scala | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala index 51d0feef8630..74ad8bd2bcf9 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala @@ -187,29 +187,35 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { try { bmDecomManager.start() - var previousRDDTime = Long.MaxValue - var previousShuffleTime = Long.MaxValue + var previousRDDTime: Option[Long] = None + var previousShuffleTime: Option[Long] = None // We don't check that all blocks are migrated because out mock is always returning an RDD. - eventually(timeout(10.second), interval(10.milliseconds)) { + eventually(timeout(100.second), interval(10.milliseconds)) { assert(bmDecomManager.shufflesToMigrate.isEmpty == true) verify(bm, least(1)).replicateBlock( mc.eq(storedBlockId1), mc.any(), mc.any(), mc.eq(Some(3))) verify(blockTransferService, times(2)) .uploadBlockSync(mc.eq("host2"), mc.eq(bmPort), mc.eq("exec2"), mc.any(), mc.any(), mc.eq(StorageLevel.DISK_ONLY), mc.isNull()) - // Since we never "finish" the RDD blocks make sure the time is always moving forward. + // Since we never "finish" the RDD blocks, make sure the time is always moving forward. assert(bmDecomManager.rddBlocksLeft) - if (!(bmDecomManager.lastRDDMigrationTime > previousRDDTime)) { - previousRDDTime = bmDecomManager.lastRDDMigrationTime - assert(false) + previousRDDTime match { + case None => + previousRDDTime = Some(bmDecomManager.lastRDDMigrationTime) + assert(false) + case Some(t) => + assert(bmDecomManager.lastRDDMigrationTime > t) } // Since we do eventually finish the shuffle blocks make sure the shuffle blocks complete // and that the time keeps moving forward. assert(!bmDecomManager.shuffleBlocksLeft) - if (!(bmDecomManager.lastShuffleMigrationTime > previousShuffleTime)) { - previousShuffleTime = bmDecomManager.lastShuffleMigrationTime - assert(false) + previousShuffleTime match { + case None => + previousShuffleTime = Some(bmDecomManager.lastShuffleMigrationTime) + assert(false) + case Some(t) => + assert(bmDecomManager.lastShuffleMigrationTime > t) } } } finally {