Skip to content

Conversation

@JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented May 11, 2017

What changes were proposed in this pull request?

This PR refactors ShuffleMapStage and MapOutputTracker in order to simplify the management of MapStatuses, reduce driver memory consumption, and remove a potential source of scheduler correctness bugs.

Background

In Spark there are currently two places where MapStatuses are tracked:

  • The MapOutputTracker maintains an Array[MapStatus] storing a single location for each map output. This mapping is used by the DAGScheduler for determining reduce-task locality preferences (when locality-aware reduce task scheduling is enabled) and is also used to serve map output locations to executors / tasks.
  • Each ShuffleMapStage also contains a mapping of Array[List[MapStatus]] which holds the complete set of locations where each map output could be available. This mapping is used to determine which map tasks need to be run when constructing TaskSets for the stage.

This duplication adds complexity and creates the potential for certain types of correctness bugs. Bad things can happen if these two copies of the map output locations get out of sync. For instance, if the MapOutputTracker is missing locations for a map output but ShuffleMapStage believes that locations are available then tasks will fail with MetadataFetchFailedException but ShuffleMapStage will not be updated to reflect the missing map outputs, leading to situations where the stage will be reattempted (because downstream stages experienced fetch failures) but no task sets will be launched (because ShuffleMapStage thinks all maps are available).

I observed this behavior in a real-world deployment. I'm still not quite sure how the state got out of sync in the first place, but we can completely avoid this class of bug if we eliminate the duplicate state.

Why we only need to track a single location for each map output

I think that storing an Array[List[MapStatus]] in ShuffleMapStage is unnecessary.

First, note that this adds memory/object bloat to the driver we need one extra List per task. If you have millions of tasks across all stages then this can add up to be a significant amount of resources.

Secondly, I believe that it's extremely uncommon that these lists will ever contain more than one entry. It's not impossible, but is very unlikely given the conditions which must occur for that to happen:

  • In normal operation (no task failures) we'll only run each task once and thus will have at most one output.
  • If speculation is enabled then it's possible that we'll have multiple attempts of a task. The TaskSetManager will kill duplicate attempts of a task after a task finishes successfully, reducing the likelihood that both the original and speculated task will successfully register map outputs.
  • There is a comment in TaskSetManager which suggests that running tasks are not killed if a task set becomes a zombie. However:
    • If the task set becomes a zombie due to the job being cancelled then it doesn't matter whether we record map outputs.
    • If the task set became a zombie because of a stage failure (e.g. the map stage itself had a fetch failure from an upstream match stage) then I believe that the "failedEpoch" will be updated which may cause map outputs from still-running tasks to be ignored. (I'm not 100% sure on this point, though).
  • Even if you do manage to record multiple map outputs for a stage, only a single map output is reported to / tracked by the MapOutputTracker. The only situation where the additional output locations could actually be read or used would be if a task experienced a FetchFailure exception. The most likely cause of a FetchFailure exception is an executor lost, which will have most likely caused the loss of several map tasks' output, so saving on potential re-execution of a single map task isn't a huge win if we're going to have to recompute several other lost map outputs from other tasks which ran on that lost executor. Also note that the re-population of MapOutputTracker state from state in the ShuffleMapTask only happens after the reduce stage has failed; the additional location doesn't help to prevent FetchFailures but, instead, can only reduce the amount of work when recomputing missing parent stages.

Given this, this patch chooses to do away with tracking multiple locations for map outputs and instead stores only a single location. This change removes the main distinction between the ShuffleMapTask and MapOutputTracker's copies of this state, paving the way for storing it only in the MapOutputTracker.

Overview of other changes

  • Significantly simplified the cache / lock management inside of the MapOutputTrackerMaster:
    • The old code had several parallel HashMaps which had to be guarded by maps of Objects which were used as locks. This code was somewhat complicated to follow.
    • The new code uses a new ShuffleStatus class to group together all of the state associated with a particular shuffle, including cached serialized map statuses, significantly simplifying the logic.
  • Moved more code out of the shared MapOutputTracker abstract base class and into the MapOutputTrackerMaster and MapOutputTrackerWorker subclasses. This makes it easier to reason about which functionality needs to be supported only on the driver or executor.
  • Removed a bunch of code from the DAGScheduler which was used to synchronize information from the MapOutputTracker to ShuffleMapStage.
  • Added comments to clarify the role of MapOutputTrackerMaster's epoch in invalidating executor-side shuffle map output caches.

I will comment on these changes via inline GitHub review comments.

/cc @hvanhovell and @rxin (whom I discussed this with offline), @tgravescs (who recently worked on caching of serialized MapOutputStatuses), and @kayousterhout and @markhamstra (for scheduler changes).

How was this patch tested?

Existing tests. I purposely avoided making interface / API which would require significant updates or modifications to test code.

shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)

if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is no longer necessary because ShuffleMapTask queries the MapOutputTracker.

// recomputation of the map outputs. In that case, some nodes may have cached
// locations with holes (from when we detected the error) and will need the
// epoch incremented to refetch them.
// TODO: Only increment the epoch number if this is not the first time
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be possible to resolve this TODO but I haven't thought about it too much yet.

// The epoch of the task is acceptable (i.e., the task was launched after the most
// recent failure we're aware of for the executor), so mark the task's output as
// available.
shuffleStage.addOutputLoc(smt.partitionId, status)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the old code we'd incrementally register map outputs in ShuffleStage and then would write the entire set of complete map outputs into MapOutputTracker in one shot upon stage completion (see line 1242 in the old code). Now we write this incrementally.

stage.removeOutputsOnExecutor(execId)
mapOutputTracker.registerMapOutputs(
shuffleId,
stage.outputLocInMapOutputTrackerFormat(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a potential inefficiency in the old code. If you lost a single executor then we'd end up overwriting the MapOutputTracker's state for every ShuffleMapStage. We'd have to iterate through each task once in stage.removeOutputsOnExecutor and then a second time in stage.outputLocInMapOutputTrackerFormat() and in both iterations we'd have to iterate on a List for each task too.

While the new code still needs to scan every task it doesn't have to scan a separate list per task and only needs a single scan, not two.

changeEpoch = true)
}
if (shuffleIdToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea behind this branch was to ensure that we always increment the mapOutputTracker epoch upon executor lost. Thus I put an incrementEpoch() into the removeOutputsOnExecutor() call itself inside MapOutputTrackerMaster.

@SparkQA
Copy link

SparkQA commented May 11, 2017

Test build #76836 has finished for PR 17955 at commit 7d59bbe.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 12, 2017

Test build #76838 has finished for PR 17955 at commit e3da298.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

The MapOutputTrackerSuite remote fetch test case failed as of that last commit because I didn't faithfully replicate the behavior of clearEpoch() / incrementEpoch().

In the old code it seems like bumping the epoch would effectively invalidate the entire driver-side cache of map output statuses, whereas my code does this at a finer-granularity by effectively tracking cache validity on a per-shuffle basis: if an executor lost event actually ends up removing map output statuses for a shuffle then we invalidate only that shuffle's cache; other caches aren't affected.

} catch {
case e: MetadataFetchFailedException =>
// We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
mapStatuses.clear()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to locally clear the mapStatuses cache. In the old code the cache would be indirectly cleared after the FetchFailure is handled by the DAGScheduler and the epoch is incremented in the MapOutputTrackerMaster.

The cache clearing still happens when the master sends us a higher epoch, but now the driver-side epoch is only bumped after outputs are actually lost (or missing outputs become available), not after every fetch failure.

@SparkQA
Copy link

SparkQA commented May 13, 2017

Test build #76886 has finished for PR 17955 at commit a8069a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@markhamstra
Copy link
Contributor

@JoshRosen The hard coding of interruptThread = true within TaskSetManager's handleSuccessfulTask to effect the killing of duplicate, speculative attempts of a task is potentially an issue -- not a new issue with this PR, but one that hasn't been fully analyzed and addressed AFAIK. https://issues.apache.org/jira/browse/SPARK-17064

@markhamstra
Copy link
Contributor

I've looked at only the DAGScheduler changes so far. They LGTM.

@JoshRosen
Copy link
Contributor Author

@markhamstra, I think that the the interruptThread = true hardcoding may be orthogonal to this PR's proposed changes: interruptThread affects how we carry out task cancellation, not whether we attempt to cancel, so even if you set interruptThread = false you could still effectively cancel redundant task attempts since those tasks would check the interrupted flag in TaskContext.isInterrupted or via InterruptibleIterator.

@markhamstra
Copy link
Contributor

@JoshRosen Yes, I agree that it is orthogonal -- at least for now. I'm mostly just offering a heads up that if we get around to addressing interruptThread, then there may also need to be some changes related to mapOutput tracking.

@sameeragarwal
Copy link
Member

cc @jiangxb1987 for review

@tgravescs
Copy link
Contributor

at a high level this definitely makes sense. I need to look at in more detail, I'll try to do that in the next day or two.

I am wondering what all testing you have done on this? have you done manual testing on large jobs and different types of failures, like the fetch failures and such?

@JoshRosen
Copy link
Contributor Author

I've been running some local tests of scheduler throughput to make sure that this doesn't adversely affect performance in the processing of task completion events (I discovered the perf. hotspot fixed in #18008 as part of this testing).

I'm planning to run more comprehensive end-to-end tests on real clusters (with injected failures) in the next couple of days.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really good, I have only two minor comments.

mapOutputTracker.registerMapOutputs(
shuffleStage.shuffleDep.shuffleId,
shuffleStage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safer if we increment the epoch number here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to think about this carefully and maybe make a matrix of possible cases to be sure. My original thought process was something like this:

  • The old code comment says TODO: Only increment the epoch number if this is not the first time we registered these map outputs, which implies that at least some of the epoch increments here were unnecessary.
  • If we assume that a new, never-before-computed map output won't be requested by executors before it is complete then we don't need to worry about executors caching incomplete map outputs.
  • I believe that any FetchFailure should end up incrementing the epoch.

That said, the increment here is only occurring once per stage completion. It probably doesn't hurt to bump the epoch here because in a single-stage-at-a-time case we'd only be invalidating map outputs which we'll never fetch again anyways. Even if we were unnecessarily invalidating the map output statuses of other concurrent stages I think that the impact of this is going to be relatively small (if we did find that this had an impact then a sane approach would be to implement an e-tag like mechanism where bumping the epoch doesn't purge the executor-side caches, but, instead, has them verify a per-stage epoch / counter). Finally, the existing code might be giving us nice eager cleanup of map statuses after stages complete (vs. the cleanup which occurs later when stages or shuffles are fully cleaned up).

I think you're right that this change carries unnecessary / not-fully-understood risks for now, so let me go ahead and put in an explicit increment here (with an updated comment / ref. to this discussion) in my next push to this PR.

firstJobId: Int,
callSite: CallSite,
val shuffleDep: ShuffleDependency[_, _, _])
val shuffleDep: ShuffleDependency[_, _, _],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we can pass the shuffleId, instead of ShuffleDependency here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I agree, but with the caveat that we can only clean this up if this isn't functioning as the last strong reference which keeps the dependency from being garbage-collected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually going to leave this as-is for now since I think the reference might actually be serving a purpose and I want to minimize scope of changes for now.

@JoshRosen
Copy link
Contributor Author

@jiangxb1987, sorry for the super long delay in addressing that latest review comment. I've made that one change you suggested at #17955 (comment) so now I think this should be good to go. Any final comments?

@jiangxb1987
Copy link
Contributor

LGTM, also cc @cloud-fan

@SparkQA
Copy link

SparkQA commented Jun 5, 2017

Test build #77759 has finished for PR 17955 at commit 4550f61.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

jenkins retest this please

@SparkQA
Copy link

SparkQA commented Jun 9, 2017

Test build #77850 has finished for PR 17955 at commit 4550f61.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

I've merged this to master (2.3.0).

Thanks to everyone who helped to review. If there is additional feedback at this point then I'll address it in a quick followup.

@asfgit asfgit closed this in 3476390 Jun 12, 2017
@JoshRosen JoshRosen deleted the map-output-tracker-rewrite branch June 12, 2017 01:50
dataknocker pushed a commit to dataknocker/spark that referenced this pull request Jun 16, 2017
…MapStage

## What changes were proposed in this pull request?

This PR refactors `ShuffleMapStage` and `MapOutputTracker` in order to simplify the management of `MapStatuses`, reduce driver memory consumption, and remove a potential source of scheduler correctness bugs.

### Background

In Spark there are currently two places where MapStatuses are tracked:

- The `MapOutputTracker` maintains an `Array[MapStatus]` storing a single location for each map output. This mapping is used by the `DAGScheduler` for determining reduce-task locality preferences (when locality-aware reduce task scheduling is enabled) and is also used to serve map output locations to executors / tasks.
- Each `ShuffleMapStage` also contains a mapping of `Array[List[MapStatus]]` which holds the complete set of locations where each map output could be available. This mapping is used to determine which map tasks need to be run when constructing `TaskSets` for the stage.

This duplication adds complexity and creates the potential for certain types of correctness bugs.  Bad things can happen if these two copies of the map output locations get out of sync. For instance, if the `MapOutputTracker` is missing locations for a map output but `ShuffleMapStage` believes that locations are available then tasks will fail with `MetadataFetchFailedException` but `ShuffleMapStage` will not be updated to reflect the missing map outputs, leading to situations where the stage will be reattempted (because downstream stages experienced fetch failures) but no task sets will be launched (because `ShuffleMapStage` thinks all maps are available).

I observed this behavior in a real-world deployment. I'm still not quite sure how the state got out of sync in the first place, but we can completely avoid this class of bug if we eliminate the duplicate state.

### Why we only need to track a single location for each map output

I think that storing an `Array[List[MapStatus]]` in `ShuffleMapStage` is unnecessary.

First, note that this adds memory/object bloat to the driver we need one extra `List` per task. If you have millions of tasks across all stages then this can add up to be a significant amount of resources.

Secondly, I believe that it's extremely uncommon that these lists will ever contain more than one entry. It's not impossible, but is very unlikely given the conditions which must occur for that to happen:

- In normal operation (no task failures) we'll only run each task once and thus will have at most one output.
- If speculation is enabled then it's possible that we'll have multiple attempts of a task. The TaskSetManager will [kill duplicate attempts of a task](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L717) after a task finishes successfully, reducing the likelihood that both the original and speculated task will successfully register map outputs.
- There is a [comment in `TaskSetManager`](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L113) which suggests that running tasks are not killed if a task set becomes a zombie. However:
  - If the task set becomes a zombie due to the job being cancelled then it doesn't matter whether we record map outputs.
  - If the task set became a zombie because of a stage failure (e.g. the map stage itself had a fetch failure from an upstream match stage) then I believe that the "failedEpoch" will be updated which may cause map outputs from still-running tasks to [be ignored](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1213). (I'm not 100% sure on this point, though).
- Even if you _do_ manage to record multiple map outputs for a stage, only a single map output is reported to / tracked by the MapOutputTracker. The only situation where the additional output locations could actually be read or used would be if a task experienced a `FetchFailure` exception. The most likely cause of a `FetchFailure` exception is an executor lost, which will have most likely caused the loss of several map tasks' output, so saving on potential re-execution of a single map task isn't a huge win if we're going to have to recompute several other lost map outputs from other tasks which ran on that lost executor. Also note that the re-population of MapOutputTracker state from state in the ShuffleMapTask only happens after the reduce stage has failed; the additional location doesn't help to prevent FetchFailures but, instead, can only reduce the amount of work when recomputing missing parent stages.

Given this, this patch chooses to do away with tracking multiple locations for map outputs and instead stores only a single location. This change removes the main distinction between the `ShuffleMapTask` and `MapOutputTracker`'s copies of this state, paving the way for storing it only in the `MapOutputTracker`.

### Overview of other changes

- Significantly simplified the cache / lock management inside of the `MapOutputTrackerMaster`:
  - The old code had several parallel `HashMap`s which had to be guarded by maps of `Object`s which were used as locks. This code was somewhat complicated to follow.
  - The new code uses a new `ShuffleStatus` class to group together all of the state associated with a particular shuffle, including cached serialized map statuses, significantly simplifying the logic.
- Moved more code out of the shared `MapOutputTracker` abstract base class and into the `MapOutputTrackerMaster` and `MapOutputTrackerWorker` subclasses. This makes it easier to reason about which functionality needs to be supported only on the driver or executor.
- Removed a bunch of code from the `DAGScheduler` which was used to synchronize information from the `MapOutputTracker` to `ShuffleMapStage`.
- Added comments to clarify the role of `MapOutputTrackerMaster`'s `epoch` in invalidating executor-side shuffle map output caches.

I will comment on these changes via inline GitHub review comments.

/cc hvanhovell and rxin (whom I discussed this with offline), tgravescs (who recently worked on caching of serialized MapOutputStatuses), and kayousterhout and markhamstra (for scheduler changes).

## How was this patch tested?

Existing tests. I purposely avoided making interface / API which would require significant updates or modifications to test code.

Author: Josh Rosen <[email protected]>

Closes apache#17955 from JoshRosen/map-output-tracker-rewrite.
asfgit pushed a commit that referenced this pull request Jul 18, 2017
…Tracker

## What changes were proposed in this pull request?

In SPARK-21444, sitalkedia reported an issue where the `Broadcast.destroy()` call in `MapOutputTracker`'s `ShuffleStatus.invalidateSerializedMapOutputStatusCache()` was failing with an `IOException`, causing the DAGScheduler to crash and bring down the entire driver.

This is a bug introduced by #17955. In the old code, we removed a broadcast variable by calling `BroadcastManager.unbroadcast` with `blocking=false`, but the new code simply calls `Broadcast.destroy()` which is capable of failing with an IOException in case certain blocking RPCs time out.

The fix implemented here is to replace this with a call to `destroy(blocking = false)` and to wrap the entire operation in `Utils.tryLogNonFatalError`.

## How was this patch tested?

I haven't written regression tests for this because it's really hard to inject mocks to simulate RPC failures here. Instead, this class of issue is probably best uncovered with more generalized error injection / network unreliability / fuzz testing tools.

Author: Josh Rosen <[email protected]>

Closes #18662 from JoshRosen/SPARK-21444.
bersprockets pushed a commit to bersprockets/spark that referenced this pull request Sep 7, 2018
…MapStage

## What changes were proposed in this pull request?

This PR refactors `ShuffleMapStage` and `MapOutputTracker` in order to simplify the management of `MapStatuses`, reduce driver memory consumption, and remove a potential source of scheduler correctness bugs.

### Background

In Spark there are currently two places where MapStatuses are tracked:

- The `MapOutputTracker` maintains an `Array[MapStatus]` storing a single location for each map output. This mapping is used by the `DAGScheduler` for determining reduce-task locality preferences (when locality-aware reduce task scheduling is enabled) and is also used to serve map output locations to executors / tasks.
- Each `ShuffleMapStage` also contains a mapping of `Array[List[MapStatus]]` which holds the complete set of locations where each map output could be available. This mapping is used to determine which map tasks need to be run when constructing `TaskSets` for the stage.

This duplication adds complexity and creates the potential for certain types of correctness bugs.  Bad things can happen if these two copies of the map output locations get out of sync. For instance, if the `MapOutputTracker` is missing locations for a map output but `ShuffleMapStage` believes that locations are available then tasks will fail with `MetadataFetchFailedException` but `ShuffleMapStage` will not be updated to reflect the missing map outputs, leading to situations where the stage will be reattempted (because downstream stages experienced fetch failures) but no task sets will be launched (because `ShuffleMapStage` thinks all maps are available).

I observed this behavior in a real-world deployment. I'm still not quite sure how the state got out of sync in the first place, but we can completely avoid this class of bug if we eliminate the duplicate state.

### Why we only need to track a single location for each map output

I think that storing an `Array[List[MapStatus]]` in `ShuffleMapStage` is unnecessary.

First, note that this adds memory/object bloat to the driver we need one extra `List` per task. If you have millions of tasks across all stages then this can add up to be a significant amount of resources.

Secondly, I believe that it's extremely uncommon that these lists will ever contain more than one entry. It's not impossible, but is very unlikely given the conditions which must occur for that to happen:

- In normal operation (no task failures) we'll only run each task once and thus will have at most one output.
- If speculation is enabled then it's possible that we'll have multiple attempts of a task. The TaskSetManager will [kill duplicate attempts of a task](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L717) after a task finishes successfully, reducing the likelihood that both the original and speculated task will successfully register map outputs.
- There is a [comment in `TaskSetManager`](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L113) which suggests that running tasks are not killed if a task set becomes a zombie. However:
  - If the task set becomes a zombie due to the job being cancelled then it doesn't matter whether we record map outputs.
  - If the task set became a zombie because of a stage failure (e.g. the map stage itself had a fetch failure from an upstream match stage) then I believe that the "failedEpoch" will be updated which may cause map outputs from still-running tasks to [be ignored](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1213). (I'm not 100% sure on this point, though).
- Even if you _do_ manage to record multiple map outputs for a stage, only a single map output is reported to / tracked by the MapOutputTracker. The only situation where the additional output locations could actually be read or used would be if a task experienced a `FetchFailure` exception. The most likely cause of a `FetchFailure` exception is an executor lost, which will have most likely caused the loss of several map tasks' output, so saving on potential re-execution of a single map task isn't a huge win if we're going to have to recompute several other lost map outputs from other tasks which ran on that lost executor. Also note that the re-population of MapOutputTracker state from state in the ShuffleMapTask only happens after the reduce stage has failed; the additional location doesn't help to prevent FetchFailures but, instead, can only reduce the amount of work when recomputing missing parent stages.

Given this, this patch chooses to do away with tracking multiple locations for map outputs and instead stores only a single location. This change removes the main distinction between the `ShuffleMapTask` and `MapOutputTracker`'s copies of this state, paving the way for storing it only in the `MapOutputTracker`.

### Overview of other changes

- Significantly simplified the cache / lock management inside of the `MapOutputTrackerMaster`:
  - The old code had several parallel `HashMap`s which had to be guarded by maps of `Object`s which were used as locks. This code was somewhat complicated to follow.
  - The new code uses a new `ShuffleStatus` class to group together all of the state associated with a particular shuffle, including cached serialized map statuses, significantly simplifying the logic.
- Moved more code out of the shared `MapOutputTracker` abstract base class and into the `MapOutputTrackerMaster` and `MapOutputTrackerWorker` subclasses. This makes it easier to reason about which functionality needs to be supported only on the driver or executor.
- Removed a bunch of code from the `DAGScheduler` which was used to synchronize information from the `MapOutputTracker` to `ShuffleMapStage`.
- Added comments to clarify the role of `MapOutputTrackerMaster`'s `epoch` in invalidating executor-side shuffle map output caches.

I will comment on these changes via inline GitHub review comments.

/cc hvanhovell and rxin (whom I discussed this with offline), tgravescs (who recently worked on caching of serialized MapOutputStatuses), and kayousterhout and markhamstra (for scheduler changes).

## How was this patch tested?

Existing tests. I purposely avoided making interface / API which would require significant updates or modifications to test code.

Author: Josh Rosen <[email protected]>

Closes apache#17955 from JoshRosen/map-output-tracker-rewrite.
asfgit pushed a commit that referenced this pull request Sep 11, 2018
…ectness issue

## What changes were proposed in this pull request?

Back port of #22354 and #17955 to 2.2 (#22354 depends on methods introduced by #17955).

-------

An alternative fix for #21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

## How was this patch tested?

a new test case

Closes #22382 from bersprockets/SPARK-23243-2.2.

Lead-authored-by: Bruce Robbins <[email protected]>
Co-authored-by: Josh Rosen <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 25, 2019
…ectness issue

## What changes were proposed in this pull request?

Back port of apache#22354 and apache#17955 to 2.2 (apache#22354 depends on methods introduced by apache#17955).

-------

An alternative fix for apache#21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

## How was this patch tested?

a new test case

Closes apache#22382 from bersprockets/SPARK-23243-2.2.

Lead-authored-by: Bruce Robbins <[email protected]>
Co-authored-by: Josh Rosen <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 26, 2019
…ectness issue

## What changes were proposed in this pull request?

Back port of apache#22354 and apache#17955 to 2.2 (apache#22354 depends on methods introduced by apache#17955).

-------

An alternative fix for apache#21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

## How was this patch tested?

a new test case

Closes apache#22382 from bersprockets/SPARK-23243-2.2.

Lead-authored-by: Bruce Robbins <[email protected]>
Co-authored-by: Josh Rosen <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 27, 2019
…ectness issue

## What changes were proposed in this pull request?

Back port of apache#22354 and apache#17955 to 2.2 (apache#22354 depends on methods introduced by apache#17955).

-------

An alternative fix for apache#21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

## How was this patch tested?

a new test case

Closes apache#22382 from bersprockets/SPARK-23243-2.2.

Lead-authored-by: Bruce Robbins <[email protected]>
Co-authored-by: Josh Rosen <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants