diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala index 9a1cf637e5a73..f069cfa8d1428 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala @@ -63,7 +63,7 @@ case class BloomFilterMightContain( TypeCheckResult.TypeCheckSuccess case _ => TypeCheckResult.TypeCheckFailure(s"The Bloom filter binary input to $prettyName " + - s"should be either a constant value or a scalar subquery expression") + "should be either a constant value or a scalar subquery expression") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala index 86d3d62e1c643..de189c1e8e6f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala @@ -110,9 +110,10 @@ case class BloomFilterAggregate( override def third: Expression = numBitsExpression - override protected def withNewChildrenInternal(newChild: Expression, - newEstimatedNumItemsExpression: Expression, newNumBitsExpression: Expression) - : BloomFilterAggregate = { + override protected def withNewChildrenInternal( + newChild: Expression, + newEstimatedNumItemsExpression: Expression, + newNumBitsExpression: Expression): BloomFilterAggregate = { copy(child = newChild, estimatedNumItemsExpression = newEstimatedNumItemsExpression, numBitsExpression = newNumBitsExpression) } @@ -176,7 +177,7 @@ object BloomFilterAggregate { class BloomFilterSerDe { final def serialize(obj: BloomFilter): Array[Byte] = { - val size = obj.bitSize()/8 + val size = obj.bitSize() / 8 require(size <= Integer.MAX_VALUE, s"actual number of bits is too large $size") val out = new ByteArrayOutputStream(size.intValue()) obj.writeTo(out) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 2e96c20bf3c29..2c879beeed623 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.trees.TernaryLike -import org.apache.spark.sql.catalyst.trees.TreePattern.{INVOKE, _} +import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 020ca39dc3837..f8bc60457b08d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -36,7 +36,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J // Wraps `expr` with a hash function if its byte size is larger than an integer. private def mayWrapWithHash(expr: Expression): Expression = { - if (expr.dataType.defaultSize > IntegerType.defaultSize) { + if (expr.dataType.defaultSize > IntegerType.defaultSize) { new Murmur3Hash(Seq(expr)) } else { expr @@ -47,8 +47,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J filterApplicationSideExp: Expression, filterApplicationSidePlan: LogicalPlan, filterCreationSideExp: Expression, - filterCreationSidePlan: LogicalPlan - ): LogicalPlan = { + filterCreationSidePlan: LogicalPlan): LogicalPlan = { require(conf.runtimeFilterBloomFilterEnabled || conf.runtimeFilterSemiJoinReductionEnabled) if (conf.runtimeFilterBloomFilterEnabled) { injectBloomFilter( @@ -98,8 +97,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J filterApplicationSideExp: Expression, filterApplicationSidePlan: LogicalPlan, filterCreationSideExp: Expression, - filterCreationSidePlan: LogicalPlan - ): LogicalPlan = { + filterCreationSidePlan: LogicalPlan): LogicalPlan = { require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType) val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp) val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bf97e73ba3b91..705e14ab48352 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -344,7 +344,7 @@ object SQLConf { val RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED = buildConf("spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled") .doc("When true and if one side of a shuffle join has a selective predicate, we attempt " + - "to insert a semi join in the other side to reduce the amount of shuffle data") + "to insert a semi join in the other side to reduce the amount of shuffle data.") .version("3.3.0") .booleanConf .createWithDefault(false) @@ -352,7 +352,7 @@ object SQLConf { val RUNTIME_FILTER_NUMBER_THRESHOLD = buildConf("spark.sql.optimizer.runtimeFilter.number.threshold") .doc("The total number of injected runtime filters (non-DPP) for a single " + - "query. This is to prevent driver OOMs with too many Bloom filters") + "query. This is to prevent driver OOMs with too many Bloom filters.") .version("3.3.0") .intConf .checkValue(threshold => threshold >= 0, "The threshold should be >= 0") @@ -361,7 +361,7 @@ object SQLConf { lazy val RUNTIME_BLOOM_FILTER_ENABLED = buildConf("spark.sql.optimizer.runtime.bloomFilter.enabled") .doc("When true and if one side of a shuffle join has a selective predicate, we attempt " + - "to insert a bloom filter in the other side to reduce the amount of shuffle data") + "to insert a bloom filter in the other side to reduce the amount of shuffle data.") .version("3.3.0") .booleanConf .createWithDefault(false) @@ -369,7 +369,7 @@ object SQLConf { val RUNTIME_BLOOM_FILTER_THRESHOLD = buildConf("spark.sql.optimizer.runtime.bloomFilter.threshold") .doc("Size threshold of the bloom filter creation side plan. Estimated size needs to be " + - "under this value to try to inject bloom filter") + "under this value to try to inject bloom filter.") .version("3.3.0") .bytesConf(ByteUnit.BYTE) .createWithDefaultString("10MB") @@ -378,7 +378,7 @@ object SQLConf { buildConf("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizethreshold") .doc("Byte size threshold of the Bloom filter application side plan's aggregated scan " + "size. Aggregated scan byte size of the Bloom filter application side needs to be over " + - "this value to inject a bloom filter") + "this value to inject a bloom filter.") .version("3.3.0") .bytesConf(ByteUnit.BYTE) .createWithDefaultString("10GB") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala index ab1f0e61759d5..a7abd1f3b3f0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala @@ -208,6 +208,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp sql("DROP TABLE IF EXISTS bf3") sql("DROP TABLE IF EXISTS bf4") sql("DROP TABLE IF EXISTS bf5part") + sql("DROP TABLE IF EXISTS bf5filtered") } finally { super.afterAll() } @@ -292,64 +293,64 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp checkWithAndWithoutFeatureEnabled(query, testSemiJoin = false, shouldReplace = false) } - test(s"Runtime semi join reduction: simple") { + test("Runtime semi join reduction: simple") { // Filter creation side is 3409 bytes // Filter application side scan is 3362 bytes withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { - assertRewroteSemiJoin(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62") - assertDidNotRewriteSemiJoin(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2") + assertRewroteSemiJoin("select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62") + assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on bf1.c1 = bf2.c2") } } - test(s"Runtime semi join reduction: two joins") { + test("Runtime semi join reduction: two joins") { withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { - assertRewroteSemiJoin(s"select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 " + - s"and bf3.c3 = bf2.c2 where bf2.a2 = 5") + assertRewroteSemiJoin("select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 " + + "and bf3.c3 = bf2.c2 where bf2.a2 = 5") } } - test(s"Runtime semi join reduction: three joins") { + test("Runtime semi join reduction: three joins") { withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { - assertRewroteSemiJoin(s"select * from bf1 join bf2 join bf3 join bf4 on " + - s"bf1.c1 = bf2.c2 and bf2.c2 = bf3.c3 and bf3.c3 = bf4.c4 where bf1.a1 = 5") + assertRewroteSemiJoin("select * from bf1 join bf2 join bf3 join bf4 on " + + "bf1.c1 = bf2.c2 and bf2.c2 = bf3.c3 and bf3.c3 = bf4.c4 where bf1.a1 = 5") } } - test(s"Runtime semi join reduction: simple expressions only") { + test("Runtime semi join reduction: simple expressions only") { withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { val squared = (s: Long) => { s * s } spark.udf.register("square", squared) - assertDidNotRewriteSemiJoin(s"select * from bf1 join bf2 on " + - s"bf1.c1 = bf2.c2 where square(bf2.a2) = 62") - assertDidNotRewriteSemiJoin(s"select * from bf1 join bf2 on " + - s"bf1.c1 = square(bf2.c2) where bf2.a2= 62") + assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on " + + "bf1.c1 = bf2.c2 where square(bf2.a2) = 62") + assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on " + + "bf1.c1 = square(bf2.c2) where bf2.a2= 62") } } - test(s"Runtime bloom filter join: simple") { + test("Runtime bloom filter join: simple") { withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { - assertRewroteWithBloomFilter(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 " + - s"where bf2.a2 = 62") - assertDidNotRewriteWithBloomFilter(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2") + assertRewroteWithBloomFilter("select * from bf1 join bf2 on bf1.c1 = bf2.c2 " + + "where bf2.a2 = 62") + assertDidNotRewriteWithBloomFilter("select * from bf1 join bf2 on bf1.c1 = bf2.c2") } } - test(s"Runtime bloom filter join: two filters single join") { + test("Runtime bloom filter join: two filters single join") { withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { var planDisabled: LogicalPlan = null var planEnabled: LogicalPlan = null var expectedAnswer: Array[Row] = null - val query = s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " + - s"bf1.b1 = bf2.b2 where bf2.a2 = 62" + val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " + + "bf1.b1 = bf2.b2 where bf2.a2 = 62" withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false", SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") { @@ -366,15 +367,15 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp } } - test(s"Runtime bloom filter join: test the number of filter threshold") { + test("Runtime bloom filter join: test the number of filter threshold") { withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { var planDisabled: LogicalPlan = null var planEnabled: LogicalPlan = null var expectedAnswer: Array[Row] = null - val query = s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " + - s"bf1.b1 = bf2.b2 where bf2.a2 = 62" + val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " + + "bf1.b1 = bf2.b2 where bf2.a2 = 62" withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false", SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") { @@ -399,15 +400,15 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp } } - test(s"Runtime bloom filter join: insert one bloom filter per column") { + test("Runtime bloom filter join: insert one bloom filter per column") { withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { var planDisabled: LogicalPlan = null var planEnabled: LogicalPlan = null var expectedAnswer: Array[Row] = null - val query = s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " + - s"bf1.c1 = bf2.b2 where bf2.a2 = 62" + val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " + + "bf1.c1 = bf2.b2 where bf2.a2 = 62" withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false", SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") { @@ -424,48 +425,48 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp } } - test(s"Runtime bloom filter join: do not add bloom filter if dpp filter exists " + - s"on the same column") { + test("Runtime bloom filter join: do not add bloom filter if dpp filter exists " + + "on the same column") { withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { - assertDidNotRewriteWithBloomFilter(s"select * from bf5part join bf2 on " + - s"bf5part.f5 = bf2.c2 where bf2.a2 = 62") + assertDidNotRewriteWithBloomFilter("select * from bf5part join bf2 on " + + "bf5part.f5 = bf2.c2 where bf2.a2 = 62") } } - test(s"Runtime bloom filter join: add bloom filter if dpp filter exists on " + - s"a different column") { + test("Runtime bloom filter join: add bloom filter if dpp filter exists on " + + "a different column") { withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { - assertRewroteWithBloomFilter(s"select * from bf5part join bf2 on " + - s"bf5part.c5 = bf2.c2 and bf5part.f5 = bf2.f2 where bf2.a2 = 62") + assertRewroteWithBloomFilter("select * from bf5part join bf2 on " + + "bf5part.c5 = bf2.c2 and bf5part.f5 = bf2.f2 where bf2.a2 = 62") } } - test(s"Runtime bloom filter join: BF rewrite triggering threshold test") { + test("Runtime bloom filter join: BF rewrite triggering threshold test") { // Filter creation side data size is 3409 bytes. On the filter application side, an individual // scan's byte size is 3362. withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "3000", SQLConf.RUNTIME_BLOOM_FILTER_THRESHOLD.key -> "4000" ) { - assertRewroteWithBloomFilter(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 " + - s"where bf2.a2 = 62") + assertRewroteWithBloomFilter("select * from bf1 join bf2 on bf1.c1 = bf2.c2 " + + "where bf2.a2 = 62") } withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50", SQLConf.RUNTIME_BLOOM_FILTER_THRESHOLD.key -> "50" ) { - assertDidNotRewriteWithBloomFilter(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 " + - s"where bf2.a2 = 62") + assertDidNotRewriteWithBloomFilter("select * from bf1 join bf2 on bf1.c1 = bf2.c2 " + + "where bf2.a2 = 62") } withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "5000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "3000", SQLConf.RUNTIME_BLOOM_FILTER_THRESHOLD.key -> "4000" ) { // Rewrite should not be triggered as the Bloom filter application side scan size is small. - assertDidNotRewriteWithBloomFilter(s"select * from bf1 join bf2 on bf1.c1 = bf2.c2 " - + s"where bf2.a2 = 62") + assertDidNotRewriteWithBloomFilter("select * from bf1 join bf2 on bf1.c1 = bf2.c2 " + + "where bf2.a2 = 62") } withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "32", SQLConf.RUNTIME_BLOOM_FILTER_THRESHOLD.key -> "4000") { @@ -473,30 +474,30 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp // application side matters. `bf5filtered` has 14168 bytes and `bf2` has 3409 bytes. withSQLConf( SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "5000") { - assertRewroteWithBloomFilter(s"select * from " + - s"(select * from bf5filtered union all select * from bf2) t " + - s"join bf3 on t.c5 = bf3.c3 where bf3.a3 = 5") + assertRewroteWithBloomFilter("select * from " + + "(select * from bf5filtered union all select * from bf2) t " + + "join bf3 on t.c5 = bf3.c3 where bf3.a3 = 5") } withSQLConf( SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "15000") { - assertDidNotRewriteWithBloomFilter(s"select * from " + - s"(select * from bf5filtered union all select * from bf2) t " + - s"join bf3 on t.c5 = bf3.c3 where bf3.a3 = 5") + assertDidNotRewriteWithBloomFilter("select * from " + + "(select * from bf5filtered union all select * from bf2) t " + + "join bf3 on t.c5 = bf3.c3 where bf3.a3 = 5") } } } - test(s"Runtime bloom filter join: simple expressions only") { + test("Runtime bloom filter join: simple expressions only") { withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { val squared = (s: Long) => { s * s } spark.udf.register("square", squared) - assertDidNotRewriteWithBloomFilter(s"select * from bf1 join bf2 on " + - s"bf1.c1 = bf2.c2 where square(bf2.a2) = 62" ) - assertDidNotRewriteWithBloomFilter(s"select * from bf1 join bf2 on " + - s"bf1.c1 = square(bf2.c2) where bf2.a2 = 62" ) + assertDidNotRewriteWithBloomFilter("select * from bf1 join bf2 on " + + "bf1.c1 = bf2.c2 where square(bf2.a2) = 62" ) + assertDidNotRewriteWithBloomFilter("select * from bf1 join bf2 on " + + "bf1.c1 = square(bf2.c2) where bf2.a2 = 62" ) } } }