Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,38 +292,23 @@ private[spark] class CoarseGrainedExecutorBackend(

val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
override def run(): Unit = {
var lastTaskRunningTime = System.nanoTime()
val sleep_time = 1000 // 1s

while (true) {
logInfo("Checking to see if we can shutdown.")
if (executor != null) {
// giving time to process the DecommissionExecutor for the driver
Thread.sleep(sleep_time)
if (executor == null || executor.numRunningTasks == 0) {
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
logInfo("No running tasks, checking migrations")
val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo()
// We can only trust allBlocksMigrated boolean value if there were no tasks running
// since the start of computing it.
if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {
logInfo("No running tasks, all blocks migrated, stopping.")
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
} else {
logInfo("All blocks not yet migrated.")
}
} else {
logInfo("No running tasks, no block migration configured, stopping.")
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
}
} else {
while (executor.numRunningTasks != 0) {
logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks")
// If there is a running task it could store blocks, so make sure we wait for a
// migration loop to complete after the last task is done.
// Note: this is only advanced if there is a running task, if there
// is no running task but the blocks are not done migrating this does not
// move forward.
lastTaskRunningTime = System.nanoTime()
Thread.sleep(sleep_time)
}
logInfo("No running tasks, checking migrations")
while (!env.blockManager.migrationFinished) {
logInfo("All blocks not yet migrated.")
Thread.sleep(sleep_time)
}
}
logInfo("No running tasks, migration finished or not configured, stopping.")
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
}
}
shutdownThread.setDaemon(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1823,11 +1823,11 @@ private[spark] class BlockManager(
}

/*
* Returns the last migration time and a boolean denoting if all the blocks have been migrated.
* Returns a boolean indicating if all the blocks have been migrated.
* If there are any tasks running since that time the boolean may be incorrect.
*/
private[spark] def lastMigrationInfo(): (Long, Boolean) = {
decommissioner.map(_.lastMigrationInfo()).getOrElse((0, false))
private[spark] def migrationFinished(): Boolean = {
decommissioner.map(_.migrationFinished).getOrElse(true)
}

private[storage] def getMigratableRDDBlocks(): Seq[ReplicateBlock] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ private[storage] class BlockManagerDecommissioner(
conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)

// Used for tracking if our migrations are complete.
@volatile private var lastRDDMigrationTime: Long = 0
@volatile private var lastShuffleMigrationTime: Long = 0
@volatile private var rddBlocksLeft: Boolean = true
@volatile private var shuffleBlocksLeft: Boolean = true

Expand Down Expand Up @@ -156,7 +154,6 @@ private[storage] class BlockManagerDecommissioner(
val startTime = System.nanoTime()
logDebug("Attempting to replicate all cached RDD blocks")
rddBlocksLeft = decommissionRddCacheBlocks()
lastRDDMigrationTime = startTime
logInfo("Attempt to replicate all cached blocks done")
logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.")
Thread.sleep(sleepInterval)
Expand Down Expand Up @@ -186,7 +183,6 @@ private[storage] class BlockManagerDecommissioner(
logDebug("Attempting to replicate all shuffle blocks")
val startTime = System.nanoTime()
shuffleBlocksLeft = refreshOffloadingShuffleBlocks()
lastShuffleMigrationTime = startTime
logInfo("Done starting workers to migrate shuffle blocks")
Thread.sleep(sleepInterval)
} catch {
Expand Down Expand Up @@ -356,29 +352,9 @@ private[storage] class BlockManagerDecommissioner(
}

/*
* Returns the last migration time and a boolean for if all blocks have been migrated.
* The last migration time is calculated to be the minimum of the last migration of any
* running migration (and if there are now current running migrations it is set to current).
* This provides a timeStamp which, if there have been no tasks running since that time
* we can know that all potential blocks that can be have been migrated off.
* Returns a boolean if all blocks have been migrated
*/
private[storage] def lastMigrationInfo(): (Long, Boolean) = {
if (stopped || (stoppedRDD && stoppedShuffle)) {
(System.nanoTime(), true)
} else {
// Chose the min of the active times. See the function description for more information.
val lastMigrationTime = if (!stoppedRDD && !stoppedShuffle) {
Math.min(lastRDDMigrationTime, lastShuffleMigrationTime)
} else if (!stoppedShuffle) {
lastShuffleMigrationTime
} else {
lastRDDMigrationTime
}

// Technically we could have blocks left if we encountered an error, but those blocks will
// never be migrated, so we don't care about them.
val blocksMigrated = (!shuffleBlocksLeft || stoppedShuffle) && (!rddBlocksLeft || stoppedRDD)
(lastMigrationTime, blocksMigrated)
}
private[storage] def migrationFinished(): Boolean = {
return stopped || ((!shuffleBlocksLeft || stoppedShuffle) && (!rddBlocksLeft || stoppedRDD));
}
}