Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,15 @@ class BlockManagerMasterEndpoint(

case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
val isSuccess = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
context.reply(isSuccess)
// SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo
// returns false since the block info would be updated again later.
if (isSuccess) {
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
val response = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)

response.foreach { isSuccess =>
// SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo
// returns false since the block info would be updated again later.
if (isSuccess) {
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
}
context.reply(isSuccess)
Copy link
Member

@Ngone51 Ngone51 Sep 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes the responses of non-shuffle blocks also be handled in the thread pool. I'm afraid this introduces unexpected overhead. Shall we only do this for the shuffle blocks only and leave the non-shuffle block the same behavior as it is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not realize this - thanks for pointing it out !
So if I understood it right, the proposal is:

  def handleResult(success: Boolean): Unit = {
    if (success) {
      // post
    }
    context.reply(success)
  }

  if (blockId.isShuffle) {
    updateShuffleBlockInfo( ... ).foreach( handleResult(_))
  } else {
    handleResult(updateBlockInfo( ... ))
  }

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given @gengliangwang has merged it, can you create a follow up PR ? We can merge it pretty quickly and possible make that into current 3.2 RC as well :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

}

case GetLocations(blockId) =>
Expand Down Expand Up @@ -573,39 +576,41 @@ class BlockManagerMasterEndpoint(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
diskSize: Long): Future[Boolean] = {
logDebug(s"Updating block info on master ${blockId} for ${blockManagerId}")

if (blockId.isShuffle) {
blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
// We need to update this at index file because there exists the index-only block
logDebug(s"Received shuffle index block update for ${shuffleId} ${mapId}, updating.")
mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId)
return true
return Future {
logDebug(s"Received shuffle index block update for ${shuffleId} ${mapId}, updating.")
mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId)
true
}
case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) =>
logDebug(s"Received shuffle data block update for ${shuffleId} ${mapId}, ignore.")
return true
return Future.successful(true)
case _ =>
logDebug(s"Unexpected shuffle block type ${blockId}" +
s"as ${blockId.getClass().getSimpleName()}")
return false
return Future.successful(false)
}
}

if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
return true
return Future.successful(true)
} else {
return false
return Future.successful(false)
}
}

if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
return true
return Future.successful(true)
}

blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
Expand Down Expand Up @@ -637,7 +642,7 @@ class BlockManagerMasterEndpoint(
if (locations.size == 0) {
blockLocations.remove(blockId)
}
true
Future.successful(true)
}

private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,22 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
runDecomTest(true, true, JobEnded)
}

test(s"SPARK-36782 not deadlock if MapOutput uses broadcast") {
runDecomTest(false, true, JobEnded, forceMapOutputBroadcast = true)
}

private def runDecomTest(
persist: Boolean,
shuffle: Boolean,
whenToDecom: String): Unit = {
whenToDecom: String,
forceMapOutputBroadcast: Boolean = false): Unit = {
val migrateDuring = whenToDecom != JobEnded
val master = s"local-cluster[${numExecs}, 1, 1024]"
val minBroadcastSize = if (forceMapOutputBroadcast) {
0
} else {
config.SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST.defaultValue.get
}
val conf = new SparkConf().setAppName("test").setMaster(master)
.set(config.DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
Expand All @@ -115,6 +125,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
// Just replicate blocks quickly during testing, there isn't another
// workload we need to worry about.
.set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
.set(config.SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, minBroadcastSize)

if (whenToDecom == TaskStarted) {
// We are using accumulators below, make sure those are reported frequently.
Expand Down