Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ case class BloomFilterMightContain(
TypeCheckResult.TypeCheckSuccess
case _ =>
TypeCheckResult.TypeCheckFailure(s"The Bloom filter binary input to $prettyName " +
s"should be either a constant value or a scalar subquery expression")
"should be either a constant value or a scalar subquery expression")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ case class BloomFilterAggregate(

override def third: Expression = numBitsExpression

override protected def withNewChildrenInternal(newChild: Expression,
newEstimatedNumItemsExpression: Expression, newNumBitsExpression: Expression)
: BloomFilterAggregate = {
override protected def withNewChildrenInternal(
newChild: Expression,
newEstimatedNumItemsExpression: Expression,
newNumBitsExpression: Expression): BloomFilterAggregate = {
copy(child = newChild, estimatedNumItemsExpression = newEstimatedNumItemsExpression,
numBitsExpression = newNumBitsExpression)
}
Expand Down Expand Up @@ -176,7 +177,7 @@ object BloomFilterAggregate {
class BloomFilterSerDe {

final def serialize(obj: BloomFilter): Array[Byte] = {
val size = obj.bitSize()/8
val size = obj.bitSize() / 8
require(size <= Integer.MAX_VALUE, s"actual number of bits is too large $size")
val out = new ByteArrayOutputStream(size.intValue())
obj.writeTo(out)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.trees.TernaryLike
import org.apache.spark.sql.catalyst.trees.TreePattern.{INVOKE, _}
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J

// Wraps `expr` with a hash function if its byte size is larger than an integer.
private def mayWrapWithHash(expr: Expression): Expression = {
if (expr.dataType.defaultSize > IntegerType.defaultSize) {
if (expr.dataType.defaultSize > IntegerType.defaultSize) {
new Murmur3Hash(Seq(expr))
} else {
expr
Expand All @@ -47,8 +47,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
filterApplicationSideExp: Expression,
filterApplicationSidePlan: LogicalPlan,
filterCreationSideExp: Expression,
filterCreationSidePlan: LogicalPlan
): LogicalPlan = {
filterCreationSidePlan: LogicalPlan): LogicalPlan = {
require(conf.runtimeFilterBloomFilterEnabled || conf.runtimeFilterSemiJoinReductionEnabled)
if (conf.runtimeFilterBloomFilterEnabled) {
injectBloomFilter(
Expand Down Expand Up @@ -98,8 +97,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
filterApplicationSideExp: Expression,
filterApplicationSidePlan: LogicalPlan,
filterCreationSideExp: Expression,
filterCreationSidePlan: LogicalPlan
): LogicalPlan = {
filterCreationSidePlan: LogicalPlan): LogicalPlan = {
require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType)
val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp)
val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,15 @@ object SQLConf {
val RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED =
buildConf("spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled")
.doc("When true and if one side of a shuffle join has a selective predicate, we attempt " +
"to insert a semi join in the other side to reduce the amount of shuffle data")
"to insert a semi join in the other side to reduce the amount of shuffle data.")
.version("3.3.0")
.booleanConf
.createWithDefault(false)

val RUNTIME_FILTER_NUMBER_THRESHOLD =
buildConf("spark.sql.optimizer.runtimeFilter.number.threshold")
.doc("The total number of injected runtime filters (non-DPP) for a single " +
"query. This is to prevent driver OOMs with too many Bloom filters")
"query. This is to prevent driver OOMs with too many Bloom filters.")
.version("3.3.0")
.intConf
.checkValue(threshold => threshold >= 0, "The threshold should be >= 0")
Expand All @@ -361,15 +361,15 @@ object SQLConf {
lazy val RUNTIME_BLOOM_FILTER_ENABLED =
buildConf("spark.sql.optimizer.runtime.bloomFilter.enabled")
.doc("When true and if one side of a shuffle join has a selective predicate, we attempt " +
"to insert a bloom filter in the other side to reduce the amount of shuffle data")
"to insert a bloom filter in the other side to reduce the amount of shuffle data.")
.version("3.3.0")
.booleanConf
.createWithDefault(false)

val RUNTIME_BLOOM_FILTER_THRESHOLD =
buildConf("spark.sql.optimizer.runtime.bloomFilter.threshold")
.doc("Size threshold of the bloom filter creation side plan. Estimated size needs to be " +
"under this value to try to inject bloom filter")
"under this value to try to inject bloom filter.")
.version("3.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("10MB")
Expand All @@ -378,7 +378,7 @@ object SQLConf {
buildConf("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizethreshold")
.doc("Byte size threshold of the Bloom filter application side plan's aggregated scan " +
"size. Aggregated scan byte size of the Bloom filter application side needs to be over " +
"this value to inject a bloom filter")
"this value to inject a bloom filter.")
.version("3.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("10GB")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
sql("DROP TABLE IF EXISTS bf3")
sql("DROP TABLE IF EXISTS bf4")
sql("DROP TABLE IF EXISTS bf5part")
sql("DROP TABLE IF EXISTS bf5filtered")
} finally {
super.afterAll()
}
Expand Down Expand Up @@ -292,64 +293,64 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
checkWithAndWithoutFeatureEnabled(query, testSemiJoin = false, shouldReplace = false)
}

test(s"Runtime semi join reduction: simple") {
test("Runtime semi join reduction: simple") {
// Filter creation side is 3409 bytes
// Filter application side scan is 3362 bytes
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
assertRewroteSemiJoin(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62")
assertDidNotRewriteSemiJoin(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2")
assertRewroteSemiJoin("select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62")
assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on bf1.c1 = bf2.c2")
}
}

test(s"Runtime semi join reduction: two joins") {
test("Runtime semi join reduction: two joins") {
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
assertRewroteSemiJoin(s"select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 " +
s"and bf3.c3 = bf2.c2 where bf2.a2 = 5")
assertRewroteSemiJoin("select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 " +
"and bf3.c3 = bf2.c2 where bf2.a2 = 5")
}
}

test(s"Runtime semi join reduction: three joins") {
test("Runtime semi join reduction: three joins") {
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
assertRewroteSemiJoin(s"select * from bf1 join bf2 join bf3 join bf4 on " +
s"bf1.c1 = bf2.c2 and bf2.c2 = bf3.c3 and bf3.c3 = bf4.c4 where bf1.a1 = 5")
assertRewroteSemiJoin("select * from bf1 join bf2 join bf3 join bf4 on " +
"bf1.c1 = bf2.c2 and bf2.c2 = bf3.c3 and bf3.c3 = bf4.c4 where bf1.a1 = 5")
}
}

test(s"Runtime semi join reduction: simple expressions only") {
test("Runtime semi join reduction: simple expressions only") {
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
val squared = (s: Long) => {
s * s
}
spark.udf.register("square", squared)
assertDidNotRewriteSemiJoin(s"select * from bf1 join bf2 on " +
s"bf1.c1 = bf2.c2 where square(bf2.a2) = 62")
assertDidNotRewriteSemiJoin(s"select * from bf1 join bf2 on " +
s"bf1.c1 = square(bf2.c2) where bf2.a2= 62")
assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on " +
"bf1.c1 = bf2.c2 where square(bf2.a2) = 62")
assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on " +
"bf1.c1 = square(bf2.c2) where bf2.a2= 62")
}
}

test(s"Runtime bloom filter join: simple") {
test("Runtime bloom filter join: simple") {
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
assertRewroteWithBloomFilter(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 " +
s"where bf2.a2 = 62")
assertDidNotRewriteWithBloomFilter(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2")
assertRewroteWithBloomFilter("select * from bf1 join bf2 on bf1.c1 = bf2.c2 " +
"where bf2.a2 = 62")
assertDidNotRewriteWithBloomFilter("select * from bf1 join bf2 on bf1.c1 = bf2.c2")
}
}

test(s"Runtime bloom filter join: two filters single join") {
test("Runtime bloom filter join: two filters single join") {
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
var planDisabled: LogicalPlan = null
var planEnabled: LogicalPlan = null
var expectedAnswer: Array[Row] = null

val query = s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " +
s"bf1.b1 = bf2.b2 where bf2.a2 = 62"
val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " +
"bf1.b1 = bf2.b2 where bf2.a2 = 62"

withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false",
SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
Expand All @@ -366,15 +367,15 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
}
}

test(s"Runtime bloom filter join: test the number of filter threshold") {
test("Runtime bloom filter join: test the number of filter threshold") {
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
var planDisabled: LogicalPlan = null
var planEnabled: LogicalPlan = null
var expectedAnswer: Array[Row] = null

val query = s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " +
s"bf1.b1 = bf2.b2 where bf2.a2 = 62"
val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " +
"bf1.b1 = bf2.b2 where bf2.a2 = 62"

withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false",
SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
Expand All @@ -399,15 +400,15 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
}
}

test(s"Runtime bloom filter join: insert one bloom filter per column") {
test("Runtime bloom filter join: insert one bloom filter per column") {
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
var planDisabled: LogicalPlan = null
var planEnabled: LogicalPlan = null
var expectedAnswer: Array[Row] = null

val query = s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " +
s"bf1.c1 = bf2.b2 where bf2.a2 = 62"
val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " +
"bf1.c1 = bf2.b2 where bf2.a2 = 62"

withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false",
SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
Expand All @@ -424,79 +425,79 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
}
}

test(s"Runtime bloom filter join: do not add bloom filter if dpp filter exists " +
s"on the same column") {
test("Runtime bloom filter join: do not add bloom filter if dpp filter exists " +
"on the same column") {
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
assertDidNotRewriteWithBloomFilter(s"select * from bf5part join bf2 on " +
s"bf5part.f5 = bf2.c2 where bf2.a2 = 62")
assertDidNotRewriteWithBloomFilter("select * from bf5part join bf2 on " +
"bf5part.f5 = bf2.c2 where bf2.a2 = 62")
}
}

test(s"Runtime bloom filter join: add bloom filter if dpp filter exists on " +
s"a different column") {
test("Runtime bloom filter join: add bloom filter if dpp filter exists on " +
"a different column") {
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
assertRewroteWithBloomFilter(s"select * from bf5part join bf2 on " +
s"bf5part.c5 = bf2.c2 and bf5part.f5 = bf2.f2 where bf2.a2 = 62")
assertRewroteWithBloomFilter("select * from bf5part join bf2 on " +
"bf5part.c5 = bf2.c2 and bf5part.f5 = bf2.f2 where bf2.a2 = 62")
}
}

test(s"Runtime bloom filter join: BF rewrite triggering threshold test") {
test("Runtime bloom filter join: BF rewrite triggering threshold test") {
// Filter creation side data size is 3409 bytes. On the filter application side, an individual
// scan's byte size is 3362.
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "3000",
SQLConf.RUNTIME_BLOOM_FILTER_THRESHOLD.key -> "4000"
) {
assertRewroteWithBloomFilter(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 " +
s"where bf2.a2 = 62")
assertRewroteWithBloomFilter("select * from bf1 join bf2 on bf1.c1 = bf2.c2 " +
"where bf2.a2 = 62")
}
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
SQLConf.RUNTIME_BLOOM_FILTER_THRESHOLD.key -> "50"
) {
assertDidNotRewriteWithBloomFilter(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 " +
s"where bf2.a2 = 62")
assertDidNotRewriteWithBloomFilter("select * from bf1 join bf2 on bf1.c1 = bf2.c2 " +
"where bf2.a2 = 62")
}
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "5000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "3000",
SQLConf.RUNTIME_BLOOM_FILTER_THRESHOLD.key -> "4000"
) {
// Rewrite should not be triggered as the Bloom filter application side scan size is small.
assertDidNotRewriteWithBloomFilter(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 "
+ s"where bf2.a2 = 62")
assertDidNotRewriteWithBloomFilter("select * from bf1 join bf2 on bf1.c1 = bf2.c2 "
+ "where bf2.a2 = 62")
}
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "32",
SQLConf.RUNTIME_BLOOM_FILTER_THRESHOLD.key -> "4000") {
// Test that the max scan size rather than an individual scan size on the filter
// application side matters. `bf5filtered` has 14168 bytes and `bf2` has 3409 bytes.
withSQLConf(
SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "5000") {
assertRewroteWithBloomFilter(s"select * from " +
s"(select * from bf5filtered union all select * from bf2) t " +
s"join bf3 on t.c5 = bf3.c3 where bf3.a3 = 5")
assertRewroteWithBloomFilter("select * from " +
"(select * from bf5filtered union all select * from bf2) t " +
"join bf3 on t.c5 = bf3.c3 where bf3.a3 = 5")
}
withSQLConf(
SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "15000") {
assertDidNotRewriteWithBloomFilter(s"select * from " +
s"(select * from bf5filtered union all select * from bf2) t " +
s"join bf3 on t.c5 = bf3.c3 where bf3.a3 = 5")
assertDidNotRewriteWithBloomFilter("select * from " +
"(select * from bf5filtered union all select * from bf2) t " +
"join bf3 on t.c5 = bf3.c3 where bf3.a3 = 5")
}
}
}

test(s"Runtime bloom filter join: simple expressions only") {
test("Runtime bloom filter join: simple expressions only") {
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
val squared = (s: Long) => {
s * s
}
spark.udf.register("square", squared)
assertDidNotRewriteWithBloomFilter(s"select * from bf1 join bf2 on " +
s"bf1.c1 = bf2.c2 where square(bf2.a2) = 62" )
assertDidNotRewriteWithBloomFilter(s"select * from bf1 join bf2 on " +
s"bf1.c1 = square(bf2.c2) where bf2.a2 = 62" )
assertDidNotRewriteWithBloomFilter("select * from bf1 join bf2 on " +
"bf1.c1 = bf2.c2 where square(bf2.a2) = 62" )
assertDidNotRewriteWithBloomFilter("select * from bf1 join bf2 on " +
"bf1.c1 = square(bf2.c2) where bf2.a2 = 62" )
}
}
}