Skip to content

Conversation

@leanken-zz
Copy link
Contributor

What changes were proposed in this pull request?

In this PR, proposed a trade-off that can also support multi column to perform hash lookup in buildSide, but required buildSide with extra duplicate data, the duplication would be 2^numKeys - 1, for example, if we are to support NAAJ with 3 column join key, the buildSide would be expanded into (2^3 - 1) times, 7X.

For example, if there is a UnsafeRow key (1,2,3)
In NullAware Mode, it should be expanded into 7 keys with extra C(3,1), C(3,2) combinations, within the combinations, we duplicated these record with null padding as following.

Original record

(1,2,3)

Extra record to be appended into HashedRelation

(null, 2, 3) (1, null, 3) (1, 2, null)
(null, null, 3) (null, 2, null) (1, null, null))

with the expanded data we can extract a common pattern for both single and multi column. allNull refer to a unsafeRow which has all null columns.

  • buildSide is empty input => return all rows
  • allNullColumnKey Exists In buildSide input => reject all rows
  • if streamedSideRow.allNull is true => drop the row
  • if streamedSideRow.allNull is false & findMatch in NullAwareHashedRelation => drop the row
  • if streamedSideRow.allNull is false & notFindMatch in NullAwareHashedRelation => return the row

Why are the changes needed?

Considered that NAAJ in real production usage, the numKeys should not be that big, normally 1~3 keys, I think it's still worth to do such trade-off.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  1. SQLQueryTestSuite with NOT IN keyword SQL, add CONFIG_DIM with spark.sql.optimizeNullAwareAntiJoin on and off
  2. added case in org.apache.spark.sql.JoinSuite.
  3. added case in org.apache.spark.sql.SubquerySuite.
  4. added case in org.apache.spark.sql.execution.joins.HashedRelationSuite to make sure the data expand logical.
  5. config combination against e2e test (both single and multi column cases) with following
Map(
  "spark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "false",
  "spark.sql.codegen.wholeStage" -> "false"
),
Map(
  "sspark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "false",
  "spark.sql.codegen.wholeStage" -> "true"
),
Map(
  "spark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "true",
  "spark.sql.codegen.wholeStage" -> "false"
),
Map(
  "spark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "true",
  "spark.sql.codegen.wholeStage" -> "true"
)

@leanken-zz
Copy link
Contributor Author

leanken-zz commented Jul 30, 2020

@cloud-fan @maropu @agrawaldevesh
New JIRA and PR re-created. Many thanks.

val OPTIMIZE_NULL_AWARE_ANTI_JOIN_MAX_NUM_KEYS =
buildConf("spark.sql.optimizeNullAwareAntiJoin.maxNumKeys")
.internal()
.doc("The maximum number of keys that will be supported to use NAAJ optimize.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the cost is to increase this maximum number? What is good and what is bad. We should state clearly in the doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes / 8);
}

public boolean allNull() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add doc for the method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

"optimized from O(M*N) calculation into O(M) calculation " +
"using Hash lookup instead of Looping lookup." +
"Only support for singleColumn NAAJ for now.")
"using Hash lookup instead of Looping lookup.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add few words to link this with OPTIMIZE_NULL_AWARE_ANTI_JOIN_MAX_NUM_KEYS. E.g., "The number of keys supported for NAAJ is configured by spark.sql.optimizeNullAwareAntiJoin.maxNumKeys".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@viirya
Copy link
Member

viirya commented Jul 30, 2020

ok to test

@leanken-zz
Copy link
Contributor Author

@viirya thanks for your reviewing, code updated.

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126826 has finished for PR 29304 at commit 4b113f2.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126838 has finished for PR 29304 at commit c495125.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126854 has finished for PR 29304 at commit 0c6e259.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@leanken-zz leanken-zz force-pushed the leanken-SPARK-32494 branch from 0c6e259 to 130606c Compare July 31, 2020 07:44
@leanken-zz
Copy link
Contributor Author

rebased code because merge conflicts.

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126872 has finished for PR 29304 at commit 130606c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}

object ExtractSingleColumnNullAwareAntiJoin extends JoinSelectionHelper with PredicateHelper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SingleColumn -> MultiColumn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about ExtractNullAwareAntiJoinKeys

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks ok

"it might cause Driver OOM if NAAJ numKeys increased, since it is exponential growth. " +
"It's ok to increase this configuration if buildSide is small enough and safe enough " +
"to do such exponential expansion to gain performance improvement from O(M*N) to O(M).")
.intConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plz add checkValue. I think only a positive value seems reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 2687 to 2688
"It's ok to increase this configuration if buildSide is small enough and safe enough " +
"to do such exponential expansion to gain performance improvement from O(M*N) to O(M).")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the last two statements above? IMO the first three statements looks enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done remove it.

|// generate join key for stream side
|${keyEv.code}
|if ($anyNull) {
|if (${ if (isLongHashedRelation) s"$anyNull" else s"${keyEv.value}.allNull()"}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if (${if (isLongHashedRelation) anyNull else s"${keyEv.value}.allNull()"}) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val leftKeys = ArrayBuffer[Expression]()
val rightKeys = ArrayBuffer[Expression]()

// all predicate must match pattern condition: Or(EqualTo(a=b), IsNull(EqualTo(a=b)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: all -> All

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val rightKeys = ArrayBuffer[Expression]()

// all predicate must match pattern condition: Or(EqualTo(a=b), IsNull(EqualTo(a=b)))
val allMatch = predicates.forall {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit format; how about this?

        val joinKeys = ArrayBuffer[(Expression, Expression)]()

        // All predicate must match pattern condition: Or(EqualTo(a=b), IsNull(EqualTo(a=b)))
        val allMatch = predicates.forall {
          case Or(e @ EqualTo(leftExpr: Expression, rightExpr: Expression),
              IsNull(e2 @ EqualTo(_, _))) if e.semanticEquals(e2) =>
            if (canEvaluate(leftExpr, left) && canEvaluate(rightExpr, right)) {
              joinKeys += ((leftExpr, rightExpr))
              true
            } else if (canEvaluate(leftExpr, right) && canEvaluate(rightExpr, left)) {
              joinKeys += ((rightExpr, leftExpr))
              true
            } else {
              false
            }
          case _ =>
            false
        }

        if (allMatch) {
          Some(joinKeys.unzip)
        } else {
          None
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

val keyGenerator = UnsafeProjection.create(key)
var numFields = 0
val nullPaddingCombinations: Seq[UnsafeProjection] = if (isNullAware) {
// C(numKeys, 0), C(numKeys, 1) ... C(numKeys, numKeys - 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just reformatted it like this;

      // C(numKeys, 0), C(numKeys, 1) ... C(numKeys, numKeys - 1)
      // In total 2^numKeys - 1 records will be appended.
      key.indices.flatMap { n =>
        key.indices.combinations(n).map { combination =>
          // combination is Seq[Int] indicates which key should be replaced to null padding
          val exprs = key.indices.map { index =>
            if (combination.contains(index)) {
              Literal.create(null, key(index).dataType)
            } else {
              key(index)
            }
          }
          UnsafeProjection.create(exprs)
        }
      }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) {
// positive not in subquery case
var joinExec = assertJoin((
"select * from testData where (key, key + 1) not in (select * from testData2)",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use uppercases for SQL keywords where possible? e.g., SELECT * FROM testData...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@leanken-zz
Copy link
Contributor Author

@maropu thanks for reviewing during weekend, code updated. ^_^

@SparkQA
Copy link

SparkQA commented Aug 2, 2020

Test build #126925 has finished for PR 29304 at commit 1153513.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

/**
* return whether an UnsafeRow is null on every column.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return -> Return and please add tests in UnsafeRowSuite if you add a new method here.

// TODO support multi column NULL-aware anti join in future.
// See. http://www.vldb.org/pvldb/vol2/vldb09-423.pdf Section 6
// multi-column null aware anti join is much more complicated than single column ones.
object ExtractNullAwareAntiJoinKeys extends JoinSelectionHelper with PredicateHelper {
Copy link
Member

@maropu maropu Aug 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I think its better to add some fine-grained tests for this extractor. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, i will add test upon it.

}
}

test("NullAwareHashedRelation") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make the test title clearer about what is this test for?

object ExtractSingleColumnNullAwareAntiJoin extends JoinSelectionHelper with PredicateHelper {

// TODO support multi column NULL-aware anti join in future.
// See. http://www.vldb.org/pvldb/vol2/vldb09-423.pdf Section 6
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about keeping this comment for the reference to NAAJ? I think this is a good material to understand how it works. (NOTE: IMHO the link to the ACM page is better than the direct link to the PDF).

// combination is Seq[Int] indicates which key should be replaced to null padding
val exprs = key.indices.map { index =>
if (combination.contains(index)) {
Literal.create(null, key(index).dataType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to expand it like this even if key(index).nullable=false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good reminder, I have not considered on such case, i will confirm and reply you later.

Copy link
Member

@maropu maropu Aug 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I think this works correctly, it is okay to separate the optimization from this PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be easy to fold this optimization into this PR ... All we need to do is to only create the combinations for the (truly) nullable keys. I think this is an important performance optimization since most keys are non nullable (in a well formed schema) and it would reduce the memory blow up.

Copy link
Contributor Author

@leanken-zz leanken-zz Aug 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After serious consideration, I think when key(index).nullable = false case, it should be expand as well.
Just consider the following case, with streamedSide and buildSide both has only one record
(1, null, 3) => (1, 2, 3)
this was consider a MATCH on NAAJ, right? no matter buildSide column is nullable or not, NAAJ care about its real key value of the record. if we don't expand (1,2,3) to 7X record, we can't find a exact match of (1,null,3)

@maropu @agrawaldevesh

if we don't want extra if-else check on null column, it is needed to do such expansion, because I already put all possibilities in HashedRelation, streamedSide record just need to lookup, if you got a exact match, bingo.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense and great observation. Worth explaining this in the comments and adding a test for this.

I am wondering if you still have a test suite that compares the old-school BNLJ results vs with this optimization ? This would be a great test case.

|// generate join key for stream side
|${keyEv.code}
|if ($anyNull) {
|if (${if (isLongHashedRelation) s"$anyNull" else s"${keyEv.value}.allNull()"}) {
Copy link
Member

@maropu maropu Aug 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work correctly?, e.g., the case where we rewrite 4 short join keys (short, short, short, short) into a single long key. At least, we need tests for those cases.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh you can combine multiple keys into a single long key ? I believe the reason for this special case with the long hashed relation is because the long hashed relation can only take a single long key. But it would be interesting if we can get multiple "multiple real keys packed into it".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the multiple keys rewrite in HashJoinExec is symmetry, if four short is rewrite into a long key, then it's counted as single key, or other wise, it should be UnsafehashedRelation in right side, right? Since LongHashedRelation only take single key as look up input.

and the CodeGen logical of $anyNull should handle the multi key rewrite. But i am agreed that we should have test for those cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh you can combine multiple keys into a single long key ? I believe the reason for this special case with the long hashed relation is because the long hashed relation can only take a single long key. But it would be interesting if we can get multiple "multiple real keys packed into it".

FYI, it is ok to pack multi key into a Long, as long as there are IntegralType and defaultSizeSum is less or equal than 8

def rewriteKeyExpr(keys: Seq[Expression]): Seq[Expression] = {
    assert(keys.nonEmpty)
    // TODO: support BooleanType, DateType and TimestampType
    if (keys.exists(!_.dataType.isInstanceOf[IntegralType])
      || keys.map(_.dataType.defaultSize).sum > 8) {
      return keys
    }

    var keyExpr: Expression = if (keys.head.dataType != LongType) {
      Cast(keys.head, LongType)
    } else {
      keys.head
    }
    keys.tail.foreach { e =>
      val bits = e.dataType.defaultSize * 8
      keyExpr = BitwiseOr(ShiftLeft(keyExpr, Literal(bits)),
        BitwiseAnd(Cast(e, LongType), Literal((1L << bits) - 1)))
    }
    keyExpr :: Nil
  }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for digging this, I think multi-key NAAJ would preclude the packing of multiple keys into a single long key unless of all of those sub-keys are actually non nullable ?

I think it would actually be incorrect to NOT use the key explosion of HashedRelation when there are multiple logical keys that can be independently nullable. Is there a way we can 'undo' the packing into the LongHashedRelation and switch back to a regular HashedRelation ?

Is there a test case for this scenario of multiple nullable integral keys ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still insist that we should not make it special here.

no matter it combine multi key into single long or not. Inside Probe looking logical,
I see

Single Long Key -> LongHashedRelation
Multi Key -> UnsafeHashedRelation

As long as the codegen make sure the above assumption Established,I can just go ahead with my NAAJ logical

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay lets add tests here for @viirya's case of multiple short keys packing into a single long. Please ensure that at least two of those keys are nullable in your test. I am curious if that will trigger the LongHashedRelation or the (exploding) UnsafeHashedRelation. I believe that the former is incorrect.

Basically I am claiming that LongHashedRelation cannot be used with more than one null keys. This was moot earlier because we were dropping rows that had any null key.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least, as @agrawaldevesh suggested in #29304 (comment), I think its better to leave some comments about how it work here and we need to carefully write a logic for this code part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least, as @agrawaldevesh suggested in #29304 (comment), I think its better to leave some comments about how it work here and we need to carefully write a logic for this code part.

agree.

Copy link

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong here but I don't see any special handling for null keys on the left (probe) side. The changes to HashedRelation only account for the right side handling.

ie, You seem to have implemented the step 2 in section 6.2 of the NAAJ paper but I don't see where step 3 is implemented ? That one seems more trickier since it needs to wild card matches.

|// generate join key for stream side
|${keyEv.code}
|if ($anyNull) {
|if (${ if (isLongHashedRelation) s"$anyNull" else s"${keyEv.value}.allNull()"}) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps a comment here ?

|// generate join key for stream side
|${keyEv.code}
|if ($anyNull) {
|if (${if (isLongHashedRelation) s"$anyNull" else s"${keyEv.value}.allNull()"}) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh you can combine multiple keys into a single long key ? I believe the reason for this special case with the long hashed relation is because the long hashed relation can only take a single long key. But it would be interesting if we can get multiple "multiple real keys packed into it".

// Create a mapping of buildKeys -> rows
val keyGenerator = UnsafeProjection.create(key)
var numFields = 0
val nullPaddingCombinations: Seq[UnsafeProjection] = if (isNullAware) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a small example here to illustrate what this block of code is doing. What do you think of extracting this into a method for better readability ? It's a bit big to be inline. Perhaps take an example of like 3 keys.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also do we use nullPaddingCombinations when all the keys are null ?

if (key.allNull()) {
return EmptyHashedRelationWithAllNullKeys
}
nullPaddingCombinations.foreach(project => append(project(row).copy(), row))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: need a better name than project here. How about nullPaddedKeySubsequence or nullPaddedKeyCombination or something.

@leanken-zz
Copy link
Contributor Author

Correct me if I am wrong here but I don't see any special handling for null keys on the left (probe) side. The changes to HashedRelation only account for the right side handling.

ie, You seem to have implemented the step 2 in section 6.2 of the NAAJ paper but I don't see where step 3 is implemented ? That one seems more trickier since it needs to wild card matches.

in fact, the reason i am doing now to expand data in build side, it's mainly to just avoid handling null values in probe side.

image

let's say there is a record

(1, null, 3) in probe side, if there is a (1,2,3) in build side, it's counted as MATCH in comparison. basically if i want to avoid 0(M*N) which is loop look up in build side, i will have to expand (1,2,3) with all combination null padding new records like

Original key expand to 2^3 -1 = 7X keys, and we can use probe side record (1, null, 3) to just directly hash loop up with such data duplication. I don't know if I make it clean for you @agrawaldevesh , it is a bit hard for me to explain in english. ^_^

(1, 2,3 )
(null, 2, 3)
(1, null, 3)
(1, 2, null)
(null, null, 3)
(null, 2, null)
(1, null, null)

@leanken-zz
Copy link
Contributor Author

Correct me if I am wrong here but I don't see any special handling for null keys on the left (probe) side. The changes to HashedRelation only account for the right side handling.
ie, You seem to have implemented the step 2 in section 6.2 of the NAAJ paper but I don't see where step 3 is implemented ? That one seems more trickier since it needs to wild card matches.

in fact, the reason i am doing now to expand data in build side, it's mainly to just avoid handling null values in probe side.

image

let's say there is a record

(1, null, 3) in probe side, if there is a (1,2,3) in build side, it's counted as MATCH in comparison. basically if i want to avoid 0(M*N) which is loop look up in build side, i will have to expand (1,2,3) with all combination null padding new records like

Original key expand to 2^3 -1 = 7X keys, and we can use probe side record (1, null, 3) to just directly hash loop up with such data duplication. I don't know if I make it clean for you @agrawaldevesh , it is a bit hard for me to explain in english. ^_^

(1, 2,3 )
(null, 2, 3)
(1, null, 3)
(1, 2, null)
(null, null, 3)
(null, 2, null)
(1, null, null)

basically, if there is any null columns in probe side keys, it means ignoring the null keys values, and using the rest non-null column to try match in corresponding columns in buildSide. since in streamedSide rows, in can be all kinds of null position combination existing, i can not pre-build according to any single combination, so i have to expand to all kinds of combination with null padding .

@agrawaldevesh
Copy link

@leanken ... let me chew on your response. I will get back to you.

In the meanwhile I am wondering if you can rerun the performance of the original TPCH q16 (single key not-in) with this multi-key PR and without it. I have a feeling we might have regressed the performance of the (much more common) single key case and it may make sense to retain that as a special case. Can you double check that please.

@leanken-zz
Copy link
Contributor Author

@leanken ... let me chew on your response. I will get back to you.

In the meanwhile I am wondering if you can rerun the performance of the original TPCH q16 (single key not-in) with this multi-key PR and without it. I have a feeling we might have regressed the performance of the (much more common) single key case and it may make sense to retain that as a special case. Can you double check that please.

Sure, I will do Q16 Test again. Though i have strong confident that it will not cause perf regression, because in single key not in case, it will not cause any data expansion.

@leanken-zz
Copy link
Contributor Author

@leanken ... let me chew on your response. I will get back to you.
In the meanwhile I am wondering if you can rerun the performance of the original TPCH q16 (single key not-in) with this multi-key PR and without it. I have a feeling we might have regressed the performance of the (much more common) single key case and it may make sense to retain that as a special case. Can you double check that please.

Sure, I will do Q16 Test again. Though i have strong confident that it will not cause perf regression, because in single key not in case, it will not cause any data expansion.

because 2^1 - 1 = 1

@agrawaldevesh
Copy link

let's say there is a record

(1, null, 3) in probe side, if there is a (1,2,3) in build side, it's counted as MATCH in comparison. basically if i want to avoid 0(M*N) which is loop look up in build side, i will have to expand (1,2,3) with all combination null padding new records like

Original key expand to 2^3 -1 = 7X keys, and we can use probe side record (1, null, 3) to just directly hash loop up with such data duplication. I don't know if I make it clean for you @agrawaldevesh , it is a bit hard for me to explain in english. ^_^

(1, 2,3 )
(null, 2, 3)
(1, null, 3)
(1, 2, null)
(null, null, 3)
(null, 2, null)
(1, null, null)

Let's consider for both steps 2 and 3 of section 6.2 in the NAAJ separately:

  • Step 2: Say there is a right (build) side row (1, null, 3). It should be counted as a match against a row on the left side (1, 2, 3). What makes this tricky is that say say you have a build row (1, 5, 3), then (1, 5, 3) should NOT match the probe row (1, 2, 3). But if you explode (1, 5, 3) into a (1, null, 3) then it might incorrectly match (1, 2, 3). How do you handle both of these subcases ?

  • Step 3: Consider a build row (1, 5, null), it should match the left row (1, null, 3). In addition, it should not match the build row (1, 5, 7). How do you handle these subcases ?

// All predicate must match pattern condition: Or(EqualTo(a=b), IsNull(EqualTo(a=b)))
val allMatch = predicates.forall {
case Or(e @ EqualTo(leftExpr: Expression, rightExpr: Expression),
IsNull(e2 @ EqualTo(_, _))) if e.semanticEquals(e2) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this pattern matching depends on the RewritePredicateSubquery code:

val nullAwareJoinConds = baseJoinConds.map(c => Or(c, IsNull(c)))

This is okay now, but I'm a little worried that it does not work well if the RewritePredicateSubquery code will be updated; for example, if both attributes are non-nullable in a join condition, we might be able to remove IsNull(c) for optimization in the RewritePredicateSubquery rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. the IsNull being removed case is considered, we only do NAAJ optimize with the Or condition still exists.

Basically, the NAAJ Optimize switch triggered at SparkStrategies, which I think optimizer is done its job. it's save to put this pattern check in physical plan state

// negative hand-written left anti join
      // testData.key nullable false
      // testData2.a nullable false
      // isnull(key = a) isnull(key+1=a) will be optimized to true literal and removed
      joinExec = assertJoin((
        "SELECT * FROM testData LEFT ANTI JOIN testData3 ON (key = a OR ISNULL(key = a)) " +
          "AND (key + 1 = a OR ISNULL(key + 1 = a))",
        classOf[BroadcastHashJoinExec]))
      assert(!joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in JoinSuite Line 1209. FYI.

@leanken-zz
Copy link
Contributor Author

leanken-zz commented Aug 3, 2020

let's say there is a record
(1, null, 3) in probe side, if there is a (1,2,3) in build side, it's counted as MATCH in comparison. basically if i want to avoid 0(M*N) which is loop look up in build side, i will have to expand (1,2,3) with all combination null padding new records like
Original key expand to 2^3 -1 = 7X keys, and we can use probe side record (1, null, 3) to just directly hash loop up with such data duplication. I don't know if I make it clean for you @agrawaldevesh , it is a bit hard for me to explain in english. ^_^
(1, 2,3 )
(null, 2, 3)
(1, null, 3)
(1, 2, null)
(null, null, 3)
(null, 2, null)
(1, null, null)

Let's consider for both steps 2 and 3 of section 6.2 in the NAAJ separately:

  • Step 2: Say there is a right (build) side row (1, null, 3). It should be counted as a match against a row on the left side (1, 2, 3). What makes this tricky is that say say you have a build row (1, 5, 3), then (1, 5, 3) should NOT match the probe row (1, 2, 3). But if you explode (1, 5, 3) into a (1, null, 3) then it might incorrectly match (1, 2, 3). How do you handle both of these subcases ?
  • Step 3: Consider a build row (1, 5, null), it should match the left row (1, null, 3). In addition, it should not match the build row (1, 5, 7). How do you handle these subcases ?

After we expand data in BuildSide
streamedSide (1 , 2, 3) and buildSide (1, null, 3) is not counted as a Match, we are looking for exactly same UnsafeRow, which has same hashCode, therefore we can use streamedUnsafeRow as key to look up in HashedRelation.

it's a bit tricky, but to sum up. After the expansion, I need to find exact same match include null column from streamedSide in HashedRelation, which is counted as a Match.

Just see how it is probed in Code.

if (hashed == EmptyHashedRelation) {
          streamedIter
        } else if (hashed == EmptyHashedRelationWithAllNullKeys) {
          Iterator.empty
        } else {
          val keyGenerator = UnsafeProjection.create(
            BindReferences.bindReferences[Expression](
              leftKeys,
              AttributeSeq(left.output))
          )
          streamedIter.filter(row => {
            val lookupKey: UnsafeRow = keyGenerator(row)
            if (lookupKey.allNull()) {
              false
            } else {
              // Anti Join: Drop the row on the streamed side if it is a match on the build

// !!!! if hashed is a  (1, null, 3), lookupKey is a (1,2,3) the return would be null
// !!!! only if lookupKey in exactly (1, null, 3) the return would be non-null
              hashed.get(lookupKey) == null
            }
          })
        }

@agrawaldevesh
Copy link

Step 2: Say there is a right (build) side row (1, null, 3). It should be counted as a match against a row on the left side (1, 2, 3). What makes this tricky is that say say you have a build row (1, 5, 3), then (1, 5, 3) should NOT match the probe row (1, 2, 3). But if you explode (1, 5, 3) into a (1, null, 3) then it might incorrectly match (1, 2, 3). How do you handle both of these subcases ?
Step 3: Consider a build row (1, 5, null), it should match the left row (1, null, 3). In addition, it should not match the build row (1, 5, 7). How do you handle these subcases ?

Above, when I mean "match" -- I mean that the left side would match the build row and WON'T be returned. Whereas with non match I mean that the left side would not match the build side and thus WILL be returned. We have different meanings for the words 'match' and 'not-match'. So please read my 'match' == 'NAAJ should not return the left row', and conversely for non-match.

I would really really really encourage you to:

  • Please reread the paper section 6.2 in its entirety many times and understand the above cases. I had to read it many times myself. It is very tricky as you pointed out.
  • Add them as test cases comparing them with the original BNLJ implementation, both the negative and positive cases.

This is really tricky and I don't think the current implementation you have of expanding the hash table with a simple lookup on the stream side would suffice. I will also try to play around with your PR locally and run them as tests to convince myself. I hope I am wrong ;-).

@leanken-zz
Copy link
Contributor Author

leanken-zz commented Aug 3, 2020

Step 2: Say there is a right (build) side row (1, null, 3). It should be counted as a match against a row on the left side (1, 2, 3). What makes this tricky is that say say you have a build row (1, 5, 3), then (1, 5, 3) should NOT match the probe row (1, 2, 3). But if you explode (1, 5, 3) into a (1, null, 3) then it might incorrectly match (1, 2, 3). How do you handle both of these subcases ?
Step 3: Consider a build row (1, 5, null), it should match the left row (1, null, 3). In addition, it should not match the build row (1, 5, 7). How do you handle these subcases ?

Above, when I mean "match" -- I mean that the left side would match the build row and WON'T be returned. Whereas with non match I mean that the left side would not match the build side and thus WILL be returned. We have different meanings for the words 'match' and 'not-match'. So please read my 'match' == 'NAAJ should not return the left row', and conversely for non-match.

I would really really really encourage you to:

  • Please reread the paper section 6.2 in its entirety many times and understand the above cases. I had to read it many times myself. It is very tricky as you pointed out.
  • Add them as test cases comparing them with the original BNLJ implementation, both the negative and positive cases.

This is really tricky and I don't think the current implementation you have of expanding the hash table with a simple lookup on the stream side would suffice. I will also try to play around with your PR locally and run them as tests to convince myself. I hope I am wrong ;-).

Yes, I do understand of the Paper 6.2. Basically the paper describe the algorithm in the perspective of StreamedSide. But the expansion state the perspective of BuildSide. Let's just do revert inferencing of the following case.

if buildSide exist a row (1,2,3), what data in StreamedSide will evaluated as TRUE OR UNKNOWN and dropped.
it should be
(null, 2, 3) (1, null, 3) (1, 2, null) (null, null, 3) (null, 2, null) (1, null, null) and of course (1,2,3)
right?

Only in above combination, streamedSide row will be dropped besides non-all-null case, right?
Once you find a exact same record in HashedRelation include null columns, you dropped.

if (lookupKey.allNull()) {
              false
            } else {
              // Anti Join: Drop the row on the streamed side if it is a match on the build
              hashed.get(lookupKey) == null
            }

I suppose this solution is working because it's passing all the not in cases in SQLQueryTestSuite.

@leanken-zz
Copy link
Contributor Author

leanken-zz commented Aug 3, 2020

I just found out a negative case for it
it should return (1,2,3) in expansion solution, but it return nothing in BNLJ.
you are right about the correctness, let me rethink and come back to you later.

spark.sql(
        """
          |CREATE TEMPORARY VIEW m AS SELECT * FROM VALUES
          |  (1, 2, 3)
          |  AS m(a, b, c)
        """.stripMargin).collect()

      spark.sql(
        """
          |CREATE TEMPORARY VIEW s AS SELECT * FROM VALUES
          |  (1, null, 3)
          |  AS s(c, d, e)
        """.stripMargin).collect()

      spark.sql(
        """
          |select * from m where (a,b,c) not in (select * from s)
        """.stripMargin).collect().foreach(println)

and we should do something on streamedSide too, if we want this hash lookup to apply correctly.

@leanken-zz
Copy link
Contributor Author

leanken-zz commented Aug 3, 2020

@agrawaldevesh I am finally understand the complexity of multi column support, thanks to your remind again and again, feel sorry about my naive. Do you think it still worth to carry on to support multi column? sincerely ask for your suggestion.

@leanken-zz
Copy link
Contributor Author

@agrawaldevesh I am finally understand the complexity of multi column support, thanks to your remind again and again, feel sorry about my naive. Do you think it still worth to carry on to support multi column? sincerely ask for you suggestion.

as for how to support it, i think it might be

  1. scan buildSide to gather information about which columns contains null
  2. build HashedRelation with original input include anyNull Key
  3. building a extra HashedRelation which is all combination null padding.

when probe doing on streamedSide

  1. if streamedSide key is a all non-null value, using the gathered null information on right side, to try find match in original HashedRelation, for example (1,2,3) with buildSide c2, c3 with null value, try match using following keys
    (1,2,3) (1,null,3)(1,2,null)(1,null,null)
  2. if streamedSide key contains any column which is null value, for example (null, 2, 3), use the key to look up in extra hashedRelation because it contains all possible combinations.

@agrawaldevesh
Copy link

Yes, I do understand of the Paper 6.2. Basically the paper describe the algorithm in the perspective of StreamedSide. But the expansion state the perspective of BuildSide. Let's just do revert inferencing of the following case.

if buildSide exist a row (1,2,3), what data in StreamedSide will evaluated as TRUE OR UNKNOWN and dropped.
it should be
(null, 2, 3) (1, null, 3) (1, 2, null) (null, null, 3) (null, 2, null) (1, null, null) and of course (1,2,3)
right?

First let me make sure I understand the current approach in this PR: We take a row and add possible null padded combinations of it to the hash table. And then there is almost no change to the streaming side: We look for an exact match in the hash table. The only other tiny change to the streaming side is the change from anyNull to allNull, which becomes pertinent with multiple keys. Is that right ?

So with this consider what happens when there is a build row of (1, null, 3). We expect the row (1, 2, 3) to NOT be returned (or 'matched' in my parlance). Lets see what would happen with this PR: We will expand (1, null, 3) into the following rows in the HT: (1, null, 3), (null, null, 3), (1, null, null), (null, null, null). (Btw, should (null, null, null) be even added here ?). Unfortunately the row (1, 2, 3) does not match and is RETURNED. Similarly the left side row (1, 3, null) should also match and not be RETURNED but this PR would return it.

(Please consider adding them as tests and you will see that the BNLJ implementation passes them but this PR would fail them.)

The PR currently will pass the negative tests I mentioned above: build row (1, 5, 3) will not match probe row (1, 2, 3) and nor will the build row (1, 5, null) match the probe row (1, 5, 7). That is, the PR will correctly NOT return the probe rows (1, 2, 3) nor (1, 5, 7).

As for why the PR may be passing SubquerySuite: I am not sure how strong the coverage for SubquerySuite's multi-key not-in is. The BNLJ approach didn't have a special algorithm for multiple keys and thus it may not have needed as much attention.

My guess is that we cannot just do simple HT probing on the stream side. We have to do something different. The original NAAJ paper calls for creating multiple HT's with different key combinations (depending on what's null or not).

@agrawaldevesh I am finally understand the complexity of multi column support, thanks to your remind again and again, feel sorry about my naive. Do you think it still worth to carry on to support multi column? sincerely ask for your suggestion.

Don't feel sorry. I admire you being bold and persevering with this !! This is how engineering proceeds, we learn when we hit brick walls. I have actually tried to (unsuccessfully !) implement this optimization before using a variety of these naive ways but shied away from implementing the multiple index approach because it seemed too much work.

You have come very far with this and I would totally encourage you to please think how to support this for the common cases of just 2 and 3 keys. I think creating new additional indices "on-demand" (as mentioned in step 3) when we encounter a particular row-type on the probe side is probably not going to work (it would lead to unpredictable memory behavior).

As for whether we should do this or not ... it totally depends on the approach. If its not too much work to support it for the special case of 2 keys (lets even forget three keys), we can consider it. Its a tradeoff: Customers can always rewrite their query from NOT-IN to NOT-EXISTS if they are not happy with the BNLJ performance.

How about this approach (mirroring the steps 1, 2, and 3 in the paper): Step 1 is for the exact match. For step 2, Instead of exploding the HT (ie inserting the null combinations there) -- just search for different null variants as mentioned in step 2 of the paper. ie do the different null-aware padding lookups instead of storing those lookups. Whereas if you encounter a left side row with a null key then we would be forced to do the BNLJ style "scanning" of the HT to search for all matching combinations (ie, if we decide to not build additional indices). This approach would still be more efficient than plain BNLJ since the NLJ penalty is only paid for left rows having a null key.

As a diversion, I wonder if it makes sense instead to support the single key case but for distributed scenario (shuffle hash join and like) if this multi-key stuff is really hard. I think the single-key distributed case would be more common.

Thanks for working on this !!

.doc("The maximum number of keys that will be supported to use NAAJ optimize. " +
"While with NAAJ optimize, buildSide data would be expanded to (2^numKeys - 1) times, " +
"it might cause Driver OOM if NAAJ numKeys increased, since it is exponential growth.")
.intConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add .version("3.1.0"): #29335

In SPARK-32290, We only support Single Column NAAJ, and because of the complexity of Multi Column support, See. http://www.vldb.org/pvldb/vol2/vldb09-423.pdf Section 6. In this PR, proposed a trade-off that can also support multi column to perform hash lookup in buildSide, but required buildSide with extra duplicate data, the duplication would be (2^numKeys - 1) times, for example, if we are to support NAAJ with 3 column join key, the buildSide would be expanded into (2^3 - 1) times, 7X.

For example, if there is a UnsafeRow key (1,2,3)
In NullAware Mode, it should be expanded into 7 keys with extra C(3,1), C(3,2) combinations, within the combinations, we duplicated these record with null padding as following.

Original record

(1,2,3)

Extra record to be appended into HashedRelation

(null, 2, 3) (1, null, 3) (1, 2, null)
(null, null, 3) (null, 2, null) (1, null, null))

with the expanded data we can extract a common pattern for both single and multi column. allNull refer to a unsafeRow which has all null columns.

* buildSide is empty input => return all rows
* allNullColumnKey Exists In buildSide input => reject all rows
* if streamedSideRow.allNull is true => drop the row
* if streamedSideRow.allNull is false & findMatch in NullAwareHashedRelation => drop the row
* if streamedSideRow.allNull is false & notFindMatch in NullAwareHashedRelation => return the row

Considered that NAAJ in real production usage, the numKeys should not be that big, normally 1~3 keys, I think it's still worth to do such trade-off.

No.

1. SQLQueryTestSuite with NOT IN keyword SQL, add CONFIG_DIM with spark.sql.optimizeNullAwareAntiJoin on and off
2. added case in org.apache.spark.sql.JoinSuite.
3. added case in org.apache.spark.sql.SubquerySuite.
4. added case in org.apache.spark.sql.execution.joins.HashedRelationSuite to make sure the data expand logical.
5. config combination against e2e test (both single and multi column cases) with following

```
Map(
  "spark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "false",
  "spark.sql.codegen.wholeStage" -> "false"
),
Map(
  "sspark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "false",
  "spark.sql.codegen.wholeStage" -> "true"
),
Map(
  "spark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "true",
  "spark.sql.codegen.wholeStage" -> "false"
),
Map(
  "spark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "true",
  "spark.sql.codegen.wholeStage" -> "true"
)
```

Change-Id: I8c4ffeb7e5d27bff7db269f414f2f702ceb3612f
Change-Id: I506224fdbefd9c4f1f24a8f62fecf65636f23f20
Change-Id: I58066c5c0285860f0c3bbc0b6bfbe82a888baa40
Change-Id: I2ab6b1d87b467386ebf9a2faa7947e46450de173
…iJoinKeys.

2. update SQLConf doc.
3. comments and code style refined.
4. SQL Keyword to upper case in JoinSuite and SubquerySuite.

Change-Id: I83eca23bde4476e2864e081372cbcc490d39f7b0
Change-Id: I6c59ca05232c42617e3c1e93b2043b258ad515b8
@leanken-zz leanken-zz force-pushed the leanken-SPARK-32494 branch from 1153513 to 054581a Compare August 4, 2020 22:55
@leanken-zz
Copy link
Contributor Author

leanken-zz commented Aug 4, 2020

@agrawaldevesh already pushed the InvertedIndex version POC. and gather some test result on TPCH 1TB Q16
It is indeed causing performance regression for single column case, as for multi column case, the perf data is as expected.
I talk with @cloud-fan offline, he suggests that the regression is mainly caused by the HashMap is Inefficient compare to the Long2UnsafeRowMap and Byte2ByteMap, because they are more cpu cache friendly.

Since for the following reason, I think I might need to temporary seal the PR, and maybe looking back in some future time.

  • As we discuss yesterday, multi-column is not that frequency used in production, as I observed, maybe less than 0.1 %
  • I can't guarantee after supporting the multi-column, there will be no regression for single column, Since there are no such UnsafeRoaringBitmap now, and if i am going to implement one, it might be to much for the reviewer to review both with the correctness and performance
  • Single Column Support for ShuffleHashJoinExex should be more important.

But, we are still come up with a neat algorithm to fix the complicated issue, that should count for something. Let's just considered this PR as a Discussion and Memo, and maybe some day when it's ready to support multi-column, the conversation and test result in the PR might be helpful. @agrawaldevesh @viirya @maropu sorry for waste your time, and as @cloud-fan suggested, I will move on to support single column ShuffleHashJoinExec.

hashedRelation impl

SingleColumn

E2E time: 33.6s
BHJ Stage time: 5.9m

InvertedIndex impl

SingleColumn

E2E time: 40.5s
BHJ Stage time: 16.3m

TwoColumn

E2E time: 59.2s
BHJ Stage time: 36.2m

@leanken-zz leanken-zz closed this Aug 4, 2020
@SparkQA
Copy link

SparkQA commented Aug 5, 2020

Test build #127068 has finished for PR 29304 at commit 054581a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class NullAwareUnsafeRowHashSet(
  • class InvertedIndexHashedRelation(

@agrawaldevesh
Copy link

@leanken ... this was a GREAT GREAT attempt and I certainly learned a ton from it :-P. I am curious if you ran profiled it while running the Q16 and have a sense of where the low hanging fruits might be ?

We can also consider the hybrid approach we discussed where we double the memory and keep the original HashedRelation for step 1 and 2 of the paper but use the inverted indices only for the step 3. That might help with the inverted index caused regression for the single key case.

In any case, I am totally with @cloud-fan that supporting shuffled hash join single key is more important. (As I also noted in my previous comment):

As a diversion, I wonder if it makes sense instead to support the single key case but for distributed scenario (shuffle hash join and like) if this multi-key stuff is really hard. I think the single-key distributed case would be more common.

if (isNullAwareAntiJoin) {
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input, true)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI @leanken ... I am curious why we force unsafe for NAAJ ? Is that for efficiency or is the implementation assume unsafe row ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because if it's a single Long key, keyEv will be codegen as java long, but not UnsafeRow. In InvertedIndexHashedRelation, it only takes UnsafeRow as input to lookup in buildSide.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants