Skip to content

[SPARK-7826][CORE] Suppress extra calling getCacheLocs.#6352

Closed
ueshin wants to merge 9 commits intoapache:masterfrom
ueshin:issues/SPARK-7826
Closed

[SPARK-7826][CORE] Suppress extra calling getCacheLocs.#6352
ueshin wants to merge 9 commits intoapache:masterfrom
ueshin:issues/SPARK-7826

Conversation

@ueshin
Copy link
Copy Markdown
Member

@ueshin ueshin commented May 22, 2015

There are too many extra call method getCacheLocs for DAGScheduler, which includes Akka communication.
To improve DAGScheduler performance, suppress extra calling the method.

In my application with over 1200 stages, the execution time became 3.8 min from 8.5 min with my patch.

@srowen
Copy link
Copy Markdown
Member

srowen commented May 22, 2015

Can you explain why it's valid to proceed without the call when there is 1 dependency?
Also it looks like you're adding calls to getCacheLocs actually. I don't see an explanation and the description isn't consistent with the change.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 22, 2015

Test build #33339 has finished for PR 6352 at commit 9a80fad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Copy Markdown
Member Author

ueshin commented May 22, 2015

@srowen, Thank you for your checking.

  1. To check the parent stages are missing or not, we only have to check the location of RDD that has ShuffleDependency because narrow-depending RDDs would be the same location, so I moved the calling getCacheLocs into the case shufDep.
  2. I was afraid that the dependency graph would become bigger (if there are a lot of union or zipPartitions, etc.) so I added checking the location at the point the RDD has more than 1 dependencies.
    • we might not need line 389 if we don't have to consider the case.

@srowen
Copy link
Copy Markdown
Member

srowen commented May 22, 2015

Is part 2 really just to be safe? It seems essential. Are you saying that only shuffle dependencies have more than 1 dependency? Also this adds a new call to all dependencies. Doesn't this mostly defeat the purpose? I am not an expert on this code but I am not sure the logic is clear here

@ueshin
Copy link
Copy Markdown
Member Author

ueshin commented May 22, 2015

Calling will occur not for all RDDs in the stage but only when:

  1. RDD has ShuffleDependency.
  2. RDD has more than 1 dependencies regardless of the dependency type.

And I should have mentioned that calling for the same RDD is not a problem because the location is already cached.

@JoshRosen
Copy link
Copy Markdown
Contributor

(Warning: drive-by comment; I'll look at this patch in more detail later)

One high-level comment:

For any patch which modifies scheduler internals, we should err on the side of extremely liberal commenting of code, even if this means paragraph-long comments. If it's tricky enough to merit a question in a GitHub code review, then it deserves a comment. For instance, the rdd.dependencies.size < 2 check could benefit from a nearby comment that explains why this is safe.

@ueshin
Copy link
Copy Markdown
Member Author

ueshin commented May 23, 2015

Oops, I found that I misunderstood what the method getCacheLocs is doing here.
I'll change the way to suppress Akka communications in the next push, so please check this PR after that.

@JoshRosen, Thank you for your comment.
I'll add comments in the next push.

@ueshin ueshin changed the title [SPARK-7826][CORE] Suppress extra calling getCacheLocs. [WIP][SPARK-7826][CORE] Suppress extra calling getCacheLocs. May 23, 2015
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general aside, I find getCacheLocs(rdd).contains(Nil) to be hard to understand to begin with. I think that this condition is meant to be read as "if at least one partition of this RDD is not cached anywhere...". Maybe this code would be easier to review / parse if we extracted this condition into a variable, perhaps a lazy val if we want to short-circuit, named rddHasUncachedPartitions, or !rddIsCached if we don't mind negation.

@JoshRosen
Copy link
Copy Markdown
Contributor

Oh, one other thought: maybe a good exercise would be to attempt to write the Scaladoc comment for getMissingParentStages which describes, in prose, the basic high-level algorithm for finding missing parent stages. I can help with this tomorrow. Even if you don't end up modifying getMissingParentStages, I'd love to submit a new PR that just comments / explains the existing code in order to make this easier to understand in the future.

To help me build some intuition for understanding your optimization here:

It looks like this only save us from performing getCacheLocs lookups in cases where we're traversing backwards through a long chain of narrow dependencies. I don't think that this is necessarily safe. Imagine that we have a lineage graph which looks something like this:

┌───┐ shuffle ┌───┐    ┌───┐          
│ A │◀ ─ ─ ─ ─│ B │◀───│ C │◀─┐       
└───┘         └───┘    └───┘  │  ┌───┐
                              ├──│ E │
                       ┌───┐  │  └───┘
                       │ D │◀─┘       
                       └───┘              

Here, E has one-to-one dependencies on C and D. C is derived from A by performing a shuffle and then a map. If we're trying to determine which ancestor stages need to be computed in order to compute E, we need to figure out whether the shuffle A -> B should be performed. If the RDD C, which has only one ancestor via a narrow dependency, is cached, then we won't need to compute A, even if it has some unavailable output partitions. The same goes for B: if B is 100% cached, then we can avoid the shuffle on A. Based on this, I don't think that we can make a local decision to skip the caching check based on the structure of the RDD graph. However, we might be able to skip / optimize this check based on RDDs' storage levels: in long chains of narrow dependencies, most RDDs probably aren't cached, so adding a simple if StorageLevel = None return Seq.fill(numPartitions)(Nil) check to getCacheLocs might be safe / sufficient.

Someone more familiar with StorageLevel / caching semantics should double-check this reasoning to make sure that I'm not overlooking any corner-cases when RDDs' storage levels change due to unpersist / cache / persist calls.

@JoshRosen
Copy link
Copy Markdown
Contributor

Also: if my above reasoning is right and this optimization is incorrect, then it's concerning that it didn't cause a test failure. My hunch is that we don't have unit tests for the particular combinations of RDD dependency graphs, caching states, and map output availability that would expose this issue. It would be nice to write a failing regression test which would have caught the problems in the current version of this patch, since that will help us to gain confidence that the new optimizations are safe.

@ueshin
Copy link
Copy Markdown
Member Author

ueshin commented May 23, 2015

@JoshRosen Thank you for your details.
It is exactly that I was noticed yesterday.
I'm modifying DAGScheduler and adding the tests.
I'll push the next version as soon as possible.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 23, 2015

Test build #33402 has finished for PR 6352 at commit b9c835c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 23, 2015

Test build #33403 has finished for PR 6352 at commit 6f3125c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin ueshin changed the title [WIP][SPARK-7826][CORE] Suppress extra calling getCacheLocs. [SPARK-7826][CORE] Suppress extra calling getCacheLocs. May 23, 2015
@ueshin
Copy link
Copy Markdown
Member Author

ueshin commented May 23, 2015

I pushed and the test passed.
@JoshRosen, @srowen, Could you please take a look at this PR again?
Thanks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general style note, I'd try to avoid using return in Scala code, since there are some corner-cases where using it can lead to exception-handling issues (plus it results in slightly inefficient code which uses exceptions for control flow).

@JoshRosen
Copy link
Copy Markdown
Contributor

Thanks for adding that test. This patch looks like it's in pretty good shape, but before we consider merging there's one or two other minor corner-cases that I'd like to explore.

In the current implementation of getCacheLocs, we first check to see whether the RDD's cache locations have been previously fetched; if so, we return the "cached" set of cache locations, and otherwise we fetch the set of locations from the block manager and store it in the cache. This patch's optimization takes place prior to checking / updating the cacheLocs map, meaning that it might slightly change behavior. Specifically, I'm wondering what happened in the old code if we called getCahceLocs() on an RDD that wasn't cached, then cached the RDD , forced it to be computed, then called getCacheLocs() again as part of a different job. In the old code, an empty set of cache locations would have been stored in the map on the first call, so I don't think the second call would see an updated set of cache locations unless we cleared the cache locations. In the new code, non-cached RDDs won't ever cause entries to be stored in cacheLocs, so it's possible that the effects of caching might become visible sooner after this patch than they would in the old code. This might be safe, but if we make this change then it should be deliberate / knowingly. If we want to be really conservative, maybe we should move the storageLevel check inside the if (!cacheLocs.contains(rdd.id) block in order to better preserve the old behavior.

@JoshRosen
Copy link
Copy Markdown
Contributor

Actually, it looks like we end up calling clearCacheLocs() when submitting a new job, so the change described above probably doesn't make a difference. To be safe, though, and to eliminate the return, let's go ahead and move it into the cacheLocs-updating block. Once we do that, I think this will be good to go, but I'll probably pull in a scheduler maintainer for a final spot-check / review.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify for other reviewers, I think that we need these cache() calls so these other tests don't fail due to the skipping of the cached locations lookups.

@ueshin
Copy link
Copy Markdown
Member Author

ueshin commented May 24, 2015

@JoshRosen, Thank you for your comment.
I agree with you and moved the storageLevel check into the if block.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 24, 2015

Test build #33422 has finished for PR 6352 at commit d858b59.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Copy Markdown
Contributor

LGTM. /ping @markhamstra or @kayousterhout for final sign-off on scheduler-related changes.

@markhamstra
Copy link
Copy Markdown
Contributor

LGTM

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't D a missing parent stage here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like what happens is that the call to submit() causes the first set of missing parent stages to be submitted, so at that point, stage D is submitted. Can you add a comment describing this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there's a one-to-one dependency from D to E, won't D and E be computed in the same stage?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. What if we changed this test to, instead of directly calling getMissingParentStages, just directly inspect DAGScheduler.runningStages (since that's already private[scheduler]) to make sure it contains the one stage we expect? I'd find that more intuitive, since that more directly tests the underlying issue we're trying to verify.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea; let's do this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout, Thank you for your checking this PR.
I see, and should I revert getMissingParentStages to private ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if we're not going to use it in the test suite, then it should go back to private.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I found that only checking if the DAGScheduler.runningStages contains one stage is not enough because it also contains one stage including A if the C is not cached yet.
I think we should also check the size of the final stage's missing parents.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking you could inspect the contents of the stages in runningStages to make sure the Id is correct

Sent from my iPhone

On May 26, 2015, at 7:53 PM, Takuya UESHIN notifications@github.com wrote:

In core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:

  • * If the RDD C, which has only one ancestor via a narrow dependency, is cached, then we won't
  • * need to compute A, even if it has some unavailable output partitions. The same goes for B:
  • * if B is 100% cached, then we can avoid the shuffle on A.
  • */
  • test("SPARK-7826: regression test for getMissingParentStages") {
  • val rddA = new MyRDD(sc, 1, Nil)
  • val rddB = new MyRDD(sc, 1, List(new ShuffleDependency(rddA, null)))
  • val rddC = new MyRDD(sc, 1, List(new OneToOneDependency(rddB))).cache()
  • val rddD = new MyRDD(sc, 1, Nil)
  • val rddE = new MyRDD(sc, 1,
  •  List(new OneToOneDependency(rddC), new OneToOneDependency(rddD)))
    
  • cacheLocations(rddC.id -> 0) =
  •  Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))
    
  • val jobId = submit(rddE, Array(0))
  • val finalStage = scheduler.jobIdToActiveJob(jobId).finalStage
  • assert(scheduler.getMissingParentStages(finalStage).size === 0)
    Ah, I found that only checking if the DAGScheduler.runningStages contains one stage is not enough because it also contains one stage including A if the C is not cached yet.
    I think we should also check the size of the final stage's missing parents.


Reply to this email directly or view it on GitHub.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the runningStages contains one stage and it's id is 1, right?

@ueshin
Copy link
Copy Markdown
Member Author

ueshin commented May 27, 2015

I modified the unit test.
Thank you all for your instructions.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 27, 2015

Test build #33559 has finished for PR 6352 at commit 10b1b22.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Copy Markdown
Member Author

ueshin commented May 27, 2015

Retest this please.

@kayousterhout
Copy link
Copy Markdown
Contributor

Jenkins, retest this please

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 27, 2015

Test build #33575 has finished for PR 6352 at commit 10b1b22.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you actually change this to:
assert(scheduler.runningStages.head.isInstanceOf[ResultStage])?

And then add a comment saying something like "Make sure that the scheduler is running the final result stage. Because C is cached, the shuffle map stage to compute A does not need to be run."

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I think this is more intuitive; otherwise, it's hard for someone looking at this to understand why the ID should be 1. This also makes the test more agnostic to unrelated scheduler internals, like if we change the way we assign IDs to stages)

@kayousterhout
Copy link
Copy Markdown
Contributor

Just a few more comments on improving the documentation and understandability of the test. @JoshRosen has recently pointed out that the schedule code is extremely difficult to understand and check for correctness, and I think having easily understandable and well-documented tests is a step towards making the scheduler code more friendly.

@ueshin
Copy link
Copy Markdown
Member Author

ueshin commented May 28, 2015

I modified what you mentioned.
I agree with the difficulties, so please let me know if there are other things I can do here.
Thanks.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 28, 2015

Test build #33629 has finished for PR 6352 at commit 3d4d036.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Copy Markdown
Member Author

ueshin commented May 28, 2015

Retest this please.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 28, 2015

Test build #33631 has finished for PR 6352 at commit 3d4d036.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Copy Markdown
Member Author

ueshin commented May 28, 2015

Jenkins, retest this please.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 28, 2015

Test build #33637 has finished for PR 6352 at commit 3d4d036.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kayousterhout
Copy link
Copy Markdown
Contributor

LGTM

@asfgit asfgit closed this in 9b692bf May 29, 2015
@kayousterhout
Copy link
Copy Markdown
Contributor

Thanks @ueshin ! I merged this since @JoshRosen and @markhamstra LGTM'ed a while ago.

@ueshin
Copy link
Copy Markdown
Member Author

ueshin commented May 29, 2015

@kayousterhout, Thank you for merging this!

jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
There are too many extra call method `getCacheLocs` for `DAGScheduler`, which includes Akka communication.
To improve `DAGScheduler` performance, suppress extra calling the method.

In my application with over 1200 stages, the execution time became 3.8 min from 8.5 min with my patch.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes apache#6352 from ueshin/issues/SPARK-7826 and squashes the following commits:

3d4d036 [Takuya UESHIN] Modify a test and the documentation.
10b1b22 [Takuya UESHIN] Simplify the unit test.
d858b59 [Takuya UESHIN] Move the storageLevel check inside the if (!cacheLocs.contains(rdd.id)) block.
6f3125c [Takuya UESHIN] Fix scalastyle.
b9c835c [Takuya UESHIN] Put the condition that checks if the RDD has uncached partition or not into variable for readability.
f87f2ec [Takuya UESHIN] Get cached locations from block manager only if the storage level of the RDD is not StorageLevel.NONE.
8248386 [Takuya UESHIN] Revert "Suppress extra calling getCacheLocs."
a4d944a [Takuya UESHIN] Add an unit test.
9a80fad [Takuya UESHIN] Suppress extra calling getCacheLocs.
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
There are too many extra call method `getCacheLocs` for `DAGScheduler`, which includes Akka communication.
To improve `DAGScheduler` performance, suppress extra calling the method.

In my application with over 1200 stages, the execution time became 3.8 min from 8.5 min with my patch.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes apache#6352 from ueshin/issues/SPARK-7826 and squashes the following commits:

3d4d036 [Takuya UESHIN] Modify a test and the documentation.
10b1b22 [Takuya UESHIN] Simplify the unit test.
d858b59 [Takuya UESHIN] Move the storageLevel check inside the if (!cacheLocs.contains(rdd.id)) block.
6f3125c [Takuya UESHIN] Fix scalastyle.
b9c835c [Takuya UESHIN] Put the condition that checks if the RDD has uncached partition or not into variable for readability.
f87f2ec [Takuya UESHIN] Get cached locations from block manager only if the storage level of the RDD is not StorageLevel.NONE.
8248386 [Takuya UESHIN] Revert "Suppress extra calling getCacheLocs."
a4d944a [Takuya UESHIN] Add an unit test.
9a80fad [Takuya UESHIN] Suppress extra calling getCacheLocs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants