Update upstream#130
Merged
GulajavaMinistudio merged 3 commits intoGulajavaMinistudio:masterfrom Aug 11, 2017
Merged
Conversation
…rmatWriter jobs ## What changes were proposed in this pull request? This patch introduces an internal interface for tracking metrics and/or statistics on data on the fly, as it is being written to disk during a `FileFormatWriter` job and partially reimplements SPARK-20703 in terms of it. The interface basically consists of 3 traits: - `WriteTaskStats`: just a tag for classes that represent statistics collected during a `WriteTask` The only constraint it adds is that the class should be `Serializable`, as instances of it will be collected on the driver from all executors at the end of the `WriteJob`. - `WriteTaskStatsTracker`: a trait for classes that can actually compute statistics based on tuples that are processed by a given `WriteTask` and eventually produce a `WriteTaskStats` instance. - `WriteJobStatsTracker`: a trait for classes that act as containers of `Serializable` state that's necessary for instantiating `WriteTaskStatsTracker` on executors and finally process the resulting collection of `WriteTaskStats`, once they're gathered back on the driver. Potential future use of this interface is e.g. CBO stats maintenance during `INSERT INTO table ... ` operations. ## How was this patch tested? Existing tests for SPARK-20703 exercise the new code: `hive/SQLMetricsSuite`, `sql/JavaDataFrameReaderWriterSuite`, etc. Author: Adrian Ionescu <adrian@databricks.com> Closes #18884 from adrian-ionescu/write-stats-tracker-api.
## What changes were proposed in this pull request? When train RF model, there are many warning messages like this: > WARN RandomForest: Tree learning is using approximately 268492800 bytes per iteration, which exceeds requested limit maxMemoryUsage=268435456. This allows splitting 2622 nodes in this iteration. This warning message is unnecessary and the data is not accurate. Actually, if all the nodes cannot split in one iteration, it will show this warning. For most of the case, all the nodes cannot split just in one iteration, so for most of the case, it will show this warning for each iteration. ## How was this patch tested? The existing UT Author: Peng Meng <peng.meng@intel.com> Closes #18868 from mpjlu/fixRFwarning.
## What changes were proposed in this pull request? This patch removes the unused SessionCatalog.getTableMetadataOption and ExternalCatalog. getTableOption. ## How was this patch tested? Removed the test case. Author: Reynold Xin <rxin@databricks.com> Closes #18912 from rxin/remove-getTableOption.
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Jul 20, 2020
…or its output partitioning
### What changes were proposed in this pull request?
Currently, the `BroadcastHashJoinExec`'s `outputPartitioning` only uses the streamed side's `outputPartitioning`. However, if the join type of `BroadcastHashJoinExec` is an inner-like join, the build side's info (the join keys) can be added to `BroadcastHashJoinExec`'s `outputPartitioning`.
For example,
```Scala
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500")
val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2")
val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3")
val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4")
// join1 is a sort merge join.
val join1 = t1.join(t2, t1("i1") === t2("i2"))
// join2 is a broadcast join where t3 is broadcasted.
val join2 = join1.join(t3, join1("i1") === t3("i3"))
// Join on the column from the broadcasted side (i3).
val join3 = join2.join(t4, join2("i3") === t4("i4"))
join3.explain
```
You see that `Exchange hashpartitioning(i2#103, 200)` is introduced because there is no output partitioning info from the build side.
```
== Physical Plan ==
*(6) SortMergeJoin [i3#29], [i4#40], Inner
:- *(4) Sort [i3#29 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i3#29, 200), true, [id=#55]
: +- *(3) BroadcastHashJoin [i1#7], [i3#29], Inner, BuildRight
: :- *(3) SortMergeJoin [i1#7], [i2#18], Inner
: : :- *(1) Sort [i1#7 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(i1#7, 200), true, [id=#28]
: : : +- LocalTableScan [i1#7, j1#8]
: : +- *(2) Sort [i2#18 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(i2#18, 200), true, [id=#29]
: : +- LocalTableScan [i2#18, j2#19]
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#34]
: +- LocalTableScan [i3#29, j3#30]
+- *(5) Sort [i4#40 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i4#40, 200), true, [id=#39]
+- LocalTableScan [i4#40, j4#41]
```
This PR proposes to introduce output partitioning for the build side for `BroadcastHashJoinExec` if the streamed side has a `HashPartitioning` or a collection of `HashPartitioning`s.
There is a new internal config `spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit`, which can limit the number of partitioning a `HashPartitioning` can expand to. It can be set to "0" to disable this feature.
### Why are the changes needed?
To remove unnecessary shuffle.
### Does this PR introduce _any_ user-facing change?
Yes, now the shuffle in the above example can be eliminated:
```
== Physical Plan ==
*(5) SortMergeJoin [i3#108], [i4#119], Inner
:- *(3) Sort [i3#108 ASC NULLS FIRST], false, 0
: +- *(3) BroadcastHashJoin [i1#86], [i3#108], Inner, BuildRight
: :- *(3) SortMergeJoin [i1#86], [i2#97], Inner
: : :- *(1) Sort [i1#86 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(i1#86, 200), true, [id=#120]
: : : +- LocalTableScan [i1#86, j1#87]
: : +- *(2) Sort [i2#97 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(i2#97, 200), true, [id=#121]
: : +- LocalTableScan [i2#97, j2#98]
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#126]
: +- LocalTableScan [i3#108, j3#109]
+- *(4) Sort [i4#119 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i4#119, 200), true, [id=#130]
+- LocalTableScan [i4#119, j4#120]
```
### How was this patch tested?
Added new tests.
Closes apache#28676 from imback82/broadcast_join_output.
Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.