Skip to content

Conversation

@shivaram
Copy link
Contributor

@shivaram shivaram commented Jun 4, 2015

Set preferred locations for reduce tasks.
The basic design is that we maintain a map from reducerId to a list of (sizes, locations) for each shuffle. We then pick the machines with the top 5 sizes and set that as preferred locations.

Picking the top5 needs sorting (well its a little cheaper than sorting, but still), so we restrict this feature to cases where we have fewer than 1000 map tasks and 1000 reduce tasks.

Resubmission of #4576 and #1697. Hopefully third time is a charm ? cc @kayousterhout for review

This is another attempt at apache#1697 addressing some of the earlier concerns.
This adds a couple of thresholds based on number map and reduce tasks
beyond which we don't use preferred locations for reduce tasks.

This patch also fixes some bugs in DAGSchedulerSuite where the MapStatus
objects created didn't have the right number of reducers set.
…locations

Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
	core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the Scaladoc syntax for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now using scaladoc syntax

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34210 has finished for PR 6652 at commit e7d5449.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public final class UnsafeRow extends BaseMutableRow
    • public abstract class BaseMutableRow extends BaseRow implements MutableRow
    • public abstract class BaseRow implements Row
    • protected class CodeGenContext
    • abstract class BaseMutableProjection extends MutableProjection
    • class SpecificProjection extends $
    • class BaseOrdering extends Ordering[Row]
    • class SpecificOrdering extends $
    • abstract class Predicate
    • class SpecificPredicate extends $
    • abstract class BaseProject extends Projection
    • class SpecificProjection extends $
    • final class SpecificRow extends $

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34220 has finished for PR 6652 at commit 0df3180.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to not use "sizes.map(_._2).sum" instead here? (just slightly easier to read)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually what about:

val locs = statuses.groupBy(s.location).mapValues { statusesAtLocation =>
statusesAtLocation.map(_.getSizeForBlock(r)).sum }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this imperative now per Josh's comment below. I think the new version is pretty readable, but let me now if you think its good

@kayousterhout
Copy link
Contributor

Ok this looks mostly good, subject to @JoshRosen's comments about scalability -- just a few last comments to improve readability.

@shivaram
Copy link
Contributor Author

shivaram commented Jun 6, 2015

@kayousterhout I updated the flag name to be a bit more specific. Let me know if you have any further comments

@SparkQA
Copy link

SparkQA commented Jun 6, 2015

Test build #34374 has finished for PR 6652 at commit 1090b58.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 6, 2015

Test build #34375 has finished for PR 6652 at commit 68bc29e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kayousterhout
Copy link
Contributor

@shivaram and I discussed this offline and whether it was possible to incorporate @sryza's point about the fact that it only makes sense to do this when there's a lot of skew in the placement of the data read by one reduce task. We came up with a simple approach that seems strictly better than the current one (it's simpler to implement and also avoids setting preferred locations when there isn't much skew in the output). The new idea is to set the preferred locations for a reducer to be any locations that hold >20% of the data to be read by that reducer. This will result in at most 5 locations being set (the current approach always chooses 5 locations), but more typically fewer. 20% is totally a magic number here...but intuitively it seems unnecessary to set a preferred location and add that complexity when there will be less than a 20% perf. improvement from doing so.

Unless anyone objects, @shivaram is going to push a new version that uses that approach.

Also removes caching of preferred locations to make the API cleaner
@shivaram
Copy link
Contributor Author

shivaram commented Jun 9, 2015

Thanks @kayousterhout for taking a look and proposing this new design. I agree that its more cleaner than the top-5 locations and we only get up to 5 locations at the maximum with fraction as 0.2. I've updated the code to reflect this now.

BTW I also removed caching the preferred locations in the map output tracker. This made it messy to implement an API which accepted fractionThreshold as an argument and I think the caching only helps in case of speculative tasks / failures (so it shouldn't hurt us much).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOD comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed now

@kayousterhout
Copy link
Contributor

Mostly LGTM -- just a few readability comments to help whatever future poor soul wants to update the DAGScheduler code.

@sryza does this new approach adequately address your concerns?

@SparkQA
Copy link

SparkQA commented Jun 9, 2015

Test build #34475 has finished for PR 6652 at commit 897a914.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 9, 2015

Test build #34474 has finished for PR 6652 at commit f5be578.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 9, 2015

Test build #34480 has finished for PR 6652 at commit 2ef2d39.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 9, 2015

Test build #34519 has finished for PR 6652 at commit 492e25e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can delete this pattern now, it shouldn't ever occur.

incidentally, this also suggests that Depedendency should be sealed. I just tried making that change, it turned up these other warnings, which actually seem legit:

warn] /Users/irashid/github/spark/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala:136: non-variable type argument Product2[K,Any] in type pattern org.apache.spark.OneToOneDependency[Product2[K,Any]] is unchecked since it is eliminated by erasure
[warn]       case oneToOneDependency: OneToOneDependency[Product2[K, Any]] =>
[warn]                                ^
[warn] /Users/irashid/github/spark/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala:135: match may not be exhaustive.
[warn] It would fail on the following input: NarrowDependency()
[warn]     for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
[warn]                                                      ^
[warn] /Users/irashid/github/spark/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala:109: match may not be exhaustive.
[warn] It would fail on the following input: NarrowDependency()
[warn]       dependencies(depNum) match {
[warn]                   ^
[warn] three warnings found

It seems that code will break if they are ever given a RangeDependency

@squito
Copy link
Contributor

squito commented Jun 9, 2015

lgtm too. I think you could go ahead and delete the extra case _, though maybe it makes sense to defer changing Dependency to sealed to a separate pr.

@shivaram
Copy link
Contributor Author

shivaram commented Jun 9, 2015

Thanks @squito for taking a look. Yeah I think it makes to change Dependency to a sealed class but we can do it in a follow up PR. BTW in the specific case of the scheduler it might be interesting to set preferred locations for RangeDependency based on the size of the parent splits. I haven't run into many applications which use RangeDependency though.

@sryza
Copy link
Contributor

sryza commented Jun 10, 2015

@shivaram @kayousterhout this approach addresses my concerns. Thanks for updating!

@asfgit asfgit closed this in 96a7c88 Jun 10, 2015
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
Set preferred locations for reduce tasks.
The basic design is that we maintain a map from reducerId to a list of (sizes, locations) for each
shuffle. We then set the preferred locations to be any machines that have 20% of more of the output
that needs to be read by the reduce task.  This will result in at most 5 preferred locations for
each reduce task.

Selecting the preferred locations involves O(# map tasks * # reduce tasks) computation, so we
restrict this feature to cases where we have fewer than 1000 map tasks and 1000 reduce tasks.

Author: Shivaram Venkataraman <[email protected]>

Closes apache#6652 from shivaram/reduce-locations and squashes the following commits:

492e25e [Shivaram Venkataraman] Remove unused import
2ef2d39 [Shivaram Venkataraman] Address code review comments
897a914 [Shivaram Venkataraman] Remove unused hash map
f5be578 [Shivaram Venkataraman] Use fraction of map outputs to determine locations Also removes caching of preferred locations to make the API cleaner
68bc29e [Shivaram Venkataraman] Fix line length
1090b58 [Shivaram Venkataraman] Change flag name
77ce7d8 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
e5d56bd [Shivaram Venkataraman] Add flag to turn off locality for shuffle deps
6cfae98 [Shivaram Venkataraman] Filter out zero blocks, rename variables
9d5831a [Shivaram Venkataraman] Address some more comments
8e31266 [Shivaram Venkataraman] Fix style
0df3180 [Shivaram Venkataraman] Address code review comments
e7d5449 [Shivaram Venkataraman] Fix merge issues
ad7cb53 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
df14cee [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
5093aea [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
0171d3c [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
bc4dfd6 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
774751b [Shivaram Venkataraman] Fix bug introduced by line length adjustment
34d0283 [Shivaram Venkataraman] Fix style issues
3b464b7 [Shivaram Venkataraman] Set preferred locations for reduce tasks This is another attempt at apache#1697 addressing some of the earlier concerns. This adds a couple of thresholds based on number map and reduce tasks beyond which we don't use preferred locations for reduce tasks.
shivaram added a commit to shivaram/spark-1 that referenced this pull request Nov 24, 2015
Set preferred locations for reduce tasks.
The basic design is that we maintain a map from reducerId to a list of (sizes, locations) for each
shuffle. We then set the preferred locations to be any machines that have 20% of more of the output
that needs to be read by the reduce task.  This will result in at most 5 preferred locations for
each reduce task.

Selecting the preferred locations involves O(# map tasks * # reduce tasks) computation, so we
restrict this feature to cases where we have fewer than 1000 map tasks and 1000 reduce tasks.

Author: Shivaram Venkataraman <[email protected]>

Closes apache#6652 from shivaram/reduce-locations and squashes the following commits:

492e25e [Shivaram Venkataraman] Remove unused import
2ef2d39 [Shivaram Venkataraman] Address code review comments
897a914 [Shivaram Venkataraman] Remove unused hash map
f5be578 [Shivaram Venkataraman] Use fraction of map outputs to determine locations Also removes caching of preferred locations to make the API cleaner
68bc29e [Shivaram Venkataraman] Fix line length
1090b58 [Shivaram Venkataraman] Change flag name
77ce7d8 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
e5d56bd [Shivaram Venkataraman] Add flag to turn off locality for shuffle deps
6cfae98 [Shivaram Venkataraman] Filter out zero blocks, rename variables
9d5831a [Shivaram Venkataraman] Address some more comments
8e31266 [Shivaram Venkataraman] Fix style
0df3180 [Shivaram Venkataraman] Address code review comments
e7d5449 [Shivaram Venkataraman] Fix merge issues
ad7cb53 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
df14cee [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
5093aea [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
0171d3c [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
bc4dfd6 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
774751b [Shivaram Venkataraman] Fix bug introduced by line length adjustment
34d0283 [Shivaram Venkataraman] Fix style issues
3b464b7 [Shivaram Venkataraman] Set preferred locations for reduce tasks This is another attempt at apache#1697 addressing some of the earlier concerns. This adds a couple of thresholds based on number map and reduce tasks beyond which we don't use preferred locations for reduce tasks.

Conflicts:
	core/src/main/scala/org/apache/spark/MapOutputTracker.scala
	core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants