Update upstream#69
Merged
GulajavaMinistudio merged 5 commits intoGulajavaMinistudio:masterfrom Jun 5, 2017
Merged
Conversation
## What changes were proposed in this pull request? This pull request fix the TaskScheulerImpl bug in some condition. Detail see: https://issues.apache.org/jira/browse/SPARK-20945 (Please fill in changes proposed in this fix) ## How was this patch tested? manual tests (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: liupengcheng <liupengcheng@xiaomi.com> Author: PengchengLiu <pengchengliu_bupt@163.com> Closes #18171 from liupc/Fix-tid-key-not-found-in-TaskSchedulerImpl.
… KMeans ## What changes were proposed in this pull request? Destroy broadcasted centers after computing cost ## How was this patch tested? existing tests Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes #18152 from zhengruifeng/destroy_kmeans_model.
## What changes were proposed in this pull request? JIRA: [SPARK-19762](https://issues.apache.org/jira/browse/SPARK-19762) The larger changes in this patch are: * Adds a `DifferentiableLossAggregator` trait which is intended to be used as a common parent trait to all Spark ML aggregator classes. It factors out the common methods: `merge, gradient, loss, weight` from the aggregator subclasses. * Adds a `RDDLossFunction` which is intended to be the only implementation of Breeze's `DiffFunction` necessary in Spark ML, and can be used by all other algorithms. It takes the aggregator type as a type parameter, and maps the aggregator over an RDD. It additionally takes in a optional regularization loss function for applying the differentiable part of regularization. * Factors out the regularization from the data part of the cost function, and treats regularization as a separate independent cost function which can be evaluated and added to the data cost function. * Changes `LinearRegression` to use this new hierarchy as a proof of concept. * Adds the following new namespaces `o.a.s.ml.optim.loss` and `o.a.s.ml.optim.aggregator` Also note that none of these are public-facing changes. All of these classes are internal to Spark ML and remain that way. **NOTE: The large majority of the "lines added" and "lines deleted" are simply code moving around or unit tests.** BTW, I also converted LinearSVC to this framework as a way to prove that this new hierarchy is flexible enough for the other algorithms, but I backed those changes out because the PR is large enough as is. ## How was this patch tested? Test suites are added for the new components, and some test suites are also added to provide coverage where there wasn't any before. * DifferentiablLossAggregatorSuite * LeastSquaresAggregatorSuite * RDDLossFunctionSuite * DifferentiableRegularizationSuite Below are some performance testing numbers. Run on a 6 node virtual cluster with 44 cores and ~110G RAM, the dataset size is about 37G. These are not "large-scale" tests, but we really want to just make sure the iteration times don't increase with this patch. Notably we are doing the regularization a bit differently than before, but that should cost very little. I think there's very little risk otherwise, and these numbers don't show a difference. Of course I'm happy to add more tests as we think it's necessary, but I think the patch is ready for review now. **Note:** timings are best of 3 runs. | | numFeatures | numPoints | maxIter | regParam | elasticNetParam | SPARK-19762 (sec) | master (sec) | |----|---------------|-------------|-----------|------------|-------------------|---------------------|----------------| | 0 | 5000 | 1e+06 | 30 | 0 | 0 | 129.594 | 131.153 | | 1 | 5000 | 1e+06 | 30 | 0.1 | 0 | 135.54 | 136.327 | | 2 | 5000 | 1e+06 | 30 | 0.01 | 0.5 | 135.148 | 129.771 | | 3 | 50000 | 100000 | 30 | 0 | 0 | 145.764 | 144.096 | ## Follow ups If this design is accepted, we will convert the other ML algorithms that use this aggregator pattern to this new hierarchy in follow up PRs. Author: sethah <seth.hendrickson16@gmail.com> Author: sethah <shendrickson@cloudera.com> Closes #17094 from sethah/ml_aggregators.
…ies as equivalence of --repositories ## What changes were proposed in this pull request? In our use case of launching Spark applications via REST APIs (Livy), there's no way for user to specify command line arguments, all Spark configurations are set through configurations map. For "--repositories" because there's no equivalent Spark configuration, so we cannot specify the custom repository through configuration. So here propose to add "--repositories" equivalent configuration in Spark. ## How was this patch tested? New UT added. Author: jerryshao <sshao@hortonworks.com> Closes #18201 from jerryshao/SPARK-20981.
…rSuite listing ## What changes were proposed in this pull request? When stopping StreamingQuery, StreamExecution will set `streamDeathCause` then notify StreamingQueryManager to remove this query. So it's possible that when `q2.exception.isDefined` returns `true`, StreamingQueryManager's active list still has `q2`. This PR just puts the checks into `eventually` to fix the flaky test. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #18180 from zsxwing/SPARK-20957.
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Oct 8, 2020
… more scenarios such as PartitioningCollection
### What changes were proposed in this pull request?
This PR proposes to improve `EnsureRquirement.reorderJoinKeys` to handle the following scenarios:
1. If the keys cannot be reordered to match the left-side `HashPartitioning`, consider the right-side `HashPartitioning`.
2. Handle `PartitioningCollection`, which may contain `HashPartitioning`
### Why are the changes needed?
1. For the scenario 1), the current behavior matches either the left-side `HashPartitioning` or the right-side `HashPartitioning`. This means that if both sides are `HashPartitioning`, it will try to match only the left side.
The following will not consider the right-side `HashPartitioning`:
```
val df1 = (0 until 10).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 10).map(i => (i % 7, i % 11)).toDF("i2", "j2")
df1.write.format("parquet").bucketBy(4, "i1", "j1").saveAsTable("t1")df2.write.format("parquet").bucketBy(4, "i2", "j2").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val join = t1.join(t2, t1("i1") === t2("j2") && t1("i1") === t2("i2"))
join.explain
== Physical Plan ==
*(5) SortMergeJoin [i1#26, i1#26], [j2#31, i2#30], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#69]
: +- *(1) Project [i1#26, j1#27]
: +- *(1) Filter isnotnull(i1#26)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(4) Sort [j2#31 ASC NULLS FIRST, i2#30 ASC NULLS FIRST], false, 0.
+- Exchange hashpartitioning(j2#31, i2#30, 4), true, [id=#79]. <===== This can be removed
+- *(3) Project [i2#30, j2#31]
+- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
+- *(3) ColumnarToRow
+- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4
```
2. For the scenario 2), the current behavior does not handle `PartitioningCollection`:
```
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("i2", "j2")
val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3")
val join = df1.join(df2, df1("i1") === df2("i2") && df1("j1") === df2("j2")) // PartitioningCollection
val join2 = join.join(df3, join("j1") === df3("j3") && join("i1") === df3("i3"))
join2.explain
== Physical Plan ==
*(9) SortMergeJoin [j1#8, i1#7], [j3#30, i3#29], Inner
:- *(6) Sort [j1#8 ASC NULLS FIRST, i1#7 ASC NULLS FIRST], false, 0. <===== This can be removed
: +- Exchange hashpartitioning(j1#8, i1#7, 5), true, [id=#58] <===== This can be removed
: +- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
: :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#45]
: : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
: : +- *(1) LocalTableScan [_1#2, _2#3]
: +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#51]
: +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
: +- *(3) LocalTableScan [_1#13, _2#14]
+- *(8) Sort [j3#30 ASC NULLS FIRST, i3#29 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(j3#30, i3#29, 5), true, [id=#64]
+- *(7) Project [_1#24 AS i3#29, _2#25 AS j3#30]
+- *(7) LocalTableScan [_1#24, _2#25]
```
### Does this PR introduce _any_ user-facing change?
Yes, now from the above examples, the shuffle/sort nodes pointed by `This can be removed` are now removed:
1. Senario 1):
```
== Physical Plan ==
*(4) SortMergeJoin [i1#26, i1#26], [i2#30, j2#31], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#67]
: +- *(1) Project [i1#26, j1#27]
: +- *(1) Filter isnotnull(i1#26)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(3) Sort [i2#30 ASC NULLS FIRST, j2#31 ASC NULLS FIRST], false, 0
+- *(3) Project [i2#30, j2#31]
+- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
+- *(3) ColumnarToRow
+- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4
```
2. Scenario 2):
```
== Physical Plan ==
*(8) SortMergeJoin [i1#7, j1#8], [i3#29, j3#30], Inner
:- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
: :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#43]
: : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
: : +- *(1) LocalTableScan [_1#2, _2#3]
: +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#49]
: +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
: +- *(3) LocalTableScan [_1#13, _2#14]
+- *(7) Sort [i3#29 ASC NULLS FIRST, j3#30 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i3#29, j3#30, 5), true, [id=#58]
+- *(6) Project [_1#24 AS i3#29, _2#25 AS j3#30]
+- *(6) LocalTableScan [_1#24, _2#25]
```
### How was this patch tested?
Added tests.
Closes apache#29074 from imback82/reorder_keys.
Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.