Skip to content

[SPARK-9952] Fix N^2 loop when DAGScheduler.getPreferredLocsInternal accesses cacheLocs#8178

Closed
JoshRosen wants to merge 7 commits intoapache:masterfrom
JoshRosen:dagscheduler-perf
Closed

[SPARK-9952] Fix N^2 loop when DAGScheduler.getPreferredLocsInternal accesses cacheLocs#8178
JoshRosen wants to merge 7 commits intoapache:masterfrom
JoshRosen:dagscheduler-perf

Conversation

@JoshRosen
Copy link
Copy Markdown
Contributor

In Scala, Seq.fill always seems to return a List. Accessing a list by index is an O(N) operation. Thus, the following code will be really slow (~10 seconds on my machine):

val numItems = 100000
val s = Seq.fill(numItems)(1)
for (i <- 0 until numItems) s(i)

It turns out that we had a loop like this in DAGScheduler code, although it's a little tricky to spot. In getPreferredLocsInternal, there's a call to getCacheLocs(rdd)(partition). The getCacheLocs call returns a Seq. If this Seq is a List and the RDD contains many partitions, then indexing into this list will cost O(partitions). Thus, when we loop over our tasks to compute their individual preferred locations we implicitly perform an N^2 loop, reducing scheduling throughput.

This patch fixes this by replacing Seq with Array.

@JoshRosen
Copy link
Copy Markdown
Contributor Author

The problematic Seq.fill was added in #6352, which was merged to 1.5.0, so I think that this patch needs to be merged to master and 1.5.0 in order to prevent a scheduling performance regression.

@JoshRosen
Copy link
Copy Markdown
Contributor Author

I noticed this while running a very simple scheduling throughput benchmark under YourKit Java profiler with CPU tracing enabled. Here's a comparison of two trace results, clearly illustrating the slowdown:

image

For scheduling a job with 10000 no-op tasks, (sc.makeRDD(1 to NUM_TASKS, NUM_TASKS).mapPartitions(identity).count()), the end-to-end time in local[*] mode dropped from ~48 seconds to ~20 seconds as a result of this change.

@JoshRosen
Copy link
Copy Markdown
Contributor Author

Also, note that the actual max scheduling throughput is much higher with tracing disabled; I can scheduler over 5000 tasks / second on my laptop.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 14, 2015

Test build #40813 timed out for PR 8178 at commit fe918a9 after a configured wait of 175m.

@JoshRosen
Copy link
Copy Markdown
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 14, 2015

Test build #1595 timed out for PR 8178 at commit fe918a9 after a configured wait of 175m.

@JoshRosen
Copy link
Copy Markdown
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 14, 2015

Test build #40888 has finished for PR 8178 at commit fe918a9.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Copy Markdown
Contributor

squito commented Aug 14, 2015

nice find. We could also use an IndexedSeq (which I think is what everyone really wants anytime they say Seq), but it looks like there are other places that want an array anyway, so makes sense to just use an array here. IndexedSeq would leave the door open for other optimizations -- eg., when the storage level is None, you don't actually need to create anything, you can just return a collection which knows its Nil in every slot ... but that would probably be premature optimization.

anyway, just some random thoughts. lgtm pending tests.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 15, 2015

Test build #1615 has finished for PR 8178 at commit fe918a9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 15, 2015

Test build #1624 has finished for PR 8178 at commit fe918a9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Copy Markdown
Contributor Author

One potential gotcha of using arrays: we might run into problems with array equality checks returning false because they're based on the array identity rather than contents. Given this, maybe it would be safer to use IndexedSeq; the alternative would be to carefully search the existing code to figure out whether this change has accidentally broken any comparisons.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 15, 2015

Test build #40972 has finished for PR 8178 at commit 88710c1.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 15, 2015

Test build #40974 has finished for PR 8178 at commit 44a15f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Copy Markdown
Contributor Author

I've gone ahead and minimized this to just the IndexedSeq change, which seems to address the performance issue, so I'm going to merge this to master and 1.5 pending Jenkins.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 17, 2015

Test build #40998 timed out for PR 8178 at commit 6e5fdc2 after a configured wait of 175m.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit, but any reason for all the size -> length changes? just seems like a bit of noise if we ever look in git history for these lines.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how true it is with more recent versions of Scala, but there at least was a time when Array#size didn't perform nearly as well as Array#length.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito
Copy link
Copy Markdown
Contributor

squito commented Aug 17, 2015

thanks for updating josh. still lgtm pending tests from me. (left one minor comment, your discretion to update).

@squito
Copy link
Copy Markdown
Contributor

squito commented Aug 17, 2015

Jenkins, retest this please

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 17, 2015

Test build #41003 timed out for PR 8178 at commit 6e5fdc2 after a configured wait of 175m.

@JoshRosen
Copy link
Copy Markdown
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 17, 2015

Test build #41022 has finished for PR 8178 at commit 6e5fdc2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Copy Markdown
Contributor

retest this please

@andrewor14
Copy link
Copy Markdown
Contributor

LGTM will merge once we pass tests.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 17, 2015

Test build #41035 has finished for PR 8178 at commit 6e5fdc2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 17, 2015

Test build #1633 has finished for PR 8178 at commit 6e5fdc2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class FilterNode(condition: Expression, child: LocalNode) extends UnaryLocalNode
    • abstract class LocalNode extends TreeNode[LocalNode]
    • abstract class LeafLocalNode extends LocalNode
    • abstract class UnaryLocalNode extends LocalNode
    • case class ProjectNode(projectList: Seq[NamedExpression], child: LocalNode) extends UnaryLocalNode
    • case class SeqScanNode(output: Seq[Attribute], data: Seq[InternalRow]) extends LeafLocalNode
    • public final class UTF8String implements Comparable<UTF8String>, Externalizable

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 17, 2015

Test build #1634 has finished for PR 8178 at commit 6e5fdc2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 17, 2015

Test build #1635 has finished for PR 8178 at commit 6e5fdc2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StringIndexerModel (
    • implicit class StringToColumn(val sc: StringContext)
    • case class FilterNode(condition: Expression, child: LocalNode) extends UnaryLocalNode
    • abstract class LocalNode extends TreeNode[LocalNode]
    • abstract class LeafLocalNode extends LocalNode
    • abstract class UnaryLocalNode extends LocalNode
    • case class ProjectNode(projectList: Seq[NamedExpression], child: LocalNode) extends UnaryLocalNode
    • case class SeqScanNode(output: Seq[Attribute], data: Seq[InternalRow]) extends LeafLocalNode
    • public final class UTF8String implements Comparable<UTF8String>, Externalizable

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 18, 2015

Test build #1638 has finished for PR 8178 at commit 6e5fdc2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 18, 2015

Test build #1637 timed out for PR 8178 at commit 6e5fdc2 after a configured wait of 175m.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 18, 2015

Test build #1639 timed out for PR 8178 at commit 6e5fdc2 after a configured wait of 175m.

@JoshRosen
Copy link
Copy Markdown
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 18, 2015

Test build #41095 has finished for PR 8178 at commit 6e5fdc2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Copy Markdown
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 18, 2015

Test build #41138 timed out for PR 8178 at commit 6e5fdc2 after a configured wait of 175m.

@rxin
Copy link
Copy Markdown
Contributor

rxin commented Aug 19, 2015

I'm going to merge this since the unit tests just took longer to run but actually the relevant tests passed.

asfgit pushed a commit that referenced this pull request Aug 19, 2015
…accesses cacheLocs

In Scala, `Seq.fill` always seems to return a List. Accessing a list by index is an O(N) operation. Thus, the following code will be really slow (~10 seconds on my machine):

```scala
val numItems = 100000
val s = Seq.fill(numItems)(1)
for (i <- 0 until numItems) s(i)
```

It turns out that we had a loop like this in DAGScheduler code, although it's a little tricky to spot. In `getPreferredLocsInternal`, there's a call to `getCacheLocs(rdd)(partition)`.  The `getCacheLocs` call returns a Seq. If this Seq is a List and the RDD contains many partitions, then indexing into this list will cost O(partitions). Thus, when we loop over our tasks to compute their individual preferred locations we implicitly perform an N^2 loop, reducing scheduling throughput.

This patch fixes this by replacing `Seq` with `Array`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #8178 from JoshRosen/dagscheduler-perf.

(cherry picked from commit 010b03e)
Signed-off-by: Reynold Xin <rxin@databricks.com>
@asfgit asfgit closed this in 010b03e Aug 19, 2015
@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 19, 2015

Test build #1663 has finished for PR 8178 at commit 6e5fdc2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants