diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6d3f283fa739..49cd23851ec8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -959,6 +959,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED = + buildConf("spark.sql.parquet.filterPushdown.stringPredicate") + .doc("If true, enables Parquet filter push-down optimization for string predicate such " + + "as startsWith/endsWith/contains function. This configuration only has an effect when " + + s"'${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is enabled.") + .version("3.4.0") + .internal() + .fallbackConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED) + val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD = buildConf("spark.sql.parquet.pushdown.inFilterThreshold") .doc("For IN predicate, Parquet filter will push-down a set of OR clauses if its " + @@ -4050,8 +4059,8 @@ class SQLConf extends Serializable with Logging { def parquetFilterPushDownDecimal: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED) - def parquetFilterPushDownStringStartWith: Boolean = - getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED) + def parquetFilterPushDownStringPredicate: Boolean = + getConf(PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED) def parquetFilterPushDownInFilterThreshold: Int = getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 44dc145d36e6..de0759979d53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -251,7 +251,7 @@ class ParquetFileFormat val pushDownDate = sqlConf.parquetFilterPushDownDate val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal - val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) @@ -279,7 +279,7 @@ class ParquetFileFormat pushDownDate, pushDownTimestamp, pushDownDecimal, - pushDownStringStartWith, + pushDownStringPredicate, pushDownInFilterThreshold, isCaseSensitive, datetimeRebaseSpec) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 75060cfca24e..210c40351b0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -48,7 +48,7 @@ class ParquetFilters( pushDownDate: Boolean, pushDownTimestamp: Boolean, pushDownDecimal: Boolean, - pushDownStartWith: Boolean, + pushDownStringPredicate: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean, datetimeRebaseSpec: RebaseSpec) { @@ -747,7 +747,7 @@ class ParquetFilters( } case sources.StringStartsWith(name, prefix) - if pushDownStartWith && canMakeFilterOn(name, prefix) => + if pushDownStringPredicate && canMakeFilterOn(name, prefix) => Option(prefix).map { v => FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames), new UserDefinedPredicate[Binary] with Serializable { @@ -778,6 +778,36 @@ class ParquetFilters( ) } + case sources.StringEndsWith(name, suffix) + if pushDownStringPredicate && canMakeFilterOn(name, suffix) => + Option(suffix).map { v => + FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames), + new UserDefinedPredicate[Binary] with Serializable { + private val suffixStr = UTF8String.fromString(v) + override def canDrop(statistics: Statistics[Binary]): Boolean = false + override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false + override def keep(value: Binary): Boolean = { + value != null && UTF8String.fromBytes(value.getBytes).endsWith(suffixStr) + } + } + ) + } + + case sources.StringContains(name, value) + if pushDownStringPredicate && canMakeFilterOn(name, value) => + Option(value).map { v => + FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames), + new UserDefinedPredicate[Binary] with Serializable { + private val subStr = UTF8String.fromString(v) + override def canDrop(statistics: Statistics[Binary]): Boolean = false + override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false + override def keep(value: Binary): Boolean = { + value != null && UTF8String.fromBytes(value.getBytes).contains(subStr) + } + } + ) + } + case _ => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 9a25dd88ff4e..c9572e474c8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -79,7 +79,7 @@ case class ParquetPartitionReaderFactory( private val pushDownDate = sqlConf.parquetFilterPushDownDate private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal - private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead private val int96RebaseModeInRead = options.int96RebaseModeInRead @@ -221,7 +221,7 @@ case class ParquetPartitionReaderFactory( pushDownDate, pushDownTimestamp, pushDownDecimal, - pushDownStringStartWith, + pushDownStringPredicate, pushDownInFilterThreshold, isCaseSensitive, datetimeRebaseSpec) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala index 2093f4a16ef4..2e3b9b20b5dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala @@ -52,7 +52,7 @@ case class ParquetScanBuilder( val pushDownDate = sqlConf.parquetFilterPushDownDate val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal - val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis val parquetSchema = @@ -62,7 +62,7 @@ case class ParquetScanBuilder( pushDownDate, pushDownTimestamp, pushDownDecimal, - pushDownStringStartWith, + pushDownStringPredicate, pushDownInFilterThreshold, isCaseSensitive, // The rebase mode doesn't matter here because the filters are used to determine diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 70b38db034f6..ba46c25d5ca0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3072,7 +3072,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } Seq("orc", "parquet").foreach { format => - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> "false") { withTempPath { dir => spark.range(10).map(i => (i, i.toString)).toDF("id", "s") .write diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index 2bd03b6cb758..dd2852eea780 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -242,6 +242,38 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark { } } + runBenchmark("Pushdown benchmark for StringEndsWith") { + withTempPath { dir => + withTempTable("orcTable", "parquetTable") { + prepareStringDictTable(dir, numRows, 200, width) + Seq( + "value like '%10'", + "value like '%1000'", + s"value like '%${mid.toString.substring(0, mid.toString.length - 1)}'" + ).foreach { whereExpr => + val title = s"StringEndsWith filter: ($whereExpr)" + filterPushDownBenchmark(numRows, title, whereExpr) + } + } + } + } + + runBenchmark("Pushdown benchmark for StringContains") { + withTempPath { dir => + withTempTable("orcTable", "parquetTable") { + prepareStringDictTable(dir, numRows, 200, width) + Seq( + "value like '%10%'", + "value like '%1000%'", + s"value like '%${mid.toString.substring(0, mid.toString.length - 1)}%'" + ).foreach { whereExpr => + val title = s"StringContains filter: ($whereExpr)" + filterPushDownBenchmark(numRows, title, whereExpr) + } + } + } + } + runBenchmark(s"Pushdown benchmark for ${DecimalType.simpleString}") { withTempPath { dir => Seq( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index d8eab40c38ff..be081dadb2fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -81,7 +81,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared datetimeRebaseSpec: RebaseSpec = RebaseSpec(LegacyBehaviorPolicy.CORRECTED) ): ParquetFilters = new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, - conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, + conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringPredicate, conf.parquetFilterPushDownInFilterThreshold, caseSensitive.getOrElse(conf.caseSensitiveAnalysis), datetimeRebaseSpec) @@ -207,20 +207,24 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } - // This function tests that exactly go through the `canDrop` and `inverseCanDrop`. - private def testStringStartsWith(dataFrame: DataFrame, filter: String): Unit = { + // This function tests that exactly go through the `keep`, `canDrop` and `inverseCanDrop`. + private def testStringPredicate(dataFrame: DataFrame, filter: String, + shouldFilterOut: Boolean, enableDictionary: Boolean = true): Unit = { withTempPath { dir => val path = dir.getCanonicalPath - dataFrame.write.option("parquet.block.size", 512).parquet(path) + dataFrame.write + .option("parquet.block.size", 512) + .option(ParquetOutputFormat.ENABLE_DICTIONARY, enableDictionary) + .parquet(path) Seq(true, false).foreach { pushDown => withSQLConf( - SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> pushDown.toString) { + SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> pushDown.toString) { val accu = new NumRowGroupsAcc sparkContext.register(accu) val df = spark.read.parquet(path).filter(filter) df.foreachPartition((it: Iterator[Row]) => it.foreach(v => accu.add(0))) - if (pushDown) { + if (pushDown && shouldFilterOut) { assert(accu.value == 0) } else { assert(accu.value > 0) @@ -970,7 +974,12 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared )) val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) - val parquetFilters = createParquetFilters(parquetSchema) + // Following tests are used to check one arm of AND/OR can't be pushed down, + // so we disable string predicate pushdown here + var parquetFilters: ParquetFilters = null + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> "false") { + parquetFilters = createParquetFilters(parquetSchema) + } assertResult(Some(and( lt(intColumn("a"), 10: Integer), gt(doubleColumn("c"), 1.5: java.lang.Double))) @@ -1114,7 +1123,12 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared )) val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) - val parquetFilters = createParquetFilters(parquetSchema) + // Following tests are used to check one arm of AND/OR can't be pushed down, + // so we disable string predicate pushdown here + var parquetFilters: ParquetFilters = null + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> "false") { + parquetFilters = createParquetFilters(parquetSchema) + } // Testing // case sources.Or(lhs, rhs) => // ... @@ -1169,7 +1183,12 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared )) val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) - val parquetFilters = createParquetFilters(parquetSchema) + // Following tests are used to check one arm of AND/OR can't be pushed down, + // so we disable string predicate pushdown here + var parquetFilters: ParquetFilters = null + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> "false") { + parquetFilters = createParquetFilters(parquetSchema) + } assertResult(Seq(sources.And(sources.LessThan("a", 10), sources.GreaterThan("c", 1.5D)))) { parquetFilters.convertibleFilters( Seq(sources.And( @@ -1423,65 +1442,123 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } - test("filter pushdown - StringStartsWith") { + private def checkStringFilterPushdown( + stringPredicate: String => Expression, + sourceFilter: (String, String) => sources.Filter): Unit = { withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { implicit df => checkFilterPredicate( - $"_1".startsWith("").asInstanceOf[Predicate], + stringPredicate("").asInstanceOf[Predicate], classOf[UserDefinedByInstance[_, _]], Seq("1str1", "2str2", "3str3", "4str4").map(Row(_))) - Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix => + Seq("2", "2str2").foreach { str => checkFilterPredicate( - $"_1".startsWith(prefix).asInstanceOf[Predicate], + stringPredicate(str).asInstanceOf[Predicate], classOf[UserDefinedByInstance[_, _]], "2str2") } - Seq("2S", "null", "2str22").foreach { prefix => + Seq("2S", "null", "2str22").foreach { str => checkFilterPredicate( - $"_1".startsWith(prefix).asInstanceOf[Predicate], + stringPredicate(str).asInstanceOf[Predicate], classOf[UserDefinedByInstance[_, _]], Seq.empty[Row]) } checkFilterPredicate( - !$"_1".startsWith("").asInstanceOf[Predicate], + !stringPredicate("").asInstanceOf[Predicate], classOf[Operators.Not], Seq().map(Row(_))) - Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix => + Seq("2", "2str2").foreach { str => checkFilterPredicate( - !$"_1".startsWith(prefix).asInstanceOf[Predicate], + !stringPredicate(str).asInstanceOf[Predicate], classOf[Operators.Not], Seq("1str1", "3str3", "4str4").map(Row(_))) } - Seq("2S", "null", "2str22").foreach { prefix => + Seq("2S", "null", "2str22").foreach { str => checkFilterPredicate( - !$"_1".startsWith(prefix).asInstanceOf[Predicate], + !stringPredicate(str).asInstanceOf[Predicate], classOf[Operators.Not], Seq("1str1", "2str2", "3str3", "4str4").map(Row(_))) } val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) assertResult(None) { - createParquetFilters(schema).createFilter(sources.StringStartsWith("_1", null)) + createParquetFilters(schema).createFilter(sourceFilter("_1", null)) } } // SPARK-28371: make sure filter is null-safe. withParquetDataFrame(Seq(Tuple1[String](null))) { implicit df => checkFilterPredicate( - $"_1".startsWith("blah").asInstanceOf[Predicate], + stringPredicate("blah").asInstanceOf[Predicate], classOf[UserDefinedByInstance[_, _]], Seq.empty[Row]) } + } + + test("filter pushdown - StringStartsWith") { + checkStringFilterPushdown( + str => $"_1".startsWith(str), + (attr, value) => sources.StringStartsWith(attr, value)) + } + + test("filter pushdown - StringEndsWith") { + checkStringFilterPushdown( + str => $"_1".endsWith(str), + (attr, value) => sources.StringEndsWith(attr, value)) + } + + test("filter pushdown - StringContains") { + checkStringFilterPushdown( + str => $"_1".contains(str), + (attr, value) => sources.StringContains(attr, value)) + } + test("filter pushdown - StringPredicate") { import testImplicits._ - // Test canDrop() has taken effect - testStringStartsWith(spark.range(1024).map(_.toString).toDF(), "value like 'a%'") - // Test inverseCanDrop() has taken effect - testStringStartsWith(spark.range(1024).map(c => "100").toDF(), "value not like '10%'") + // keep() should take effect on StartsWith/EndsWith/Contains + Seq( + "value like 'a%'", // StartsWith + "value like '%a'", // EndsWith + "value like '%a%'" // Contains + ).foreach { filter => + testStringPredicate( + // dictionary will be generated since there are duplicated values + spark.range(1000).map(t => (t % 10).toString).toDF(), + filter, + true) + } + + // canDrop() should take effect on StartsWith, + // and has no effect on EndsWith/Contains + Seq( + ("value like 'a%'", true), // StartsWith + ("value like '%a'", false), // EndsWith + ("value like '%a%'", false) // Contains + ).foreach { case (filter, shouldFilterOut) => + testStringPredicate( + spark.range(1024).map(_.toString).toDF(), + filter, + shouldFilterOut, + enableDictionary = false) + } + + // inverseCanDrop() should take effect on StartsWith, + // and has no effect on EndsWith/Contains + Seq( + ("value not like '10%'", true), // StartsWith + ("value not like '%10'", false), // EndsWith + ("value not like '%10%'", false) // Contains + ).foreach { case (filter, shouldFilterOut) => + testStringPredicate( + spark.range(1024).map(c => "100").toDF(), + filter, + shouldFilterOut, + enableDictionary = false) + } } test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {