Skip to content

Conversation

@JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Sep 9, 2016

This patch makes three minor refactorings to the BlockManager:

  • Move the if (info.tellMaster) check out of reportBlockStatus; this fixes an issue where a debug logging message would incorrectly claim to have reported a block status to the master even though no message had been sent (in case info.tellMaster == false). This also makes it easier to write code which unconditionally sends block statuses to the master (which is necessary in another patch of mine).
  • Split removeBlock() into two methods, the existing method and an internal removeBlockInternal() method which is designed to be called by internal code that already holds a write lock on the block. This is also needed by a followup patch.
  • Instead of calling getCurrentBlockStatus() in removeBlock(), just pass BlockStatus.empty; the block status should always be empty following complete removal of a block.

These changes were originally authored as part of a bug fix patch which is targeted at branch-2.0 and master; I've split them out here into their own separate PR in order to make them easier to review and so that the behavior-changing parts of my other patch can be isolated to their own PR.

info.size = size
if (tellMaster) {
reportBlockStatus(blockId, info, putBlockStatus)
if (tellMaster && info.tellMaster) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing. What is the difference between tellMaster and info.tellMaster ? For instance, why isn't this tellMaster || info.tellMaster. (I understand that && is the current behavior, but I'm not sure why.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In BlockInfo, which tracks metadata about an individual block (such as the desired storage level that the block should be stored at), the tellMaster field tracks whether the master should be informed of state changes to this block. This appears to be false only for blocks which are deserialized copies of TorrentBroadcasts (see the putSingle calls in TorrentBroadcast.scala).

The tellMaster parameter, on the other hand, controls whether this particular block-status-changing operation should send a metadata update to the master. The only place where this seems to be false is in the removeRdd code path, which is used for bulk-removal of an RDD's cached blocks. In this path, the master first performs a bulk deletion of block statuses in its own metadata table and then asynchronously deletes the blocks from block managers. I think the goal here is to avoid sending one status update per deleted block since that might result in a huge flood of RPC traffic at the master and could cause bad message queueing (since the block manager metadata-handling endpoint is single-threaded).

If we go way back, I think that one original rationale of this may have been to avoid sending status updates for map outputs, which at one time may have been persisted on disk via the BlockManager rather than bypassing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, looks like my memory was correct:

Take a look at ShuffleMapTask in Spark v0.6.0:

val blockManager = SparkEnv.get.blockManager
    for (i <- 0 until numOutputSplits) {
      val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
      // Get a Scala iterator from Java map
      val iter: Iterator[(Any, Any)] = bucketIterators(i)
      val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
      compressedSizes(i) = MapOutputTracker.compressSize(size)
    }

Here, false is the tellMaster parameter.

This functionality in the BlockManager dates back to the original Spark Streaming engine improvement patch: 63051dd

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sounds like this is not trivial to fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rename the flag to removedByMaster or similar? Might also consider renaming tellMaster to trackedByMaster but that's probably a bigger change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that a big clarifying change would be to internally refactor this so that the tellMaster parameter is omitted from all calls and so that the tellMaster = false case for cached and reassembled broadcast blocks is inferred automatically from the block id. This would clean up a lot of the function signatures and would make the weird "don't tell the master" exception much more readily apparent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to rename / clarify this, I'd prefer to just do that bigger change separately in a patch which isn't intended for branch-2.0.

@SparkQA
Copy link

SparkQA commented Sep 9, 2016

Test build #65170 has finished for PR 15036 at commit dca6a8a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen JoshRosen changed the title [SPARK-17483] Minor refactoring in BlockManager status reporting and block removal [SPARK-17483] Refactoring in BlockManager status reporting and block removal Sep 9, 2016
logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
}
blockInfoManager.removeBlock(blockId)
if (tellMaster && info.tellMaster) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just pass in tellMaster && info.tellMaster into one parameter and get rid of the info: BlockInfo parameter ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea; let me go ahead and make that change.

@srinathshankar
Copy link
Contributor

One minor additional comments, but the rest LGTM

@ericl
Copy link
Contributor

ericl commented Sep 10, 2016

One suggestion but also LGTM

}

private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = {
Option(TaskContext.get()).foreach { c =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One new change: I decided to move this duplicated logic into this helper function. See c3cc277. This is going to make it easier to skip the block status update in certain places in my next patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

@SparkQA
Copy link

SparkQA commented Sep 10, 2016

Test build #65177 has finished for PR 15036 at commit 1406048.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 10, 2016

Test build #65181 has finished for PR 15036 at commit c3cc277.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Thanks for the reviews. I'm going to merge this into master.

@asfgit asfgit closed this in 3d40896 Sep 12, 2016
@JoshRosen JoshRosen deleted the cache-failure-race-conditions-refactorings-only branch September 13, 2016 20:01
@JoshRosen
Copy link
Contributor Author

I'm also going to backport this to branch-2.0 so that I can merge #15085 there.

asfgit pushed a commit that referenced this pull request Sep 15, 2016
…removal

This patch makes three minor refactorings to the BlockManager:

- Move the `if (info.tellMaster)` check out of `reportBlockStatus`; this fixes an issue where a debug logging message would incorrectly claim to have reported a block status to the master even though no message had been sent (in case `info.tellMaster == false`). This also makes it easier to write code which unconditionally sends block statuses to the master (which is necessary in another patch of mine).
- Split  `removeBlock()` into two methods, the existing method and an internal `removeBlockInternal()` method which is designed to be called by internal code that already holds a write lock on the block. This is also needed by a followup patch.
- Instead of calling `getCurrentBlockStatus()` in `removeBlock()`, just pass `BlockStatus.empty`; the block status should always be empty following complete removal of a block.

These changes were originally authored as part of a bug fix patch which is targeted at branch-2.0 and master; I've split them out here into their own separate PR in order to make them easier to review and so that the behavior-changing parts of my other patch can be isolated to their own PR.

Author: Josh Rosen <[email protected]>

Closes #15036 from JoshRosen/cache-failure-race-conditions-refactorings-only.

(cherry picked from commit 3d40896)
Signed-off-by: Josh Rosen <[email protected]>
asfgit pushed a commit that referenced this pull request Sep 15, 2016
…er put() exceptions

## What changes were proposed in this pull request?

If a BlockManager `put()` call failed after the BlockManagerMaster was notified of a block's availability then incomplete cleanup logic in a `finally` block would never send a second block status method to inform the master of the block's unavailability. This, in turn, leads to fetch failures and used to be capable of causing complete job failures before #15037 was fixed.

This patch addresses this issue via multiple small changes:

- The `finally` block now calls `removeBlockInternal` when cleaning up from a failed `put()`; in addition to removing the `BlockInfo` entry (which was _all_ that the old cleanup logic did), this code (redundantly) tries to remove the block from the memory and disk stores (as an added layer of defense against bugs lower down in the stack) and optionally notifies the master of block removal (which now happens during exception-triggered cleanup).
- When a BlockManager receives a request for a block that it does not have it will now notify the master to update its block locations. This ensures that bad metadata pointing to non-existent blocks will eventually be fixed. Note that I could have implemented this logic in the block manager client (rather than in the remote server), but that would introduce the problem of distinguishing between transient and permanent failures; on the server, however, we know definitively that the block isn't present.
- Catch `NonFatal` instead of `Exception` to avoid swallowing `InterruptedException`s thrown from synchronous block replication calls.

This patch depends upon the refactorings in #15036, so that other patch will also have to be backported when backporting this fix.

For more background on this issue, including example logs from a real production failure, see [SPARK-17484](https://issues.apache.org/jira/browse/SPARK-17484).

## How was this patch tested?

Two new regression tests in BlockManagerSuite.

Author: Josh Rosen <[email protected]>

Closes #15085 from JoshRosen/SPARK-17484.

(cherry picked from commit 1202075)
Signed-off-by: Josh Rosen <[email protected]>
asfgit pushed a commit that referenced this pull request Sep 15, 2016
…er put() exceptions

## What changes were proposed in this pull request?

If a BlockManager `put()` call failed after the BlockManagerMaster was notified of a block's availability then incomplete cleanup logic in a `finally` block would never send a second block status method to inform the master of the block's unavailability. This, in turn, leads to fetch failures and used to be capable of causing complete job failures before #15037 was fixed.

This patch addresses this issue via multiple small changes:

- The `finally` block now calls `removeBlockInternal` when cleaning up from a failed `put()`; in addition to removing the `BlockInfo` entry (which was _all_ that the old cleanup logic did), this code (redundantly) tries to remove the block from the memory and disk stores (as an added layer of defense against bugs lower down in the stack) and optionally notifies the master of block removal (which now happens during exception-triggered cleanup).
- When a BlockManager receives a request for a block that it does not have it will now notify the master to update its block locations. This ensures that bad metadata pointing to non-existent blocks will eventually be fixed. Note that I could have implemented this logic in the block manager client (rather than in the remote server), but that would introduce the problem of distinguishing between transient and permanent failures; on the server, however, we know definitively that the block isn't present.
- Catch `NonFatal` instead of `Exception` to avoid swallowing `InterruptedException`s thrown from synchronous block replication calls.

This patch depends upon the refactorings in #15036, so that other patch will also have to be backported when backporting this fix.

For more background on this issue, including example logs from a real production failure, see [SPARK-17484](https://issues.apache.org/jira/browse/SPARK-17484).

## How was this patch tested?

Two new regression tests in BlockManagerSuite.

Author: Josh Rosen <[email protected]>

Closes #15085 from JoshRosen/SPARK-17484.
wgtmac pushed a commit to wgtmac/spark that referenced this pull request Sep 19, 2016
…removal

This patch makes three minor refactorings to the BlockManager:

- Move the `if (info.tellMaster)` check out of `reportBlockStatus`; this fixes an issue where a debug logging message would incorrectly claim to have reported a block status to the master even though no message had been sent (in case `info.tellMaster == false`). This also makes it easier to write code which unconditionally sends block statuses to the master (which is necessary in another patch of mine).
- Split  `removeBlock()` into two methods, the existing method and an internal `removeBlockInternal()` method which is designed to be called by internal code that already holds a write lock on the block. This is also needed by a followup patch.
- Instead of calling `getCurrentBlockStatus()` in `removeBlock()`, just pass `BlockStatus.empty`; the block status should always be empty following complete removal of a block.

These changes were originally authored as part of a bug fix patch which is targeted at branch-2.0 and master; I've split them out here into their own separate PR in order to make them easier to review and so that the behavior-changing parts of my other patch can be isolated to their own PR.

Author: Josh Rosen <[email protected]>

Closes apache#15036 from JoshRosen/cache-failure-race-conditions-refactorings-only.
wgtmac pushed a commit to wgtmac/spark that referenced this pull request Sep 19, 2016
…er put() exceptions

## What changes were proposed in this pull request?

If a BlockManager `put()` call failed after the BlockManagerMaster was notified of a block's availability then incomplete cleanup logic in a `finally` block would never send a second block status method to inform the master of the block's unavailability. This, in turn, leads to fetch failures and used to be capable of causing complete job failures before apache#15037 was fixed.

This patch addresses this issue via multiple small changes:

- The `finally` block now calls `removeBlockInternal` when cleaning up from a failed `put()`; in addition to removing the `BlockInfo` entry (which was _all_ that the old cleanup logic did), this code (redundantly) tries to remove the block from the memory and disk stores (as an added layer of defense against bugs lower down in the stack) and optionally notifies the master of block removal (which now happens during exception-triggered cleanup).
- When a BlockManager receives a request for a block that it does not have it will now notify the master to update its block locations. This ensures that bad metadata pointing to non-existent blocks will eventually be fixed. Note that I could have implemented this logic in the block manager client (rather than in the remote server), but that would introduce the problem of distinguishing between transient and permanent failures; on the server, however, we know definitively that the block isn't present.
- Catch `NonFatal` instead of `Exception` to avoid swallowing `InterruptedException`s thrown from synchronous block replication calls.

This patch depends upon the refactorings in apache#15036, so that other patch will also have to be backported when backporting this fix.

For more background on this issue, including example logs from a real production failure, see [SPARK-17484](https://issues.apache.org/jira/browse/SPARK-17484).

## How was this patch tested?

Two new regression tests in BlockManagerSuite.

Author: Josh Rosen <[email protected]>

Closes apache#15085 from JoshRosen/SPARK-17484.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants