-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the InjectRuntimeFilter #35998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Contributor
Author
|
cc @cloud-fan @wangyum @somani @sigmod can you double check if I'm missing something ? |
sigmod
approved these changes
Mar 29, 2022
Contributor
|
thanks, merging to master/3.3! |
cloud-fan
pushed a commit
that referenced
this pull request
Mar 29, 2022
…njectRuntimeFilter ### What changes were proposed in this pull request? Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`. ### Why are the changes needed? It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said. This pr fixes the issue. ### Does this PR introduce _any_ user-facing change? No, not released ### How was this patch tested? Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path. Closes #35998 from ulysses-you/SPARK-32268-FOllOWUP. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit c0c52dd) Signed-off-by: Wenchen Fan <[email protected]>
Contributor
Author
|
thank you! |
Contributor
|
Thank you! |
songzhxlh-max
pushed a commit
to songzhxlh-max/spark
that referenced
this pull request
Oct 12, 2022
* [SPARK-32268][SQL] Row-level Runtime Filtering This PR proposes row-level runtime filters in Spark to reduce intermediate data volume for operators like shuffle, join and aggregate, and hence improve performance. We propose two mechanisms to do this: semi-join filters or bloom filters, and both mechanisms are proposed to co-exist side-by-side behind feature configs. [Design Doc](https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit?usp=sharing) with more details. With Semi-Join, we see 9 queries improve for the TPC DS 3TB benchmark, and no regressions. With Bloom Filter, we see 10 queries improve for the TPC DS 3TB benchmark, and no regressions. No Added tests Closes apache#35789 from somani/rf. Lead-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 1f4e4c8) Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-32268][TESTS][FOLLOWUP] Fix `BloomFilterAggregateQuerySuite` failed in ansi mode `Test that might_contain errors out non-constant Bloom filter` in `BloomFilterAggregateQuerySuite ` failed in ansi mode due to `Numeric <=> Binary` is [not allowed in ansi mode](apache#30260), so the content of `exception.getMessage` is different from that of non-ans mode. This pr change the case to ensure that the error messages of `ansi` mode and `non-ansi` are consistent. Bug fix. No - Pass GA - Local Test **Before** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 23 seconds, 537 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` - Test that might_contain errors out non-constant Bloom filter *** FAILED *** "cannot resolve 'CAST(t.a AS BINARY)' due to data type mismatch: cannot cast bigint to binary with ANSI mode on. If you have to cast bigint to binary, you can set spark.sql.ansi.enabled as false. ; line 2 pos 21; 'Project [unresolvedalias('might_contain(cast(a#2424L as binary), cast(5 as bigint)), None)] +- SubqueryAlias t +- LocalRelation [a#2424L] " did not contain "The Bloom filter binary input to might_contain should be either a constant value or a scalar subquery expression" (BloomFilterAggregateQuerySuite.scala:171) ``` **After** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 26 seconds, 544 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 25 seconds, 289 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` Closes apache#35953 from LuciferYang/SPARK-32268-FOLLOWUP. Authored-by: yangjie01 <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 7165123) Signed-off-by: Yuming Wang <[email protected]> * [SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the InjectRuntimeFilter Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`. It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said. This pr fixes the issue. No, not released Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path. Closes apache#35998 from ulysses-you/SPARK-32268-FOllOWUP. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit c0c52dd) Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query. It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`: ```scala withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true", SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62" sql(query).explain() } ``` The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it. No, not released Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join. Closes apache#36047 from Flyangz/SPARK-32268-FOllOWUP. Authored-by: Yang Liu <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit c98725a) Signed-off-by: Yuming Wang <[email protected]> * [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry in the SparkSession This PR proposes: 1. Use the function registry in the Spark Session being used 2. Move function registration into `beforeAll` Registration of the function without `beforeAll` at `builtin` can affect other tests. See also https://lists.apache.org/thread/jp0ccqv10ht716g9xldm2ohdv3mpmmz1. No, test-only. Unittests fixed. Closes apache#36576 from HyukjinKwon/SPARK-32268-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit c5351f8) Signed-off-by: Hyukjin Kwon <[email protected]>
songzhxlh-max
added a commit
to Kyligence/spark
that referenced
this pull request
Oct 13, 2022
* [SPARK-39857][SQL] V2ExpressionBuilder uses the wrong LiteralValue data type for In predicate (#535) ### What changes were proposed in this pull request? When building V2 `In` Predicate in `V2ExpressionBuilder`, `InSet.dataType` (which is `BooleanType`) is used to build the `LiteralValue`, `InSet.child.dataType` should be used instead. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new test Closes apache#37271 from huaxingao/inset. Authored-by: huaxingao <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> Co-authored-by: huaxingao <[email protected]> * [SPARK-32268][SQL] Row-level Runtime Filtering * [SPARK-32268][SQL] Row-level Runtime Filtering This PR proposes row-level runtime filters in Spark to reduce intermediate data volume for operators like shuffle, join and aggregate, and hence improve performance. We propose two mechanisms to do this: semi-join filters or bloom filters, and both mechanisms are proposed to co-exist side-by-side behind feature configs. [Design Doc](https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit?usp=sharing) with more details. With Semi-Join, we see 9 queries improve for the TPC DS 3TB benchmark, and no regressions. With Bloom Filter, we see 10 queries improve for the TPC DS 3TB benchmark, and no regressions. No Added tests Closes apache#35789 from somani/rf. Lead-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 1f4e4c8) Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-32268][TESTS][FOLLOWUP] Fix `BloomFilterAggregateQuerySuite` failed in ansi mode `Test that might_contain errors out non-constant Bloom filter` in `BloomFilterAggregateQuerySuite ` failed in ansi mode due to `Numeric <=> Binary` is [not allowed in ansi mode](apache#30260), so the content of `exception.getMessage` is different from that of non-ans mode. This pr change the case to ensure that the error messages of `ansi` mode and `non-ansi` are consistent. Bug fix. No - Pass GA - Local Test **Before** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 23 seconds, 537 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` - Test that might_contain errors out non-constant Bloom filter *** FAILED *** "cannot resolve 'CAST(t.a AS BINARY)' due to data type mismatch: cannot cast bigint to binary with ANSI mode on. If you have to cast bigint to binary, you can set spark.sql.ansi.enabled as false. ; line 2 pos 21; 'Project [unresolvedalias('might_contain(cast(a#2424L as binary), cast(5 as bigint)), None)] +- SubqueryAlias t +- LocalRelation [a#2424L] " did not contain "The Bloom filter binary input to might_contain should be either a constant value or a scalar subquery expression" (BloomFilterAggregateQuerySuite.scala:171) ``` **After** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 26 seconds, 544 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 25 seconds, 289 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` Closes apache#35953 from LuciferYang/SPARK-32268-FOLLOWUP. Authored-by: yangjie01 <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 7165123) Signed-off-by: Yuming Wang <[email protected]> * [SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the InjectRuntimeFilter Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`. It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said. This pr fixes the issue. No, not released Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path. Closes apache#35998 from ulysses-you/SPARK-32268-FOllOWUP. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit c0c52dd) Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query. It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`: ```scala withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true", SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62" sql(query).explain() } ``` The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it. No, not released Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join. Closes apache#36047 from Flyangz/SPARK-32268-FOllOWUP. Authored-by: Yang Liu <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit c98725a) Signed-off-by: Yuming Wang <[email protected]> * [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry in the SparkSession This PR proposes: 1. Use the function registry in the Spark Session being used 2. Move function registration into `beforeAll` Registration of the function without `beforeAll` at `builtin` can affect other tests. See also https://lists.apache.org/thread/jp0ccqv10ht716g9xldm2ohdv3mpmmz1. No, test-only. Unittests fixed. Closes apache#36576 from HyukjinKwon/SPARK-32268-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit c5351f8) Signed-off-by: Hyukjin Kwon <[email protected]> * KE-29673 add segment prune function for bloom runtime filter fix min/max for UTF8String collection valid the runtime filter if need when broadcast join is valid * AL-6084 in Cast for method of canCast, when DecimalType cast to DoubleType add transformable logic (#542) * AL-6084 in Cast for method of canCast, when DecimalType cast DecimalType to DoubleType add suit logical Signed-off-by: Dongjoon Hyun <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> Co-authored-by: Zhixiong Chen <[email protected]> Co-authored-by: huaxingao <[email protected]> Co-authored-by: Bowen Song <[email protected]>
leejaywei
pushed a commit
to Kyligence/spark
that referenced
this pull request
Oct 18, 2022
* [SPARK-32268][SQL] Row-level Runtime Filtering This PR proposes row-level runtime filters in Spark to reduce intermediate data volume for operators like shuffle, join and aggregate, and hence improve performance. We propose two mechanisms to do this: semi-join filters or bloom filters, and both mechanisms are proposed to co-exist side-by-side behind feature configs. [Design Doc](https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit?usp=sharing) with more details. With Semi-Join, we see 9 queries improve for the TPC DS 3TB benchmark, and no regressions. With Bloom Filter, we see 10 queries improve for the TPC DS 3TB benchmark, and no regressions. No Added tests Closes apache#35789 from somani/rf. Lead-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Abhishek Somani <[email protected]> Co-authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 1f4e4c8) Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-32268][TESTS][FOLLOWUP] Fix `BloomFilterAggregateQuerySuite` failed in ansi mode `Test that might_contain errors out non-constant Bloom filter` in `BloomFilterAggregateQuerySuite ` failed in ansi mode due to `Numeric <=> Binary` is [not allowed in ansi mode](apache#30260), so the content of `exception.getMessage` is different from that of non-ans mode. This pr change the case to ensure that the error messages of `ansi` mode and `non-ansi` are consistent. Bug fix. No - Pass GA - Local Test **Before** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 23 seconds, 537 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` - Test that might_contain errors out non-constant Bloom filter *** FAILED *** "cannot resolve 'CAST(t.a AS BINARY)' due to data type mismatch: cannot cast bigint to binary with ANSI mode on. If you have to cast bigint to binary, you can set spark.sql.ansi.enabled as false. ; line 2 pos 21; 'Project [unresolvedalias('might_contain(cast(a#2424L as binary), cast(5 as bigint)), None)] +- SubqueryAlias t +- LocalRelation [a#2424L] " did not contain "The Bloom filter binary input to might_contain should be either a constant value or a scalar subquery expression" (BloomFilterAggregateQuerySuite.scala:171) ``` **After** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 26 seconds, 544 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 25 seconds, 289 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` Closes apache#35953 from LuciferYang/SPARK-32268-FOLLOWUP. Authored-by: yangjie01 <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 7165123) Signed-off-by: Yuming Wang <[email protected]> * [SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the InjectRuntimeFilter Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`. It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said. This pr fixes the issue. No, not released Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path. Closes apache#35998 from ulysses-you/SPARK-32268-FOllOWUP. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit c0c52dd) Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query. It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`: ```scala withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true", SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62" sql(query).explain() } ``` The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it. No, not released Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join. Closes apache#36047 from Flyangz/SPARK-32268-FOllOWUP. Authored-by: Yang Liu <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit c98725a) Signed-off-by: Yuming Wang <[email protected]> * [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry in the SparkSession This PR proposes: 1. Use the function registry in the Spark Session being used 2. Move function registration into `beforeAll` Registration of the function without `beforeAll` at `builtin` can affect other tests. See also https://lists.apache.org/thread/jp0ccqv10ht716g9xldm2ohdv3mpmmz1. No, test-only. Unittests fixed. Closes apache#36576 from HyukjinKwon/SPARK-32268-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit c5351f8) Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Add
RewritePredicateSubquerybelow theInjectRuntimeFilterinSparkOptimizer.Why are the changes needed?
It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said.
This pr fixes the issue.
Does this PR introduce any user-facing change?
No, not released
How was this patch tested?
Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path.