Skip to content

Conversation

@f-thiele
Copy link
Contributor

@f-thiele f-thiele commented Sep 19, 2021

What changes were proposed in this pull request?

Delegate potentially blocking call to mapOutputTracker.updateMapOutput from within UpdateBlockInfo from dispatcher-BlockManagerMaster to the threadpool to avoid blocking the endpoint. This code path is only accessed for ShuffleIndexBlockId, other blocks are still executed on the dispatcher-BlockManagerMaster itself.

Change updateBlockInfo to return Future[Boolean] instead of Boolean. Response will be sent to RPC caller upon successful completion of the future.

Introduce a unit test that forces MapOutputTracker to make a broadcast as part of MapOutputTracker.serializeOutputStatuses when running decommission tests.

Why are the changes needed?

SPARK-36782 describes a deadlock occurring if the dispatcher-BlockManagerMaster is allowed to block while waiting for write access to data structures.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test as introduced in this PR.


Ping @eejbyfeldt for notice.

Emil Ejbyfeldt and others added 2 commits September 16, 2021 17:23
This seems to happen if the MapOutputTracker uses broadcast when sending
decommission statuses.
…ateBlockInfo

Delegate task to threadpool and register callback for succesful
completion. Reply to caller once future finished succesfully.

To avoid java.util.ConcurrentModificationException we have to protect
the blockLocations using locks.
@github-actions github-actions bot added the CORE label Sep 19, 2021
@HyukjinKwon HyukjinKwon changed the title [SPARK-36782] Avoid blocking dispatcher-BlockManagerMaster during UpdateBlockInfo [SPARK-36782][CORE] Avoid blocking dispatcher-BlockManagerMaster during UpdateBlockInfo Sep 20, 2021
@attilapiros
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Sep 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47984/

@SparkQA
Copy link

SparkQA commented Sep 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47984/

@SparkQA
Copy link

SparkQA commented Sep 21, 2021

Test build #143473 has finished for PR 34043 at commit 1278c41.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Sep 21, 2021

This is going to make analysis/evolution of the code more complex, so I want to understand better why are we doing this ? What is the current issue/bottleneck and how much of it is mitigated by this change ?

@eejbyfeldt
Copy link
Contributor

This is going to make analysis/evolution of the code more complex, so I want to understand better why are we doing this ? What is the current issue/bottleneck and how much of it is mitigated by this change ?

The goal is to fix the deadlock that is described in detail in SPARK-36782. From our testing this deadlock is common enough meaning that we can not use the decommission features introduced in 3.1.1, since when the deadlock happens the driver does not seem to recover and the spark job en up failing. The patch should remove the possibility of this deadlock by making the BlockManagerMasterEndpoint not block when responding to calls with UpdateBlockInfo.

@mridulm
Copy link
Contributor

mridulm commented Sep 21, 2021

Thanks for the details @eejbyfeldt, this makes sense ... this is an unfortunate side effect.
Will take a look at the PR later.

@mridulm
Copy link
Contributor

mridulm commented Sep 21, 2021

+CC @Ngone51, @holdenk

@mridulm
Copy link
Contributor

mridulm commented Sep 21, 2021

Can we add a test which surfaces this issue ?
I want to see if alternate approaches will work on not (and use the test to validate the required functionality).
Or are the changes to BlockManagerDecommissionIntegrationSuite sufficient ?

@mridulm
Copy link
Contributor

mridulm commented Sep 22, 2021

Thanks for the jira and stack trace - that was really helpful !
Looking at it more, the issue here is that we are relying on BlockManagerMasterEndpoint to update state in MapOutputTracker (which is usually done via DAGScheduler).

Given there is no state mod for shuffle blocks when UpdateBlockInfo is received, it would simply be better to move just that codepath to a different thread - instead of trying to add fine grained locking for rest of BlockManagerMasterEndpoint (the change is insufficient to do that unfortunately).

I am still testing locally, and making sure there are no issues - but the gist is:

  • updateBlockInfo returns Future[Boolean]
  • When blockId is a shuffle block - return a Future { } around the existing blockId match
  • For all other return in that method, use Future.successful to complete immediately.
  • As with the current PR, chain the future to return response.

For shuffle blocks, it will should executed outside of the BlockManagerMasterEndpoint, while for the rest, it will get executed within the lock - and so no need to change rest of the state's MT safety.

Thoughts ?

@Ngone51
Copy link
Member

Ngone51 commented Sep 22, 2021

Looking at it more, the issue here is that we are relying on BlockManagerMasterEndpoint to update state in MapOutputTracker (which is usually done via DAGScheduler).

Good point! So I think we can delegate the UpdateBlockInfo of migrated shuffle blocks to DagScheduler instead.

@f-thiele
Copy link
Contributor Author

I am still testing locally, and making sure there are no issues - but the gist is:

  • updateBlockInfo returns Future[Boolean]
  • When blockId is a shuffle block - return a Future { } around the existing blockId match
  • For all other return in that method, use Future.successful to complete immediately.
  • As with the current PR, chain the future to return response.

Thanks @mridulm. This was very helpful. I agree it would be good to have a different architectural approach than the here presented extra locks. I tried out your suggestions in 306fa17 and my tests at least pass fine. Now the change is much more localized with no additional locking.

I don't know whether the repo standard procedure is to close this PR and open a new one with your suggestions or to adapt the existing one - I just pushed my changes here assuming the latter but let me know in case you prefer to open a new PR.

With the proposed changes I suppose we don't actually delegate it to the DAGScheduler (as suggested by @Ngone51) but to the block-manager-ask-thread-pool. If this is undesired, I believe there's a more fundamental change needed?

@SparkQA
Copy link

SparkQA commented Sep 22, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48016/

@Ngone51
Copy link
Member

Ngone51 commented Sep 22, 2021

With the proposed changes I suppose we don't actually delegate it to the DAGScheduler (as suggested by @Ngone51) but to the block-manager-ask-thread-pool. If this is undesired, I believe there's a more fundamental change needed?

@f-thiele We can get the dagScheduler by BlockManagerMasterEndpoint.listenerBus.sparkContext.dagScheduler (should expose sparkContext in listenerBus as private[spark] ) and add a new function in dagScheduler to handle the specific UpdateBlockInfo of migrated shuffle block. (You could follow dagScheduler.executorAdded as an example). So I actually don't expect too much fundamental changes here.

@SparkQA
Copy link

SparkQA commented Sep 22, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48016/

@gengliangwang
Copy link
Member

@f-thiele @eejbyfeldt thanks for the fix.
@mridulm @Ngone51 I would like to have this one in 3.2 if it doesn't take too much time.

@SparkQA
Copy link

SparkQA commented Sep 22, 2021

Test build #143506 has finished for PR 34043 at commit 306fa17.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I was trying it out, I moved the entire blockId match into the Future - but this is equivalent.
Thanks for fixing it quickly @f-thiele !

@mridulm
Copy link
Contributor

mridulm commented Sep 22, 2021

@Ngone51 I was initially thinking along same lines - to move the shuffle map updates to DAGScheduler as well.
But given UpdateBlockInfo rpc, the incoming rpc would come to BlockManagerMasterEndpoint.
Given there was no MT-safety reason to delegate to DAGScheduler, I was thinking along the lines of what @f-thiele's change : we already have other Future invocations in BlockManagerMasterEndpoint.

Thoughts on this PR ?

@attilapiros
Copy link
Contributor

I do not think we have to delegate into BlockManagerMasterEndpoint at all.
The block locations / block infos for about RDD and broadcast blocks for the shuffle blocks we only need to update the map output tracker.

@attilapiros
Copy link
Contributor

attilapiros commented Sep 22, 2021

Even the existing code shows this as when we talk about a shuffle block in the updateBlockInfo we return (in all the three branches below):

if (blockId.isShuffle) {
blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
// We need to update this at index file because there exists the index-only block
return Future {
logDebug(s"Received shuffle index block update for ${shuffleId} ${mapId}, updating.")
mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId)
true
}
case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) =>
logDebug(s"Received shuffle data block update for ${shuffleId} ${mapId}, ignore.")
return Future.successful(true)
case _ =>
logDebug(s"Unexpected shuffle block type ${blockId}" +
s"as ${blockId.getClass().getSimpleName()}")
return Future.successful(false)
}

@attilapiros
Copy link
Contributor

attilapiros commented Sep 22, 2021

This way (not delegating into BlockManagerMasterEndpoint) the dispatcher-BlockManagerMaster thread will be avoided and it can handle the RPC calls which was the root of the problem.

@SparkQA
Copy link

SparkQA commented Sep 22, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48018/

@SparkQA
Copy link

SparkQA commented Sep 22, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48018/

@SparkQA
Copy link

SparkQA commented Sep 22, 2021

Test build #143508 has finished for PR 34043 at commit 26b3b11.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Sep 22, 2021

@attilapiros I have not followed node decommissioning discussions in detail, hopefully @holdenk or @dongjoon-hyun can comment more here.

When looking at the affected codepaths, BlockManager is not currently differentiating between the block id's in tryToReportBlockStatus when it is updating driver (normal and for block migration).
Until node decommissioning, this path was used for non-shuffle blocks iirc.

IMO handling the shuffle vs non-shuffle split while processing UpdateBlockInfo at driver is cleaner than spreading it out to caller; unless we have other issues in doing so (for example: MT-safety, correctness, design hygine, etc concerns).
Thoughts ?

@Ngone51
Copy link
Member

Ngone51 commented Sep 23, 2021

Given there was no MT-safety reason to delegate to DAGScheduler, I was thinking along the lines of what @f-thiele's change : we already have other Future invocations in BlockManagerMasterEndpoint.

I think delegating to DAGScheduler is an alternative way to decouple BlockManagerMasterEndpoint and MapOutputTracker, which should also work in this case. The current fix looks good to me after removing the read/write lock. I'll approve it to catch up 3.2 cut.

When looking at the affected codepaths, BlockManager is not currently differentiating between the block id's in tryToReportBlockStatus when it is updating driver (normal and for block migration).
Until node decommissioning, this path was used for non-shuffle blocks iirc.

IMO handling the shuffle vs non-shuffle split while processing UpdateBlockInfo at driver is cleaner than spreading it out to caller; unless we have other issues in doing so (for example: MT-safety, correctness, design hygine, etc concerns).
Thoughts ?

I actually think it's cleaner to spread it out at the caller. It's confused the shuffle blocks are handled specifically comparing to other blocks. Ideally, I think we should send the RPC msg (e.g., having a new message called UpdatedShuffleBlockLocation) to MapOutputTrackerMasterEndpoint directly.

if (isSuccess) {
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
}
context.reply(isSuccess)
Copy link
Member

@Ngone51 Ngone51 Sep 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes the responses of non-shuffle blocks also be handled in the thread pool. I'm afraid this introduces unexpected overhead. Shall we only do this for the shuffle blocks only and leave the non-shuffle block the same behavior as it is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not realize this - thanks for pointing it out !
So if I understood it right, the proposal is:

  def handleResult(success: Boolean): Unit = {
    if (success) {
      // post
    }
    context.reply(success)
  }

  if (blockId.isShuffle) {
    updateShuffleBlockInfo( ... ).foreach( handleResult(_))
  } else {
    handleResult(updateBlockInfo( ... ))
  }

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given @gengliangwang has merged it, can you create a follow up PR ? We can merge it pretty quickly and possible make that into current 3.2 RC as well :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

@gengliangwang
Copy link
Member

@mridulm @Ngone51 I really want to start 3.2.0 RC4 today.
So I am going to merge this one and ask @Ngone51 to create a follow-up PR so that we can start the new RC soon.

@gengliangwang
Copy link
Member

gengliangwang commented Sep 23, 2021

Merging to master/3.2/3.1

gengliangwang pushed a commit that referenced this pull request Sep 23, 2021
…ng UpdateBlockInfo

### What changes were proposed in this pull request?
Delegate potentially blocking call to `mapOutputTracker.updateMapOutput` from within  `UpdateBlockInfo` from `dispatcher-BlockManagerMaster` to the threadpool to avoid blocking the endpoint. This code path is only accessed for `ShuffleIndexBlockId`, other blocks are still executed on the `dispatcher-BlockManagerMaster` itself.

Change `updateBlockInfo` to return `Future[Boolean]` instead of `Boolean`. Response will be sent to RPC caller upon successful completion of the future.

Introduce a unit test that forces `MapOutputTracker` to make a broadcast as part of `MapOutputTracker.serializeOutputStatuses` when running decommission tests.

### Why are the changes needed?
[SPARK-36782](https://issues.apache.org/jira/browse/SPARK-36782) describes a deadlock occurring if the `dispatcher-BlockManagerMaster` is allowed to block while waiting for write access to data structures.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test as introduced in this PR.

---

Ping eejbyfeldt for notice.

Closes #34043 from f-thiele/SPARK-36782.

Lead-authored-by: Fabian A.J. Thiele <[email protected]>
Co-authored-by: Emil Ejbyfeldt <[email protected]>
Co-authored-by: Fabian A.J. Thiele <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit 4ea54e8)
Signed-off-by: Gengliang Wang <[email protected]>
@Ngone51
Copy link
Member

Ngone51 commented Sep 23, 2021

FYI: followup PR: #34076

gengliangwang pushed a commit that referenced this pull request Sep 23, 2021
…thread pool

### What changes were proposed in this pull request?

This's a follow-up of #34043. This PR proposes to only handle shuffle blocks in the separate thread pool and leave other blocks the same behavior as it is.

### Why are the changes needed?

To avoid any potential overhead.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass existing tests.

Closes #34076 from Ngone51/spark-36782-follow-up.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
gengliangwang pushed a commit that referenced this pull request Sep 23, 2021
…thread pool

### What changes were proposed in this pull request?

This's a follow-up of #34043. This PR proposes to only handle shuffle blocks in the separate thread pool and leave other blocks the same behavior as it is.

### Why are the changes needed?

To avoid any potential overhead.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass existing tests.

Closes #34076 from Ngone51/spark-36782-follow-up.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit 9d8ac7c)
Signed-off-by: Gengliang Wang <[email protected]>
gengliangwang pushed a commit to gengliangwang/spark that referenced this pull request Sep 23, 2021
…ng UpdateBlockInfo

### What changes were proposed in this pull request?
Delegate potentially blocking call to `mapOutputTracker.updateMapOutput` from within  `UpdateBlockInfo` from `dispatcher-BlockManagerMaster` to the threadpool to avoid blocking the endpoint. This code path is only accessed for `ShuffleIndexBlockId`, other blocks are still executed on the `dispatcher-BlockManagerMaster` itself.

Change `updateBlockInfo` to return `Future[Boolean]` instead of `Boolean`. Response will be sent to RPC caller upon successful completion of the future.

Introduce a unit test that forces `MapOutputTracker` to make a broadcast as part of `MapOutputTracker.serializeOutputStatuses` when running decommission tests.

### Why are the changes needed?
[SPARK-36782](https://issues.apache.org/jira/browse/SPARK-36782) describes a deadlock occurring if the `dispatcher-BlockManagerMaster` is allowed to block while waiting for write access to data structures.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test as introduced in this PR.

---

Ping eejbyfeldt for notice.

Closes apache#34043 from f-thiele/SPARK-36782.

Lead-authored-by: Fabian A.J. Thiele <[email protected]>
Co-authored-by: Emil Ejbyfeldt <[email protected]>
Co-authored-by: Fabian A.J. Thiele <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
gengliangwang pushed a commit that referenced this pull request Sep 23, 2021
…thread pool

### What changes were proposed in this pull request?

This's a follow-up of #34043. This PR proposes to only handle shuffle blocks in the separate thread pool and leave other blocks the same behavior as it is.

### Why are the changes needed?

To avoid any potential overhead.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass existing tests.

Closes #34076 from Ngone51/spark-36782-follow-up.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit 9d8ac7c)
Signed-off-by: Gengliang Wang <[email protected]>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
…ng UpdateBlockInfo

### What changes were proposed in this pull request?
Delegate potentially blocking call to `mapOutputTracker.updateMapOutput` from within  `UpdateBlockInfo` from `dispatcher-BlockManagerMaster` to the threadpool to avoid blocking the endpoint. This code path is only accessed for `ShuffleIndexBlockId`, other blocks are still executed on the `dispatcher-BlockManagerMaster` itself.

Change `updateBlockInfo` to return `Future[Boolean]` instead of `Boolean`. Response will be sent to RPC caller upon successful completion of the future.

Introduce a unit test that forces `MapOutputTracker` to make a broadcast as part of `MapOutputTracker.serializeOutputStatuses` when running decommission tests.

### Why are the changes needed?
[SPARK-36782](https://issues.apache.org/jira/browse/SPARK-36782) describes a deadlock occurring if the `dispatcher-BlockManagerMaster` is allowed to block while waiting for write access to data structures.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test as introduced in this PR.

---

Ping eejbyfeldt for notice.

Closes apache#34043 from f-thiele/SPARK-36782.

Lead-authored-by: Fabian A.J. Thiele <[email protected]>
Co-authored-by: Emil Ejbyfeldt <[email protected]>
Co-authored-by: Fabian A.J. Thiele <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
…thread pool

### What changes were proposed in this pull request?

This's a follow-up of apache#34043. This PR proposes to only handle shuffle blocks in the separate thread pool and leave other blocks the same behavior as it is.

### Why are the changes needed?

To avoid any potential overhead.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass existing tests.

Closes apache#34076 from Ngone51/spark-36782-follow-up.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit 9d8ac7c)
Signed-off-by: Gengliang Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants