Skip to content

Conversation

@mengxr
Copy link
Contributor

@mengxr mengxr commented Apr 23, 2014

The current implementations of machine learning algorithms rely on the driver for some computation and data broadcasting. This may create a bottleneck at the driver for both computation and communication, especially in multi-model training. An efficient implementation of AllReduce can help free up the driver. This PR contains a simple butterfly AllReduce implementation. Compared it with reduce + broadcast (http) on a 16-node EC2 cluster (with slow connection), and saw 2x speed-up on vectors of size 1k to 10m.

Possible improvements:

  1. Each executor only needs one copy.
  2. Better handling when the number of partitions is not a power of two?

@AmplabJenkins
Copy link

Merged build triggered.

@mridulm
Copy link
Contributor

mridulm commented Apr 23, 2014

Might be a good idea to move this out of mllib and push this into core itself.
The utility of this PR seems more fundamental than just for ML (assuming it does something analogous to all reduce in mpi - note, I am yet to go through this in detail :-) ).

@AmplabJenkins
Copy link

Merged build started.

@mengxr
Copy link
Contributor Author

mengxr commented Apr 23, 2014

I'm a little worried about misuse. Calling AllReduce with many small partitions can be very slow, at least with this implementation. Even in MLlib, this is package private.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14371/

@rxin
Copy link
Contributor

rxin commented Apr 23, 2014

@mridulm this is not a very general solution yet, and can have bad consequences (e.g. when data are not cached in memory). If we want a more reliable allReduce, we should probably look into some sort of shuffle dependency that is not all to all (the main problem modeling this using shuffle I see is having to send a bunch of 0s back to the driver for shuffle block size estimation; we might be able to just use run-length-encoding to make that transmission cheap).

@mridulm
Copy link
Contributor

mridulm commented Apr 23, 2014

Agree, I was not suggesting that this specific change per-se makes it into core.
Just that there are a lot of applications for all-reduce support in spark : and if it were available out of the box, it will make porting a lot of algos quite trivial !

And in that context, all reduce support should go into core.
If we feel that this specific PR is not what we want due to limitations/design choices, that is fine by me.

@mengxr mengxr closed this Apr 23, 2014
@mengxr mengxr reopened this Apr 23, 2014
@mengxr
Copy link
Contributor Author

mengxr commented Apr 23, 2014

@mridulm This implementation is experimental, and I'm looking for comments and suggestions to make it better. @etrain @shivaram ?

Since it already outperforms reduce + broadcast, it is interesting to see how far we can go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the same as PartitionPruningRDD ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I didn't know there is one.

@shivaram
Copy link
Contributor

@mengxr This is really cool and the performance wins look awesome. Apart from the inline comments, I just one more idea: Instead of using cache + rdd re-partitioning in each step, how expensive is it to do a reduceByKey at each iteration and adjust the keys appropriately ? I think some serialization + de-serialization overheads might add up, but it'll simplify the clean up / caching etc.

pwendell pushed a commit to pwendell/spark that referenced this pull request May 12, 2014
…#506.

SPARK-1062 Add rdd.intersection(otherRdd) method

Author: Andrew Ash <[email protected]>

== Merge branch commits ==

commit 5d9982b171b9572649e9828f37ef0b43f0242912
Author: Andrew Ash <[email protected]>
Date:   Thu Feb 6 18:11:45 2014 -0800

    Minor fixes

    - style: (v,null) => (v, null)
    - mention the shuffle in Javadoc

commit b86d02f14e810902719cef893cf6bfa18ff9acb0
Author: Andrew Ash <[email protected]>
Date:   Sun Feb 2 13:17:40 2014 -0800

    Overload .intersection() for numPartitions and custom Partitioner

commit bcaa34911fcc6bb5bc5e4f9fe46d1df73cb71c09
Author: Andrew Ash <[email protected]>
Date:   Sun Feb 2 13:05:40 2014 -0800

    Better naming of parameters in intersection's filter

commit b10a6af2d793ec6e9a06c798007fac3f6b860d89
Author: Andrew Ash <[email protected]>
Date:   Sat Jan 25 23:06:26 2014 -0800

    Follow spark code format conventions of tab => 2 spaces

commit 965256e4304cca514bb36a1a36087711dec535ec
Author: Andrew Ash <[email protected]>
Date:   Fri Jan 24 00:28:01 2014 -0800

    Add rdd.intersection(otherRdd) method
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14943/

@mengxr mengxr changed the title [SPARK-1485][MLLIB] Implement Butterfly AllReduce [WIP][SPARK-1485][MLLIB] Implement Butterfly AllReduce May 13, 2014
@mengxr
Copy link
Contributor Author

mengxr commented Jun 17, 2014

Thanks all for reviewing this PR! I found the butterfly pattern introduces complex dependency that slows down the computation. In my tests, a good approach for Spark is tree reduce + bt broadcast. So I'm closing this one now in favor of #1110 .

@mengxr mengxr closed this Jun 17, 2014
gzm55 pushed a commit to MediaV/spark that referenced this pull request Jul 17, 2014
…#506.

SPARK-1062 Add rdd.intersection(otherRdd) method

Author: Andrew Ash <[email protected]>

== Merge branch commits ==

commit 5d9982b171b9572649e9828f37ef0b43f0242912
Author: Andrew Ash <[email protected]>
Date:   Thu Feb 6 18:11:45 2014 -0800

    Minor fixes

    - style: (v,null) => (v, null)
    - mention the shuffle in Javadoc

commit b86d02f14e810902719cef893cf6bfa18ff9acb0
Author: Andrew Ash <[email protected]>
Date:   Sun Feb 2 13:17:40 2014 -0800

    Overload .intersection() for numPartitions and custom Partitioner

commit bcaa34911fcc6bb5bc5e4f9fe46d1df73cb71c09
Author: Andrew Ash <[email protected]>
Date:   Sun Feb 2 13:05:40 2014 -0800

    Better naming of parameters in intersection's filter

commit b10a6af2d793ec6e9a06c798007fac3f6b860d89
Author: Andrew Ash <[email protected]>
Date:   Sat Jan 25 23:06:26 2014 -0800

    Follow spark code format conventions of tab => 2 spaces

commit 965256e4304cca514bb36a1a36087711dec535ec
Author: Andrew Ash <[email protected]>
Date:   Fri Jan 24 00:28:01 2014 -0800

    Add rdd.intersection(otherRdd) method
asfgit pushed a commit that referenced this pull request Jul 29, 2014
In `reduce` and `aggregate`, the driver node spends linear time on the number of partitions. It becomes a bottleneck when there are many partitions and the data from each partition is big.

SPARK-1485 (#506) tracks the progress of implementing AllReduce on Spark. I did several implementations including butterfly, reduce + broadcast, and treeReduce + broadcast. treeReduce + BT broadcast seems to be right way to go for Spark. Using binary tree may introduce some overhead in communication, because the driver still need to coordinate on data shuffling. In my experiments, n -> sqrt(n) -> 1 gives the best performance in general, which is why I set "depth = 2" in MLlib algorithms. But it certainly needs more testing.

I left `treeReduce` and `treeAggregate` public for easy testing. Some numbers from a test on 32-node m3.2xlarge cluster.

code:

~~~
import breeze.linalg._
import org.apache.log4j._

Logger.getRootLogger.setLevel(Level.OFF)

for (n <- Seq(1, 10, 100, 1000, 10000, 100000, 1000000)) {
  val vv = sc.parallelize(0 until 1024, 1024).map(i => DenseVector.zeros[Double](n))
  var start = System.nanoTime(); vv.treeReduce(_ + _, 2); println((System.nanoTime() - start) / 1e9)
  start = System.nanoTime(); vv.reduce(_ + _); println((System.nanoTime() - start) / 1e9)
}
~~~

out:

| n | treeReduce(,2) | reduce |
|---|---------------------|-----------|
| 10 | 0.215538731 | 0.204206899 |
| 100 | 0.278405907 | 0.205732582 |
| 1000 | 0.208972182 | 0.214298272 |
| 10000 | 0.194792071 | 0.349353687 |
| 100000 | 0.347683285 | 6.086671892 |
| 1000000 | 2.589350682 | 66.572906702 |

CC: @pwendell

This is clearly more scalable than the default implementation. My question is whether we should use this implementation in `reduce` and `aggregate` or put them as separate methods. The concern is that users may use `reduce` and `aggregate` as collect, where having multiple stages doesn't reduce the data size. However, in this case, `collect` is more appropriate.

Author: Xiangrui Meng <[email protected]>

Closes #1110 from mengxr/tree and squashes the following commits:

c6cd267 [Xiangrui Meng] make depth default to 2
b04b96a [Xiangrui Meng] address comments
9bcc5d3 [Xiangrui Meng] add depth for readability
7495681 [Xiangrui Meng] fix compile error
142a857 [Xiangrui Meng] merge master
d58a087 [Xiangrui Meng] move treeReduce and treeAggregate to mllib
8a2a59c [Xiangrui Meng] Merge branch 'master' into tree
be6a88a [Xiangrui Meng] use treeAggregate in mllib
0f94490 [Xiangrui Meng] add docs
eb71c33 [Xiangrui Meng] add treeReduce
fe42a5e [Xiangrui Meng] add treeAggregate
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
In `reduce` and `aggregate`, the driver node spends linear time on the number of partitions. It becomes a bottleneck when there are many partitions and the data from each partition is big.

SPARK-1485 (apache#506) tracks the progress of implementing AllReduce on Spark. I did several implementations including butterfly, reduce + broadcast, and treeReduce + broadcast. treeReduce + BT broadcast seems to be right way to go for Spark. Using binary tree may introduce some overhead in communication, because the driver still need to coordinate on data shuffling. In my experiments, n -> sqrt(n) -> 1 gives the best performance in general, which is why I set "depth = 2" in MLlib algorithms. But it certainly needs more testing.

I left `treeReduce` and `treeAggregate` public for easy testing. Some numbers from a test on 32-node m3.2xlarge cluster.

code:

~~~
import breeze.linalg._
import org.apache.log4j._

Logger.getRootLogger.setLevel(Level.OFF)

for (n <- Seq(1, 10, 100, 1000, 10000, 100000, 1000000)) {
  val vv = sc.parallelize(0 until 1024, 1024).map(i => DenseVector.zeros[Double](n))
  var start = System.nanoTime(); vv.treeReduce(_ + _, 2); println((System.nanoTime() - start) / 1e9)
  start = System.nanoTime(); vv.reduce(_ + _); println((System.nanoTime() - start) / 1e9)
}
~~~

out:

| n | treeReduce(,2) | reduce |
|---|---------------------|-----------|
| 10 | 0.215538731 | 0.204206899 |
| 100 | 0.278405907 | 0.205732582 |
| 1000 | 0.208972182 | 0.214298272 |
| 10000 | 0.194792071 | 0.349353687 |
| 100000 | 0.347683285 | 6.086671892 |
| 1000000 | 2.589350682 | 66.572906702 |

CC: @pwendell

This is clearly more scalable than the default implementation. My question is whether we should use this implementation in `reduce` and `aggregate` or put them as separate methods. The concern is that users may use `reduce` and `aggregate` as collect, where having multiple stages doesn't reduce the data size. However, in this case, `collect` is more appropriate.

Author: Xiangrui Meng <[email protected]>

Closes apache#1110 from mengxr/tree and squashes the following commits:

c6cd267 [Xiangrui Meng] make depth default to 2
b04b96a [Xiangrui Meng] address comments
9bcc5d3 [Xiangrui Meng] add depth for readability
7495681 [Xiangrui Meng] fix compile error
142a857 [Xiangrui Meng] merge master
d58a087 [Xiangrui Meng] move treeReduce and treeAggregate to mllib
8a2a59c [Xiangrui Meng] Merge branch 'master' into tree
be6a88a [Xiangrui Meng] use treeAggregate in mllib
0f94490 [Xiangrui Meng] add docs
eb71c33 [Xiangrui Meng] add treeReduce
fe42a5e [Xiangrui Meng] add treeAggregate
@sidps
Copy link

sidps commented Mar 17, 2018

I've been curious about underlying implementations of such operations, has the ring all-reduce technique been considered? http://research.baidu.com/bringing-hpc-techniques-deep-learning/

yifeih added a commit to yifeih/spark that referenced this pull request Mar 5, 2019
yifeih added a commit to yifeih/spark that referenced this pull request Mar 5, 2019
yifeih added a commit to yifeih/spark that referenced this pull request May 8, 2019
…ache#498)

* add initial bypass merge sort shuffle writer benchmarks

* dd unsafe shuffle writer benchmarks

* changes in bypassmergesort benchmarks

* cleanup

* add circle script

* add this branch for testing

* fix circle attempt 1

* checkout code

* add some caches?

* why is it not pull caches...

* save as artifact instead of publishing

* mkdir

* typo

* try uploading artifacts again

* try print per iteration to avoid circle erroring out on idle

* blah (apache#495)

* make a PR comment

* actually delete files

* run benchmarks on test build branch

* oops forgot to enable upload

* add sort shuffle writer benchmarks

* add stdev

* cleanup sort a bit

* fix stdev text

* fix sort shuffle

* initial code for read side

* format

* use times and sample stdev

* add assert for at least one iteration

* cleanup shuffle write to use fewer mocks and single base interface

* shuffle read works with transport client... needs lots of cleaning

* test running in cicle

* scalastyle

* dont publish results yet

* cleanup writer code

* get only git message

* fix command to get PR number

* add SortshuffleWriterBenchmark

* writer code

* cleanup

* fix benchmark script

* use ArgumentMatchers

* also in shufflewriterbenchmarkbase

* scalastyle

* add apache license

* fix some scale stuff

* fix up tests

* only copy benchmarks we care about

* increase size for reader again

* delete two writers and reader for PR

* SPARK-25299: Add shuffle reader benchmarks (apache#506)

* Revert "SPARK-25299: Add shuffle reader benchmarks (apache#506)"

This reverts commit 9d46fae.

* add -e to bash script

* blah

* enable upload as a PR comment and prevent running benchmarks on this branch

* Revert "enable upload as a PR comment and prevent running benchmarks on this branch"

This reverts commit 13703fa.

* try machine execution

* try uploading benchmarks (apache#498)

* only upload results when merging into the feature branch

* lock down machine image

* don't write input data to disk

* run benchmark test

* stop creating file cleanup threads for every block manager

* use alphanumeric again

* use a new random everytime

* close the writers -__________-

* delete branch and publish results as comment

* close in finally
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
Manageiq jobs are failed due to can't match the libssl
packages when wget for ubuntufailing bundler fixing. This
change uses wildcard to match the packages.

Closes: theopenlab/openlab#242
arjunshroff pushed a commit to arjunshroff/spark that referenced this pull request Nov 24, 2020
RolatZhang pushed a commit to RolatZhang/spark that referenced this pull request Aug 15, 2022
RolatZhang pushed a commit to RolatZhang/spark that referenced this pull request Aug 15, 2022
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants