From bff90dd7419e225d50e45d497dc7e0b26beaa6d5 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sat, 18 Jun 2022 09:25:11 +0900 Subject: [PATCH 1/4] [SPARK-39496][SQL] Handle null struct in `Inline.eval` Change `Inline.eval` to return a row of null values rather than a null row in the case of a null input struct. Consider the following query: ``` set spark.sql.codegen.wholeStage=false; select inline(array(named_struct('a', 1, 'b', 2), null)); ``` This query fails with a `NullPointerException`: ``` 22/06/16 15:10:06 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$11(GenerateExec.scala:122) ``` (In Spark 3.1.3, you don't need to set `spark.sql.codegen.wholeStage` to false to reproduce the error, since Spark 3.1.3 has no codegen path for `Inline`). This query fails regardless of the setting of `spark.sql.codegen.wholeStage`: ``` val dfWide = (Seq((1)) .toDF("col0") .selectExpr(Seq.tabulate(99)(x => s"$x as col${x + 1}"): _*)) val df = (dfWide .selectExpr("*", "array(named_struct('a', 1, 'b', 2), null) as struct_array")) df.selectExpr("*", "inline(struct_array)").collect ``` It fails with ``` 22/06/16 15:18:55 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1] java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:80) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_8$(Unknown Source) ``` When `Inline.eval` returns a null row in the collection, GenerateExec gets a NullPointerException either when joining the null row with required child output, or projecting the null row. This PR avoids producing the null row and produces a row of null values instead: ``` spark-sql> set spark.sql.codegen.wholeStage=false; spark.sql.codegen.wholeStage false Time taken: 3.095 seconds, Fetched 1 row(s) spark-sql> select inline(array(named_struct('a', 1, 'b', 2), null)); 1 2 NULL NULL Time taken: 1.214 seconds, Fetched 2 row(s) spark-sql> ``` No. New unit test. Closes #36903 from bersprockets/inline_eval_null_struct_issue. Authored-by: Bruce Robbins Signed-off-by: Hyukjin Kwon --- .../sql/catalyst/expressions/generators.scala | 8 ++++-- .../spark/sql/GeneratorFunctionSuite.scala | 26 +++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index ad6e365f76fa..08aa2502bc02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -427,13 +427,17 @@ case class Inline(child: Expression) extends UnaryExpression with CollectionGene private lazy val numFields = elementSchema.fields.length + private lazy val generatorNullRow = new GenericInternalRow(elementSchema.length) + override def eval(input: InternalRow): TraversableOnce[InternalRow] = { val inputArray = child.eval(input).asInstanceOf[ArrayData] if (inputArray == null) { Nil } else { - for (i <- 0 until inputArray.numElements()) - yield inputArray.getStruct(i, numFields) + for (i <- 0 until inputArray.numElements()) yield { + val s = inputArray.getStruct(i, numFields) + if (s == null) generatorNullRow else s + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index 5ac238368c59..a9fd59cb93b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Generator} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StructType} @@ -363,6 +364,31 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { df.select(Stream(explode(array(min($"v"), max($"v"))), sum($"v")): _*), Row(1, 6) :: Row(3, 6) :: Nil) } + + test("SPARK-39496: inline eval path should handle null struct") { + val df = sql( + """select * from values + |( + | 1, + | array( + | named_struct('c1', 0, 'c2', 1), + | null, + | named_struct('c1', 2, 'c2', 3), + | null + | ) + |) + |as tbl(a, b) + """.stripMargin) + df.createOrReplaceTempView("t1") + + checkAnswer( + sql("select inline(b) from t1"), + Row(0, 1) :: Row(null, null) :: Row(2, 3) :: Row(null, null) :: Nil) + + checkAnswer( + sql("select a, inline(b) from t1"), + Row(1, 0, 1) :: Row(1, null, null) :: Row(1, 2, 3) :: Row(1, null, null) :: Nil) + } } case class EmptyGenerator() extends Generator { From 1d2a074828f7a8298c8698370c5f9d4bad7a9340 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sat, 18 Jun 2022 09:31:15 -0700 Subject: [PATCH 2/4] Some fix-ups for 3.1 --- .../org/apache/spark/sql/catalyst/expressions/generators.scala | 3 ++- .../scala/org/apache/spark/sql/GeneratorFunctionSuite.scala | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 08aa2502bc02..044b50be2db5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -420,7 +420,8 @@ case class Inline(child: Expression) extends UnaryExpression with CollectionGene } override def elementSchema: StructType = child.dataType match { - case ArrayType(st: StructType, _) => st + case ArrayType(st: StructType, false) => st + case ArrayType(st: StructType, true) => st.asNullable } override def collectionType: DataType = child.dataType diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index a9fd59cb93b1..1099497143df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Generator} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.functions._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StructType} From eaca65813b7d31ea570136948b65137f269a1799 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sat, 18 Jun 2022 13:26:18 -0700 Subject: [PATCH 3/4] Update test name --- .../scala/org/apache/spark/sql/GeneratorFunctionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index 1099497143df..024160b4447f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -364,7 +364,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { Row(1, 6) :: Row(3, 6) :: Nil) } - test("SPARK-39496: inline eval path should handle null struct") { + test("SPARK-39061/SPARK-39496: inline should handle null struct") { val df = sql( """select * from values |( From 0fb5a5f638125dd4e4e8a6a9ef2a0a8cb42008fc Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Tue, 21 Jun 2022 13:50:30 -0700 Subject: [PATCH 4/4] Fix up test name again --- .../scala/org/apache/spark/sql/GeneratorFunctionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index 024160b4447f..619e2fc8761f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -364,7 +364,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { Row(1, 6) :: Row(3, 6) :: Nil) } - test("SPARK-39061/SPARK-39496: inline should handle null struct") { + test("SPARK-39496: inline should handle null struct") { val df = sql( """select * from values |(