-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20732][CORE] Decommission cache blocks to other executors when an executor is decommissioned #27864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20732][CORE] Decommission cache blocks to other executors when an executor is decommissioned #27864
Changes from 7 commits
ab0e38c
6a47615
622e1ba
d792092
b9906c2
076dd67
d12dbff
4c67660
f6b4f7c
9c6bdb6
bb324f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -412,6 +412,21 @@ package object config { | |
| .intConf | ||
| .createWithDefault(1) | ||
|
|
||
| private[spark] val STORAGE_DECOMMISSION_ENABLED = | ||
| ConfigBuilder("spark.storage.decommission.enabled") | ||
| .doc("Whether to decommission the block manager when decommissioning executor") | ||
| .version("3.1.0") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = | ||
| ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") | ||
| .doc("Maximum number of failures to tolerate for offloading " + | ||
| "one block in single decommission cache blocks iteration") | ||
|
||
| .version("3.1.0") | ||
| .intConf | ||
| .createWithDefault(3) | ||
|
|
||
| private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE = | ||
| ConfigBuilder("spark.storage.replication.topologyFile") | ||
| .version("2.1.0") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,7 @@ import org.apache.spark.rpc.RpcEnv | |
| import org.apache.spark.scheduler.ExecutorCacheTaskLocation | ||
| import org.apache.spark.serializer.{SerializerInstance, SerializerManager} | ||
| import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter} | ||
| import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock | ||
| import org.apache.spark.storage.memory._ | ||
| import org.apache.spark.unsafe.Platform | ||
| import org.apache.spark.util._ | ||
|
|
@@ -241,6 +242,9 @@ private[spark] class BlockManager( | |
|
|
||
| private var blockReplicationPolicy: BlockReplicationPolicy = _ | ||
|
|
||
| private var blockManagerDecommissioning: Boolean = false | ||
| private var decommissionManager: Option[BlockManagerDecommissionManager] = None | ||
|
|
||
| // A DownloadFileManager used to track all the files of remote blocks which are above the | ||
| // specified memory threshold. Files will be deleted automatically based on weak reference. | ||
| // Exposed for test | ||
|
|
@@ -1551,30 +1555,36 @@ private[spark] class BlockManager( | |
| } | ||
|
|
||
| /** | ||
| * Called for pro-active replenishment of blocks lost due to executor failures | ||
| * Replicates a block to peer block managers based on existingReplicas and maxReplicas | ||
| * | ||
| * @param blockId blockId being replicate | ||
| * @param existingReplicas existing block managers that have a replica | ||
| * @param maxReplicas maximum replicas needed | ||
| * @param maxReplicationFailures number of replication failures to tolerate before | ||
| * giving up. | ||
| * @return whether block was successfully replicated or not | ||
| */ | ||
| def replicateBlock( | ||
| blockId: BlockId, | ||
| existingReplicas: Set[BlockManagerId], | ||
| maxReplicas: Int): Unit = { | ||
| maxReplicas: Int, | ||
| maxReplicationFailures: Option[Int] = None): Boolean = { | ||
| logInfo(s"Using $blockManagerId to pro-actively replicate $blockId") | ||
| blockInfoManager.lockForReading(blockId).foreach { info => | ||
| blockInfoManager.lockForReading(blockId).forall { info => | ||
| val data = doGetLocalBytes(blockId, info) | ||
| val storageLevel = StorageLevel( | ||
| useDisk = info.level.useDisk, | ||
| useMemory = info.level.useMemory, | ||
| useOffHeap = info.level.useOffHeap, | ||
| deserialized = info.level.deserialized, | ||
| replication = maxReplicas) | ||
| // we know we are called as a result of an executor removal, so we refresh peer cache | ||
| // this way, we won't try to replicate to a missing executor with a stale reference | ||
| // we know we are called as a result of an executor removal or because the current executor | ||
| // is getting decommissioned. so we refresh peer cache before trying replication, we won't | ||
| // try to replicate to a missing executor/another decommissioning executor | ||
| getPeers(forceFetch = true) | ||
| try { | ||
| replicate(blockId, data, storageLevel, info.classTag, existingReplicas) | ||
| replicate( | ||
| blockId, data, storageLevel, info.classTag, existingReplicas, maxReplicationFailures) | ||
| } finally { | ||
| logDebug(s"Releasing lock for $blockId") | ||
| releaseLockAndDispose(blockId, data) | ||
|
|
@@ -1591,9 +1601,11 @@ private[spark] class BlockManager( | |
| data: BlockData, | ||
| level: StorageLevel, | ||
| classTag: ClassTag[_], | ||
| existingReplicas: Set[BlockManagerId] = Set.empty): Unit = { | ||
| existingReplicas: Set[BlockManagerId] = Set.empty, | ||
| maxReplicationFailures: Option[Int] = None): Boolean = { | ||
|
|
||
| val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE) | ||
| val maxReplicationFailureCount = maxReplicationFailures.getOrElse( | ||
| conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)) | ||
| val tLevel = StorageLevel( | ||
| useDisk = level.useDisk, | ||
| useMemory = level.useMemory, | ||
|
|
@@ -1617,7 +1629,7 @@ private[spark] class BlockManager( | |
| blockId, | ||
| numPeersToReplicateTo) | ||
|
|
||
| while(numFailures <= maxReplicationFailures && | ||
| while(numFailures <= maxReplicationFailureCount && | ||
| !peersForReplication.isEmpty && | ||
| peersReplicatedTo.size < numPeersToReplicateTo) { | ||
| val peer = peersForReplication.head | ||
|
|
@@ -1665,9 +1677,11 @@ private[spark] class BlockManager( | |
| if (peersReplicatedTo.size < numPeersToReplicateTo) { | ||
| logWarning(s"Block $blockId replicated to only " + | ||
| s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers") | ||
| return false | ||
| } | ||
|
|
||
| logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}") | ||
| return true | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1761,6 +1775,58 @@ private[spark] class BlockManager( | |
| blocksToRemove.size | ||
| } | ||
|
|
||
| def decommissionBlockManager(): Unit = { | ||
| if (!blockManagerDecommissioning) { | ||
prakharjain09 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| logInfo("Starting block manager decommissioning process") | ||
| blockManagerDecommissioning = true | ||
| decommissionManager = Some(new BlockManagerDecommissionManager) | ||
|
||
| decommissionManager.foreach(_.start()) | ||
| } else { | ||
| logDebug(s"Block manager already in decommissioning state") | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers | ||
| * Visible for testing | ||
| */ | ||
| def offloadRddCacheBlocks(): Unit = { | ||
| val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId) | ||
|
|
||
| if (replicateBlocksInfo.nonEmpty) { | ||
| logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " + | ||
| s"for block manager decommissioning") | ||
prakharjain09 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // Maximum number of storage replication failure which replicateBlock can handle | ||
| // before giving up for one block | ||
|
||
| val maxReplicationFailures = conf.get( | ||
| config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) | ||
|
|
||
| // TODO: We can sort these blocks based on some policy (LRU/blockSize etc) | ||
| // so that we end up prioritize them over each other | ||
| val blocksFailedReplication = replicateBlocksInfo.filterNot { | ||
| case ReplicateBlock(blockId, existingReplicas, maxReplicas) => | ||
| val replicatedSuccessfully = replicateBlock( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know if we need it but replicateBlock is blocking and it seems like maybe async + futures might help us migrate more blocks? Especially if one host is underload we might block on sending a block to that host before we move forward.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @holdenk I am not sure how this will behave when multiple executors on same host machine are decommissioning. And each one of them is doing it in parallel - may cause some sort of network congestion? I have updated code to do replication in ThreadPool of size 4. Maybe we should make this configurable? any suggestions?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Network congestion is certainly a possibility, I think that for now this strike a good balance between simple code and avoiding hanging all transfers if we have one slow target host. We can revisit this in the future if it turns out we need more control in production environments. Sound good? |
||
| blockId, | ||
| existingReplicas.toSet, | ||
| maxReplicas, | ||
| maxReplicationFailures = Some(maxReplicationFailures)) | ||
| if (replicatedSuccessfully) { | ||
| logInfo(s"Block $blockId offloaded successfully, Removing block now") | ||
| removeBlock(blockId) | ||
| logInfo(s"Block $blockId removed") | ||
| } else { | ||
| logWarning(s"Failed to offload block $blockId") | ||
| } | ||
| replicatedSuccessfully | ||
| } | ||
| if (blocksFailedReplication.nonEmpty) { | ||
| logWarning(s"Blocks failed replication in cache decommissioning " + | ||
| s"process: ${blocksFailedReplication.mkString(",")}") | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Remove all blocks belonging to the given broadcast. | ||
| */ | ||
|
|
@@ -1829,7 +1895,50 @@ private[spark] class BlockManager( | |
| data.dispose() | ||
| } | ||
|
|
||
| /** | ||
| * Class to handle block manager decommissioning retries | ||
| * It creates a Thread to retry offloading all RDD cache blocks | ||
| */ | ||
| private class BlockManagerDecommissionManager { | ||
| @volatile private var stopped = false | ||
| private val cacheReplicationThread = new Thread { | ||
|
||
| override def run(): Unit = { | ||
| while (blockManagerDecommissioning && !stopped) { | ||
| try { | ||
| logDebug(s"Attempting to replicate all cached RDD blocks") | ||
prakharjain09 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| offloadRddCacheBlocks() | ||
| logInfo(s"Attempt to replicate all cached blocks done") | ||
prakharjain09 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val sleepInterval = if (Utils.isTesting) 100 else 30000 | ||
|
||
| Thread.sleep(sleepInterval) | ||
| } catch { | ||
| case _: InterruptedException => | ||
| // no-op | ||
prakharjain09 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| case NonFatal(e) => | ||
| logError("Error occurred while trying to " + | ||
| "replicate cached RDD blocks for block manager decommissioning", e) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| cacheReplicationThread.setDaemon(true) | ||
| cacheReplicationThread.setName("cache-replication-thread") | ||
|
|
||
| def start(): Unit = { | ||
prakharjain09 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| cacheReplicationThread.start() | ||
| } | ||
|
|
||
| def stop(): Unit = { | ||
| if (!stopped) { | ||
| stopped = true | ||
| logInfo("Stopping cache replication thread") | ||
| cacheReplicationThread.interrupt() | ||
| cacheReplicationThread.join() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| def stop(): Unit = { | ||
| decommissionManager.foreach(_.stop()) | ||
| blockTransferService.close() | ||
| if (blockStoreClient ne blockTransferService) { | ||
| // Closing should be idempotent, but maybe not for the NioBlockTransferService. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain about the meaning of
decommissionhere?Let say, we have Spark Thrift Server. In that case, dynamic allocation wants to decommission all workers. Then, what this PR can provide for zero-worker situation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun Thanks for the detailed review.
I couldn't understand your question with respect to dynamic allocation completely (as per my understanding dynamic allocation doesn't initiates decommissioning). This change works together with SPARK-20628 PR. After SPARK-20628, Spark scheduler starts receiving decommissionExecutor message (because of AWS spot loss, Azure low-priority VMs etc). In such case, we should offload cache data from that executor to other active executors so that we don't eventually loose those cache blocks. This is done in a best effort way i.e. move cache blocks if space is available on active executors. If space is not available on other active executors, then keep the cache blocks.
Next step after this change is to initiate decommissioning from DynamicAllocation. Let say minExecutor=2, maxExecutor=50. A spark application is running at max 50 executors and all of them have little bit of cache data (say 2GB out of total available 5GB capacity). So DynamicAllocation is not able to downscale any of the executors (although they are idle). In future, DynamicAllocation can leverage changes in this PR to do defragmentation of cache data to fewer set of executors so that some of the executors can be freed up.
Hope this answers your question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is focused on executor to executor migration supporting zero executors isn't going to be a viable option here. However since were using the same methods for putting blocks, assuming that we add support for storing these blocks in external storage in the future, this approach should be able to be generalized to use that same mechanism if configured.