Skip to content

Commit 25e4ec2

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7442] Implement runtime/bloom filter (apache#311)
1 parent 89ad2d6 commit 25e4ec2

File tree

19 files changed

+1982
-45
lines changed

19 files changed

+1982
-45
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.expressions
2020
import org.apache.spark.sql.catalyst.InternalRow
2121
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
2222
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan}
23+
import org.apache.spark.sql.catalyst.trees.{TreeNodeTag, UnaryLike}
2324
import org.apache.spark.sql.catalyst.trees.TreePattern._
24-
import org.apache.spark.sql.catalyst.trees.UnaryLike
2525

2626
trait DynamicPruning extends Predicate
2727

@@ -89,6 +89,10 @@ case class DynamicPruningSubquery(
8989
copy(pruningKey = newChild)
9090
}
9191

92+
object DynamicPruningSubquery {
93+
private[spark] val IS_PRUNING_DATA_TAG = TreeNodeTag[Boolean]("is_pruning_data")
94+
}
95+
9296
/**
9397
* Marker for a planned [[DynamicPruning]] expression.
9498
* The expression is created during planning, and it defers to its child for evaluation.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,8 @@ object Murmur3HashFunction extends InterpretedHashFunction {
658658
case class XxHash64(children: Seq[Expression], seed: Long) extends HashExpression[Long] {
659659
def this(arguments: Seq[Expression]) = this(arguments, 42L)
660660

661+
def this(argument: Expression) = this(Seq(argument))
662+
661663
override def dataType: DataType = LongType
662664

663665
override def prettyName: String = "xxhash64"

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ object JoinStrategyHint {
127127
BROADCAST,
128128
SHUFFLE_MERGE,
129129
SHUFFLE_HASH,
130-
SHUFFLE_REPLICATE_NL)
130+
SHUFFLE_REPLICATE_NL,
131+
BLOOM_FILTER_JOIN)
131132
}
132133

133134
/**
@@ -197,6 +198,15 @@ case object NO_BROADCAST_AND_REPLICATION extends JoinStrategyHint {
197198
override def hintAliases: Set[String] = Set.empty
198199
}
199200

201+
/**
202+
* The hint for bloom filter join.
203+
*/
204+
case object BLOOM_FILTER_JOIN extends JoinStrategyHint {
205+
override def displayName: String = "bloom_filter_join"
206+
override def hintAliases: Set[String] = Set(
207+
"BLOOM_FILTER_JOIN")
208+
}
209+
200210
/**
201211
* The callback for implementing customized strategies of handling hint errors.
202212
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,51 @@ object SQLConf {
460460
.booleanConf
461461
.createWithDefault(true)
462462

463+
val RUNTIME_FILTER_PRUNING_ENABLED =
464+
buildConf("spark.sql.optimizer.runtimeFilterPruning.enabled")
465+
.doc("When true, we will generate predicate when it's used as join key")
466+
.version("3.5.0")
467+
.booleanConf
468+
.createWithDefault(false)
469+
470+
val RUNTIME_FILTER_PRUNING_FILTERING_ROW_COUNT =
471+
buildConf("spark.sql.optimizer.runtimeFilterPruning.filteringSideThreshold")
472+
.internal()
473+
.doc("We assume it has partition pruning filter if it has no selective predicate and " +
474+
"the maximum number of rows less than this threshold.")
475+
.version("3.5.0")
476+
.intConf
477+
.checkValue(threshold => threshold >= 0, "The maximum row count must be non-negative.")
478+
.createWithDefault(0)
479+
480+
val RUNTIME_FILTER_PRUNING_MAX_BLOOM_FILTER_ENTRIES =
481+
buildConf("spark.sql.optimizer.runtimeFilterPruning.maxBloomFilterEntries")
482+
.doc("The maximum number of bloom filter entries allowed when building dynamic bloom filter" +
483+
"join pruning.")
484+
.version("3.5.0")
485+
.longConf
486+
.checkValue(_ > 0, "the value of max bloom filter entries must be greater than 0")
487+
.createWithDefault(100000000L)
488+
489+
val RUNTIME_FILTER_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO =
490+
buildConf("spark.sql.optimizer.runtimeFilterPruning.pruningSideExtraFilterRatio")
491+
.internal()
492+
.doc("When filtering side doesn't support broadcast by join type, and doing DPP means " +
493+
"running an extra query that may have significant overhead. This config will be used " +
494+
"as the extra filter ratio for computing the data size of the pruning side after DPP, " +
495+
"in order to evaluate if it is worth adding an extra subquery as the pruning filter.")
496+
.version("3.5.0")
497+
.doubleConf
498+
.checkValue(ratio => ratio > 0.0 && ratio <= 1.0, "The ratio value must be in (0.0, 1.0].")
499+
.createWithDefault(0.04)
500+
501+
val DYNAMIC_PRUNING_MAX_INSET_NUM =
502+
buildConf("spark.sql.optimizer.dynamicPruning.maxInsetNum")
503+
.doc("We will fall back to true if InSet's size exceeds this value when pruning the data.")
504+
.version("3.0.0")
505+
.intConf
506+
.createWithDefault(1000000)
507+
463508
val PLANNED_WRITE_ENABLED = buildConf("spark.sql.optimizer.plannedWrite.enabled")
464509
.internal()
465510
.doc("When set to true, Spark optimizer will add logical sort operators to V1 write commands " +
@@ -5227,9 +5272,22 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
52275272
def runtimeFilterCreationSideThreshold: Long =
52285273
getConf(RUNTIME_BLOOM_FILTER_CREATION_SIDE_THRESHOLD)
52295274

5275+
def runtimeFilterApplicationSideThreshold: Long =
5276+
getConf(RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD)
5277+
52305278
def runtimeRowLevelOperationGroupFilterEnabled: Boolean =
52315279
getConf(RUNTIME_ROW_LEVEL_OPERATION_GROUP_FILTER_ENABLED)
52325280

5281+
def runtimeFilterPruningEnabled: Boolean = getConf(RUNTIME_FILTER_PRUNING_ENABLED)
5282+
5283+
def runtimeFilterPruningPruningSideExtraFilterRatio: Double =
5284+
getConf(RUNTIME_FILTER_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO)
5285+
5286+
def runtimeFilterPruningMaxBloomFilterEntries: Long =
5287+
getConf(RUNTIME_FILTER_PRUNING_MAX_BLOOM_FILTER_ENTRIES)
5288+
5289+
def dynamicPruningMaxInsetNum: Int = getConf(DYNAMIC_PRUNING_MAX_INSET_NUM)
5290+
52335291
def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)
52345292

52355293
def isStateSchemaCheckEnabled: Boolean = getConf(STATE_SCHEMA_CHECK_ENABLED)

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,18 @@ object QueryExecution {
583583
prepareForExecution(preparationRules, sparkPlan.clone())
584584
}
585585

586+
/**
587+
* Prepare the [[SparkPlan]] for execution using exists adaptive execution context.
588+
* This method is only called by [[PlanAdaptiveDynamicPruningFilters]].
589+
*/
590+
def prepareExecutedPlan(
591+
session: SparkSession,
592+
sparkPlan: SparkPlan,
593+
context: AdaptiveExecutionContext): SparkPlan = {
594+
val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true)
595+
prepareForExecution(preparationRules, sparkPlan.clone())
596+
}
597+
586598
/**
587599
* Converts asserts, null pointer exceptions to internal errors.
588600
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
2525
import org.apache.spark.sql.connector.catalog.CatalogManager
2626
import org.apache.spark.sql.execution.datasources.{CleanupLazyFileIndex, PruneFileSourcePartitions, SchemaPruning, V1Writes}
2727
import org.apache.spark.sql.execution.datasources.v2.{GroupBasedRowLevelOperationScanPlanning, OptimizeMetadataOnlyDeleteFromTable, V2ScanPartitioningAndOrdering, V2ScanRelationPushDown, V2Writes}
28-
import org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning, RowLevelOperationRuntimeGroupFiltering}
28+
import org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning, RowLevelOperationRuntimeGroupFiltering, RuntimeFilterPruning}
2929
import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs, ExtractPythonUDTFs}
3030

3131
class SparkOptimizer(
@@ -47,16 +47,24 @@ class SparkOptimizer(
4747
override def preCBORules: Seq[Rule[LogicalPlan]] =
4848
OptimizeMetadataOnlyDeleteFromTable :: Nil
4949

50-
override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
51-
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
52-
Batch("PartitionPruning", Once,
53-
PartitionPruning,
54-
// We can't run `OptimizeSubqueries` in this batch, as it will optimize the subqueries
55-
// twice which may break some optimizer rules that can only be applied once. The rule below
56-
// only invokes `OptimizeSubqueries` to optimize newly added subqueries.
57-
new RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+
58-
Batch("InjectRuntimeFilter", FixedPoint(1),
59-
InjectRuntimeFilter) :+
50+
override def defaultBatches: Seq[Batch] = ((preOptimizationBatches ++ super.defaultBatches :+
51+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog))) ++
52+
{
53+
if (conf.runtimeFilterPruningEnabled) {
54+
Seq.empty[Batch] :+ Batch("Runtime Filter Pruning", Once,
55+
RuntimeFilterPruning,
56+
new RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries))
57+
} else {
58+
Seq.empty[Batch] :+ Batch("PartitionPruning", Once,
59+
PartitionPruning,
60+
// We can't run `OptimizeSubqueries` in this batch, as it will optimize the subqueries
61+
// twice which may break some optimizer rules that can only be applied once. The rule
62+
// below only invokes `OptimizeSubqueries` to optimize newly added subqueries.
63+
new RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+
64+
Batch("InjectRuntimeFilter", FixedPoint(1),
65+
InjectRuntimeFilter)
66+
}
67+
} :+
6068
Batch("MergeScalarSubqueries", Once,
6169
MergeScalarSubqueries,
6270
RewriteDistinctAggregates) :+

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class AQEOptimizer(conf: SQLConf, extendedRuntimeOptimizerRules: Seq[Rule[Logica
4141
ConvertToLocalRelation,
4242
UpdateAttributeNullability),
4343
Batch("Dynamic Join Selection", Once, DynamicJoinSelection),
44+
Batch("Optimize Bloom Filter Join", Once, OptimizeBuildBloomFilter),
4445
Batch("Eliminate Limits", fixedPoint, EliminateLimits),
4546
Batch("Optimize One Row Plan", fixedPoint, OptimizeOneRowPlan)) :+
4647
Batch("User Provided Runtime Optimizers", fixedPoint, extendedRuntimeOptimizerRules: _*)
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.adaptive
19+
20+
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Multiply}
21+
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, BloomFilterAggregate}
22+
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Repartition}
23+
import org.apache.spark.sql.catalyst.rules.Rule
24+
import org.apache.spark.sql.execution.CoalesceExec
25+
import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
26+
import org.apache.spark.sql.execution.dynamicpruning.DynamicPruningHelper
27+
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
28+
import org.apache.spark.sql.internal.SQLConf
29+
import org.apache.spark.util.sketch.BloomFilter
30+
31+
/**
32+
* This optimization rule find the build bloom filter expression and
33+
* set expectedNumItems from LogicalQueryStage which is more accurate.
34+
*/
35+
object OptimizeBuildBloomFilter extends Rule[LogicalPlan] with DynamicPruningHelper {
36+
def apply(plan: LogicalPlan): LogicalPlan = {
37+
if (!conf.runtimeFilterBloomFilterEnabled) {
38+
return plan
39+
}
40+
41+
lazy val defaultNumItems =
42+
Literal(conf.getConf(SQLConf.RUNTIME_BLOOM_FILTER_EXPECTED_NUM_ITEMS))
43+
lazy val defaultNumBits =
44+
Literal(conf.getConf(SQLConf.RUNTIME_BLOOM_FILTER_NUM_BITS))
45+
46+
plan match {
47+
case p @ LogicalQueryStage(a @ Aggregate(Nil, Seq(Alias(AggregateExpression(
48+
bf: BloomFilterAggregate, _, _, _, _), _)), child: Repartition),
49+
ha @ ObjectHashAggregateExec(_, _, _, _, _, _, _, _,
50+
ShuffleExchangeExec(_, ObjectHashAggregateExec(_, _, _, _, _, _, _, _,
51+
CoalesceExec(_, s: ShuffleQueryStageExec)), _, _)))
52+
if s.isMaterialized && s.getRuntimeStatistics.rowCount.nonEmpty &&
53+
(bf.numBitsExpression.isInstanceOf[Multiply] ||
54+
(bf.numBitsExpression.semanticEquals(defaultNumBits) &&
55+
bf.estimatedNumItemsExpression.semanticEquals(defaultNumItems))) =>
56+
val expectedNumItems = math.max(s.getRuntimeStatistics.rowCount.get.longValue(), 1L)
57+
val fpp = expectedNumItems / 3000000000L.toDouble
58+
val numBits = BloomFilter.optimalNumOfBits(expectedNumItems, fpp)
59+
val newBuildBloomFilter = bf.copy(
60+
estimatedNumItemsExpression = Literal(expectedNumItems),
61+
numBitsExpression = Literal(numBits))
62+
63+
val newLogicalPlan = a.transformExpressions {
64+
case e: AggregateExpression if e.aggregateFunction.semanticEquals(bf) =>
65+
e.copy(aggregateFunction = newBuildBloomFilter)
66+
}
67+
68+
val newPhysicalPlan = ha.transformDown {
69+
case a: ObjectHashAggregateExec =>
70+
a.transformExpressions {
71+
case e: AggregateExpression if e.aggregateFunction.semanticEquals(bf) =>
72+
e.copy(aggregateFunction = newBuildBloomFilter)
73+
}
74+
case c: CoalesceExec =>
75+
c.copy(numPartitions = coalesceBuildBloomFilterNum(child))
76+
}
77+
78+
p.copy(logicalPlan = newLogicalPlan, physicalPlan = newPhysicalPlan)
79+
case _ => plan
80+
}
81+
}
82+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,35 @@
1717

1818
package org.apache.spark.sql.execution.adaptive
1919

20-
import org.apache.spark.sql.catalyst.expressions.{Alias, BindReferences, DynamicPruningExpression, Literal}
21-
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
20+
import org.apache.spark.sql.catalyst.expressions.{Alias, BindReferences, BloomFilterMightContain, DynamicPruningExpression, DynamicPruningSubquery, Literal, XxHash64}
21+
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, JoinSelectionHelper}
2222
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
2323
import org.apache.spark.sql.catalyst.rules.Rule
2424
import org.apache.spark.sql.catalyst.trees.TreePattern._
2525
import org.apache.spark.sql.execution._
26-
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
26+
import org.apache.spark.sql.execution.{ScalarSubquery => ScalarSubqueryExec}
27+
import org.apache.spark.sql.execution.dynamicpruning.DynamicPruningHelper
28+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
2729
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode, HashJoin}
2830

