Skip to content

Conversation

@ueshin
Copy link

@ueshin ueshin commented Oct 19, 2017

Simplify _createFromPandasWithArrow() if we won't reach schema check block. (https://github.com/apache/spark/pull/19459/files#r145603645).

return None
arrow_types = [to_arrow_type(f.dataType) for f in schema.fields]
batches = [_create_batch([(c, t)
for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, looks this is only the place using zip. Not a big deal but I think we are safe to replace

from itertools import imap as map
from itertools import izip as zip, imap as map

We could change this after merging this first too.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@BryanCutler
Copy link
Owner

merged, thanks!

BryanCutler pushed a commit that referenced this pull request Sep 8, 2020
…or its output partitioning

### What changes were proposed in this pull request?

Currently, the `BroadcastHashJoinExec`'s `outputPartitioning` only uses the streamed side's `outputPartitioning`. However, if the join type of `BroadcastHashJoinExec` is an inner-like join, the build side's info (the join keys) can be added to `BroadcastHashJoinExec`'s `outputPartitioning`.

 For example,
```Scala
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500")
val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2")
val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3")
val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4")

// join1 is a sort merge join.
val join1 = t1.join(t2, t1("i1") === t2("i2"))

// join2 is a broadcast join where t3 is broadcasted.
val join2 = join1.join(t3, join1("i1") === t3("i3"))

// Join on the column from the broadcasted side (i3).
val join3 = join2.join(t4, join2("i3") === t4("i4"))

join3.explain
```
You see that `Exchange hashpartitioning(i2#103, 200)` is introduced because there is no output partitioning info from the build side.
```
== Physical Plan ==
*(6) SortMergeJoin [i3#29], [i4#40], Inner
:- *(4) Sort [i3#29 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i3#29, 200), true, [id=apache#55]
:     +- *(3) BroadcastHashJoin [i1#7], [i3#29], Inner, BuildRight
:        :- *(3) SortMergeJoin [i1#7], [i2#18], Inner
:        :  :- *(1) Sort [i1#7 ASC NULLS FIRST], false, 0
:        :  :  +- Exchange hashpartitioning(i1#7, 200), true, [id=#28]
:        :  :     +- LocalTableScan [i1#7, j1#8]
:        :  +- *(2) Sort [i2#18 ASC NULLS FIRST], false, 0
:        :     +- Exchange hashpartitioning(i2#18, 200), true, [id=#29]
:        :        +- LocalTableScan [i2#18, j2#19]
:        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=apache#34]
:           +- LocalTableScan [i3#29, j3#30]
+- *(5) Sort [i4#40 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i4#40, 200), true, [id=apache#39]
      +- LocalTableScan [i4#40, j4#41]
```
This PR proposes to introduce output partitioning for the build side for `BroadcastHashJoinExec` if the streamed side has a `HashPartitioning` or a collection of `HashPartitioning`s.

There is a new internal config `spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit`, which can limit the number of partitioning a `HashPartitioning` can expand to. It can be set to "0" to disable this feature.

### Why are the changes needed?

To remove unnecessary shuffle.

### Does this PR introduce _any_ user-facing change?

Yes, now the shuffle in the above example can be eliminated:
```
== Physical Plan ==
*(5) SortMergeJoin [i3#108], [i4#119], Inner
:- *(3) Sort [i3#108 ASC NULLS FIRST], false, 0
:  +- *(3) BroadcastHashJoin [i1#86], [i3#108], Inner, BuildRight
:     :- *(3) SortMergeJoin [i1#86], [i2#97], Inner
:     :  :- *(1) Sort [i1#86 ASC NULLS FIRST], false, 0
:     :  :  +- Exchange hashpartitioning(i1#86, 200), true, [id=apache#120]
:     :  :     +- LocalTableScan [i1#86, j1#87]
:     :  +- *(2) Sort [i2#97 ASC NULLS FIRST], false, 0
:     :     +- Exchange hashpartitioning(i2#97, 200), true, [id=apache#121]
:     :        +- LocalTableScan [i2#97, j2#98]
:     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=apache#126]
:        +- LocalTableScan [i3#108, j3#109]
+- *(4) Sort [i4#119 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i4#119, 200), true, [id=apache#130]
      +- LocalTableScan [i4#119, j4#120]
```

### How was this patch tested?

Added new tests.

Closes apache#28676 from imback82/broadcast_join_output.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants