Skip to content
Closed
1 change: 1 addition & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see

## Upgrading From Spark SQL 2.3 to 2.4

- In version 2.3 and earlier, the IN operator returns `false` when comparing structs with null fields; since 2.4, by default Spark returns `null` in this scenario in compliance to other RDBMS behavior (therefore NOT IN filters out the rows). The previous behavior can be restored switching `spark.sql.legacy.inOperator.falseForNullField` to `true`.
- In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.
<table class="table">
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import scala.collection.immutable.TreeSet
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral, GenerateSafeProjection, GenerateUnsafeProjection, Predicate => BasePredicate}
import org.apache.spark.sql.catalyst.expressions.codegen.Block
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -202,7 +204,11 @@ case class InSubquery(values: Seq[Expression], query: ListQuery)
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals to any valN.",
usage = """
expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals to any valN. Otherwise, if
spark.sql.legacy.inOperator.falseForNullField is false and any of the elements or fields of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any of the elements or fields ...

We should explicitly mention multi-column IN, which is different from a in (b, c, ...) while a is struct type.

the elements is null it returns null, else it returns false.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vaguely remember that multi-line string doesn't work with ExpressionDescription. Can you verify it with DESCRIBE FUNCTION?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the point here is that only a string literal works, so it doesn't work concat and/or interpolation. This just puts the string on different lines, ie. the output is:

scala> sql("DESCRIBE FUNCTION IN").show(false)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|function_desc                                                                                                                                                                                                                                                             |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Function: in                                                                                                                                                                                                                                                              |
|Class: org.apache.spark.sql.catalyst.expressions.In                                                                                                                                                                                                                       |
|Usage: 
    expr1 in(expr2, expr3, ...) - Returns true if `expr` equals to any valN. Otherwise, if
      spark.sql.legacy.inOperator.falseForNullField is false and any of the elements or fields of
      the elements is null it returns null, else it returns false.
  |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should use

"""
  |xxx
""".stripMargin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that doesn't work. We cannot do stripMargin. We can just put a string.

""",
arguments = """
Arguments:
* expr1, expr2, expr3, ... - the arguments must be same type.
Expand Down Expand Up @@ -238,20 +244,25 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate {
lazy val inSetConvertible = list.forall(_.isInstanceOf[Literal])
private lazy val ordering = TypeUtils.getInterpretedOrdering(value.dataType)

override def nullable: Boolean = children.exists(_.nullable)
override def nullable: Boolean = value.dataType match {
case _: StructType if !SQLConf.get.inFalseForNullField =>
children.exists(_.nullable) ||
children.exists(_.dataType.asInstanceOf[StructType].exists(_.nullable))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 . Ur, this asInstanceOf[StructType] looks unsafe for non-nullable primitive type. Could you add an additional test case for the nested StructType to provide the coverage for this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you think it is unsafe? We are doing it only if the dataType of value match StructType and according also to what is enforced in the checkInputDataTypes, therefore all the children must be StructTypes.

I will add anyway a test case for the scenario you described, thanks.

case _ => children.exists(_.nullable)
}
override def foldable: Boolean = children.forall(_.foldable)

override def toString: String = s"$value IN ${list.mkString("(", ",", ")")}"

override def eval(input: InternalRow): Any = {
val evaluatedValue = value.eval(input)
if (evaluatedValue == null) {
if (checkNullEval(evaluatedValue)) {
null
} else {
var hasNull = false
list.foreach { e =>
val v = e.eval(input)
if (v == null) {
if (checkNullEval(v)) {
hasNull = true
} else if (ordering.equiv(v, evaluatedValue)) {
return true
Expand All @@ -265,6 +276,18 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate {
}
}

@transient lazy val checkNullGenCode: (ExprCode) => Block = value.dataType match {
case _: StructType if !SQLConf.get.inFalseForNullField =>
e => code"${e.isNull} || ${e.value}.anyNull()"
case _ => e => code"${e.isNull}"
}

@transient lazy val checkNullEval: (Any) => Boolean = value.dataType match {
case _: StructType if !SQLConf.get.inFalseForNullField =>
input => input == null || input.asInstanceOf[InternalRow].anyNull
case _ => input => input == null
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val javaDataType = CodeGenerator.javaType(value.dataType)
val valueGen = value.genCode(ctx)
Expand All @@ -283,7 +306,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate {
val listCode = listGen.map(x =>
s"""
|${x.code}
|if (${x.isNull}) {
|if (${checkNullGenCode(x)}) {
| $tmpResult = $HAS_NULL; // ${ev.isNull} = true;
|} else if (${ctx.genEqual(value.dataType, valueArg, x.value)}) {
| $tmpResult = $MATCHED; // ${ev.isNull} = false; ${ev.value} = true;
Expand Down Expand Up @@ -316,7 +339,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate {
code"""
|${valueGen.code}
|byte $tmpResult = $HAS_NULL;
|if (!${valueGen.isNull}) {
|if (!(${checkNullGenCode(valueGen)})) {
| $tmpResult = $NOT_MATCHED;
| $javaDataType $valueArg = ${valueGen.value};
| do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1561,6 +1561,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_IN_FALSE_FOR_NULL_FIELD =
buildConf("spark.sql.legacy.inOperator.falseForNullField")
.internal()
.doc("When set to true, the IN operator returns false when comparing literal structs " +
"containing a null field. When set to false (default), it returns null, instead. This is " +
"important especially when using NOT IN as in the second case, it filters out the rows " +
"when a null is present in a filed; while in the first one, those rows are returned.")
.booleanConf
.createWithDefault(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we set false as default to follow SQL standard? and be consistent with in-subquery

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I agree, let me switch it, thanks.


val LEGACY_INTEGRALDIVIDE_RETURN_LONG = buildConf("spark.sql.legacy.integralDivide.returnBigint")
.doc("If it is set to true, the div operator returns always a bigint. This behavior was " +
"inherited from Hive. Otherwise, the return type is the data type of the operands.")
Expand Down Expand Up @@ -1978,6 +1988,8 @@ class SQLConf extends Serializable with Logging {

def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED)

def inFalseForNullField: Boolean = getConf(SQLConf.LEGACY_IN_FALSE_FOR_NULL_FIELD)

def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG)

/** ********************** SQLConf functionality methods ************ */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import java.sql.{Date, Timestamp}
import scala.collection.immutable.HashSet

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.RandomDataGenerator
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExamplePointUDT
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -155,7 +156,16 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper {
}

test("IN with different types") {
def testWithRandomDataGeneration(dataType: DataType, nullable: Boolean): Unit = {
def testWithRandomDataGeneration(dataType: DataType,
nullable: Boolean,
legacyNullHandling: Boolean = false): Unit = {
def isNull(e: Any): Boolean = {
if (!legacyNullHandling && dataType.isInstanceOf[StructType]) {
e == null || e.asInstanceOf[Row].anyNull
} else {
e == null
}
}
val maybeDataGen = RandomDataGenerator.forType(dataType, nullable = nullable)
// Actually we won't pass in unsupported data types, this is a safety check.
val dataGen = maybeDataGen.getOrElse(
Expand All @@ -178,11 +188,11 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}
val input = inputData.map(NonFoldableLiteral.create(_, dataType))
val expected = if (inputData(0) == null) {
val expected = if (isNull(inputData.head)) {
null
} else if (inputData.slice(1, 10).contains(inputData(0))) {
true
} else if (inputData.slice(1, 10).contains(null)) {
} else if (inputData.slice(1, 10).exists(isNull)) {
null
} else {
false
Expand Down Expand Up @@ -212,13 +222,22 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper {
}

// Struct types:
val atomicAndComplexTypes = atomicTypes ++ atomicTypes.map { t =>
StructType(StructField("f1", t) :: StructField("f2", t) :: Nil)
}
for (
colOneType <- atomicTypes;
colTwoType <- atomicTypes;
colOneType <- atomicAndComplexTypes;
colTwoType <- atomicAndComplexTypes;
nullable <- Seq(true, false)) {
val structType = StructType(
StructField("a", colOneType) :: StructField("b", colTwoType) :: Nil)
testWithRandomDataGeneration(structType, nullable)
if (nullable) {
Seq("true", "false").foreach { legacyNullHandling =>
withSQLConf((SQLConf.LEGACY_IN_FALSE_FOR_NULL_FIELD.key, legacyNullHandling)) {
testWithRandomDataGeneration(structType, nullable, legacyNullHandling.toBoolean)
}
}
}
}

// Map types: not supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,25 @@ CREATE TEMPORARY VIEW m AS SELECT * FROM VALUES
-- Case 1 (not possible to write a literal with no rows, so we ignore it.)
-- (subquery is empty -> row is returned)

-- Cases 2, 3 and 4 are currently broken, so I have commented them out here.
-- Filed https://issues.apache.org/jira/browse/SPARK-24395 to fix and restore these test cases.
-- Case 2
-- (subquery contains a row with null in all columns -> row not returned)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, do we need the two spaces at the beginning? Never mind. I noticed that it was a convention here.

SELECT *
FROM m
WHERE (a, b) NOT IN ((CAST (null AS INT), CAST (null AS DECIMAL(2, 1))));

-- Case 3
-- (probe-side columns are all null -> row not returned)
SELECT *
FROM m
WHERE a IS NULL AND b IS NULL -- Matches only (null, null)
AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1))));

-- Case 4
-- (one column null, other column matches a row in the subquery result -> row not returned)
SELECT *
FROM m
WHERE b = 1.0 -- Matches (null, 1.0)
AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1))));

-- Case 5
-- (one null column with no match -> row is returned)
Expand All @@ -37,3 +54,26 @@ SELECT *
FROM m
WHERE b = 5.0 -- Matches (4, 5.0)
AND (a, b) NOT IN ((2, 3.0));


set spark.sql.legacy.inOperator.falseForNullField=true;

-- Case 2 (old behavior)
-- (subquery contains a row with null in all columns -> rows returned)
SELECT *
FROM m
WHERE (a, b) NOT IN ((CAST (null AS INT), CAST (null AS DECIMAL(2, 1))));

-- Case 3 (old behavior)
-- (probe-side columns are all null -> row returned)
SELECT *
FROM m
WHERE a IS NULL AND b IS NULL -- Matches only (null, null)
AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1))));

-- Case 4 (old behavior)
-- (one column null, other column matches a row in the subquery result -> row returned)
SELECT *
FROM m
WHERE b = 1.0 -- Matches (null, 1.0)
AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1))));
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 4
-- Number of queries: 11


-- !query 0
Expand All @@ -16,39 +16,125 @@ struct<>


-- !query 1
-- Case 2
-- (subquery contains a row with null in all columns -> row not returned)
SELECT *
FROM m
WHERE (a, b) NOT IN ((CAST (null AS INT), CAST (null AS DECIMAL(2, 1))))
-- !query 1 schema
struct<a:int,b:decimal(2,1)>
-- !query 1 output



-- !query 2
-- Case 3
-- (probe-side columns are all null -> row not returned)
SELECT *
FROM m
WHERE a IS NULL AND b IS NULL -- Matches only (null, null)
AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1))))
-- !query 2 schema
struct<a:int,b:decimal(2,1)>
-- !query 2 output



-- !query 3
-- Case 4
-- (one column null, other column matches a row in the subquery result -> row not returned)
SELECT *
FROM m
WHERE b = 1.0 -- Matches (null, 1.0)
AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1))))
-- !query 3 schema
struct<a:int,b:decimal(2,1)>
-- !query 3 output



-- !query 4
-- Case 5
-- (one null column with no match -> row is returned)
SELECT *
FROM m
WHERE b = 1.0 -- Matches (null, 1.0)
AND (a, b) NOT IN ((2, 3.0))
-- !query 1 schema
-- !query 4 schema
struct<a:int,b:decimal(2,1)>
-- !query 1 output
NULL 1
-- !query 4 output


-- !query 2

-- !query 5
-- Case 6
-- (no null columns with match -> row not returned)
SELECT *
FROM m
WHERE b = 3.0 -- Matches (2, 3.0)
AND (a, b) NOT IN ((2, 3.0))
-- !query 2 schema
-- !query 5 schema
struct<a:int,b:decimal(2,1)>
-- !query 2 output
-- !query 5 output



-- !query 3
-- !query 6
-- Case 7
-- (no null columns with no match -> row is returned)
SELECT *
FROM m
WHERE b = 5.0 -- Matches (4, 5.0)
AND (a, b) NOT IN ((2, 3.0))
-- !query 3 schema
-- !query 6 schema
struct<a:int,b:decimal(2,1)>
-- !query 3 output
-- !query 6 output
4 5


-- !query 7
set spark.sql.legacy.inOperator.falseForNullField=true
-- !query 7 schema
struct<key:string,value:string>
-- !query 7 output
spark.sql.legacy.inOperator.falseForNullField true


-- !query 8
-- Case 2 (old behavior)
-- (subquery contains a row with null in all columns -> rows returned)
SELECT *
FROM m
WHERE (a, b) NOT IN ((CAST (null AS INT), CAST (null AS DECIMAL(2, 1))))
-- !query 8 schema
struct<a:int,b:decimal(2,1)>
-- !query 8 output
2 3
4 5
NULL 1


-- !query 9
-- Case 3 (old behavior)
-- (probe-side columns are all null -> row returned)
SELECT *
FROM m
WHERE a IS NULL AND b IS NULL -- Matches only (null, null)
AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1))))
-- !query 9 schema
struct<a:int,b:decimal(2,1)>
-- !query 9 output
NULL NULL


-- !query 10
-- Case 4 (old behavior)
-- (one column null, other column matches a row in the subquery result -> row returned)
SELECT *
FROM m
WHERE b = 1.0 -- Matches (null, 1.0)
AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1))))
-- !query 10 schema
struct<a:int,b:decimal(2,1)>
-- !query 10 output
NULL 1