diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index e6de7d0e763b9..5067cd7fa3ca1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -148,7 +148,7 @@ case class RowDataSourceScanExec( val pushedTopN = s"ORDER BY ${seqToString(pushedDownOperators.sortValues.map(_.describe()))}" + s" LIMIT ${pushedDownOperators.limit.get}" - Some("pushedTopN" -> pushedTopN) + Some("PushedTopN" -> pushedTopN) } else { pushedDownOperators.limit.map(value => "PushedLimit" -> s"LIMIT $value") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 31fdb022b625f..e7e9174463bbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -24,7 +24,6 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sort} -import org.apache.spark.sql.connector.expressions.{FieldReference, NullOrdering, SortDirection, SortValue} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.functions.{avg, count, count_distinct, lit, not, sum, udf, when} @@ -110,13 +109,20 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(sql("SELECT name, id FROM h2.test.people"), Seq(Row("fred", 1), Row("mary", 2))) } + private def checkPushedInfo(df: DataFrame, expectedPlanFragment: String): Unit = { + df.queryExecution.optimizedPlan.collect { + case _: DataSourceV2ScanRelation => + checkKeywordsExistsInExplain(df, expectedPlanFragment) + } + } + // TABLESAMPLE ({integer_expression | decimal_expression} PERCENT) and // TABLESAMPLE (BUCKET integer_expression OUT OF integer_expression) // are tested in JDBC dialect tests because TABLESAMPLE is not supported by all the DBMS test("TABLESAMPLE (integer_expression ROWS) is the same as LIMIT") { val df = sql("SELECT NAME FROM h2.test.employee TABLESAMPLE (3 ROWS)") checkSchemaNames(df, Seq("NAME")) - checkPushedLimit(df, Some(3)) + checkPushedInfo(df, "PushedFilters: [], PushedLimit: LIMIT 3, ") checkAnswer(df, Seq(Row("amy"), Row("alex"), Row("cathy"))) } @@ -130,7 +136,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel test("simple scan with LIMIT") { val df1 = spark.read.table("h2.test.employee") .where($"dept" === 1).limit(1) - checkPushedLimit(df1, Some(1)) + checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, ") checkAnswer(df1, Seq(Row(1, "amy", 10000.00, 1000.0, true))) val df2 = spark.read @@ -141,19 +148,22 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .table("h2.test.employee") .filter($"dept" > 1) .limit(1) - checkPushedLimit(df2, Some(1)) + checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], PushedLimit: LIMIT 1, ") checkAnswer(df2, Seq(Row(2, "alex", 12000.00, 1200.0, false))) val df3 = sql("SELECT name FROM h2.test.employee WHERE dept > 1 LIMIT 1") checkSchemaNames(df3, Seq("NAME")) - checkPushedLimit(df3, Some(1)) + checkPushedInfo(df3, + "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], PushedLimit: LIMIT 1, ") checkAnswer(df3, Seq(Row("alex"))) val df4 = spark.read .table("h2.test.employee") .groupBy("DEPT").sum("SALARY") .limit(1) - checkPushedLimit(df4, None) + checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT], ") checkAnswer(df4, Seq(Row(1, 19000.00))) val name = udf { (x: String) => x.matches("cat|dav|amy") } @@ -164,24 +174,18 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .filter(name($"shortName")) .limit(1) // LIMIT is pushed down only if all the filters are pushed down - checkPushedLimit(df5, None) + checkPushedInfo(df5, "PushedFilters: [], ") checkAnswer(df5, Seq(Row(10000.00, 1000.0, "amy"))) } - private def checkPushedLimit(df: DataFrame, limit: Option[Int] = None, - sortValues: Seq[SortValue] = Nil): Unit = { - df.queryExecution.optimizedPlan.collect { - case relation: DataSourceV2ScanRelation => relation.scan match { - case v1: V1ScanWrapper => - assert(v1.pushedDownOperators.limit === limit) - assert(v1.pushedDownOperators.sortValues === sortValues) - } + private def checkSortRemoved(df: DataFrame, removed: Boolean = true): Unit = { + val sorts = df.queryExecution.optimizedPlan.collect { + case s: Sort => s } - if (sortValues.nonEmpty) { - val sorts = df.queryExecution.optimizedPlan.collect { - case s: Sort => s - } + if (removed) { assert(sorts.isEmpty) + } else { + assert(sorts.nonEmpty) } } @@ -190,12 +194,16 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .table("h2.test.employee") .sort("salary") .limit(1) - checkPushedLimit(df1, Some(1), createSortValues()) + checkSortRemoved(df1) + checkPushedInfo(df1, + "PushedFilters: [], PushedTopN: ORDER BY [salary ASC NULLS FIRST] LIMIT 1, ") checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) val df2 = spark.read.table("h2.test.employee") .where($"dept" === 1).orderBy($"salary").limit(1) - checkPushedLimit(df2, Some(1), createSortValues()) + checkSortRemoved(df2) + checkPushedInfo(df2, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], " + + "PushedTopN: ORDER BY [salary ASC NULLS FIRST] LIMIT 1, ") checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) val df3 = spark.read @@ -207,19 +215,23 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .filter($"dept" > 1) .orderBy($"salary".desc) .limit(1) - checkPushedLimit( - df3, Some(1), createSortValues(SortDirection.DESCENDING, NullOrdering.NULLS_LAST)) + checkSortRemoved(df3) + checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], " + + "PushedTopN: ORDER BY [salary DESC NULLS LAST] LIMIT 1, ") checkAnswer(df3, Seq(Row(2, "alex", 12000.00, 1200.0, false))) val df4 = sql("SELECT name FROM h2.test.employee WHERE dept > 1 ORDER BY salary NULLS LAST LIMIT 1") checkSchemaNames(df4, Seq("NAME")) - checkPushedLimit(df4, Some(1), createSortValues(nullOrdering = NullOrdering.NULLS_LAST)) + checkSortRemoved(df4) + checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], " + + "PushedTopN: ORDER BY [salary ASC NULLS LAST] LIMIT 1, ") checkAnswer(df4, Seq(Row("david"))) val df5 = spark.read.table("h2.test.employee") .where($"dept" === 1).orderBy($"salary") - checkPushedLimit(df5, None) + checkSortRemoved(df5, false) + checkPushedInfo(df5, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ") checkAnswer(df5, Seq(Row(1, "cathy", 9000.00, 1200.0, false), Row(1, "amy", 10000.00, 1000.0, true))) @@ -228,7 +240,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .groupBy("DEPT").sum("SALARY") .orderBy("DEPT") .limit(1) - checkPushedLimit(df6) + checkSortRemoved(df6, false) + checkPushedInfo(df6, "PushedAggregates: [SUM(SALARY)]," + + " PushedFilters: [], PushedGroupByColumns: [DEPT], ") checkAnswer(df6, Seq(Row(1, 19000.00))) val name = udf { (x: String) => x.matches("cat|dav|amy") } @@ -240,145 +254,69 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .sort($"SALARY".desc) .limit(1) // LIMIT is pushed down only if all the filters are pushed down - checkPushedLimit(df7) + checkSortRemoved(df7, false) + checkPushedInfo(df7, "PushedFilters: [], ") checkAnswer(df7, Seq(Row(10000.00, 1000.0, "amy"))) val df8 = spark.read .table("h2.test.employee") .sort(sub($"NAME")) .limit(1) - checkPushedLimit(df8) + checkSortRemoved(df8, false) + checkPushedInfo(df8, "PushedFilters: [], ") checkAnswer(df8, Seq(Row(2, "alex", 12000.00, 1200.0, false))) } - private def createSortValues( - sortDirection: SortDirection = SortDirection.ASCENDING, - nullOrdering: NullOrdering = NullOrdering.NULLS_FIRST): Seq[SortValue] = { - Seq(SortValue(FieldReference("salary"), sortDirection, nullOrdering)) - } - test("scan with filter push-down") { val df = spark.table("h2.test.people").filter($"id" > 1) - checkFiltersRemoved(df) - - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedFilters: [ID IS NOT NULL, ID > 1]" - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } - + checkPushedInfo(df, "PushedFilters: [ID IS NOT NULL, ID > 1], ") checkAnswer(df, Row("mary", 2)) val df2 = spark.table("h2.test.employee").filter($"name".isin("amy", "cathy")) - checkFiltersRemoved(df2) - - df2.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedFilters: [NAME IN ('amy', 'cathy')]" - checkKeywordsExistsInExplain(df2, expected_plan_fragment) - } - + checkPushedInfo(df2, "PushedFilters: [NAME IN ('amy', 'cathy')]") checkAnswer(df2, Seq(Row(1, "amy", 10000, 1000, true), Row(1, "cathy", 9000, 1200, false))) val df3 = spark.table("h2.test.employee").filter($"name".startsWith("a")) - checkFiltersRemoved(df3) - - df3.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedFilters: [NAME IS NOT NULL, NAME LIKE 'a%']" - checkKeywordsExistsInExplain(df3, expected_plan_fragment) - } - + checkPushedInfo(df3, "PushedFilters: [NAME IS NOT NULL, NAME LIKE 'a%']") checkAnswer(df3, Seq(Row(1, "amy", 10000, 1000, true), Row(2, "alex", 12000, 1200, false))) val df4 = spark.table("h2.test.employee").filter($"is_manager") - checkFiltersRemoved(df4) - - df4.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true]" - checkKeywordsExistsInExplain(df4, expected_plan_fragment) - } - + checkPushedInfo(df4, "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true]") checkAnswer(df4, Seq(Row(1, "amy", 10000, 1000, true), Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true))) val df5 = spark.table("h2.test.employee").filter($"is_manager".and($"salary" > 10000)) - checkFiltersRemoved(df5) - - df5.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedFilters: [IS_MANAGER IS NOT NULL, SALARY IS NOT NULL, " + - "IS_MANAGER = true, SALARY > 10000.00]" - checkKeywordsExistsInExplain(df5, expected_plan_fragment) - } - + checkPushedInfo(df5, "PushedFilters: [IS_MANAGER IS NOT NULL, SALARY IS NOT NULL, " + + "IS_MANAGER = true, SALARY > 10000.00]") checkAnswer(df5, Seq(Row(6, "jen", 12000, 1200, true))) val df6 = spark.table("h2.test.employee").filter($"is_manager".or($"salary" > 10000)) - checkFiltersRemoved(df6) - - df6.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedFilters: [(IS_MANAGER = true) OR (SALARY > 10000.00)], " - checkKeywordsExistsInExplain(df6, expected_plan_fragment) - } - + checkPushedInfo(df6, "PushedFilters: [(IS_MANAGER = true) OR (SALARY > 10000.00)], ") checkAnswer(df6, Seq(Row(1, "amy", 10000, 1000, true), Row(2, "alex", 12000, 1200, false), Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true))) val df7 = spark.table("h2.test.employee").filter(not($"is_manager") === true) - checkFiltersRemoved(df7) - - df7.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedFilters: [IS_MANAGER IS NOT NULL, NOT (IS_MANAGER = true)], " - checkKeywordsExistsInExplain(df7, expected_plan_fragment) - } - + checkPushedInfo(df7, "PushedFilters: [IS_MANAGER IS NOT NULL, NOT (IS_MANAGER = true)], ") checkAnswer(df7, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "alex", 12000, 1200, false))) val df8 = spark.table("h2.test.employee").filter($"is_manager" === true) - checkFiltersRemoved(df8) - - df8.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true], " - checkKeywordsExistsInExplain(df8, expected_plan_fragment) - } - + checkPushedInfo(df8, "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true], ") checkAnswer(df8, Seq(Row(1, "amy", 10000, 1000, true), Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true))) val df9 = spark.table("h2.test.employee") .filter(when($"dept" > 1, true).when($"is_manager", false).otherwise($"dept" > 3)) - checkFiltersRemoved(df9) - - df9.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedFilters: [CASE WHEN DEPT > 1 THEN TRUE WHEN IS_MANAGER = true THEN FALSE" + - " ELSE DEPT > 3 END], " - checkKeywordsExistsInExplain(df9, expected_plan_fragment) - } - + checkPushedInfo(df9, "PushedFilters: [CASE WHEN DEPT > 1 THEN TRUE " + + "WHEN IS_MANAGER = true THEN FALSE ELSE DEPT > 3 END], ") checkAnswer(df9, Seq(Row(2, "alex", 12000, 1200, false), Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true))) } @@ -387,19 +325,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel Seq(false, true).foreach { ansiMode => withSQLConf(SQLConf.ANSI_ENABLED.key -> ansiMode.toString) { val df = spark.table("h2.test.people").filter($"id" + 1 > 1) - checkFiltersRemoved(df, ansiMode) - - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = if (ansiMode) { - "PushedFilters: [ID IS NOT NULL, (ID + 1) > 1]" - } else { - "PushedFilters: [ID IS NOT NULL]" - } - checkKeywordsExistsInExplain(df, expected_plan_fragment) + val expectedPlanFragment = if (ansiMode) { + "PushedFilters: [ID IS NOT NULL, (ID + 1) > 1]" + } else { + "PushedFilters: [ID IS NOT NULL]" } - + checkPushedInfo(df, expectedPlanFragment) checkAnswer(df, Seq(Row("fred", 1), Row("mary", 2))) val df2 = spark.table("h2.test.people").filter($"id" + Int.MaxValue > 1) @@ -432,18 +364,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel |""".stripMargin) checkFiltersRemoved(df3, ansiMode) - - df3.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = if (ansiMode) { - "PushedFilters: [(CASE WHEN SALARY > 10000.00 THEN BONUS" + - " ELSE BONUS + 200.0 END) > 1200.0]" - } else { - "PushedFilters: []" - } - checkKeywordsExistsInExplain(df3, expected_plan_fragment) + val expectedPlanFragment3 = if (ansiMode) { + "PushedFilters: [(CASE WHEN SALARY > 10000.00 THEN BONUS" + + " ELSE BONUS + 200.0 END) > 1200.0]" + } else { + "PushedFilters: []" } - + checkPushedInfo(df3, expectedPlanFragment3) checkAnswer(df3, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "david", 10000, 1300, true))) } @@ -587,14 +514,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel " group by DePt") checkFiltersRemoved(df) checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [MAX(SALARY), AVG(BONUS)], " + - "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " + - "PushedGroupByColumns: [DEPT], " - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [MAX(SALARY), AVG(BONUS)], " + + "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " + + "PushedGroupByColumns: [DEPT], ") checkAnswer(df, Seq(Row(10000, 1100.0), Row(12000, 1250.0), Row(12000, 1200.0))) } @@ -613,14 +535,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel val df = sql("select MAX(ID), AVG(ID) FROM h2.test.people where id > 0") checkFiltersRemoved(df) checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [MAX(ID), AVG(ID)], " + - "PushedFilters: [ID IS NOT NULL, ID > 0], " + - "PushedGroupByColumns: [], " - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [MAX(ID), AVG(ID)], " + + "PushedFilters: [ID IS NOT NULL, ID > 0], " + + "PushedGroupByColumns: [], ") checkAnswer(df, Seq(Row(2, 1.5))) } @@ -650,42 +567,28 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "PushedAggregates: [MAX(SALARY)]" checkKeywordsExistsInExplain(df, expected_plan_fragment) } + checkPushedInfo(df, "PushedAggregates: [MAX(SALARY)]") checkAnswer(df, Seq(Row(12001))) } test("scan with aggregate push-down: COUNT(*)") { val df = sql("select COUNT(*) FROM h2.test.employee") checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [COUNT(*)]" - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [COUNT(*)]") checkAnswer(df, Seq(Row(5))) } test("scan with aggregate push-down: COUNT(col)") { val df = sql("select COUNT(DEPT) FROM h2.test.employee") checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [COUNT(DEPT)]" - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [COUNT(DEPT)]") checkAnswer(df, Seq(Row(5))) } test("scan with aggregate push-down: COUNT(DISTINCT col)") { val df = sql("select COUNT(DISTINCT DEPT) FROM h2.test.employee") checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [COUNT(DISTINCT DEPT)]" - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [COUNT(DISTINCT DEPT)]") checkAnswer(df, Seq(Row(3))) } @@ -704,52 +607,30 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel test("scan with aggregate push-down: SUM without filer and group by") { val df = sql("SELECT SUM(SALARY) FROM h2.test.employee") checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [SUM(SALARY)]" - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [SUM(SALARY)]") checkAnswer(df, Seq(Row(53000))) } test("scan with aggregate push-down: DISTINCT SUM without filer and group by") { val df = sql("SELECT SUM(DISTINCT SALARY) FROM h2.test.employee") checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [SUM(DISTINCT SALARY)]" - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [SUM(DISTINCT SALARY)]") checkAnswer(df, Seq(Row(31000))) } test("scan with aggregate push-down: SUM with group by") { val df = sql("SELECT SUM(SALARY) FROM h2.test.employee GROUP BY DEPT") checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [SUM(SALARY)], " + - "PushedFilters: [], " + - "PushedGroupByColumns: [DEPT], " - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [SUM(SALARY)], " + + "PushedFilters: [], PushedGroupByColumns: [DEPT], ") checkAnswer(df, Seq(Row(19000), Row(22000), Row(12000))) } test("scan with aggregate push-down: DISTINCT SUM with group by") { val df = sql("SELECT SUM(DISTINCT SALARY) FROM h2.test.employee GROUP BY DEPT") checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [SUM(DISTINCT SALARY)], " + - "PushedFilters: [], " + - "PushedGroupByColumns: [DEPT], " - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [SUM(DISTINCT SALARY)], " + + "PushedFilters: [], PushedGroupByColumns: [DEPT]") checkAnswer(df, Seq(Row(19000), Row(22000), Row(12000))) } @@ -758,14 +639,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel " group by DEPT, NAME") checkFiltersRemoved(df) checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " + - "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " + - "PushedGroupByColumns: [DEPT, NAME], " - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " + + "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT, NAME]") checkAnswer(df, Seq(Row(9000, 1200), Row(12000, 1200), Row(10000, 1300), Row(10000, 1000), Row(12000, 1200))) } @@ -778,14 +653,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel } assert(filters1.isEmpty) checkAggregateRemoved(df1) - df1.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [MAX(SALARY)], " + - "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " + - "PushedGroupByColumns: [DEPT, NAME], " - checkKeywordsExistsInExplain(df1, expected_plan_fragment) - } + checkPushedInfo(df1, "PushedAggregates: [MAX(SALARY)], " + + "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT, NAME]") checkAnswer(df1, Seq(Row("1#amy", 10000), Row("1#cathy", 9000), Row("2#alex", 12000), Row("2#david", 10000), Row("6#jen", 12000))) @@ -796,30 +665,16 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel } assert(filters2.isEmpty) checkAggregateRemoved(df2) - df2.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " + - "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " + - "PushedGroupByColumns: [DEPT, NAME], " - checkKeywordsExistsInExplain(df2, expected_plan_fragment) - } + checkPushedInfo(df2, "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " + + "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT, NAME]") checkAnswer(df2, Seq(Row("1#amy", 11000), Row("1#cathy", 10200), Row("2#alex", 13200), Row("2#david", 11300), Row("6#jen", 13200))) val df3 = sql("select concat_ws('#', DEPT, NAME), MAX(SALARY) + MIN(BONUS)" + " FROM h2.test.employee where dept > 0 group by concat_ws('#', DEPT, NAME)") - val filters3 = df3.queryExecution.optimizedPlan.collect { - case f: Filter => f - } - assert(filters3.isEmpty) + checkFiltersRemoved(df3) checkAggregateRemoved(df3, false) - df3.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " - checkKeywordsExistsInExplain(df3, expected_plan_fragment) - } + checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], ") checkAnswer(df3, Seq(Row("1#amy", 11000), Row("1#cathy", 10200), Row("2#alex", 13200), Row("2#david", 11300), Row("6#jen", 13200))) } @@ -827,19 +682,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel test("scan with aggregate push-down: with having clause") { val df = sql("select MAX(SALARY), MIN(BONUS) FROM h2.test.employee where dept > 0" + " group by DEPT having MIN(BONUS) > 1000") - val filters = df.queryExecution.optimizedPlan.collect { - case f: Filter => f // filter over aggregate not push down - } - assert(filters.nonEmpty) + // filter over aggregate not push down + checkFiltersRemoved(df, false) checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " + - "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " + - "PushedGroupByColumns: [DEPT], " - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [MAX(SALARY), MIN(BONUS)], " + + "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]") checkAnswer(df, Seq(Row(12000, 1200), Row(12000, 1200))) } @@ -848,14 +695,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .groupBy($"DEPT") .min("SALARY").as("total") checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [MIN(SALARY)], " + - "PushedFilters: [], " + - "PushedGroupByColumns: [DEPT], " - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [MIN(SALARY)], " + + "PushedFilters: [], PushedGroupByColumns: [DEPT]") checkAnswer(df, Seq(Row(1, 9000), Row(2, 10000), Row(6, 12000))) } @@ -867,19 +708,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .agg(sum($"SALARY").as("total")) .filter($"total" > 1000) .orderBy($"total") - val filters = query.queryExecution.optimizedPlan.collect { - case f: Filter => f - } - assert(filters.nonEmpty) // filter over aggregate not pushed down - checkAggregateRemoved(df) - query.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [SUM(SALARY)], " + - "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " + - "PushedGroupByColumns: [DEPT], " - checkKeywordsExistsInExplain(query, expected_plan_fragment) - } + checkFiltersRemoved(query, false)// filter over aggregate not pushed down + checkAggregateRemoved(query) + checkPushedInfo(query, "PushedAggregates: [SUM(SALARY)], " + + "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]") checkAnswer(query, Seq(Row(6, 12000), Row(1, 19000), Row(2, 22000))) } @@ -888,12 +720,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel val decrease = udf { (x: Double, y: Double) => x - y } val query = df.select(decrease(sum($"SALARY"), sum($"BONUS")).as("value")) checkAggregateRemoved(query) - query.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [SUM(SALARY), SUM(BONUS)]" - checkKeywordsExistsInExplain(query, expected_plan_fragment) - } + checkPushedInfo(query, "PushedAggregates: [SUM(SALARY), SUM(BONUS)], ") checkAnswer(query, Seq(Row(47100.0))) } @@ -915,14 +742,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel " group by DePt") checkFiltersRemoved(df) checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [VAR_POP(BONUS), VAR_SAMP(BONUS)], " + - "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " + - "PushedGroupByColumns: [DEPT], " - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [VAR_POP(BONUS), VAR_SAMP(BONUS)], " + + "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]") checkAnswer(df, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null))) } @@ -931,14 +752,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel " where dept > 0 group by DePt") checkFiltersRemoved(df) checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [STDDEV_POP(BONUS), STDDEV_SAMP(BONUS)], " + - "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " + - "PushedGroupByColumns: [DEPT], " - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [STDDEV_POP(BONUS), STDDEV_SAMP(BONUS)], " + + "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]") checkAnswer(df, Seq(Row(100d, 141.4213562373095d), Row(50d, 70.71067811865476d), Row(0d, null))) } @@ -947,14 +762,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel " FROM h2.test.employee where dept > 0 group by DePt") checkFiltersRemoved(df) checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [COVAR_POP(BONUS, BONUS), COVAR_SAMP(BONUS, BONUS)], " + - "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " + - "PushedGroupByColumns: [DEPT], " - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [COVAR_POP(BONUS, BONUS), COVAR_SAMP(BONUS, BONUS)], " + + "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]") checkAnswer(df, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null))) } @@ -963,14 +772,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel " group by DePt") checkFiltersRemoved(df) checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [CORR(BONUS, BONUS)], " + - "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], " + - "PushedGroupByColumns: [DEPT], " - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [CORR(BONUS, BONUS)], " + + "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByColumns: [DEPT]") checkAnswer(df, Seq(Row(1d), Row(1d), Row(null))) } @@ -1032,15 +835,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel |FROM h2.test.employee GROUP BY DEPT """.stripMargin) checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00)" + - " THEN SALARY ELSE 0.00 END), COUNT(CAS..., " + - "PushedFilters: [], " + - "PushedGroupByColumns: [DEPT], " - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, + "PushedAggregates: [COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00)" + + " THEN SALARY ELSE 0.00 END), COUNT(CAS..., " + + "PushedFilters: [], " + + "PushedGroupByColumns: [DEPT], ") checkAnswer(df, Seq(Row(1, 1, 1, 1, 1, 0d, 12000d, 0d, 12000d, 12000d, 0d, 0d, 2, 0d), Row(2, 2, 2, 2, 2, 0d, 10000d, 0d, 10000d, 10000d, 0d, 0d, 2, 0d), Row(2, 2, 2, 2, 2, 0d, 12000d, 0d, 12000d, 12000d, 0d, 0d, 3, 0d))) @@ -1051,17 +850,14 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel withSQLConf(SQLConf.ANSI_ENABLED.key -> ansiMode.toString) { val df = sql("SELECT SUM(2147483647 + DEPT) FROM h2.test.employee") checkAggregateRemoved(df, ansiMode) - val expected_plan_fragment = if (ansiMode) { + val expectedPlanFragment = if (ansiMode) { "PushedAggregates: [SUM(2147483647 + DEPT)], " + "PushedFilters: [], " + "PushedGroupByColumns: []" } else { "PushedFilters: []" } - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, expectedPlanFragment) if (ansiMode) { val e = intercept[SparkException] { checkAnswer(df, Seq(Row(-10737418233L))) @@ -1080,12 +876,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel val decrease = udf { (x: Double, y: Double) => x - y } val query = df.select(sum(decrease($"SALARY", $"BONUS")).as("value")) checkAggregateRemoved(query, false) - query.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedFilters: []" - checkKeywordsExistsInExplain(query, expected_plan_fragment) - } + checkPushedInfo(query, "PushedFilters: []") checkAnswer(query, Seq(Row(47100.0))) } @@ -1121,12 +912,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(sql("SELECT `dept id` FROM h2.test.dept"), Seq(Row(1), Row(2))) val df = sql("SELECT COUNT(`dept id`) FROM h2.test.dept") checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [COUNT(`dept id`)]" - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [COUNT(`dept id`)]") checkAnswer(df, Seq(Row(2))) } @@ -1135,12 +921,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(sql("SELECT `名` FROM h2.test.person"), Seq(Row(1), Row(2))) val df = sql("SELECT COUNT(`名`) FROM h2.test.person") checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [COUNT(`名`)]" - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [COUNT(`名`)]") checkAnswer(df, Seq(Row(2))) // scalastyle:on } @@ -1154,12 +935,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .table("h2.test.employee") .agg(sum($"SALARY").as("sum"), avg($"SALARY").as("avg"), count($"SALARY").as("count")) checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [SUM(SALARY), AVG(SALARY), COUNT(SALARY)]" - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [SUM(SALARY), AVG(SALARY), COUNT(SALARY)]") checkAnswer(df, Seq(Row(53000.00, 10600.000000, 5))) val df2 = spark.read @@ -1171,12 +947,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .groupBy($"name") .agg(sum($"SALARY").as("sum"), avg($"SALARY").as("avg"), count($"SALARY").as("count")) checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [SUM(SALARY), AVG(SALARY), COUNT(SALARY)]" - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [SUM(SALARY), AVG(SALARY), COUNT(SALARY)]") checkAnswer(df2, Seq( Row("alex", 12000.00, 12000.000000, 1), Row("amy", 10000.00, 10000.000000, 1), @@ -1194,12 +965,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .table("h2.test.employee") .agg(sum($"SALARY").as("sum"), avg($"SALARY").as("avg"), count($"SALARY").as("count")) checkAggregateRemoved(df, false) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [SUM(SALARY), COUNT(SALARY)]" - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [SUM(SALARY), COUNT(SALARY)]") checkAnswer(df, Seq(Row(53000.00, 10600.000000, 5))) val df2 = spark.read @@ -1211,12 +977,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .groupBy($"name") .agg(sum($"SALARY").as("sum"), avg($"SALARY").as("avg"), count($"SALARY").as("count")) checkAggregateRemoved(df, false) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expected_plan_fragment = - "PushedAggregates: [SUM(SALARY), COUNT(SALARY)]" - checkKeywordsExistsInExplain(df, expected_plan_fragment) - } + checkPushedInfo(df, "PushedAggregates: [SUM(SALARY), COUNT(SALARY)]") checkAnswer(df2, Seq( Row("alex", 12000.00, 12000.000000, 1), Row("amy", 10000.00, 10000.000000, 1), @@ -1240,12 +1001,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .agg(sum($"mySalary").as("total")) .filter($"total" > 1000) checkAggregateRemoved(df) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expectedPlanFragment = - "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]" - checkKeywordsExistsInExplain(df, expectedPlanFragment) - } + checkPushedInfo(df, + "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]") checkAnswer(df, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00))) val df2 = spark.table("h2.test.employee") @@ -1254,12 +1011,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .agg(sum($"mySalary").as("total")) .filter($"total" > 1000) checkAggregateRemoved(df2) - df2.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expectedPlanFragment = - "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]" - checkKeywordsExistsInExplain(df2, expectedPlanFragment) - } + checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]") checkAnswer(df2, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00))) } @@ -1275,12 +1028,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .agg(sum($"mySalary").as("total")) .filter($"total" > 1000) checkAggregateRemoved(df, false) - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expectedPlanFragment = - "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]" - checkKeywordsExistsInExplain(df, expectedPlanFragment) - } + checkPushedInfo(df, + "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]") checkAnswer(df, Seq(Row("alex", 12000.00), Row("amy", 10000.00), Row("cathy", 9000.00), Row("david", 10000.00), Row("jen", 12000.00))) @@ -1295,12 +1044,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .agg(sum($"mySalary").as("total")) .filter($"total" > 1000) checkAggregateRemoved(df2, false) - df2.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - val expectedPlanFragment = - "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]" - checkKeywordsExistsInExplain(df2, expectedPlanFragment) - } + checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]") checkAnswer(df2, Seq(Row("alex", 12000.00), Row("amy", 10000.00), Row("cathy", 9000.00), Row("david", 10000.00), Row("jen", 12000.00))) }