From 90168a3b472e0fb2c810dea3b9fe5f1c9c3e3289 Mon Sep 17 00:00:00 2001 From: yangchuan Date: Tue, 16 May 2023 16:36:43 +0800 Subject: [PATCH 1/2] reproduce --- .../VeloxAggregateFunctionsSuite.scala | 387 +----------------- cpp/velox/compute/WholeStageResultIterator.cc | 3 + 2 files changed, 20 insertions(+), 370 deletions(-) diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala index 274125256fd8..3f5d121abe02 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala @@ -18,7 +18,7 @@ package io.glutenproject.execution import org.apache.spark.SparkConf -import org.apache.spark.sql.Row +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.functions.{avg, col} import org.apache.spark.sql.types.{DecimalType, StringType, StructField, StructType} @@ -47,380 +47,27 @@ class VeloxAggregateFunctionsSuite extends WholeStageTransformerSuite { .set("spark.unsafe.exceptionOnMemoryLeak", "true") .set("spark.sql.autoBroadcastJoinThreshold", "-1") .set("spark.sql.sources.useV1SourceList", "avro") + .set("spark.gluten.sql.validate.failure.logLevel", "WARN") } - test("count") { - val df = runQueryAndCompare( - "select count(*) from lineitem where l_partkey in (1552, 674, 1062)") { - checkOperatorMatch[GlutenHashAggregateExecTransformer] } - runQueryAndCompare( - "select count(l_quantity), count(distinct l_partkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} - } - - test("avg") { - val df = runQueryAndCompare( - "select avg(l_partkey) from lineitem where l_partkey < 1000") { - checkOperatorMatch[GlutenHashAggregateExecTransformer] } - runQueryAndCompare( - "select avg(l_quantity), count(distinct l_partkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} - runQueryAndCompare( - "select avg(cast (l_quantity as DECIMAL(12, 2))), " + - "count(distinct l_partkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} - runQueryAndCompare( - "select avg(cast (l_quantity as DECIMAL(22, 2))), " + - "count(distinct l_partkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} - } + private lazy val person3: DataFrame = Seq( + ("Luis", 1, 99), + ("Luis", 16, 99), + ("Luis", 16, 176), + ("Fernando", 32, 99), + ("Fernando", 32, 164), + ("David", 60, 99), + ("Amy", 24, 99)).toDF("name", "age", "height") - test("sum") { - runQueryAndCompare( - "select sum(l_partkey) from lineitem where l_partkey < 2000") { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - "select sum(l_quantity), count(distinct l_partkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} - runQueryAndCompare( - "select sum(cast (l_quantity as DECIMAL(22, 2))) from lineitem") { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - "select sum(cast (l_quantity as DECIMAL(12, 2))), " + - "count(distinct l_partkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} - runQueryAndCompare( - "select sum(cast (l_quantity as DECIMAL(22, 2))), " + - "count(distinct l_partkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} - } - - test("min and max") { - runQueryAndCompare( - "select min(l_partkey), max(l_partkey) from lineitem where l_partkey < 2000") { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - "select min(l_partkey), max(l_partkey), count(distinct l_partkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} - } + test("SPARK-34165: Add count_distinct to summary") { + val summaryDF = person3.summary("count") - test("groupby") { - val df = runQueryAndCompare( - "select l_orderkey, sum(l_partkey) as sum from lineitem " + - "where l_orderkey < 3 group by l_orderkey") { _ => } - checkLengthAndPlan(df, 2) - } - - test("group sets") { - val result = runQueryAndCompare( - "select l_orderkey, l_partkey, sum(l_suppkey) from lineitem " + - "where l_orderkey < 3 group by ROLLUP(l_orderkey, l_partkey) " + - "order by l_orderkey, l_partkey ") { _ => } - } - - test("stddev_samp") { - runQueryAndCompare( - """ - |select stddev_samp(l_quantity) from lineitem; - |""".stripMargin) { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - """ - |select l_orderkey, stddev_samp(l_quantity) from lineitem - |group by l_orderkey; - |""".stripMargin) { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - "select stddev_samp(l_quantity), count(distinct l_partkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} - } - - test("stddev_pop") { - runQueryAndCompare( - """ - |select stddev_pop(l_quantity) from lineitem; - |""".stripMargin) { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - """ - |select l_orderkey, stddev_pop(l_quantity) from lineitem - |group by l_orderkey; - |""".stripMargin) { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - "select stddev_pop(l_quantity), count(distinct l_partkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} - } - - test("var_samp") { - runQueryAndCompare( - """ - |select var_samp(l_quantity) from lineitem; - |""".stripMargin) { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - """ - |select l_orderkey, var_samp(l_quantity) from lineitem - |group by l_orderkey; - |""".stripMargin) { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - "select var_samp(l_quantity), count(distinct l_partkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} - } - - test("var_pop") { - runQueryAndCompare( - """ - |select var_pop(l_quantity) from lineitem; - |""".stripMargin) { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - """ - |select l_orderkey, var_pop(l_quantity) from lineitem - |group by l_orderkey; - |""".stripMargin) { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - "select var_pop(l_quantity), count(distinct l_partkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4) - }} - } - - test("bit_and bit_or bit_xor") { - val bitAggs = Seq("bit_and", "bit_or", "bit_xor") - for (func <- bitAggs) { - runQueryAndCompare( - s""" - |select ${func}(l_linenumber) from lineitem - |group by l_orderkey; - |""".stripMargin) { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - s"select ${func}(l_linenumber), count(distinct l_partkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - } - } - - test("corr covar_pop covar_samp") { - runQueryAndCompare( - """ - |select corr(l_partkey, l_suppkey) from lineitem; - |""".stripMargin) { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - "select corr(l_partkey, l_suppkey), count(distinct l_orderkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare( - """ - |select covar_pop(l_partkey, l_suppkey) from lineitem; - |""".stripMargin) { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - "select covar_pop(l_partkey, l_suppkey), count(distinct l_orderkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare( - """ - |select covar_samp(l_partkey, l_suppkey) from lineitem; - |""".stripMargin) { - checkOperatorMatch[GlutenHashAggregateExecTransformer] - } - runQueryAndCompare( - "select covar_samp(l_partkey, l_suppkey), count(distinct l_orderkey) from lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - } + val summaryResult = Seq( + Row("count", "7", "7", "7")) + def getSchemaAsSeq(df: DataFrame): Seq[String] = df.schema.map(_.name) - test("distinct functions") { - runQueryAndCompare("SELECT sum(DISTINCT l_partkey), count(*) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare( - "SELECT sum(DISTINCT l_partkey), count(*), sum(l_partkey) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare("SELECT avg(DISTINCT l_partkey), count(*) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare( - "SELECT avg(DISTINCT l_partkey), count(*), avg(l_partkey) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare("SELECT count(DISTINCT l_partkey), count(*) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare( - "SELECT count(DISTINCT l_partkey), count(*), count(l_partkey) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare("SELECT stddev_samp(DISTINCT l_partkey), count(*) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare( - "SELECT stddev_samp(DISTINCT l_partkey), count(*), " + - "stddev_samp(l_partkey) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare("SELECT stddev_pop(DISTINCT l_partkey), count(*) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare( - "SELECT stddev_pop(DISTINCT l_partkey), count(*), " + - "stddev_pop(l_partkey) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare("SELECT var_samp(DISTINCT l_partkey), count(*) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare( - "SELECT var_samp(DISTINCT l_partkey), count(*), " + - "var_samp(l_partkey) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare("SELECT var_pop(DISTINCT l_partkey), count(*) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare( - "SELECT var_pop(DISTINCT l_partkey), count(*), " + - "var_pop(l_partkey) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare("SELECT corr(DISTINCT l_partkey, l_suppkey)," + - "corr(DISTINCT l_suppkey, l_partkey), count(*) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare("SELECT corr(DISTINCT l_partkey, l_suppkey)," + - "count(*), corr(l_suppkey, l_partkey) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare("SELECT covar_pop(DISTINCT l_partkey, l_suppkey)," + - "covar_pop(DISTINCT l_suppkey, l_partkey), count(*) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare("SELECT covar_pop(DISTINCT l_partkey, l_suppkey)," + - "count(*), covar_pop(l_suppkey, l_partkey) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare("SELECT covar_samp(DISTINCT l_partkey, l_suppkey)," + - "covar_samp(DISTINCT l_suppkey, l_partkey), count(*) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } - runQueryAndCompare("SELECT covar_samp(DISTINCT l_partkey, l_suppkey)," + - "count(*), covar_samp(l_suppkey, l_partkey) FROM lineitem") { df => { - assert(getExecutedPlan(df).count(plan => { - plan.isInstanceOf[GlutenHashAggregateExecTransformer] - }) == 4) - } - } + assert(getSchemaAsSeq(summaryDF) === Seq("summary", "name", "age", "height")) + checkAnswer(summaryDF, summaryResult) } } diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 51d6dbc53b23..b21c2febcc76 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -106,6 +106,9 @@ std::shared_ptr WholeStageResultIterator::next() { if (numRows == 0) { return nullptr; } + std::cout << "gluten output" << std::endl; + std::cout << vector->toString() << std::endl; + std::cout << vector->toString(0, vector->size()) << std::endl; return std::make_shared(vector); } From 344ccb1aeaf52625869d0ef42b534c00caf477ea Mon Sep 17 00:00:00 2001 From: yangchuan Date: Tue, 16 May 2023 17:32:47 +0800 Subject: [PATCH 2/2] fix ci --- .github/workflows/velox_be.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/velox_be.yml b/.github/workflows/velox_be.yml index 9139cb5dabda..e50f81b7f72b 100644 --- a/.github/workflows/velox_be.yml +++ b/.github/workflows/velox_be.yml @@ -85,7 +85,7 @@ jobs: run: | docker exec velox-backend-ubuntu2004-test-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -DargLine="-Dspark.test.home=/opt/spark322" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest' + mvn clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -DargLine="-Dspark.test.home=/opt/spark322" -DwildcardSuites="io.glutenproject.execution.VeloxAggregateFunctionsSuite"' - name: Run CPP unit test run: | docker exec velox-backend-ubuntu2004-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/cpp/build && \