Skip to content

Comments

[SPARK-30667][CORE] Add allGather method to BarrierTaskContext#27395

Closed
sarthfrey wants to merge 39 commits intoapache:masterfrom
sarthfrey:master
Closed

[SPARK-30667][CORE] Add allGather method to BarrierTaskContext#27395
sarthfrey wants to merge 39 commits intoapache:masterfrom
sarthfrey:master

Conversation

@sarthfrey
Copy link
Contributor

What changes were proposed in this pull request?

The allGather method is added to the BarrierTaskContext. This method contains the same functionality as the BarrierTaskContext.barrier method; it blocks the task until all tasks make the call, at which time they may continue execution. In addition, the allGather method takes an input message. Upon returning from the allGather the task receives a list of all the messages sent by all the tasks that made the allGather call.

Why are the changes needed?

There are many situations where having the tasks communicate in a synchronized way is useful. One simple example is if each task needs to start a server to serve requests from one another; first the tasks must find a free port (the result of which is undetermined beforehand) and then start making requests, but to do so they each must know the port chosen by the other task. An allGather method would allow them to inform each other of the port they will run on.

Does this PR introduce any user-facing change?

Yes, an BarrierTaskContext.allGather method will be available through the Scala, Java, and Python APIs.

How was this patch tested?

Most of the code path is already covered by tests to the barrier method, since this PR includes a refactor so that much code is shared by the barrier and allGather methods. However, a test is added to assert that an all gather on each tasks partition ID will return a list of every partition ID.

An example through the Python API:

>>> from pyspark import BarrierTaskContext
>>>
>>> def f(iterator):
...     context = BarrierTaskContext.get()
...     return [context.allGather('{}'.format(context.partitionId()))]
...
>>> sc.parallelize(range(4), 4).barrier().mapPartitions(f).collect()[0]
[u'3', u'1', u'0', u'2']

@sarthfrey sarthfrey changed the title Add all Gather method to BarrierTaskContext Add allGather method to BarrierTaskContext Jan 30, 2020
@HyukjinKwon
Copy link
Member

@sarthfrey, please link JIRA id in the PR title. See also https://spark.apache.org/contributing.html

@sarthfrey sarthfrey changed the title Add allGather method to BarrierTaskContext [SPARK-30667] Add allGather method to BarrierTaskContext Jan 30, 2020
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-30667] Add allGather method to BarrierTaskContext [SPARK-30667][CORE] Add allGather method to BarrierTaskContext Jan 31, 2020
@sarthfrey sarthfrey requested a review from jiangxb1987 February 1, 2020 02:53
@SparkQA
Copy link

SparkQA commented Feb 14, 2020

Test build #118446 has finished for PR 27395 at commit 6398066.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 14, 2020

Test build #118452 has finished for PR 27395 at commit 377d8d2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 19, 2020

Test build #118655 has finished for PR 27395 at commit ff7f3dd.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 19, 2020

Test build #118659 has finished for PR 27395 at commit d2fffe1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 19, 2020

Test build #118658 has finished for PR 27395 at commit 24adef3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mengxr
Copy link
Contributor

mengxr commented Feb 19, 2020

retest this please

@mengxr
Copy link
Contributor

mengxr commented Feb 19, 2020

Failed test seems irrelevant: org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite.(It is not a test it is a sbt.testing.SuiteSelector)

@SparkQA
Copy link

SparkQA commented Feb 19, 2020

Test build #118681 has finished for PR 27395 at commit d2fffe1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in af63971 Feb 19, 2020
asfgit pushed a commit that referenced this pull request Feb 19, 2020
### What changes were proposed in this pull request?

The `allGather` method is added to the `BarrierTaskContext`. This method contains the same functionality as the `BarrierTaskContext.barrier` method; it blocks the task until all tasks make the call, at which time they may continue execution. In addition, the `allGather` method takes an input message. Upon returning from the `allGather` the task receives a list of all the messages sent by all the tasks that made the `allGather` call.

### Why are the changes needed?

There are many situations where having the tasks communicate in a synchronized way is useful. One simple example is if each task needs to start a server to serve requests from one another; first the tasks must find a free port (the result of which is undetermined beforehand) and then start making requests, but to do so they each must know the port chosen by the other task. An `allGather` method would allow them to inform each other of the port they will run on.

### Does this PR introduce any user-facing change?

Yes, an `BarrierTaskContext.allGather` method will be available through the Scala, Java, and Python APIs.

### How was this patch tested?

Most of the code path is already covered by tests to the `barrier` method, since this PR includes a refactor so that much code is shared by the `barrier` and `allGather` methods. However, a test is added to assert that an all gather on each tasks partition ID will return a list of every partition ID.

An example through the Python API:
```python
>>> from pyspark import BarrierTaskContext
>>>
>>> def f(iterator):
...     context = BarrierTaskContext.get()
...     return [context.allGather('{}'.format(context.partitionId()))]
...
>>> sc.parallelize(range(4), 4).barrier().mapPartitions(f).collect()[0]
[u'3', u'1', u'0', u'2']
```

Closes #27395 from sarthfrey/master.

Lead-authored-by: sarthfrey-db <sarth.frey@databricks.com>
Co-authored-by: sarthfrey <sarth.frey@gmail.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
(cherry picked from commit 57254c9)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
@mengxr
Copy link
Contributor

mengxr commented Feb 19, 2020

LGTM. Merged into both master and branch-3.0. Thanks!

@gengliangwang
Copy link
Member

It seems that this PR breaks the Mima test in the Jenkins PR builder job s(https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/118684/console)

dev/mima -Phadoop-2.7 -Phive-2.3 -Pkinesis-asl -Phive -Phive-thriftserver -Pmesos -Pkubernetes -Phadoop-cloud -Pspark-ganglia-lgpl -Pyarn

@sarthfrey
Copy link
Contributor Author

It seems that this PR breaks the Mima test in the Jenkins PR builder job s(https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/118684/console)

dev/mima -Phadoop-2.7 -Phive-2.3 -Pkinesis-asl -Phive -Phive-thriftserver -Pmesos -Pkubernetes -Phadoop-cloud -Pspark-ganglia-lgpl -Pyarn

hmm odd, this PR adds ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.RequestToSync")

@jiangxb1987
Copy link
Contributor

Reverted from both master and 3.0

It seems the merge script provides a weird behavior: when you tried to merge this PR, it automatically cherry-picked the latest commit (which was reverted before).
The following output from my local environment:

Enter a branch name [branch-3.0]:       
git fetch apache branch-3.0:PR_TOOL_PICK_PR_27395_BRANCH-3.0
remote: Enumerating objects: 307, done.
remote: Counting objects: 100% (307/307), done.
remote: Compressing objects: 100% (8/8), done.
remote: Total 692 (delta 288), reused 305 (delta 288), pack-reused 385
Receiving objects: 100% (692/692), 98.68 KiB | 5.48 MiB/s, done.
Resolving deltas: 100% (336/336), completed with 79 local objects.
From https://github.com/apache/spark
 * [new branch]            branch-3.0 -> PR_TOOL_PICK_PR_27395_BRANCH-3.0
 * [new branch]            branch-3.0 -> apache/branch-3.0
git checkout PR_TOOL_PICK_PR_27395_BRANCH-3.0
Switched to branch 'PR_TOOL_PICK_PR_27395_BRANCH-3.0'
git cherry-pick -sx 57254c9719f9af9ad985596ed7fbbaafa4052002
The previous cherry-pick is now empty, possibly due to conflict resolution.

@jiangxb1987
Copy link
Contributor

@sarthfrey Please open a new PR instead and then let's try merge it again.

jiangxb1987 pushed a commit that referenced this pull request Feb 21, 2020
Fix for #27395

### What changes were proposed in this pull request?

The `allGather` method is added to the `BarrierTaskContext`. This method contains the same functionality as the `BarrierTaskContext.barrier` method; it blocks the task until all tasks make the call, at which time they may continue execution. In addition, the `allGather` method takes an input message. Upon returning from the `allGather` the task receives a list of all the messages sent by all the tasks that made the `allGather` call.

### Why are the changes needed?

There are many situations where having the tasks communicate in a synchronized way is useful. One simple example is if each task needs to start a server to serve requests from one another; first the tasks must find a free port (the result of which is undetermined beforehand) and then start making requests, but to do so they each must know the port chosen by the other task. An `allGather` method would allow them to inform each other of the port they will run on.

### Does this PR introduce any user-facing change?

Yes, an `BarrierTaskContext.allGather` method will be available through the Scala, Java, and Python APIs.

### How was this patch tested?

Most of the code path is already covered by tests to the `barrier` method, since this PR includes a refactor so that much code is shared by the `barrier` and `allGather` methods. However, a test is added to assert that an all gather on each tasks partition ID will return a list of every partition ID.

An example through the Python API:
```python
>>> from pyspark import BarrierTaskContext
>>>
>>> def f(iterator):
...     context = BarrierTaskContext.get()
...     return [context.allGather('{}'.format(context.partitionId()))]
...
>>> sc.parallelize(range(4), 4).barrier().mapPartitions(f).collect()[0]
[u'3', u'1', u'0', u'2']
```

Closes #27640 from sarthfrey/master.

Lead-authored-by: sarthfrey-db <sarth.frey@databricks.com>
Co-authored-by: sarthfrey <sarth.frey@gmail.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
jiangxb1987 pushed a commit that referenced this pull request Feb 21, 2020
Fix for #27395

### What changes were proposed in this pull request?

The `allGather` method is added to the `BarrierTaskContext`. This method contains the same functionality as the `BarrierTaskContext.barrier` method; it blocks the task until all tasks make the call, at which time they may continue execution. In addition, the `allGather` method takes an input message. Upon returning from the `allGather` the task receives a list of all the messages sent by all the tasks that made the `allGather` call.

### Why are the changes needed?

There are many situations where having the tasks communicate in a synchronized way is useful. One simple example is if each task needs to start a server to serve requests from one another; first the tasks must find a free port (the result of which is undetermined beforehand) and then start making requests, but to do so they each must know the port chosen by the other task. An `allGather` method would allow them to inform each other of the port they will run on.

### Does this PR introduce any user-facing change?

Yes, an `BarrierTaskContext.allGather` method will be available through the Scala, Java, and Python APIs.

### How was this patch tested?

Most of the code path is already covered by tests to the `barrier` method, since this PR includes a refactor so that much code is shared by the `barrier` and `allGather` methods. However, a test is added to assert that an all gather on each tasks partition ID will return a list of every partition ID.

An example through the Python API:
```python
>>> from pyspark import BarrierTaskContext
>>>
>>> def f(iterator):
...     context = BarrierTaskContext.get()
...     return [context.allGather('{}'.format(context.partitionId()))]
...
>>> sc.parallelize(range(4), 4).barrier().mapPartitions(f).collect()[0]
[u'3', u'1', u'0', u'2']
```

Closes #27640 from sarthfrey/master.

Lead-authored-by: sarthfrey-db <sarth.frey@databricks.com>
Co-authored-by: sarthfrey <sarth.frey@gmail.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
(cherry picked from commit 274b328)
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
### What changes were proposed in this pull request?

The `allGather` method is added to the `BarrierTaskContext`. This method contains the same functionality as the `BarrierTaskContext.barrier` method; it blocks the task until all tasks make the call, at which time they may continue execution. In addition, the `allGather` method takes an input message. Upon returning from the `allGather` the task receives a list of all the messages sent by all the tasks that made the `allGather` call.

### Why are the changes needed?

There are many situations where having the tasks communicate in a synchronized way is useful. One simple example is if each task needs to start a server to serve requests from one another; first the tasks must find a free port (the result of which is undetermined beforehand) and then start making requests, but to do so they each must know the port chosen by the other task. An `allGather` method would allow them to inform each other of the port they will run on.

### Does this PR introduce any user-facing change?

Yes, an `BarrierTaskContext.allGather` method will be available through the Scala, Java, and Python APIs.

### How was this patch tested?

Most of the code path is already covered by tests to the `barrier` method, since this PR includes a refactor so that much code is shared by the `barrier` and `allGather` methods. However, a test is added to assert that an all gather on each tasks partition ID will return a list of every partition ID.

