-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16686][SQL] Remove PushProjectThroughSample since it is handled by ColumnPruning #14327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
9521a5a
6d1616d
31a6f6f
5c4e7ff
20e9436
dc70f1d
2186b7e
3e134f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer | |
| import scala.reflect.runtime.universe.TypeTag | ||
|
|
||
| import org.apache.spark.sql.catalyst.analysis | ||
| import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases | ||
| import org.apache.spark.sql.catalyst.dsl.expressions._ | ||
| import org.apache.spark.sql.catalyst.dsl.plans._ | ||
| import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder | ||
|
|
@@ -346,5 +347,35 @@ class ColumnPruningSuite extends PlanTest { | |
| comparePlans(Optimize.execute(plan1.analyze), correctAnswer1) | ||
| } | ||
|
|
||
| test("push project down into sample") { | ||
| val testRelation = LocalRelation('a.int, 'b.int, 'c.int) | ||
| val x = testRelation.subquery('x) | ||
| val originalQuery = | ||
| Sample(0.0, 0.6, false, 11L, x)().select('a) | ||
|
|
||
| val originalQueryAnalyzed = | ||
| EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery)) | ||
|
||
|
|
||
| val optimized = Optimize.execute(originalQueryAnalyzed) | ||
|
|
||
| val correctAnswer = | ||
| Sample(0.0, 0.6, false, 11L, x.select('a))() | ||
|
|
||
| comparePlans(optimized, correctAnswer.analyze) | ||
|
|
||
| val originalQuery2 = | ||
| Sample(0.0, 0.6, false, 11L, x)().select('a as 'aa) | ||
|
|
||
| val originalQueryAnalyzed2 = | ||
| EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery2)) | ||
|
|
||
| val optimized2 = Optimize.execute(originalQueryAnalyzed2) | ||
|
|
||
| val correctAnswer2 = | ||
| Sample(0.0, 0.6, false, 11L, x.select('a))().select('a as 'aa) | ||
|
|
||
| comparePlans(optimized2, correctAnswer2.analyze) | ||
| } | ||
|
|
||
| // todo: add more tests for column pruning | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -422,6 +422,31 @@ class DatasetSuite extends QueryTest with SharedSQLContext { | |
| 3, 17, 27, 58, 62) | ||
| } | ||
|
|
||
| test("SPARK-16686: Dataset.sample with seed results shouldn't depend on downstream usage") { | ||
| val udfOne = udf((n: Int) => { | ||
|
||
| require(n != 1, "udfOne shouldn't see swid=1!") | ||
| 1 | ||
| }) | ||
|
|
||
| val df = Seq( | ||
| (0, "string0"), | ||
| (1, "string1"), | ||
| (2, "string2"), | ||
| (3, "string3"), | ||
| (4, "string4"), | ||
| (5, "string5"), | ||
| (6, "string6"), | ||
| (7, "string7"), | ||
| (8, "string8"), | ||
| (9, "string9") | ||
| ).toDF("swid", "stringData") | ||
|
||
| val sampleDF = df.sample(false, 0.7, 50) | ||
| // After sampling, sampleDF doesn't contain swid=1. | ||
| assert(!sampleDF.select("swid").collect.contains(1)) | ||
| // udfOne should not encounter swid=1. | ||
| checkAnswer(sampleDF.select(udfOne($"swid")), List.fill(sampleDF.count.toInt)(Row(1))) | ||
| } | ||
|
|
||
| test("SPARK-11436: we should rebind right encoder when join 2 datasets") { | ||
| val ds1 = Seq("1", "2").toDS().as("a") | ||
| val ds2 = Seq(2, 3).toDS().as("b") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the style should be:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.