2931
/**
3032
* A rule to insert dynamic pruning predicates in order to reuse the results of broadcast.
3133
*/
32-
case class PlanAdaptiveDynamicPruningFilters(
33-
rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper {
34+
case class PlanAdaptiveDynamicPruningFilters(rootPlan: AdaptiveSparkPlanExec)
35+
extends Rule[SparkPlan]
36+
with AdaptiveSparkPlanHelper
37+
with JoinSelectionHelper
38+
with DynamicPruningHelper {
3439
def apply(plan: SparkPlan): SparkPlan = {
3540
if (!conf.dynamicPartitionPruningEnabled) {
3641
return plan
3742
}
3843

3944
plan.transformAllExpressionsWithPruning(
4045
_.containsAllPatterns(DYNAMIC_PRUNING_EXPRESSION, IN_SUBQUERY_EXEC)) {
41-
case DynamicPruningExpression(InSubqueryExec(
46+
case e @ DynamicPruningExpression(InSubqueryExec(
4247
value, SubqueryAdaptiveBroadcastExec(name, index, onlyInBroadcast, buildPlan, buildKeys,
43-
adaptivePlan: AdaptiveSparkPlanExec), exprId, _, _, _)) =>
48+
adaptivePlan: AdaptiveSparkPlanExec), exprId, _, _, _, _)) =>
4449
val packedKeys = BindReferences.bindReferences(
4550
HashJoin.rewriteKeyExpr(buildKeys), adaptivePlan.executedPlan.output)
4651
val mode = HashedRelationBroadcastMode(packedKeys)
@@ -56,16 +61,19 @@ case class PlanAdaptiveDynamicPruningFilters(
5661
case _ => false
5762
}.isDefined
5863

64+
val shouldBroadcast =
65+
e.getTagValue(DynamicPruningSubquery.IS_PRUNING_DATA_TAG).getOrElse(false)
66+
5967
if (canReuseExchange) {
6068
exchange.setLogicalLink(adaptivePlan.executedPlan.logicalLink.get)
6169
val newAdaptivePlan = adaptivePlan.copy(inputPlan = exchange)
6270

6371
val broadcastValues = SubqueryBroadcastExec(
6472
name, index, buildKeys, newAdaptivePlan)
65-
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
73+
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId, shouldBroadcast))
6674
} else if (onlyInBroadcast) {
6775
DynamicPruningExpression(Literal.TrueLiteral)
68-
} else {
76+
} else if (canBroadcastBySize(buildPlan, conf)) {
6977
// we need to apply an aggregate on the buildPlan in order to be column pruned
7078
val alias = Alias(buildKeys(index), buildKeys(index).toString)()
7179
val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan)
@@ -77,6 +85,29 @@ case class PlanAdaptiveDynamicPruningFilters(
7785
val newAdaptivePlan = sparkPlan.asInstanceOf[AdaptiveSparkPlanExec]
7886
val values = SubqueryExec(name, newAdaptivePlan)
7987
DynamicPruningExpression(InSubqueryExec(value, values, exprId))
88+
} else if (!conf.exchangeReuseEnabled) {
89+
DynamicPruningExpression(Literal.TrueLiteral)
90+
} else {
91+
val childPlan = adaptivePlan.executedPlan
92+
val session = adaptivePlan.context.session
93+
val reusedShuffleExchange = collectFirst(rootPlan.currentPhysicalPlan) {
94+
case s: ShuffleExchangeExec if s.child.sameResult(childPlan) =>
95+
s
96+
case s @ ShuffleExchangeExec(_, _: WholeStageCodegenExec, _, _)
97+
if s.child.sameResult(QueryExecution.prepareExecutedPlan(session, childPlan)) =>
98+
s.copy(child = childPlan)
99+
}
100+
101+
val bfLogicalPlan = planBloomFilterLogicalPlan(buildPlan, buildKeys, index)
102+
val bfPhysicalPlan =
103+
planBloomFilterPhysicalPlan(bfLogicalPlan, reusedShuffleExchange).map { plan =>
104+
val executedPlan = QueryExecution.prepareExecutedPlan(
105+
session, plan, adaptivePlan.context)
106+
val scalarSubquery = ScalarSubqueryExec(SubqueryExec.createForScalarSubquery(
107+
s"scalar-subquery#${exprId.id}", executedPlan), exprId)
108+
BloomFilterMightContain(scalarSubquery, new XxHash64(value))
109+
}.getOrElse(Literal.TrueLiteral)
110+
DynamicPruningExpression(bfPhysicalPlan)
80111
}
81112
}
82113
}

0 commit comments

Comments
 (0)