Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -620,11 +620,15 @@ private[spark] class BlockManager(
* Note that this method must be called without any BlockInfo locks held.
*/
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
maxOffHeapMemory, storageEndpoint)
reportAllBlocks()
SparkContext.getActive.map { context =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wankunde The problem here is it's a race condition issue - the reregistration request could be sent before the executor is stopped by the driver. So this kind of protection can't resolve the issue thoroughly.

(BTW, I think we only have SparkContext on the driver side.)

The problem is #34536 now is we can't handle the case you mentioned there. The reason the fix can't handle is that HeartbeatReceiver doesn't know the existence of the BlockManager in that case. So I think we can let HeartbeatReceiver implements onBlockManagerAdded to listen on the registration of the BlockManager so that HeartbeatReceiver knows the BlockManager too in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 Thanks for your review. It's wrong to judge whether the blockManager is stopping, maybe we can use SparkEnv instead.

if (!context.stopped.get()) {
// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString,
maxOnHeapMemory, maxOffHeapMemory, storageEndpoint)
reportAllBlocks()
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.io.File
import java.nio.ByteBuffer
import java.nio.file.Files
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -663,6 +664,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}

test("reregistration on block update") {
when(sc.stopped).thenReturn(new AtomicBoolean(false))
SparkContext.setActiveContext(sc)
val store = makeBlockManager(2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
Expand All @@ -678,6 +681,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE

assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
assert(master.getLocations("a2").size > 0, "master was not told about a2")
SparkContext.clearActiveContext()
}

test("reregistration doesn't dead lock") {
Expand Down