Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
CheckCartesianProducts),
Batch("RewriteSubquery", Once,
RewritePredicateSubquery,
NullPropagation,
PushPredicateThroughJoin,
LimitPushDown,
ColumnPruning,
Expand Down
21 changes: 19 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1236,13 +1236,13 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) {
// positive not in subquery case
var joinExec = assertJoin((
"select * from testData where key not in (select a from testData2)",
"select * from testData where key not in (select b from testData3)",
Copy link
Contributor Author

@peter-toth peter-toth Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testData2 columns are not nullable, but we need a nullable column to keep this test valid (assert(joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin)).

classOf[BroadcastHashJoinExec]))
assert(joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin)

// negative not in subquery case since multi-column is not supported
assertJoin((
"select * from testData where (key, key + 1) not in (select * from testData2)",
"select * from testData where (key, key + 1) not in (select b, b + 1 from testData3)",
classOf[BroadcastNestedLoopJoinExec]))

// positive hand-written left anti join
Expand Down Expand Up @@ -1271,6 +1271,23 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
}

test("SPARK-54972: Improve not in subqueries with non-nullable columns") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) {
// testData.key nullable false
// testData2.* nullable false

val joinExec = assertJoin((
"select * from testData where key not in (select a from testData2)",
classOf[BroadcastHashJoinExec]))
assert(!joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin)

val joinExec2 = assertJoin((
"select * from testData where (key, key + 1) not in (select * from testData2)",
classOf[BroadcastHashJoinExec]))
assert(!joinExec2.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin)
}
}

test("SPARK-32399: Full outer shuffled hash join") {
val inputDFs = Seq(
// Test unique join key
Expand Down