Skip to content

Commit 03545ce

Browse files
wangyumcloud-fan
authored andcommitted
[SPARK-24638][SQL] StringStartsWith support push down
## What changes were proposed in this pull request? `StringStartsWith` support push down. About 50% savings in compute time. ## How was this patch tested? unit tests, manual tests and performance test: ```scala cat <<EOF > SPARK-24638.scala def benchmark(func: () => Unit): Long = { val start = System.currentTimeMillis() for(i <- 0 until 100) { func() } val end = System.currentTimeMillis() end - start } val path = "/tmp/spark/parquet/string/" spark.range(10000000).selectExpr("concat(id, 'str', id) as id").coalesce(1).write.mode("overwrite").option("parquet.block.size", 1048576).parquet(path) val df = spark.read.parquet(path) spark.sql("set spark.sql.parquet.filterPushdown.string.startsWith=true") val pushdownEnable = benchmark(() => df.where("id like '999998%'").count()) spark.sql("set spark.sql.parquet.filterPushdown.string.startsWith=false") val pushdownDisable = benchmark(() => df.where("id like '999998%'").count()) val improvements = pushdownDisable - pushdownEnable println(s"improvements: $improvements") EOF bin/spark-shell -i SPARK-24638.scala ``` result: ```scala Loading SPARK-24638.scala... benchmark: (func: () => Unit)Long path: String = /tmp/spark/parquet/string/ df: org.apache.spark.sql.DataFrame = [id: string] res1: org.apache.spark.sql.DataFrame = [key: string, value: string] pushdownEnable: Long = 11608 res2: org.apache.spark.sql.DataFrame = [key: string, value: string] pushdownDisable: Long = 31981 improvements: Long = 20373 ``` Author: Yuming Wang <[email protected]> Closes #21623 from wangyum/SPARK-24638.
1 parent f71e8da commit 03545ce

File tree

4 files changed

+130
-4
lines changed

4 files changed

+130
-4
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,14 @@ object SQLConf {
378378
.booleanConf
379379
.createWithDefault(true)
380380

381+
val PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED =
382+
buildConf("spark.sql.parquet.filterPushdown.string.startsWith")
383+
.doc("If true, enables Parquet filter push-down optimization for string startsWith function. " +
384+
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
385+
.internal()
386+
.booleanConf
387+
.createWithDefault(true)
388+
381389
val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
382390
.doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
383391
"versions, when converting Parquet schema to Spark SQL schema and vice versa.")
@@ -1459,6 +1467,9 @@ class SQLConf extends Serializable with Logging {
14591467

14601468
def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)
14611469

1470+
def parquetFilterPushDownStringStartWith: Boolean =
1471+
getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)
1472+
14621473
def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
14631474

14641475
def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ class ParquetFileFormat
348348
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
349349
val returningBatch = supportBatch(sparkSession, resultSchema)
350350
val pushDownDate = sqlConf.parquetFilterPushDownDate
351+
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
351352

352353
(file: PartitionedFile) => {
353354
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -358,7 +359,8 @@ class ParquetFileFormat
358359
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
359360
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
360361
// is used here.
361-
.flatMap(new ParquetFilters(pushDownDate).createFilter(requiredSchema, _))
362+
.flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
363+
.createFilter(requiredSchema, _))
362364
.reduceOption(FilterApi.and)
363365
} else {
364366
None

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,18 @@ import java.sql.Date
2222
import org.apache.parquet.filter2.predicate._
2323
import org.apache.parquet.filter2.predicate.FilterApi._
2424
import org.apache.parquet.io.api.Binary
25+
import org.apache.parquet.schema.PrimitiveComparator
2526

2627
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2728
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
2829
import org.apache.spark.sql.sources
2930
import org.apache.spark.sql.types._
31+
import org.apache.spark.unsafe.types.UTF8String
3032

3133
/**
3234
* Some utility function to convert Spark data source filters to Parquet filters.
3335
*/
34-
private[parquet] class ParquetFilters(pushDownDate: Boolean) {
36+
private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) {
3537

3638
private def dateToDays(date: Date): SQLDate = {
3739
DateTimeUtils.fromJavaDate(date)
@@ -270,6 +272,37 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean) {
270272
case sources.Not(pred) =>
271273
createFilter(schema, pred).map(FilterApi.not)
272274

275+
case sources.StringStartsWith(name, prefix) if pushDownStartWith && canMakeFilterOn(name) =>
276+
Option(prefix).map { v =>
277+
FilterApi.userDefined(binaryColumn(name),
278+
new UserDefinedPredicate[Binary] with Serializable {
279+
private val strToBinary = Binary.fromReusedByteArray(v.getBytes)
280+
private val size = strToBinary.length
281+
282+
override def canDrop(statistics: Statistics[Binary]): Boolean = {
283+
val comparator = PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR
284+
val max = statistics.getMax
285+
val min = statistics.getMin
286+
comparator.compare(max.slice(0, math.min(size, max.length)), strToBinary) < 0 ||
287+
comparator.compare(min.slice(0, math.min(size, min.length)), strToBinary) > 0
288+
}
289+
290+
override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = {
291+
val comparator = PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR
292+
val max = statistics.getMax
293+
val min = statistics.getMin
294+
comparator.compare(max.slice(0, math.min(size, max.length)), strToBinary) == 0 &&
295+
comparator.compare(min.slice(0, math.min(size, min.length)), strToBinary) == 0
296+
}
297+
298+
override def keep(value: Binary): Boolean = {
299+
UTF8String.fromBytes(value.getBytes).startsWith(
300+
UTF8String.fromBytes(strToBinary.getBytes))
301+
}
302+
}
303+
)
304+
}
305+
273306
case _ => None
274307
}
275308
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
5555
*/
5656
class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {
5757

58-
private lazy val parquetFilters = new ParquetFilters(conf.parquetFilterPushDownDate)
58+
private lazy val parquetFilters =
59+
new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownStringStartWith)
5960

6061
override def beforeEach(): Unit = {
6162
super.beforeEach()
@@ -82,6 +83,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
8283
withSQLConf(
8384
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
8485
SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
86+
SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> "true",
8587
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
8688
val query = df
8789
.select(output.map(e => Column(e)): _*)
@@ -140,6 +142,31 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
140142
checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
141143
}
142144

145+
// This function tests that exactly go through the `canDrop` and `inverseCanDrop`.
146+
private def testStringStartsWith(dataFrame: DataFrame, filter: String): Unit = {
147+
withTempPath { dir =>
148+
val path = dir.getCanonicalPath
149+
dataFrame.write.option("parquet.block.size", 512).parquet(path)
150+
Seq(true, false).foreach { pushDown =>
151+
withSQLConf(
152+
SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> pushDown.toString) {
153+
val accu = new NumRowGroupsAcc
154+
sparkContext.register(accu)
155+
156+
val df = spark.read.parquet(path).filter(filter)
157+
df.foreachPartition((it: Iterator[Row]) => it.foreach(v => accu.add(0)))
158+
if (pushDown) {
159+
assert(accu.value == 0)
160+
} else {
161+
assert(accu.value > 0)
162+
}
163+
164+
AccumulatorContext.remove(accu.id)
165+
}
166+
}
167+
}
168+
}
169+
143170
test("filter pushdown - boolean") {
144171
withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df =>
145172
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
@@ -574,7 +601,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
574601

575602
val df = spark.read.parquet(path).filter("a < 100")
576603
df.foreachPartition((it: Iterator[Row]) => it.foreach(v => accu.add(0)))
577-
df.collect
578604

579605
if (enablePushDown) {
580606
assert(accu.value == 0)
@@ -660,6 +686,60 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
660686
assert(df.where("col > 0").count() === 2)
661687
}
662688
}
689+
690+
test("filter pushdown - StringStartsWith") {
691+
withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { implicit df =>
692+
checkFilterPredicate(
693+
'_1.startsWith("").asInstanceOf[Predicate],
694+
classOf[UserDefinedByInstance[_, _]],
695+
Seq("1str1", "2str2", "3str3", "4str4").map(Row(_)))
696+
697+
Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix =>
698+
checkFilterPredicate(
699+
'_1.startsWith(prefix).asInstanceOf[Predicate],
700+
classOf[UserDefinedByInstance[_, _]],
701+
"2str2")
702+
}
703+
704+
Seq("2S", "null", "2str22").foreach { prefix =>
705+
checkFilterPredicate(
706+
'_1.startsWith(prefix).asInstanceOf[Predicate],
707+
classOf[UserDefinedByInstance[_, _]],
708+
Seq.empty[Row])
709+
}
710+
711+
checkFilterPredicate(
712+
!'_1.startsWith("").asInstanceOf[Predicate],
713+
classOf[UserDefinedByInstance[_, _]],
714+
Seq().map(Row(_)))
715+
716+
Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix =>
717+
checkFilterPredicate(
718+
!'_1.startsWith(prefix).asInstanceOf[Predicate],
719+
classOf[UserDefinedByInstance[_, _]],
720+
Seq("1str1", "3str3", "4str4").map(Row(_)))
721+
}
722+
723+
Seq("2S", "null", "2str22").foreach { prefix =>
724+
checkFilterPredicate(
725+
!'_1.startsWith(prefix).asInstanceOf[Predicate],
726+
classOf[UserDefinedByInstance[_, _]],
727+
Seq("1str1", "2str2", "3str3", "4str4").map(Row(_)))
728+
}
729+
730+
assertResult(None) {
731+
parquetFilters.createFilter(
732+
df.schema,
733+
sources.StringStartsWith("_1", null))
734+
}
735+
}
736+
737+
import testImplicits._
738+
// Test canDrop() has taken effect
739+
testStringStartsWith(spark.range(1024).map(_.toString).toDF(), "value like 'a%'")
740+
// Test inverseCanDrop() has taken effect
741+
testStringStartsWith(spark.range(1024).map(c => "100").toDF(), "value not like '10%'")
742+
}
663743
}
664744

665745
class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {

0 commit comments

Comments
 (0)