From e206fffb0bb529f7cb030f6744ae97346cc0ca18 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 9 Mar 2018 03:32:51 +0000 Subject: [PATCH 1/5] initial commit --- .../apache/spark/sql/execution/BufferedRowIterator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java index 730a4ae8d5605..597d441a24dc6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java @@ -65,7 +65,7 @@ public long durationMs() { /** * Append a row to currentRows. */ - protected void append(InternalRow row) { + public void append(InternalRow row) { currentRows.add(row); } @@ -75,7 +75,7 @@ protected void append(InternalRow row) { * If it returns true, the caller should exit the loop that [[InputAdapter]] generates. * This interface is mainly used to limit the number of input rows. */ - protected boolean stopEarly() { + public boolean stopEarly() { return false; } @@ -84,14 +84,14 @@ protected boolean stopEarly() { * * If it returns true, the caller should exit the loop (return from processNext()). */ - protected boolean shouldStop() { + public boolean shouldStop() { return !currentRows.isEmpty(); } /** * Increase the peak execution memory for current task. */ - protected void incPeakExecutionMemory(long size) { + public void incPeakExecutionMemory(long size) { TaskContext.get().taskMetrics().incPeakExecutionMemory(size); } From dccae531f85a2b5982b02715e3e844d641483d10 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 9 Mar 2018 15:29:35 +0000 Subject: [PATCH 2/5] add test case --- .../execution/WholeStageCodegenSuite.scala | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index ef16292a8e75c..c4f8ab8cc95e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -32,6 +32,8 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructType} class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + test("range/filter should be combined") { val df = spark.range(10).filter("id = 1").selectExpr("id + 1") val plan = df.queryExecution.executedPlan @@ -307,4 +309,47 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { // a different query can result in codegen cache miss, that's by design } } + + test("SPARK-23598: Avoid compilation error with a lot of aggregation operations") { + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { + val df = Seq((8, "bat"), (15, "mouse"), (5, "horse")).toDF("age", "name") + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) + .groupBy("name").agg(avg("age").alias("age")).limit(1) + assert(df.collect() === Array(Row("bat", 8.0))) + } + } } From 6e4579113fcd4eff7c042c6b1d14e672596bc54c Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sat, 10 Mar 2018 06:40:12 +0000 Subject: [PATCH 3/5] address review comments --- .../execution/WholeStageCodegenSuite.scala | 45 +++---------------- 1 file changed, 6 insertions(+), 39 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index c4f8ab8cc95e7..91c3ae2513879 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -310,46 +310,13 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { } } - test("SPARK-23598: Avoid compilation error with a lot of aggregation operations") { + test("SPARK-23598: Codegen working for lots of aggregation operations without runtime errors") { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { - val df = Seq((8, "bat"), (15, "mouse"), (5, "horse")).toDF("age", "name") - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).groupBy("name").agg(avg("age").alias("age")) - .groupBy("name").agg(avg("age").alias("age")).limit(1) - assert(df.collect() === Array(Row("bat", 8.0))) + var df = Seq((8, "bat"), (15, "mouse"), (5, "horse")).toDF("age", "name") + for (i <- 0 until 73) { + df = df.groupBy("name").agg(avg("age").alias("age")) + } + assert(df.limit(1).collect() === Array(Row("bat", 8.0))) } } } From 603ce0fb29bfa5b5c0cfea69fb72e2a3128e772a Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sat, 10 Mar 2018 16:42:28 +0000 Subject: [PATCH 4/5] address review comment --- .../org/apache/spark/sql/execution/WholeStageCodegenSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 91c3ae2513879..fac2ac914462e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -313,7 +313,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { test("SPARK-23598: Codegen working for lots of aggregation operations without runtime errors") { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { var df = Seq((8, "bat"), (15, "mouse"), (5, "horse")).toDF("age", "name") - for (i <- 0 until 73) { + for (i <- 0 until 70) { df = df.groupBy("name").agg(avg("age").alias("age")) } assert(df.limit(1).collect() === Array(Row("bat", 8.0))) From 8fb5df0f76a6773594bb7e695036f3fdf0063c6a Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 13 Mar 2018 19:23:47 +0100 Subject: [PATCH 5/5] address review comment --- .../org/apache/spark/sql/execution/BufferedRowIterator.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java index 597d441a24dc6..74c9c05992719 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java @@ -62,6 +62,10 @@ public long durationMs() { */ public abstract void init(int index, Iterator[] iters); + /* + * Attributes of the following four methods are public. Thus, they can be also accessed from + * methods in inner classes. See SPARK-23598 + */ /** * Append a row to currentRows. */