Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 33 additions & 33 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with filter push-down with string functions") {
val df1 = sql("select * FROM h2.test.employee where " +
val df1 = sql("SELECT * FROM h2.test.employee WHERE " +
"substr(name, 2, 1) = 'e'" +
" AND upper(name) = 'JEN' AND lower(name) = 'jen' ")
checkFiltersRemoved(df1)
Expand All @@ -689,7 +689,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkPushedInfo(df1, expectedPlanFragment1)
checkAnswer(df1, Seq(Row(6, "jen", 12000, 1200, true)))

val df2 = sql("select * FROM h2.test.employee where " +
val df2 = sql("SELECT * FROM h2.test.employee WHERE " +
"trim(name) = 'jen' AND trim('j', name) = 'en'" +
"AND translate(name, 'e', 1) = 'j1n'")
checkFiltersRemoved(df2)
Expand All @@ -699,7 +699,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkPushedInfo(df2, expectedPlanFragment2)
checkAnswer(df2, Seq(Row(6, "jen", 12000, 1200, true)))

val df3 = sql("select * FROM h2.test.employee where " +
val df3 = sql("SELECT * FROM h2.test.employee WHERE " +
"ltrim(name) = 'jen' AND ltrim('j', name) = 'en'")
checkFiltersRemoved(df3)
val expectedPlanFragment3 =
Expand All @@ -708,7 +708,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkPushedInfo(df3, expectedPlanFragment3)
checkAnswer(df3, Seq(Row(6, "jen", 12000, 1200, true)))

val df4 = sql("select * FROM h2.test.employee where " +
val df4 = sql("SELECT * FROM h2.test.employee WHERE " +
"rtrim(name) = 'jen' AND rtrim('n', name) = 'je'")
checkFiltersRemoved(df4)
val expectedPlanFragment4 =
Expand All @@ -718,7 +718,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(df4, Seq(Row(6, "jen", 12000, 1200, true)))

// H2 does not support OVERLAY
val df5 = sql("select * FROM h2.test.employee where OVERLAY(NAME, '1', 2, 1) = 'j1n'")
val df5 = sql("SELECT * FROM h2.test.employee WHERE OVERLAY(NAME, '1', 2, 1) = 'j1n'")
checkFiltersRemoved(df5, false)
val expectedPlanFragment5 =
"PushedFilters: [NAME IS NOT NULL]"
Expand All @@ -727,8 +727,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with aggregate push-down: MAX AVG with filter and group by") {
val df = sql("select MAX(SaLaRY), AVG(BONUS) FROM h2.test.employee where dept > 0" +
" group by DePt")
val df = sql("SELECT MAX(SaLaRY), AVG(BONUS) FROM h2.test.employee WHERE dept > 0" +
" GROUP BY DePt")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
checkPushedInfo(df, "PushedAggregates: [MAX(SALARY), AVG(BONUS)], " +
Expand All @@ -749,7 +749,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with aggregate push-down: MAX AVG with filter without group by") {
val df = sql("select MAX(ID), AVG(ID) FROM h2.test.people where id > 0")
val df = sql("SELECT MAX(ID), AVG(ID) FROM h2.test.people WHERE id > 0")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
checkPushedInfo(df, "PushedAggregates: [MAX(ID), AVG(ID)], " +
Expand All @@ -776,7 +776,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with aggregate push-down: aggregate + number") {
val df = sql("select MAX(SALARY) + 1 FROM h2.test.employee")
val df = sql("SELECT MAX(SALARY) + 1 FROM h2.test.employee")
checkAggregateRemoved(df)
df.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
Expand All @@ -789,14 +789,14 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with aggregate push-down: COUNT(*)") {
val df = sql("select COUNT(*) FROM h2.test.employee")
val df = sql("SELECT COUNT(*) FROM h2.test.employee")
checkAggregateRemoved(df)
checkPushedInfo(df, "PushedAggregates: [COUNT(*)]")
checkAnswer(df, Seq(Row(5)))
}

test("scan with aggregate push-down: GROUP BY without aggregate functions") {
val df = sql("select name FROM h2.test.employee GROUP BY name")
val df = sql("SELECT name FROM h2.test.employee GROUP BY name")
checkAggregateRemoved(df)
checkPushedInfo(df,
"PushedAggregates: [], PushedFilters: [], PushedGroupByExpressions: [NAME],")
Expand Down Expand Up @@ -847,14 +847,14 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with aggregate push-down: COUNT(col)") {
val df = sql("select COUNT(DEPT) FROM h2.test.employee")
val df = sql("SELECT COUNT(DEPT) FROM h2.test.employee")
checkAggregateRemoved(df)
checkPushedInfo(df, "PushedAggregates: [COUNT(DEPT)]")
checkAnswer(df, Seq(Row(5)))
}

test("scan with aggregate push-down: COUNT(DISTINCT col)") {
val df = sql("select COUNT(DISTINCT DEPT) FROM h2.test.employee")
val df = sql("SELECT COUNT(DISTINCT DEPT) FROM h2.test.employee")
checkAggregateRemoved(df)
checkPushedInfo(df, "PushedAggregates: [COUNT(DISTINCT DEPT)]")
checkAnswer(df, Seq(Row(3)))
Expand Down Expand Up @@ -969,8 +969,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with aggregate push-down: with multiple group by columns") {
val df = sql("select MAX(SALARY), MIN(BONUS) FROM h2.test.employee where dept > 0" +
" group by DEPT, NAME")
val df = sql("SELECT MAX(SALARY), MIN(BONUS) FROM h2.test.employee WHERE dept > 0" +
" GROUP BY DEPT, NAME")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
checkPushedInfo(df, "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
Expand All @@ -980,8 +980,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with aggregate push-down: with concat multiple group key in project") {
val df1 = sql("select concat_ws('#', DEPT, NAME), MAX(SALARY) FROM h2.test.employee" +
" where dept > 0 group by DEPT, NAME")
val df1 = sql("SELECT concat_ws('#', DEPT, NAME), MAX(SALARY) FROM h2.test.employee" +
" WHERE dept > 0 GROUP BY DEPT, NAME")
val filters1 = df1.queryExecution.optimizedPlan.collect {
case f: Filter => f
}
Expand All @@ -992,8 +992,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(df1, Seq(Row("1#amy", 10000), Row("1#cathy", 9000), Row("2#alex", 12000),
Row("2#david", 10000), Row("6#jen", 12000)))

val df2 = sql("select concat_ws('#', DEPT, NAME), MAX(SALARY) + MIN(BONUS)" +
" FROM h2.test.employee where dept > 0 group by DEPT, NAME")
val df2 = sql("SELECT concat_ws('#', DEPT, NAME), MAX(SALARY) + MIN(BONUS)" +
" FROM h2.test.employee WHERE dept > 0 GROUP BY DEPT, NAME")
val filters2 = df2.queryExecution.optimizedPlan.collect {
case f: Filter => f
}
Expand All @@ -1004,8 +1004,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(df2, Seq(Row("1#amy", 11000), Row("1#cathy", 10200), Row("2#alex", 13200),
Row("2#david", 11300), Row("6#jen", 13200)))

val df3 = sql("select concat_ws('#', DEPT, NAME), MAX(SALARY) + MIN(BONUS)" +
" FROM h2.test.employee where dept > 0 group by concat_ws('#', DEPT, NAME)")
val df3 = sql("SELECT concat_ws('#', DEPT, NAME), MAX(SALARY) + MIN(BONUS)" +
" FROM h2.test.employee WHERE dept > 0 GROUP BY concat_ws('#', DEPT, NAME)")
checkFiltersRemoved(df3)
checkAggregateRemoved(df3, false)
checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], ")
Expand All @@ -1014,8 +1014,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with aggregate push-down: with having clause") {
val df = sql("select MAX(SALARY), MIN(BONUS) FROM h2.test.employee where dept > 0" +
" group by DEPT having MIN(BONUS) > 1000")
val df = sql("SELECT MAX(SALARY), MIN(BONUS) FROM h2.test.employee WHERE dept > 0" +
" GROUP BY DEPT having MIN(BONUS) > 1000")
// filter over aggregate not push down
checkFiltersRemoved(df, false)
checkAggregateRemoved(df)
Expand All @@ -1025,7 +1025,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with aggregate push-down: alias over aggregate") {
val df = sql("select * from h2.test.employee")
val df = sql("SELECT * FROM h2.test.employee")
.groupBy($"DEPT")
.min("SALARY").as("total")
checkAggregateRemoved(df)
Expand Down Expand Up @@ -1072,8 +1072,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with aggregate push-down: VAR_POP VAR_SAMP with filter and group by") {
val df = sql("select VAR_POP(bonus), VAR_SAMP(bonus) FROM h2.test.employee where dept > 0" +
" group by DePt")
val df = sql("SELECT VAR_POP(bonus), VAR_SAMP(bonus) FROM h2.test.employee WHERE dept > 0" +
" GROUP BY DePt")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
checkPushedInfo(df, "PushedAggregates: [VAR_POP(BONUS), VAR_SAMP(BONUS)], " +
Expand All @@ -1082,8 +1082,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with aggregate push-down: STDDEV_POP STDDEV_SAMP with filter and group by") {
val df = sql("select STDDEV_POP(bonus), STDDEV_SAMP(bonus) FROM h2.test.employee" +
" where dept > 0 group by DePt")
val df = sql("SELECT STDDEV_POP(bonus), STDDEV_SAMP(bonus) FROM h2.test.employee" +
" WHERE dept > 0 GROUP BY DePt")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
checkPushedInfo(df, "PushedAggregates: [STDDEV_POP(BONUS), STDDEV_SAMP(BONUS)], " +
Expand All @@ -1092,8 +1092,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with aggregate push-down: COVAR_POP COVAR_SAMP with filter and group by") {
val df = sql("select COVAR_POP(bonus, bonus), COVAR_SAMP(bonus, bonus)" +
" FROM h2.test.employee where dept > 0 group by DePt")
val df = sql("SELECT COVAR_POP(bonus, bonus), COVAR_SAMP(bonus, bonus)" +
" FROM h2.test.employee WHERE dept > 0 GROUP BY DePt")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
checkPushedInfo(df, "PushedAggregates: [COVAR_POP(BONUS, BONUS), COVAR_SAMP(BONUS, BONUS)], " +
Expand All @@ -1102,8 +1102,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}

test("scan with aggregate push-down: CORR with filter and group by") {
val df = sql("select CORR(bonus, bonus) FROM h2.test.employee where dept > 0" +
" group by DePt")
val df = sql("SELECT CORR(bonus, bonus) FROM h2.test.employee WHERE dept > 0" +
" GROUP BY DePt")
checkFiltersRemoved(df)
checkAggregateRemoved(df)
checkPushedInfo(df, "PushedAggregates: [CORR(BONUS, BONUS)], " +
Expand All @@ -1113,7 +1113,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel

test("scan with aggregate push-down: aggregate over alias push down") {
val cols = Seq("a", "b", "c", "d", "e")
val df1 = sql("select * from h2.test.employee").toDF(cols: _*)
val df1 = sql("SELECT * FROM h2.test.employee").toDF(cols: _*)
val df2 = df1.groupBy().sum("c")
checkAggregateRemoved(df2)
df2.queryExecution.optimizedPlan.collect {
Expand Down