An example through the Python API:
```python
>>> from pyspark import BarrierTaskContext
>>>
>>> def f(iterator):
...     context = BarrierTaskContext.get()
...     return [context.allGather('{}'.format(context.partitionId()))]
...
>>> sc.parallelize(range(4), 4).barrier().mapPartitions(f).collect()[0]
[u'3', u'1', u'0', u'2']
```

Closes apache#27395 from sarthfrey/master.

Lead-authored-by: sarthfrey-db <sarth.frey@databricks.com>
Co-authored-by: sarthfrey <sarth.frey@gmail.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
### What changes were proposed in this pull request?

The `allGather` method is added to the `BarrierTaskContext`. This method contains the same functionality as the `BarrierTaskContext.barrier` method; it blocks the task until all tasks make the call, at which time they may continue execution. In addition, the `allGather` method takes an input message. Upon returning from the `allGather` the task receives a list of all the messages sent by all the tasks that made the `allGather` call.

### Why are the changes needed?

There are many situations where having the tasks communicate in a synchronized way is useful. One simple example is if each task needs to start a server to serve requests from one another; first the tasks must find a free port (the result of which is undetermined beforehand) and then start making requests, but to do so they each must know the port chosen by the other task. An `allGather` method would allow them to inform each other of the port they will run on.

### Does this PR introduce any user-facing change?

Yes, an `BarrierTaskContext.allGather` method will be available through the Scala, Java, and Python APIs.

### How was this patch tested?

Most of the code path is already covered by tests to the `barrier` method, since this PR includes a refactor so that much code is shared by the `barrier` and `allGather` methods. However, a test is added to assert that an all gather on each tasks partition ID will return a list of every partition ID.

An example through the Python API:
```python
>>> from pyspark import BarrierTaskContext
>>>
>>> def f(iterator):
...     context = BarrierTaskContext.get()
...     return [context.allGather('{}'.format(context.partitionId()))]
...
>>> sc.parallelize(range(4), 4).barrier().mapPartitions(f).collect()[0]
[u'3', u'1', u'0', u'2']
```

Closes apache#27395 from sarthfrey/master.

Lead-authored-by: sarthfrey-db <sarth.frey@databricks.com>
Co-authored-by: sarthfrey <sarth.frey@gmail.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
(cherry picked from commit 57254c9)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
Fix for apache#27395

### What changes were proposed in this pull request?

The `allGather` method is added to the `BarrierTaskContext`. This method contains the same functionality as the `BarrierTaskContext.barrier` method; it blocks the task until all tasks make the call, at which time they may continue execution. In addition, the `allGather` method takes an input message. Upon returning from the `allGather` the task receives a list of all the messages sent by all the tasks that made the `allGather` call.

### Why are the changes needed?

There are many situations where having the tasks communicate in a synchronized way is useful. One simple example is if each task needs to start a server to serve requests from one another; first the tasks must find a free port (the result of which is undetermined beforehand) and then start making requests, but to do so they each must know the port chosen by the other task. An `allGather` method would allow them to inform each other of the port they will run on.

### Does this PR introduce any user-facing change?

Yes, an `BarrierTaskContext.allGather` method will be available through the Scala, Java, and Python APIs.

### How was this patch tested?

Most of the code path is already covered by tests to the `barrier` method, since this PR includes a refactor so that much code is shared by the `barrier` and `allGather` methods. However, a test is added to assert that an all gather on each tasks partition ID will return a list of every partition ID.

An example through the Python API:
```python
>>> from pyspark import BarrierTaskContext
>>>
>>> def f(iterator):
...     context = BarrierTaskContext.get()
...     return [context.allGather('{}'.format(context.partitionId()))]
...
>>> sc.parallelize(range(4), 4).barrier().mapPartitions(f).collect()[0]
[u'3', u'1', u'0', u'2']
```

Closes apache#27640 from sarthfrey/master.

Lead-authored-by: sarthfrey-db <sarth.frey@databricks.com>
Co-authored-by: sarthfrey <sarth.frey@gmail.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants