Skip to content

Commit 1dea574

Browse files
bersprocketsHyukjinKwon
authored andcommitted
[SPARK-39496][SQL] Handle null struct in Inline.eval
### What changes were proposed in this pull request? Change `Inline.eval` to return a row of null values rather than a null row in the case of a null input struct. ### Why are the changes needed? Consider the following query: ``` set spark.sql.codegen.wholeStage=false; select inline(array(named_struct('a', 1, 'b', 2), null)); ``` This query fails with a `NullPointerException`: ``` 22/06/16 15:10:06 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$11(GenerateExec.scala:122) ``` (In Spark 3.1.3, you don't need to set `spark.sql.codegen.wholeStage` to false to reproduce the error, since Spark 3.1.3 has no codegen path for `Inline`). This query fails regardless of the setting of `spark.sql.codegen.wholeStage`: ``` val dfWide = (Seq((1)) .toDF("col0") .selectExpr(Seq.tabulate(99)(x => s"$x as col${x + 1}"): _*)) val df = (dfWide .selectExpr("*", "array(named_struct('a', 1, 'b', 2), null) as struct_array")) df.selectExpr("*", "inline(struct_array)").collect ``` It fails with ``` 22/06/16 15:18:55 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1] java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:80) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_8$(Unknown Source) ``` When `Inline.eval` returns a null row in the collection, GenerateExec gets a NullPointerException either when joining the null row with required child output, or projecting the null row. This PR avoids producing the null row and produces a row of null values instead: ``` spark-sql> set spark.sql.codegen.wholeStage=false; spark.sql.codegen.wholeStage false Time taken: 3.095 seconds, Fetched 1 row(s) spark-sql> select inline(array(named_struct('a', 1, 'b', 2), null)); 1 2 NULL NULL Time taken: 1.214 seconds, Fetched 2 row(s) spark-sql> ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. Closes #36903 from bersprockets/inline_eval_null_struct_issue. Authored-by: Bruce Robbins <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit c4d5390) Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent ad90195 commit 1dea574

File tree

2 files changed

+18
-3
lines changed

2 files changed

+18
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -452,13 +452,17 @@ case class Inline(child: Expression) extends UnaryExpression with CollectionGene
452452

453453
private lazy val numFields = elementSchema.fields.length
454454

455+
private lazy val generatorNullRow = new GenericInternalRow(elementSchema.length)
456+
455457
override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
456458
val inputArray = child.eval(input).asInstanceOf[ArrayData]
457459
if (inputArray == null) {
458460
Nil
459461
} else {
460-
for (i <- 0 until inputArray.numElements())
461-
yield inputArray.getStruct(i, numFields)
462+
for (i <- 0 until inputArray.numElements()) yield {
463+
val s = inputArray.getStruct(i, numFields)
464+
if (s == null) generatorNullRow else s
465+
}
462466
}
463467
}
464468

sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
2323
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
2424
import org.apache.spark.sql.catalyst.trees.LeafLike
2525
import org.apache.spark.sql.functions._
26+
import org.apache.spark.sql.internal.SQLConf
2627
import org.apache.spark.sql.test.SharedSparkSession
2728
import org.apache.spark.sql.types.{IntegerType, StructType}
2829

@@ -389,7 +390,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
389390
}
390391
}
391392

392-
test("SPARK-39061: inline should handle null struct") {
393+
def testNullStruct(): Unit = {
393394
val df = sql(
394395
"""select * from values
395396
|(
@@ -413,6 +414,16 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
413414
sql("select a, inline(b) from t1"),
414415
Row(1, 0, 1) :: Row(1, null, null) :: Row(1, 2, 3) :: Row(1, null, null) :: Nil)
415416
}
417+
418+
test("SPARK-39061: inline should handle null struct") {
419+
testNullStruct
420+
}
421+
422+
test("SPARK-39496: inline eval path should handle null struct") {
423+
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
424+
testNullStruct
425+
}
426+
}
416427
}
417428

418429
case class EmptyGenerator() extends Generator with LeafLike[Expression] {

0 commit comments

Comments
 (0)