Skip to content

Comments

[SPARK-30667][CORE] Add all gather method to BarrierTaskContext#27640

Closed
sarthfrey wants to merge 7 commits intoapache:masterfrom
sarthfrey:master
Closed

[SPARK-30667][CORE] Add all gather method to BarrierTaskContext#27640
sarthfrey wants to merge 7 commits intoapache:masterfrom
sarthfrey:master

Conversation

@sarthfrey
Copy link
Contributor

@sarthfrey sarthfrey commented Feb 20, 2020

Fix for #27395

What changes were proposed in this pull request?

The allGather method is added to the BarrierTaskContext. This method contains the same functionality as the BarrierTaskContext.barrier method; it blocks the task until all tasks make the call, at which time they may continue execution. In addition, the allGather method takes an input message. Upon returning from the allGather the task receives a list of all the messages sent by all the tasks that made the allGather call.

Why are the changes needed?

There are many situations where having the tasks communicate in a synchronized way is useful. One simple example is if each task needs to start a server to serve requests from one another; first the tasks must find a free port (the result of which is undetermined beforehand) and then start making requests, but to do so they each must know the port chosen by the other task. An allGather method would allow them to inform each other of the port they will run on.

Does this PR introduce any user-facing change?

Yes, an BarrierTaskContext.allGather method will be available through the Scala, Java, and Python APIs.

How was this patch tested?

Most of the code path is already covered by tests to the barrier method, since this PR includes a refactor so that much code is shared by the barrier and allGather methods. However, a test is added to assert that an all gather on each tasks partition ID will return a list of every partition ID.

An example through the Python API:

>>> from pyspark import BarrierTaskContext
>>>
>>> def f(iterator):
...     context = BarrierTaskContext.get()
...     return [context.allGather('{}'.format(context.partitionId()))]
...
>>> sc.parallelize(range(4), 4).barrier().mapPartitions(f).collect()[0]
[u'3', u'1', u'0', u'2']

@sarthfrey sarthfrey changed the title Add all gather method to BarrierTaskContext [SPARK-30667][CORE] Add all gather method to BarrierTaskContext Feb 20, 2020
change method to allGather

fix docstring

fix test

test2

test3

test4

test5

Change API to send and receive bytes rather than strings

doc fix

doc fix 2

fix test

fix test 2

fix test 3

fix test 4

fix test 5

fix test 6

fix test 7

fix test final

add python test

fix test final 2

address review round 1

Change allGather API to accept string over bytes

addressed review feedback round 2

comments

rm trailing whitespace

address review feedback round 2

address review round 3

address review feedback round 4

address review round 5

address review round 6

fix test

retrigger build

retrigger build

add mima exclusion rule

fix semicolon

fix tests

fix python unit test

fix python unit test final

temp
@jiangxb1987
Copy link
Contributor

OK to test

@mengxr
Copy link
Contributor

mengxr commented Feb 20, 2020

"""
/home/runner/work/spark/spark/python/pyspark/taskcontext.py:docstring of pyspark.BarrierTaskContext.getTaskInfos:2:Explicit markup ends without a blank line; unexpected unindent.
"""

@mengxr
Copy link
Contributor

mengxr commented Feb 20, 2020

jenkins, add to whitelist

@mengxr
Copy link
Contributor

mengxr commented Feb 20, 2020

jenkins, test this please

@SparkQA
Copy link

SparkQA commented Feb 20, 2020

Test build #118733 has finished for PR 27640 at commit c1d1b0e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 21, 2020

Test build #118737 has finished for PR 27640 at commit 3f1f709.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 21, 2020

Test build #118738 has finished for PR 27640 at commit 7c259ac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

jiangxb1987 pushed a commit that referenced this pull request Feb 21, 2020
Fix for #27395

### What changes were proposed in this pull request?

The `allGather` method is added to the `BarrierTaskContext`. This method contains the same functionality as the `BarrierTaskContext.barrier` method; it blocks the task until all tasks make the call, at which time they may continue execution. In addition, the `allGather` method takes an input message. Upon returning from the `allGather` the task receives a list of all the messages sent by all the tasks that made the `allGather` call.

### Why are the changes needed?

There are many situations where having the tasks communicate in a synchronized way is useful. One simple example is if each task needs to start a server to serve requests from one another; first the tasks must find a free port (the result of which is undetermined beforehand) and then start making requests, but to do so they each must know the port chosen by the other task. An `allGather` method would allow them to inform each other of the port they will run on.

### Does this PR introduce any user-facing change?

Yes, an `BarrierTaskContext.allGather` method will be available through the Scala, Java, and Python APIs.

### How was this patch tested?

Most of the code path is already covered by tests to the `barrier` method, since this PR includes a refactor so that much code is shared by the `barrier` and `allGather` methods. However, a test is added to assert that an all gather on each tasks partition ID will return a list of every partition ID.

An example through the Python API:
```python
>>> from pyspark import BarrierTaskContext
>>>
>>> def f(iterator):
...     context = BarrierTaskContext.get()
...     return [context.allGather('{}'.format(context.partitionId()))]
...
>>> sc.parallelize(range(4), 4).barrier().mapPartitions(f).collect()[0]
[u'3', u'1', u'0', u'2']
```

Closes #27640 from sarthfrey/master.

Lead-authored-by: sarthfrey-db <sarth.frey@databricks.com>
Co-authored-by: sarthfrey <sarth.frey@gmail.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
(cherry picked from commit 274b328)
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
@jiangxb1987
Copy link
Contributor

Thanks, merged to master/3.0 !

*/
@Experimental
@Since("3.0.0")
def allGather(message: String): ArrayBuffer[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity, why return an ArrayBuffer[String] instead of an Array[String] here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

friendly ping @jiangxb1987 @sarthfrey

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point; why not just Seq?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't have a particular reason in mind for ArrayBuffer[String] over Array[String], @zhengruifeng do you think the latter is preferable here, and if so, why? The returned collection is indexed and sorted by partition ID so I preferred those over Seq which is vague about whether it is naturally indexed or linear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK sure IndexedSeq. or Array is fine. Just something immutable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, will submit a PR.

HyukjinKwon pushed a commit that referenced this pull request Mar 19, 2020
… type

This PR proposes that we change the return type of the `BarrierTaskContext.allGather` method to `Array[String]` instead of `ArrayBuffer[String]` since it is immutable. Based on discussion in #27640. cc zhengruifeng srowen

Closes #27951 from sarthfrey/all-gather-api.

Authored-by: sarthfrey-db <sarth.frey@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Mar 19, 2020
… type

This PR proposes that we change the return type of the `BarrierTaskContext.allGather` method to `Array[String]` instead of `ArrayBuffer[String]` since it is immutable. Based on discussion in #27640. cc zhengruifeng srowen

Closes #27951 from sarthfrey/all-gather-api.

Authored-by: sarthfrey-db <sarth.frey@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit 6fd3138)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
Fix for apache#27395

### What changes were proposed in this pull request?

The `allGather` method is added to the `BarrierTaskContext`. This method contains the same functionality as the `BarrierTaskContext.barrier` method; it blocks the task until all tasks make the call, at which time they may continue execution. In addition, the `allGather` method takes an input message. Upon returning from the `allGather` the task receives a list of all the messages sent by all the tasks that made the `allGather` call.

### Why are the changes needed?

There are many situations where having the tasks communicate in a synchronized way is useful. One simple example is if each task needs to start a server to serve requests from one another; first the tasks must find a free port (the result of which is undetermined beforehand) and then start making requests, but to do so they each must know the port chosen by the other task. An `allGather` method would allow them to inform each other of the port they will run on.

### Does this PR introduce any user-facing change?

Yes, an `BarrierTaskContext.allGather` method will be available through the Scala, Java, and Python APIs.

### How was this patch tested?

Most of the code path is already covered by tests to the `barrier` method, since this PR includes a refactor so that much code is shared by the `barrier` and `allGather` methods. However, a test is added to assert that an all gather on each tasks partition ID will return a list of every partition ID.

An example through the Python API:
```python
>>> from pyspark import BarrierTaskContext
>>>
>>> def f(iterator):
...     context = BarrierTaskContext.get()
...     return [context.allGather('{}'.format(context.partitionId()))]
...
>>> sc.parallelize(range(4), 4).barrier().mapPartitions(f).collect()[0]
[u'3', u'1', u'0', u'2']
```

Closes apache#27640 from sarthfrey/master.

Lead-authored-by: sarthfrey-db <sarth.frey@databricks.com>
Co-authored-by: sarthfrey <sarth.frey@gmail.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
… type

This PR proposes that we change the return type of the `BarrierTaskContext.allGather` method to `Array[String]` instead of `ArrayBuffer[String]` since it is immutable. Based on discussion in apache#27640. cc zhengruifeng srowen

Closes apache#27951 from sarthfrey/all-gather-api.

Authored-by: sarthfrey-db <sarth.frey@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants