-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32268][SQL] Row-level Runtime Filtering #35789
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I have a question: why do we need Semi-Join if we have Bloom Filter? |
I guess it is a trade-off between benifits and costs. BloomFilter has false positives issue and it get worse if the data set is large. So if the creation side (from the design docs) is small enough which can be broadcast, we can use semi-join to get more benifits with less cost since it is accuracy. And It is something like dpp did. |
When Bloom filter is enabled and kicks in, Semi-Join is not needed. |
|
cc @cloud-fan |
|
@sigmod did some quick tests on Q1 and it's working. 👍 |
| new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)), | ||
| Literal(rowCount.get.longValue)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use column statistics if it is exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, as we discussed previously offline, rowCount is more conservative and safer?
Feel free to tune the sizing heuristics after this PR, if you saw benefits in your environments.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala
Outdated
Show resolved
Hide resolved
wangyum
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks generally fine to me.
The injected filter predicate is just a |
| filterCreationSideExp: Expression, | ||
| filterCreationSidePlan: LogicalPlan | ||
| ): LogicalPlan = { | ||
| require(conf.runtimeFilterBloomFilterEnabled || conf.runtimeFilterSemiJoinReductionEnabled) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The require may be unnecessary, because they have been checked before that in the tryInjectRuntimeFilter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because they have been checked before that in the tryInjectRuntimeFilter.
Call sites of this function can evolve after this PR. require (like other requires in the codebase) is to prevent undesirable (potentially buggy) call site changes in the future.
| class BloomFilterSerDe { | ||
|
|
||
| final def serialize(obj: BloomFilter): Array[Byte] = { | ||
| val size = obj.bitSize()/8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a space
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Great work!!!
| filterApplicationSidePlan: LogicalPlan, | ||
| filterCreationSideExp: Expression, | ||
| filterCreationSidePlan: LogicalPlan | ||
| ): LogicalPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we format the code?
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -70,8 +70,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
filterApplicationSideExp: Expression,
filterApplicationSidePlan: LogicalPlan,
filterCreationSideExp: Expression,
- filterCreationSidePlan: LogicalPlan
- ): LogicalPlan = {
+ filterCreationSidePlan: LogicalPlan): LogicalPlan = {
// Skip if the filter creation side is too big
if (filterCreationSidePlan.stats.sizeInBytes > conf.runtimeFilterBloomFilterThreshold) {
return filterApplicationSidePlan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
| * do not add a subquery that might have an expensive computation | ||
| */ | ||
| private def isSelectiveFilterOverScan(plan: LogicalPlan): Boolean = { | ||
| plan.expressions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks!
c21
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @somani for the work! Having some comments & questions.
| * @return the number of set bits in this {@link BloomFilter}. | ||
| */ | ||
| public long cardinality() { | ||
| throw new UnsupportedOperationException("Not implemented"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why we need to provide a default implementation here, other than defining this as abstract method like others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, this is not a public API and we don't need to worry about backward compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makse sense, will change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out BloomFilter is public, and removing this caused backward compatibility tests to fail.
[error] spark-sketch: Failed binary compatibility check against org.apache.spark:spark-sketch_2.12:3.2.0! Found 1 potential problems (filtered 1)
[error] * abstract method cardinality()Long in class org.apache.spark.util.sketch.BloomFilter is present only in current version
[error] filter with: ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.BloomFilter.cardinality")
So added this back again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm @somani is it failing as part of maven build? Or some other unit test? We should exclude BloomFilter.java from check of backward compatibility, right? cc @cloud-fan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes regular sbt builds failed with
[error] spark-sketch: Failed binary compatibility check against org.apache.spark:spark-sketch_2.12:3.2.0! Found 1 potential problems (filtered 1)
[error] * abstract method cardinality()Long in class org.apache.spark.util.sketch.BloomFilter is present only in current version
[error] filter with: ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.BloomFilter.cardinality")
https://github.com/somani/spark/runs/5569255844
| } | ||
|
|
||
| private def canFilterLeft(joinType: JoinType): Boolean = joinType match { | ||
| case Inner | RightOuter => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should work for LEFT SEMI join, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let me raise a follow up for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // 1. The filterApplicationSideJoinExp can be pushed down through joins and aggregates (ie the | ||
| // expression references originate from a single leaf node) | ||
| // 2. The filter creation side has a selective predicate | ||
| // 3. The current join is a shuffle join or a broadcast join that has a shuffle or aggregate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we mean or a broadcast join that has a shuffle join or aggregate ... based on implementation of probablyHasShuffle()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to:
The current join is a shuffle join or a broadcast join that has a shuffle below it
| filterApplicationSide).isDefined && isSelectiveFilterOverScan(filterCreationSide) && | ||
| (isProbablyShuffleJoin(filterApplicationSide, filterCreationSide, hint) || | ||
| probablyHasShuffle(filterApplicationSide)) && | ||
| satisfyByteSizeRequirement(filterApplicationSide) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
satisfyByteSizeRequirement only checks size of filter application side. it seems not sync with the comment above:
4. The filterApplicationSide is larger than the filterCreationSide by a configurable threshold
Shouldn't we check size of filterCreationSide to be smaller enough as well here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filter creation side has its own threshold. Maybe I should change the comment to
The max filterApplicationSide scan size is greater than a configurable threshold
| filterCreationSide: LogicalPlan, | ||
| filterApplicationSideExp: Expression, | ||
| hint: JoinHint): Boolean = { | ||
| // Check that: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: better to move this comment to be a javadoc top-level comment (/* ... */) before this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
| val thisObj = ctx.addReferenceObj("thisObj", this) | ||
| nullSafeCodeGen(ctx, ev, (bloomFilterBytes, value) => { | ||
| s"\n${ev.value} = (Boolean) $thisObj.nullSafeEval($bloomFilterBytes, $value);\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we are just calling non-code-gen code inside code-gen code path. Why we cannot use CodegenFallback to start with? Or just provide code-gen implementation here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
| .booleanConf | ||
| .createWithDefault(true) | ||
|
|
||
| val RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: given the feature is experimental and disable by default now. It would be better to mark these configs to be .internal().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Umm, ill leave it for others to decide, but I think internal might be for internal configs that might just be used from within code, not for features that are experimental and are open for people to play with.
| .checkValue(threshold => threshold >= 0, "The threshold should be >= 0") | ||
| .createWithDefault(10) | ||
|
|
||
| lazy val RUNTIME_BLOOM_FILTER_ENABLED = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this config needs to be lazy val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't, changed. Thanks!
| Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ | ||
| Batch("PartitionPruning", Once, | ||
| PartitionPruning) :+ | ||
| Batch("InjectRuntimeFilter", FixedPoint(1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious why it's FixedPoint(1), not Once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw some idempotency check fail when I tested. I can try to do this in a follow up
| !canBroadcastBySize(left, conf) && !canBroadcastBySize(right, conf) | ||
| } | ||
|
|
||
| private def probablyHasShuffle(plan: LogicalPlan): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok to start with this heuristics, but I think it can be fragile for some queries. Cases like joining two bucketed tables would be regressed as the query plan normally has join operator, but does not have shuffle. Also it does not play very well with on-going Storage Partitioned Join work, where shuffle can be removed when joining on subset of join keys. But I don't think we have a good way to detect if the query plan has shuffle or not in logical plan phase. So this can be something to think about in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, a possible idea is to always generate the bloom filter, and then remove non-beneficial ones at the physical phase.
| if (conf.runtimeFilterBloomFilterEnabled) { | ||
| hasBloomFilter(left, right, leftKey, rightKey) | ||
| } else { | ||
| hasInSubquery(left, right, leftKey, rightKey) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] what would be the behaviour when both the conf's are enabled I believe we should check both the runtime filters presence then WDYT ?
|
|
||
| object BloomFilterAggregate { | ||
| final def serialize(obj: BloomFilter): Array[Byte] = { | ||
| val size = obj.bitSize() / 8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BloomFilterImpl.writeTo need to write VersionNumber and numHashFunctions before bits, so this size is not enough as a initialSize hint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, added 8 bytes to the size
| } | ||
| } | ||
| }) | ||
| Join(newLeft, newRight, joinType, join.condition, hint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] since we are just changing join's children should we do
join.withNewChildren(Seq(newLeft, newRight)) as if there are some tags added to the node, we want it to be copied over in the new Node WDYT ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, done!
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
Outdated
Show resolved
Hide resolved
|
|
||
| private def isSimpleExpression(e: Expression): Boolean = { | ||
| !e.containsAnyPattern(PYTHON_UDF, SCALA_UDF, INVOKE, JSON_TO_STRUCT, LIKE_FAMLIY, | ||
| REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to have a whitelist to define simple expressions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean configurable or hardcoded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to have a whitelist to define simple expressions?
Yes. However,
- if we use concrete expression classes for patterns, the whitelist will probably be too large;
- if we use abstract expression classes for patterns, it may be unfriendly to future expressions like the negation approach.
| * - The filterApplicationSideJoinExp can be pushed down through joins and aggregates (ie the | ||
| * - expression references originate from a single leaf node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * - The filterApplicationSideJoinExp can be pushed down through joins and aggregates (ie the | |
| * - expression references originate from a single leaf node) | |
| * - The filterApplicationSideJoinExp can be pushed down through joins and aggregates (ie the | |
| * expression references originate from a single leaf node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a new item but part of the item above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can remove the method doc as it just duplicates the code comment below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops I meant to remove the code comment below... removed it now.
| // 3. The current join is a shuffle join or a broadcast join that has a shuffle or aggregate | ||
| // in the filter application side | ||
| // 4. The max filterApplicationSide scan size is greater than a configurable threshold | ||
| findExpressionAndTrackLineageDown(filterApplicationSideExp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is this method defined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
|
|
||
| val RUNTIME_BLOOM_FILTER_NUM_BITS = | ||
| buildConf("spark.sql.optimizer.runtime.bloomFilter.numBits") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, doesn't the num bits depend on the num items?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this will be used if num items is not provided
| import testImplicits._ | ||
|
|
||
| // Register 'bloom_filter_agg' to builtin. | ||
| FunctionRegistry.builtin.registerFunction(new FunctionIdentifier("bloom_filter_agg"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we call dropFunction in afterAll to unregister the functions at the end of this test suite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
…mizer/InjectRuntimeFilter.scala Co-authored-by: Wenchen Fan <[email protected]>
…mizer/InjectRuntimeFilter.scala Co-authored-by: Wenchen Fan <[email protected]>
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
Outdated
Show resolved
Hide resolved
…mizer/InjectRuntimeFilter.scala Co-authored-by: Wenchen Fan <[email protected]>
|
thanks, merging to master/3.3! |
### What changes were proposed in this pull request? This PR proposes row-level runtime filters in Spark to reduce intermediate data volume for operators like shuffle, join and aggregate, and hence improve performance. We propose two mechanisms to do this: semi-join filters or bloom filters, and both mechanisms are proposed to co-exist side-by-side behind feature configs. [Design Doc](https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit?usp=sharing) with more details. ### Why are the changes needed? With Semi-Join, we see 9 queries improve for the TPC DS 3TB benchmark, and no regressions. With Bloom Filter, we see 10 queries improve for the TPC DS 3TB benchmark, and no regressions. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added tests Closes #35789 from somani/rf. Lead-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 1f4e4c8) Signed-off-by: Wenchen Fan <[email protected]>
| "Input to function might_contain should have been binary followed by a value with bigint")) | ||
| } | ||
|
|
||
| test("Test that might_contain errors out non-constant Bloom filter") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test seems failing with ANSI mode on:
org.scalatest.exceptions.TestFailedException: "cannot resolve 'CAST(t.a AS BINARY)' due to data type mismatch:
cannot cast bigint to binary with ANSI mode on.
If you have to cast bigint to binary, you can set spark.sql.ansi.enabled as false.
; line 2 pos 21;
'Project [unresolvedalias('might_contain(cast(a#785367L as binary), cast(5 as bigint)), None)]
+- SubqueryAlias t
+- LocalRelation [a#785367L]
" did not contain "The Bloom filter binary input to might_contain should be either a constant value or a scalar subquery expression"
https://github.com/apache/spark/runs/5658074902
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Numeric <=> Binary is not allowed in ansi mode, so the content of exception.getMessage is different from that of non-anis mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| test("Test that might_contain errors out non-constant Bloom filter") { | ||
| val exception1 = intercept[AnalysisException] { | ||
| spark.sql(""" | ||
| |SELECT might_contain(cast(a as binary), cast(5 as long)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to avoid ansi failure, we can do cast(cast(a as string) as binary)
| super.afterAll() | ||
| } | ||
|
|
||
| test("Test bloom_filter_agg and might_contain") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During Apache Spark 3.3.0 RC5 testing, I hit a failure of this test case and filed a JIRA
[info] !== Correct Answer - 1 == == Spark Answer - 1 ==
[info] !struct<> struct<positive_membership_test:boolean,negative_membership_test:boolean>
[info] ![true,false] [true,true] (QueryTest.scala:244)
Just FYI, cc @MaxGekk
| } | ||
| val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)), | ||
| ListQuery(aggregate, childOutputs = aggregate.output)) | ||
| Filter(filter, filterApplicationSidePlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we know, DPP filter always try to reuse exchange/subquery from other side first.
It seems semi-join filter constructs the InSubquery Filter directly.
Why not reuse exchange/subquery ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we know, DPP filter always try to reuse exchange/subquery from other side first. It seems semi-join filter constructs the InSubquery Filter directly. Why not reuse exchange/subquery ?
@beliefer do you see performance issues in your production? The filter creation side criteria in this rule is much narrower than DPP/DFP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are going to use semi-join, can you tell me the performance problems in detail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's driver-side bloom filter building (with a reused broadcast/shuffle) v.s. distributed bloom filter building job. Ideally, a job is more reliable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you.
* [SPARK-32268][SQL] Row-level Runtime Filtering This PR proposes row-level runtime filters in Spark to reduce intermediate data volume for operators like shuffle, join and aggregate, and hence improve performance. We propose two mechanisms to do this: semi-join filters or bloom filters, and both mechanisms are proposed to co-exist side-by-side behind feature configs. [Design Doc](https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit?usp=sharing) with more details. With Semi-Join, we see 9 queries improve for the TPC DS 3TB benchmark, and no regressions. With Bloom Filter, we see 10 queries improve for the TPC DS 3TB benchmark, and no regressions. No Added tests Closes apache#35789 from somani/rf. Lead-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 1f4e4c8) Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-32268][TESTS][FOLLOWUP] Fix `BloomFilterAggregateQuerySuite` failed in ansi mode `Test that might_contain errors out non-constant Bloom filter` in `BloomFilterAggregateQuerySuite ` failed in ansi mode due to `Numeric <=> Binary` is [not allowed in ansi mode](apache#30260), so the content of `exception.getMessage` is different from that of non-ans mode. This pr change the case to ensure that the error messages of `ansi` mode and `non-ansi` are consistent. Bug fix. No - Pass GA - Local Test **Before** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 23 seconds, 537 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` - Test that might_contain errors out non-constant Bloom filter *** FAILED *** "cannot resolve 'CAST(t.a AS BINARY)' due to data type mismatch: cannot cast bigint to binary with ANSI mode on. If you have to cast bigint to binary, you can set spark.sql.ansi.enabled as false. ; line 2 pos 21; 'Project [unresolvedalias('might_contain(cast(a#2424L as binary), cast(5 as bigint)), None)] +- SubqueryAlias t +- LocalRelation [a#2424L] " did not contain "The Bloom filter binary input to might_contain should be either a constant value or a scalar subquery expression" (BloomFilterAggregateQuerySuite.scala:171) ``` **After** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 26 seconds, 544 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 25 seconds, 289 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` Closes apache#35953 from LuciferYang/SPARK-32268-FOLLOWUP. Authored-by: yangjie01 <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 7165123) Signed-off-by: Yuming Wang <[email protected]> * [SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the InjectRuntimeFilter Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`. It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said. This pr fixes the issue. No, not released Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path. Closes apache#35998 from ulysses-you/SPARK-32268-FOllOWUP. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit c0c52dd) Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query. It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`: ```scala withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true", SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62" sql(query).explain() } ``` The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it. No, not released Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join. Closes apache#36047 from Flyangz/SPARK-32268-FOllOWUP. Authored-by: Yang Liu <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit c98725a) Signed-off-by: Yuming Wang <[email protected]> * [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry in the SparkSession This PR proposes: 1. Use the function registry in the Spark Session being used 2. Move function registration into `beforeAll` Registration of the function without `beforeAll` at `builtin` can affect other tests. See also https://lists.apache.org/thread/jp0ccqv10ht716g9xldm2ohdv3mpmmz1. No, test-only. Unittests fixed. Closes apache#36576 from HyukjinKwon/SPARK-32268-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit c5351f8) Signed-off-by: Hyukjin Kwon <[email protected]>
* [SPARK-39857][SQL] V2ExpressionBuilder uses the wrong LiteralValue data type for In predicate (#535) ### What changes were proposed in this pull request? When building V2 `In` Predicate in `V2ExpressionBuilder`, `InSet.dataType` (which is `BooleanType`) is used to build the `LiteralValue`, `InSet.child.dataType` should be used instead. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new test Closes apache#37271 from huaxingao/inset. Authored-by: huaxingao <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> Co-authored-by: huaxingao <[email protected]> * [SPARK-32268][SQL] Row-level Runtime Filtering * [SPARK-32268][SQL] Row-level Runtime Filtering This PR proposes row-level runtime filters in Spark to reduce intermediate data volume for operators like shuffle, join and aggregate, and hence improve performance. We propose two mechanisms to do this: semi-join filters or bloom filters, and both mechanisms are proposed to co-exist side-by-side behind feature configs. [Design Doc](https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit?usp=sharing) with more details. With Semi-Join, we see 9 queries improve for the TPC DS 3TB benchmark, and no regressions. With Bloom Filter, we see 10 queries improve for the TPC DS 3TB benchmark, and no regressions. No Added tests Closes apache#35789 from somani/rf. Lead-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 1f4e4c8) Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-32268][TESTS][FOLLOWUP] Fix `BloomFilterAggregateQuerySuite` failed in ansi mode `Test that might_contain errors out non-constant Bloom filter` in `BloomFilterAggregateQuerySuite ` failed in ansi mode due to `Numeric <=> Binary` is [not allowed in ansi mode](apache#30260), so the content of `exception.getMessage` is different from that of non-ans mode. This pr change the case to ensure that the error messages of `ansi` mode and `non-ansi` are consistent. Bug fix. No - Pass GA - Local Test **Before** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 23 seconds, 537 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` - Test that might_contain errors out non-constant Bloom filter *** FAILED *** "cannot resolve 'CAST(t.a AS BINARY)' due to data type mismatch: cannot cast bigint to binary with ANSI mode on. If you have to cast bigint to binary, you can set spark.sql.ansi.enabled as false. ; line 2 pos 21; 'Project [unresolvedalias('might_contain(cast(a#2424L as binary), cast(5 as bigint)), None)] +- SubqueryAlias t +- LocalRelation [a#2424L] " did not contain "The Bloom filter binary input to might_contain should be either a constant value or a scalar subquery expression" (BloomFilterAggregateQuerySuite.scala:171) ``` **After** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 26 seconds, 544 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 25 seconds, 289 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` Closes apache#35953 from LuciferYang/SPARK-32268-FOLLOWUP. Authored-by: yangjie01 <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 7165123) Signed-off-by: Yuming Wang <[email protected]> * [SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the InjectRuntimeFilter Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`. It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said. This pr fixes the issue. No, not released Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path. Closes apache#35998 from ulysses-you/SPARK-32268-FOllOWUP. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit c0c52dd) Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query. It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`: ```scala withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true", SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62" sql(query).explain() } ``` The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it. No, not released Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join. Closes apache#36047 from Flyangz/SPARK-32268-FOllOWUP. Authored-by: Yang Liu <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit c98725a) Signed-off-by: Yuming Wang <[email protected]> * [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry in the SparkSession This PR proposes: 1. Use the function registry in the Spark Session being used 2. Move function registration into `beforeAll` Registration of the function without `beforeAll` at `builtin` can affect other tests. See also https://lists.apache.org/thread/jp0ccqv10ht716g9xldm2ohdv3mpmmz1. No, test-only. Unittests fixed. Closes apache#36576 from HyukjinKwon/SPARK-32268-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit c5351f8) Signed-off-by: Hyukjin Kwon <[email protected]> * KE-29673 add segment prune function for bloom runtime filter fix min/max for UTF8String collection valid the runtime filter if need when broadcast join is valid * AL-6084 in Cast for method of canCast, when DecimalType cast to DoubleType add transformable logic (#542) * AL-6084 in Cast for method of canCast, when DecimalType cast DecimalType to DoubleType add suit logical Signed-off-by: Dongjoon Hyun <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> Co-authored-by: Zhixiong Chen <[email protected]> Co-authored-by: huaxingao <[email protected]> Co-authored-by: Bowen Song <[email protected]>
* [SPARK-32268][SQL] Row-level Runtime Filtering This PR proposes row-level runtime filters in Spark to reduce intermediate data volume for operators like shuffle, join and aggregate, and hence improve performance. We propose two mechanisms to do this: semi-join filters or bloom filters, and both mechanisms are proposed to co-exist side-by-side behind feature configs. [Design Doc](https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit?usp=sharing) with more details. With Semi-Join, we see 9 queries improve for the TPC DS 3TB benchmark, and no regressions. With Bloom Filter, we see 10 queries improve for the TPC DS 3TB benchmark, and no regressions. No Added tests Closes apache#35789 from somani/rf. Lead-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 1f4e4c8) Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-32268][TESTS][FOLLOWUP] Fix `BloomFilterAggregateQuerySuite` failed in ansi mode `Test that might_contain errors out non-constant Bloom filter` in `BloomFilterAggregateQuerySuite ` failed in ansi mode due to `Numeric <=> Binary` is [not allowed in ansi mode](apache#30260), so the content of `exception.getMessage` is different from that of non-ans mode. This pr change the case to ensure that the error messages of `ansi` mode and `non-ansi` are consistent. Bug fix. No - Pass GA - Local Test **Before** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 23 seconds, 537 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` - Test that might_contain errors out non-constant Bloom filter *** FAILED *** "cannot resolve 'CAST(t.a AS BINARY)' due to data type mismatch: cannot cast bigint to binary with ANSI mode on. If you have to cast bigint to binary, you can set spark.sql.ansi.enabled as false. ; line 2 pos 21; 'Project [unresolvedalias('might_contain(cast(a#2424L as binary), cast(5 as bigint)), None)] +- SubqueryAlias t +- LocalRelation [a#2424L] " did not contain "The Bloom filter binary input to might_contain should be either a constant value or a scalar subquery expression" (BloomFilterAggregateQuerySuite.scala:171) ``` **After** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 26 seconds, 544 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 25 seconds, 289 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` Closes apache#35953 from LuciferYang/SPARK-32268-FOLLOWUP. Authored-by: yangjie01 <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 7165123) Signed-off-by: Yuming Wang <[email protected]> * [SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the InjectRuntimeFilter Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`. It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said. This pr fixes the issue. No, not released Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path. Closes apache#35998 from ulysses-you/SPARK-32268-FOllOWUP. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit c0c52dd) Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query. It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`: ```scala withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true", SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62" sql(query).explain() } ``` The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it. No, not released Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join. Closes apache#36047 from Flyangz/SPARK-32268-FOllOWUP. Authored-by: Yang Liu <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit c98725a) Signed-off-by: Yuming Wang <[email protected]> * [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry in the SparkSession This PR proposes: 1. Use the function registry in the Spark Session being used 2. Move function registration into `beforeAll` Registration of the function without `beforeAll` at `builtin` can affect other tests. See also https://lists.apache.org/thread/jp0ccqv10ht716g9xldm2ohdv3mpmmz1. No, test-only. Unittests fixed. Closes apache#36576 from HyukjinKwon/SPARK-32268-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit c5351f8) Signed-off-by: Hyukjin Kwon <[email protected]>
Summary: This function is used in Spark Runtime Filters: apache/spark#35789 https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit#heading=h.4v65wq7vzy4q BloomFilter implementation in Velox is different from Spark, hence, serialized BloomFilter is different. Velox has memory limit for contiguous memory buffer, hence BloomFilter capacity is less than in Spark when numBits is large. See #4713 (comment) Spark allows for changing the defaults while Velox does not. See also #3342 Fixes #3694 Pull Request resolved: #4028 Reviewed By: Yuhta Differential Revision: D46352733 Pulled By: mbasmanova fbshipit-source-id: 1c8a0b489a736e627ba2c0869688fc0cf46279bb

What changes were proposed in this pull request?
This PR proposes row-level runtime filters in Spark to reduce intermediate data volume for operators like shuffle, join and aggregate, and hence improve performance. We propose two mechanisms to do this: semi-join filters or bloom filters, and both mechanisms are proposed to co-exist side-by-side behind feature configs.
Design Doc with more details.
Why are the changes needed?
With Semi-Join, we see 9 queries improve for the TPC DS 3TB benchmark, and no regressions.
With Bloom Filter, we see 10 queries improve for the TPC DS 3TB benchmark, and no regressions.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added tests