Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED =
buildConf("spark.sql.parquet.filterPushdown.stringPredicate")
Copy link
Contributor

@jaceklaskowski jaceklaskowski Apr 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since spark.sql.parquet.filterPushdown.string.startsWith is internal why not replacing it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid exising users who have already use it.

.doc("If true, enables Parquet filter push-down optimization for string predicate such " +
"as startsWith/endsWith/contains function. This configuration only has an effect when " +
"'${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is enabled.")
.version("3.3.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.4.0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

.internal()
.fallbackConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)

val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
.doc("For IN predicate, Parquet filter will push-down a set of OR clauses if its " +
Expand Down Expand Up @@ -4050,8 +4059,8 @@ class SQLConf extends Serializable with Logging {

def parquetFilterPushDownDecimal: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED)

def parquetFilterPushDownStringStartWith: Boolean =
getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)
def parquetFilterPushDownStringPredicate: Boolean =
getConf(PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED)

def parquetFilterPushDownInFilterThreshold: Int =
getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class ParquetFileFormat
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
Expand Down Expand Up @@ -279,7 +279,7 @@ class ParquetFileFormat
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownStringPredicate,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ParquetFilters(
pushDownDate: Boolean,
pushDownTimestamp: Boolean,
pushDownDecimal: Boolean,
pushDownStartWith: Boolean,
pushDownStringPredicate: Boolean,
pushDownInFilterThreshold: Int,
caseSensitive: Boolean,
datetimeRebaseSpec: RebaseSpec) {
Expand Down Expand Up @@ -747,7 +747,7 @@ class ParquetFilters(
}

case sources.StringStartsWith(name, prefix)
if pushDownStartWith && canMakeFilterOn(name, prefix) =>
if pushDownStringPredicate && canMakeFilterOn(name, prefix) =>
Option(prefix).map { v =>
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames),
new UserDefinedPredicate[Binary] with Serializable {
Expand Down Expand Up @@ -778,6 +778,38 @@ class ParquetFilters(
)
}

case sources.StringEndsWith(name, prefix)
if pushDownStringPredicate && canMakeFilterOn(name, prefix) =>
Copy link
Member

@viirya viirya Apr 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent, following L750.

Option(prefix).map { v =>
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames),
new UserDefinedPredicate[Binary] with Serializable {
private val strToBinary = Binary.fromReusedByteArray(v.getBytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just keep UTF8String.fromBytes(strToBinary.getBytes) instead doing it every time in keep.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. updated

override def canDrop(statistics: Statistics[Binary]): Boolean = false
override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false
override def keep(value: Binary): Boolean = {
value != null && UTF8String.fromBytes(value.getBytes).endsWith(
UTF8String.fromBytes(strToBinary.getBytes))
}
}
)
}

case sources.StringContains(name, value)
if pushDownStringPredicate && canMakeFilterOn(name, value) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Option(value).map { v =>
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames),
new UserDefinedPredicate[Binary] with Serializable {
private val strToBinary = Binary.fromReusedByteArray(v.getBytes)
override def canDrop(statistics: Statistics[Binary]): Boolean = false
override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false
override def keep(value: Binary): Boolean = {
value != null && UTF8String.fromBytes(value.getBytes).contains(
UTF8String.fromBytes(strToBinary.getBytes))
}
}
)
}

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ case class ParquetPartitionReaderFactory(
private val pushDownDate = sqlConf.parquetFilterPushDownDate
private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
private val int96RebaseModeInRead = options.int96RebaseModeInRead
Expand Down Expand Up @@ -221,7 +221,7 @@ case class ParquetPartitionReaderFactory(
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownStringPredicate,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class ParquetScanBuilder(
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetSchema =
Expand All @@ -62,7 +62,7 @@ case class ParquetScanBuilder(
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownStringPredicate,
pushDownInFilterThreshold,
isCaseSensitive,
// The rebase mode doesn't matter here because the filters are used to determine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,38 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
}
}

runBenchmark("Pushdown benchmark for StringEndsWith") {
withTempPath { dir =>
withTempTable("orcTable", "parquetTable") {
prepareStringDictTable(dir, numRows, 200, width)
Seq(
"value like '%10'",
"value like '%1000'",
s"value like '%${mid.toString.substring(0, mid.toString.length - 1)}'"
).foreach { whereExpr =>
val title = s"StringEndsWith filter: ($whereExpr)"
filterPushDownBenchmark(numRows, title, whereExpr)
}
}
}
}

runBenchmark("Pushdown benchmark for StringContains") {
withTempPath { dir =>
withTempTable("orcTable", "parquetTable") {
prepareStringDictTable(dir, numRows, 200, width)
Seq(
"value like '%10%'",
"value like '%1000%'",
s"value like '%${mid.toString.substring(0, mid.toString.length - 1)}%'"
).foreach { whereExpr =>
val title = s"StringContains filter: ($whereExpr)"
filterPushDownBenchmark(numRows, title, whereExpr)
}
}
}
}

runBenchmark(s"Pushdown benchmark for ${DecimalType.simpleString}") {
withTempPath { dir =>
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
datetimeRebaseSpec: RebaseSpec = RebaseSpec(LegacyBehaviorPolicy.CORRECTED)
): ParquetFilters =
new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringPredicate,
conf.parquetFilterPushDownInFilterThreshold,
caseSensitive.getOrElse(conf.caseSensitiveAnalysis),
datetimeRebaseSpec)
Expand Down Expand Up @@ -1934,6 +1934,43 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
checkAnswer(notIn, Seq())
}
}

private def testStringPredicateWithDictionaryFilter(
dataFrame: DataFrame, filter: String): Unit = {
Seq(true, false).foreach { enableDictionary =>
withTempPath { dir =>
val path = dir.getCanonicalPath
dataFrame.write
.option(ParquetOutputFormat.ENABLE_DICTIONARY, enableDictionary)
.parquet(path)
Seq(true, false).foreach { pushDown =>
withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> pushDown.toString) {
val accu = new NumRowGroupsAcc
sparkContext.register(accu)

val df = spark.read.parquet(path).filter(filter)
df.foreachPartition((it: Iterator[Row]) => it.foreach(v => accu.add(0)))
if (enableDictionary && pushDown) {
assert(accu.value == 0)
} else {
assert(accu.value > 0)
}

AccumulatorContext.remove(accu.id)
}
}
}
}
}

test("filter pushdown - StringEndsWith/Contains") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need to test StringStartsWith here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringStartsWith push down is an existing feature and has been tested at L1426

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I mean this test specially does test with testStringPredicateWithDictionaryFilter, but I don't see StringStartsWith is included here. Don't we need to do testStringPredicateWithDictionaryFilter for it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we merge the existing startswith test into the new one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

import testImplicits._
testStringPredicateWithDictionaryFilter(
spark.range(1024).map(t => (t % 10).toString).toDF(), "value like '%a'")
testStringPredicateWithDictionaryFilter(
spark.range(1024).map(t => (t % 10).toString).toDF(), "value like '%a%'")
}
}

@ExtendedSQLTest
Expand Down