diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 2985d199b688b..2df26f3530ef4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -117,12 +117,15 @@ class BlockManagerMasterEndpoint( case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => - val isSuccess = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) - context.reply(isSuccess) - // SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo - // returns false since the block info would be updated again later. - if (isSuccess) { - listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo))) + val response = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) + + response.foreach { isSuccess => + // SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo + // returns false since the block info would be updated again later. + if (isSuccess) { + listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo))) + } + context.reply(isSuccess) } case GetLocations(blockId) => @@ -573,23 +576,25 @@ class BlockManagerMasterEndpoint( blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long): Boolean = { + diskSize: Long): Future[Boolean] = { logDebug(s"Updating block info on master ${blockId} for ${blockManagerId}") if (blockId.isShuffle) { blockId match { case ShuffleIndexBlockId(shuffleId, mapId, _) => // We need to update this at index file because there exists the index-only block - logDebug(s"Received shuffle index block update for ${shuffleId} ${mapId}, updating.") - mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId) - return true + return Future { + logDebug(s"Received shuffle index block update for ${shuffleId} ${mapId}, updating.") + mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId) + true + } case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) => logDebug(s"Received shuffle data block update for ${shuffleId} ${mapId}, ignore.") - return true + return Future.successful(true) case _ => logDebug(s"Unexpected shuffle block type ${blockId}" + s"as ${blockId.getClass().getSimpleName()}") - return false + return Future.successful(false) } } @@ -597,15 +602,15 @@ class BlockManagerMasterEndpoint( if (blockManagerId.isDriver && !isLocal) { // We intentionally do not register the master (except in local mode), // so we should not indicate failure. - return true + return Future.successful(true) } else { - return false + return Future.successful(false) } } if (blockId == null) { blockManagerInfo(blockManagerId).updateLastSeenMs() - return true + return Future.successful(true) } blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) @@ -637,7 +642,7 @@ class BlockManagerMasterEndpoint( if (locations.size == 0) { blockLocations.remove(blockId) } - true + Future.successful(true) } private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 708d71bc7aa9b..ddec2ee7b9ed7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -99,12 +99,22 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS runDecomTest(true, true, JobEnded) } + test(s"SPARK-36782 not deadlock if MapOutput uses broadcast") { + runDecomTest(false, true, JobEnded, forceMapOutputBroadcast = true) + } + private def runDecomTest( persist: Boolean, shuffle: Boolean, - whenToDecom: String): Unit = { + whenToDecom: String, + forceMapOutputBroadcast: Boolean = false): Unit = { val migrateDuring = whenToDecom != JobEnded val master = s"local-cluster[${numExecs}, 1, 1024]" + val minBroadcastSize = if (forceMapOutputBroadcast) { + 0 + } else { + config.SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST.defaultValue.get + } val conf = new SparkConf().setAppName("test").setMaster(master) .set(config.DECOMMISSION_ENABLED, true) .set(config.STORAGE_DECOMMISSION_ENABLED, true) @@ -115,6 +125,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // Just replicate blocks quickly during testing, there isn't another // workload we need to worry about. .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) + .set(config.SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, minBroadcastSize) if (whenToDecom == TaskStarted) { // We are using accumulators below, make sure those are reported frequently.