Skip to content

Conversation

@leanken-zz
Copy link
Contributor

@leanken-zz leanken-zz commented Jul 14, 2020

What changes were proposed in this pull request?

Normally, a Null aware anti join will be planed into BroadcastNestedLoopJoin which is very time consuming, for instance, in TPCH Query 16.

select
    p_brand,
    p_type,
    p_size,
    count(distinct ps_suppkey) as supplier_cnt
from
    partsupp,
    part
where
    p_partkey = ps_partkey
    and p_brand <> 'Brand#45'
    and p_type not like 'MEDIUM POLISHED%'
    and p_size in (49, 14, 23, 45, 19, 3, 36, 9)
    and ps_suppkey not in (
        select
            s_suppkey
        from
            supplier
        where
            s_comment like '%Customer%Complaints%'
    )
group by
    p_brand,
    p_type,
    p_size
order by
    supplier_cnt desc,
    p_brand,
    p_type,
    p_size

In above query, will planed into

LeftAnti
condition Or((ps_suppkey=s_suppkey), IsNull(ps_suppkey=s_suppkey))

Inside BroadcastNestedLoopJoinExec will perform O(M*N), BUT if there is only single column in NAAJ, we can always change buildSide into a HashSet, and streamedSide just need to lookup in the HashSet, then the calculation will be optimized into O(M).

But this optimize is only targeting on null aware anti join with single column case, because multi-column support is much more complicated, we might be able to support multi-column in future.
After apply this patch, the TPCH Query 16 performance decrease from 41mins to 30s

The semantic of null-aware anti join is:

image

Why are the changes needed?

TPCH is a common benchmark for distributed compute engine, all other 21 Query works fine on Spark, except for Query 16, apply this patch will make Spark more competitive among all these popular engine. BTW, this patch has restricted rules and only apply on NAAJ Single Column case, which is safe enough.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  1. SQLQueryTestSuite with NOT IN keyword SQL, add CONFIG_DIM with spark.sql.optimizeNullAwareAntiJoin on and off
  2. added case in org.apache.spark.sql.JoinSuite.
  3. added case in org.apache.spark.sql.SubquerySuite.
  4. Compare performance before and after applying this patch against TPCH Query 16.
  5. config combination against e2e test with following
Map(
  "spark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "false",
  "spark.sql.codegen.wholeStage" -> "false"
),
Map(
  "sspark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "false",
  "spark.sql.codegen.wholeStage" -> "true"
),
Map(
  "spark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "true",
  "spark.sql.codegen.wholeStage" -> "false"
),
Map(
  "spark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "true",
  "spark.sql.codegen.wholeStage" -> "true"
)

@leanken-zz
Copy link
Contributor Author

@cloud-fan
Could you please have a quick look at this issue, many thanks !!

notInSubquerySingleColumnOptimizeStreamedKeyIndex,
notInSubquerySingleColumnOptimizeStreamedKey.dataType
)
val notInKeyEqual = params.buildSideHashSet.contains(streamedRowNotInKey)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reuse [Unsafe|Long]HashedRelation here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done changing into HashedRelation, nice advise.

Comment on lines 254 to 261
if leftAttr.semanticEquals(tmpLeft) && rightAttr.semanticEquals(tmpRight) =>
notInSubquerySingleColumnOptimizeSetStreamedKey(leftAttr, rightAttr)
if (notInSubquerySingleColumnOptimizeStreamedKeyIndex != -1) {
true
} else {
logWarning(s"failed to find notInSubquerySingleColumnOptimizeStreamedKeyIndex," +
s" fallback to leftExistenceJoin.")
false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code block is the same with the line244-251? If so, could you merge them? How about defining an extractor object for the case?

Copy link
Contributor Author

@leanken-zz leanken-zz Jul 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I check on the source code on subquery.scala, found that
Or(EqualTo(a, b), IsNull(EqualTo(a, b))) will be the only option, there is no need to handle two Or pattern. so i remove the duplicate code.

# See. org/apache/spark/sql/catalyst/optimizer/subquery.scala
val inConditions = values.zip(sub.output).map(EqualTo.tupled)
val nullAwareJoinConds = inConditions.map(c => Or(c, IsNull(c)))

// or(a=b,isnull(a=b))
// or(isnull(a=b),a=b)
condition.get match {
case _@Or(_@EqualTo(leftAttr: AttributeReference, rightAttr: AttributeReference),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, could you follow the format in the other code? For example, we need to a space between @ and EqualTo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@maropu
Copy link
Member

maropu commented Jul 15, 2020

Could you update TPCDSQueryBenchmark-results.txt, too? Can the number of q16 get better?

q16 1658 1707 69 0.0 Infinity 1.0X

@maropu
Copy link
Member

maropu commented Jul 15, 2020

ok to test

@maropu
Copy link
Member

maropu commented Jul 15, 2020

oh, btw, thanks for the first contribution, @leanken .

@leanken-zz
Copy link
Contributor Author

oh, btw, thanks for the first contribution, @leanken .

Will reply your comments ASAP, many thanks.

@SparkQA
Copy link

SparkQA commented Jul 15, 2020

Test build #125871 has finished for PR 29104 at commit 042ca4a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class NotInSubquerySingleColumnOptimizeParams(

@dilipbiswal
Copy link
Contributor

retest this please

@leanken-zz
Copy link
Contributor Author

Could you update TPCDSQueryBenchmark-results.txt, too? Can the number of q16 get better?

q16 1658 1707 69 0.0 Infinity 1.0X

I am afraid that TPCDS sqls does not have NotInSubquery case, TPCDS sqls using Not Exists instead of Not In. What i ran before is TPCH Query 16. But i am more than happy to just write TPHC benchmark code and do benchmark after this issue closed, if needed, ^_^

@maropu
Copy link
Member

maropu commented Jul 15, 2020

Ah, I see and I missed that. You said not TPCDS but TPCH, right.

@leanken-zz
Copy link
Contributor Author

image

image

the Origin 800,000,000 * 4,898 times calculation could easily cost 50~60 mins, after apply this patch, it takes only 9s.

@SparkQA
Copy link

SparkQA commented Jul 15, 2020

Test build #125884 has finished for PR 29104 at commit 40d7174.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@leanken-zz
Copy link
Contributor Author

@maropu Any further comments?

@maropu
Copy link
Member

maropu commented Jul 16, 2020

But this optimize is only targeting on NotInSubquery with single column case.

One question; why did you apply this optimization only in the case? The optimization itself looks more general though.

@leanken-zz
Copy link
Contributor Author

But this optimize is only targeting on NotInSubquery with single column case.

One question; why did you apply this optimization only in the case? The optimization itself looks more general though.

you mean extend it to support multi columns?

@maropu
Copy link
Member

maropu commented Jul 16, 2020

yea, we cannnot handle the case?

@leanken-zz
Copy link
Contributor Author

-- Test cases for multi-column ``WHERE a NOT IN (SELECT c FROM r ...)'':
-- | # | does subquery include null? | do filter columns contain null? | a = c? | b = d? | row included in result? |
-- | 1 | empty | * | * | * | yes |
-- | 2 | 1+ row has null for all columns | * | * | * | no |
-- | 3 | no row has null for all columns | (yes, yes) | * | * | no |
-- | 4 | no row has null for all columns | (no, yes) | yes | * | no |
-- | 5 | no row has null for all columns | (no, yes) | no | * | yes |
-- | 6 | no | (no, no) | yes | yes | no |
-- | 7 | no | (no, no) | _ | _ | yes

multi column Not(IsNull) is much more complicated. i am afraid that the lookup code and if-else logic will be un-readable.

@leanken-zz
Copy link
Contributor Author

let me take some time to find out common pattern among single and multi column support.

@maropu
Copy link
Member

maropu commented Jul 16, 2020

hm, it might be okay to support the limited optimization as a first step if it has a huge impact on the performance of common caes. But, I think the method (& parameter) names should be more general and we need to leave to TODO for future work.

@leanken-zz
Copy link
Contributor Author

For example.
-- Case 4
-- (one column null, other column matches a row in the subquery result -> row not returned)
SELECT *
FROM m
WHERE b = 1.0 -- Matches (null, 1.0)
AND (a, b) NOT IN (SELECT *
FROM s
WHERE c IS NOT NULL) -- Matches (0, 1.0), (2, 3.0), (4, null)
;

in this case, i can't not use InternalRow(null, 1.0) to lookup in HashedRelation. I need to exclude all null column, and try found match within the not null column, which i think HashedRelation is not a suitable structure for multi-column support. But if change into multi column and need to deal with null column, which means i can't use Hash to lookup, so it will still be a M*N, that's no gona help.

@leanken-zz
Copy link
Contributor Author

let's say in streamedSide there is a record
(null, 1, null)
and buildSide is
(1, 2, 3)
(1, 1, 3)
(null, 1, 3)

if i need to confirm a Not In, i need to extract second column values, and build HashSet; what if next time streamedSide is a
(1, null, null)

in simple words, i could not rebuild a "CUBE-LIKE" HashSet for all column combinations; or I can just rollback to just compare two records in BuildSide row by row, which is still M*N.

So I think multiple column is not suitable for these Hash Optimize because its null safe complexity

@leanken-zz
Copy link
Contributor Author

hm, it might be okay to support the limited optimization as a first step if it has a huge impact on the performance of common caes. But, I think the method (& parameter) names should be more general and we need to leave to TODO for future work.

Myself also thinks of these methodName and paramsName being too long, do you have better suggestion for me, that will be great help.

@leanken-zz
Copy link
Contributor Author

For example.
-- Case 4
-- (one column null, other column matches a row in the subquery result -> row not returned)
SELECT *
FROM m
WHERE b = 1.0 -- Matches (null, 1.0)
AND (a, b) NOT IN (SELECT *
FROM s
WHERE c IS NOT NULL) -- Matches (0, 1.0), (2, 3.0), (4, null)
;

in this case, i can't not use InternalRow(null, 1.0) to lookup in HashedRelation. I need to exclude all null column, and try found match within the not null column, which i think HashedRelation is not a suitable structure for multi-column support. But if change into multi column and need to deal with null column, which means i can't use Hash to lookup, so it will still be a M*N, that's no gona help.

ping @maropu on the multi column support conclusion.

@agrawaldevesh
Copy link

I don't see a unit test in this PR. Can you please add one. Thanks.

Copy link

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty neat and it would make Spark look pretty cool on TPCH. Thanks for taking this up.

But Please add some Unit Tests ! Another way to test this "Exhaustively" would be to have a config to force this optimization and then run it through the existing Not in test suites, which already do a fairly good job. But I think you would have to copy that test perhaps to make sure it runs fully with this "forced config" enabled.

As for the general design of this optimization, I feel a bit uncomfortable doing this check at "Runtime". An alternative design would be to do this check somehow at the optimizer / compile time and then set a flag in the regular BroadcastHashJoin that it should now be null aware. I am wondering if you considered that strategy ? It might be a better UX for the user: The explain plan and spark UI would be more faithful.

On that note ? Is it worth somehow communicating to the user that their BroadcastNLJ was "accelerated" using this approach ? Do you want to up-level that as a metric etc such that it can show up in the Spark UI. Not sure if it is worth the plumbing.

Thanks.

// BuildSide must be single column, condition must be the following pattern
// Or(EqualTo(a, b), IsNull(EqualTo(a, b)))
condition.get match {
case _ @ Or(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can write this more simply as:

Or(EqualTo(leftAttr, rightAttr), IsNull(tmpLeft, tmpRight)) if ...

I believe you don't need the dashes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

AttributeSeq(left.output))
)
streamedIter.filter(row => {
// See. not-in-unit-tests-single-column.sql for detail filter rules

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets not refer to test code for describing production code :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

buildConf("spark.sql.notInSubquery.singleColumn.optimize.enabled")
.internal()
.doc("When true, single column not in subquery execution in BroadcastNestedLoopJoinExec " +
"will be optimized from M*N calculation into M*log(N) calculation using HashMap lookup " +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N00b/dumb question: Why M*log(N) instead of M * 1 ? Shouldn't HT probe lookup be O(1) and O(log N).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

.booleanConf
.createWithDefault(false)

val NOT_IN_SUBQUERY_SINGLE_COLUMN_OPTIMIZE_ROW_COUNT_THRESHOLD =

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should be the relationship of this config vs spark.sql.autoBroadcastJoinThreshold ? Shouldn't the threshold be based on the size of the build size in bytes vs num rows ? I think it is confusing to have two configs for the similar sort of information: How big can the table be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done remove this config and use spark.sql.autoBroadcastJoinThreshold

isBuildRowsEmpty: Boolean)

private def notInSubquerySingleColumnOptimizeEnabled: Boolean = {
if (SQLConf.get.notInSubquerySingleColumnOptimizeEnabled && right.output.length == 1) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dumb question: Should left.output.length be checked as well ?

I believe not everyone would know of the nuances with multi-column Null Aware anti join (see http://www.vldb.org/pvldb/vol2/vldb09-423.pdf section 6.1 and 6.2), so it would be nice to mention atleast that multi-column is not being handled because it is insanely complicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left.output.length could be more than 1.

)
streamedIter.filter(row => {
// See. not-in-unit-tests-single-column.sql for detail filter rules
if (params.isBuildRowsEmpty) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can pull this check out .. No point in going through a "filter" that is unconditionally true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

val lookupRow: UnsafeRow = keyGenerator(row)
val notInKeyEqual = params.buildSideHashedRelation.get(lookupRow) match {
case null => false
case _ => true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be simplified to params.buildSideHashedRelation.get(lookupRow) != null ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

BindReferences.bindReferences[Expression](
Seq(right.output.head), AttributeSeq(right.output)),
buildRows.length),
buildRows.exists(row => row.isNullAt(0)),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dumb question: row is guaranteed to have only a single column, right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. it is.

case _ => true
}

if (!streamedRowIsNull && !params.isNullExists && !notInKeyEqual) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for isNullExists can also be pulled out: If isNullExists, then we will unconditionally return nothing.

You have already paid the one time cost of doing a scan on the build size to check if any nulls exist (when preparing the params), you might as well exploit that check to gain some speed here :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

.notInSubquerySingleColumnOptimizeRowCountThreshold}, fallback to leftExistenceJoin.")
leftExistenceJoin(relation, false)
} else {
val params = notInSubquerySingleColumnOptimizeBuildParams(buildRows)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Null aware single column hash join is subtle and could use some comments or perhaps a reference. Perhaps you could link to section 6.1 of that paper above ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. with TODO and comment left.

@leanken-zz
Copy link
Contributor Author

@agrawaldevesh thanks for your feedback, I will first consider your suggestion about doing it in optimizer. it might take some time.

@leanken-zz
Copy link
Contributor Author

Hi. @agrawaldevesh
I am afraid that putting the optimize into BroadcastHashJoinExec is not that easy.
right now, I've got
BroadcastNestedLoopJoinExec(LeftAnti with condition Or(EqualTo(a=b), IsNull(EqualTo(a=b))))

if i want to translate into BroadcastHashJoinExec, first of all i need a join key, right?
BroadcastHashJoinExec(LeftAnti joinKey(a=b), with condition)
But the EquiJoinKeys itself already break the integrity of the origin condition Or(EqualTo(a=b), IsNull(EqualTo(a=b))

Let's see what codegenAnti is like:

s"""
         |boolean $found = false;
         |// generate join key for stream side
         |${keyEv.code}
         |// Check if the key has nulls.
         |if (!($anyNull)) {
         |  // Check if the HashedRelation exists.
         |  UnsafeRow $matched = (UnsafeRow)$relationTerm.getValue(${keyEv.value});
         |  if ($matched != null) {
         |    // Evaluate the condition.
         |    $checkCondition {
         |      $found = true;
         |    }
         |  }
         |}
         |if (!$found) {
         |  $numOutput.add(1);
         |  ${consume(ctx, input)}
         |}
       """.stripMargin

antiJoin with Key will keep streamedSideRow if streamedSide key is a null, but it's totally opposite in NotInSubquery. I can certainly do some if-else check here, but it might mess up the whole BroadcastHashJoinExec Code.

Besides the streamedSide key null difference, need to go through the entire buildSide to see if there is a null key exists, that's also kind of weird.

BroadcastHashJoinExec assume that it has join key, but if i apply my NotInSubquery check here, it would like, hey, I found two key should be joined, but wait a minute, there are a tiny corner case here, so back off.

if it's up to me to choose, i won't choose to break integrity of BroadcastHashJoinExec, i would rather count NotInSubquerySingleColumn as an runtime optimize.

So, I am polling out the relative information for you guys, seeking advice till I move forward to next step.

Choose A.
Count NotInSubquerySingleColumn as runtime optimize

Choose B.
Move code into BroadcastHashJoinExec but Codegen looks tricky.

looking for your reply, many many thanks.

sub commit

1. change spark.sql.nullAwareAntiJoin.optimize.enabled => spark.sql.optimizeNullAwareAntiJoin
2. add assertion for isNullAware
3. update CONFIG_DIM with spark.sql.optimizeNullAwareAntiJoin
4. code style refined.

Change-Id: I871fe95664e233908bb39b63444e73c4a24126c0
typo.

Change-Id: Id5db52227468cb4b22bad1923eab85bd6ce6fb5d
Change-Id: Icbf28bdbee90de6b09172ab4c495383002f340a4
sub commit

1. change EmptyHashedRelation and EmptyHashedRelationWithAllNullKeys to singleton object
2. change default implementation of NullAwareHashedRelation to throw UnsupportedOperationException

Change-Id: I173ce102bb704677699b89daa1c9906f748c94aa
@leanken-zz leanken-zz force-pushed the leanken-SPARK-32290 branch from 395823d to 233eff6 Compare July 27, 2020 22:06
Comment on lines +3 to +5
--CONFIG_DIM1 spark.sql.optimizeNullAwareAntiJoin=true
--CONFIG_DIM1 spark.sql.optimizeNullAwareAntiJoin=false

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding these. It gives us more confidence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks to @cloud-fan I know of this better way to do e2e case coverage when adding a new feature.

@SparkQA
Copy link

SparkQA commented Jul 28, 2020

Test build #126672 has finished for PR 29104 at commit 233eff6.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

github action passes, I'm merging it to master, thanks for your great work!

@cloud-fan cloud-fan closed this in 12b9787 Jul 28, 2020
@leanken-zz leanken-zz deleted the leanken-SPARK-32290 branch July 28, 2020 08:38
.checkValue(_ >= 0, "The value must be non-negative.")
.createWithDefault(8)

val OPTIMIZE_NULL_AWARE_ANTI_JOIN =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You forgot to add version(). Here is the follow up PR #29335

Comment on lines +962 to +963
object EmptyHashedRelationWithAllNullKeys extends NullAwareHashedRelation {
override def asReadOnlyCopy(): EmptyHashedRelationWithAllNullKeys.type = this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This object name really confuses. EmptyHashedRelation is from empty input, and EmptyHashedRelationWithAllNullKeys is from non-empty input.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed, but I can't come out with better naming, could you please help with the naming, and i will create a new PR to do code refine, since this PR is closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably just remove Empty to make it HashedRelationWithAllNullKeys?

Comment on lines +506 to +522
return s"""
|boolean $found = false;
|// generate join key for stream side
|${keyEv.code}
|if ($anyNull) {
| $found = true;
|} else {
| UnsafeRow $matched = (UnsafeRow)$relationTerm.getValue(${keyEv.value});
| if ($matched != null) {
| $found = true;
| }
|}
|
|if (!$found) {
| $numOutput.add(1);
| ${consume(ctx, input)}
|}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we can get rid of found variable and move this two lines to above if/else. found looks not correct in its semantics too. anyNull is true, doesn't mean we found matched row.

Copy link
Contributor Author

@leanken-zz leanken-zz Aug 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

s"""
           |// generate join key for stream side
           |${keyEv.code}
           |if (!$anyNull && $relationTerm.getValue(${keyEv.value}) == null) {
           |  $numOutput.add(1);
           |  ${consume(ctx, input)}
           |}
         """.stripMargin

maybe I could update these code as well with the new HashedRelation Name in next PR.

cloud-fan pushed a commit that referenced this pull request Jun 17, 2021
…join

### What changes were proposed in this pull request?

NULL-aware ANTI join (https://issues.apache.org/jira/browse/SPARK-32290) detects NULL join keys during building the map for `HashedRelation`, and will immediately return `HashedRelationWithAllNullKeys` without taking care of the map built already. Before returning `HashedRelationWithAllNullKeys`, the map needs to be freed properly to save memory and keep memory accounting correctly.

### Why are the changes needed?

Save memory and keep memory accounting correctly for the join query.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests introduced in #29104 .

Closes #32939 from c21/free-null-aware.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Jun 17, 2021
…join

### What changes were proposed in this pull request?

NULL-aware ANTI join (https://issues.apache.org/jira/browse/SPARK-32290) detects NULL join keys during building the map for `HashedRelation`, and will immediately return `HashedRelationWithAllNullKeys` without taking care of the map built already. Before returning `HashedRelationWithAllNullKeys`, the map needs to be freed properly to save memory and keep memory accounting correctly.

### Why are the changes needed?

Save memory and keep memory accounting correctly for the join query.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests introduced in #29104 .

Closes #32939 from c21/free-null-aware.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e0d81d9)
Signed-off-by: Wenchen Fan <[email protected]>
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
…join

### What changes were proposed in this pull request?

NULL-aware ANTI join (https://issues.apache.org/jira/browse/SPARK-32290) detects NULL join keys during building the map for `HashedRelation`, and will immediately return `HashedRelationWithAllNullKeys` without taking care of the map built already. Before returning `HashedRelationWithAllNullKeys`, the map needs to be freed properly to save memory and keep memory accounting correctly.

### Why are the changes needed?

Save memory and keep memory accounting correctly for the join query.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests introduced in apache#29104 .

Closes apache#32939 from c21/free-null-aware.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e0d81d9)
Signed-off-by: Wenchen Fan <[email protected]>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
…join

### What changes were proposed in this pull request?

NULL-aware ANTI join (https://issues.apache.org/jira/browse/SPARK-32290) detects NULL join keys during building the map for `HashedRelation`, and will immediately return `HashedRelationWithAllNullKeys` without taking care of the map built already. Before returning `HashedRelationWithAllNullKeys`, the map needs to be freed properly to save memory and keep memory accounting correctly.

### Why are the changes needed?

Save memory and keep memory accounting correctly for the join query.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests introduced in apache#29104 .

Closes apache#32939 from c21/free-null-aware.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e0d81d9)
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Nov 28, 2022
…degen is disabled

### What changes were proposed in this pull request?
BHJ LeftAnti does not update numOutputRows when codegen is disabled

### Why are the changes needed?
PR #29104 Only update numOutputRows when codegen is enabled, but there is no numOutputRows when codegen is disabled, and numOutputRows is equal to 0.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add UT

Closes #38489 from cxzl25/SPARK-41003.

Authored-by: sychen <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 15, 2022
…degen is disabled

### What changes were proposed in this pull request?
BHJ LeftAnti does not update numOutputRows when codegen is disabled

### Why are the changes needed?
PR apache#29104 Only update numOutputRows when codegen is enabled, but there is no numOutputRows when codegen is disabled, and numOutputRows is equal to 0.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add UT

Closes apache#38489 from cxzl25/SPARK-41003.

Authored-by: sychen <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
…degen is disabled

### What changes were proposed in this pull request?
BHJ LeftAnti does not update numOutputRows when codegen is disabled

### Why are the changes needed?
PR apache#29104 Only update numOutputRows when codegen is enabled, but there is no numOutputRows when codegen is disabled, and numOutputRows is equal to 0.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add UT

Closes apache#38489 from cxzl25/SPARK-41003.

Authored-by: sychen <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
peter-toth added a commit that referenced this pull request Jan 9, 2026
### What changes were proposed in this pull request?

Run `NullPropagation` after NOT IN subquery rewrite.

### Why are the changes needed?

NOT IN subqueries like `SELECT * FROM t1 WHERE c NOT IN (SELECT c FROM t2)` are rewritten as left anti join `t1.c = t2.c` with additional `OR IsNull(t1.c = t2.c)` conditions which prevents equi join implementations to be used so those joins end up as `BroadcastNestedLoopJoin`. When we know the columns can't be null, we can either drop those additional conditions during subquery rewrite or call `NullPropagation` after the rewrite to simplify them to `false`. This PR contains the latter.

Please note that #29104 already optmized the single column NOT IN subqueries from `BroadcastNestedLoopJoin` to "null aware" `BroadcastHashJoin` very well, but when the columns are not nullable we can optimize multi column cases as well and the join don't need to be "null aware".

### Does this PR introduce _any_ user-facing change?

Yes, performance improvement.

### How was this patch tested?

A new UTs was added and some exsisting tests were adjusted to keep their validity.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #53733 from peter-toth/SPARK-54972-improve-not-in-with-non-nullables.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
Yicong-Huang pushed a commit to Yicong-Huang/spark that referenced this pull request Jan 9, 2026
### What changes were proposed in this pull request?

Run `NullPropagation` after NOT IN subquery rewrite.

### Why are the changes needed?

NOT IN subqueries like `SELECT * FROM t1 WHERE c NOT IN (SELECT c FROM t2)` are rewritten as left anti join `t1.c = t2.c` with additional `OR IsNull(t1.c = t2.c)` conditions which prevents equi join implementations to be used so those joins end up as `BroadcastNestedLoopJoin`. When we know the columns can't be null, we can either drop those additional conditions during subquery rewrite or call `NullPropagation` after the rewrite to simplify them to `false`. This PR contains the latter.

Please note that apache#29104 already optmized the single column NOT IN subqueries from `BroadcastNestedLoopJoin` to "null aware" `BroadcastHashJoin` very well, but when the columns are not nullable we can optimize multi column cases as well and the join don't need to be "null aware".

### Does this PR introduce _any_ user-facing change?

Yes, performance improvement.

### How was this patch tested?

A new UTs was added and some exsisting tests were adjusted to keep their validity.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#53733 from peter-toth/SPARK-54972-improve-not-in-with-non-nullables.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants