From 3bdd86910c22a14dc81fb973f6c601342b448918 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 28 Sep 2022 15:19:01 +0800 Subject: [PATCH 1/2] fix --- .../spark/sql/execution/ExpandExec.scala | 22 +++++++++++-------- .../sql/execution/python/PythonUDFSuite.scala | 2 +- .../continuous/ContinuousSuite.scala | 2 +- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala index c087fdf5f962..119466df9dc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala @@ -234,16 +234,20 @@ case class ExpandExec( val i = ctx.freshName("i") // these column have to declared before the loop. val evaluate = evaluateVariables(outputColumns) - s""" - |$evaluate - |for (int $i = 0; $i < ${projections.length}; $i ++) { - | switch ($i) { - | ${cases.mkString("\n").trim} - | } - | $numOutput.add(1); - | ${consume(ctx, outputColumns)} - |} + val margin = + s""" + |$evaluate + |for (int $i = 0; $i < ${projections.length}; $i ++) { + | switch ($i) { + | ${cases.mkString("\n").trim} + | } + | $numOutput.add(1); + | ${consume(ctx, outputColumns)} + |} """.stripMargin + println("fuck:") + println(margin) + margin } override protected def withNewChildInternal(newChild: SparkPlan): ExpandExec = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala index 42e4b1accde7..70784c20a8eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala @@ -73,7 +73,7 @@ class PythonUDFSuite extends QueryTest with SharedSparkSession { } test("SPARK-39962: Global aggregation of Pandas UDF should respect the column order") { - assume(shouldTestPythonUDFs) + assume(shouldTestPandasUDFs) val df = Seq[(java.lang.Integer, java.lang.Integer)]((1, null)).toDF("a", "b") val pandasTestUDF = TestGroupedAggPandasUDF(name = "pandas_udf") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index e6a0af48190a..fc6b51dce790 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -279,7 +279,7 @@ class ContinuousSuite extends ContinuousSuiteBase { Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), TestScalarPandasUDF("udf")).foreach { udf => test(s"continuous mode with various UDFs - ${udf.prettyName}") { assume( - shouldTestPythonUDFs && udf.isInstanceOf[TestScalarPandasUDF] || + shouldTestPandasUDFs && udf.isInstanceOf[TestScalarPandasUDF] || shouldTestPythonUDFs && udf.isInstanceOf[TestPythonUDF] || udf.isInstanceOf[TestScalaUDF]) From 1ef1dae811df1f91d55c18eef26eccc017013354 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 28 Sep 2022 15:52:06 +0800 Subject: [PATCH 2/2] revert --- .../spark/sql/execution/ExpandExec.scala | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala index 119466df9dc0..c087fdf5f962 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala @@ -234,20 +234,16 @@ case class ExpandExec( val i = ctx.freshName("i") // these column have to declared before the loop. val evaluate = evaluateVariables(outputColumns) - val margin = - s""" - |$evaluate - |for (int $i = 0; $i < ${projections.length}; $i ++) { - | switch ($i) { - | ${cases.mkString("\n").trim} - | } - | $numOutput.add(1); - | ${consume(ctx, outputColumns)} - |} + s""" + |$evaluate + |for (int $i = 0; $i < ${projections.length}; $i ++) { + | switch ($i) { + | ${cases.mkString("\n").trim} + | } + | $numOutput.add(1); + | ${consume(ctx, outputColumns)} + |} """.stripMargin - println("fuck:") - println(margin) - margin } override protected def withNewChildInternal(newChild: SparkPlan): ExpandExec =