Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED =
buildConf("spark.sql.parquet.filterPushdown.string.startsWith")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if we added .enabled postfix.

.doc("If true, enables Parquet filter push-down optimization for string startsWith function. " +
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.internal()
.booleanConf
.createWithDefault(true)

val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
.doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
"versions, when converting Parquet schema to Spark SQL schema and vice versa.")
Expand Down Expand Up @@ -1420,6 +1428,9 @@ class SQLConf extends Serializable with Logging {

def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)

def parquetFilterPushDownStringStartWith: Boolean =
getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)

def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ class ParquetFileFormat
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith

(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
Expand All @@ -355,7 +356,8 @@ class ParquetFileFormat
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(new ParquetFilters(pushDownDate).createFilter(requiredSchema, _))
.flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
.createFilter(requiredSchema, _))
.reduceOption(FilterApi.and)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ import java.sql.Date
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.PrimitiveComparator

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/**
* Some utility function to convert Spark data source filters to Parquet filters.
*/
private[parquet] class ParquetFilters(pushDownDate: Boolean) {
private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) {

private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
Expand Down Expand Up @@ -270,6 +272,37 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean) {
case sources.Not(pred) =>
createFilter(schema, pred).map(FilterApi.not)

case sources.StringStartsWith(name, prefix) if pushDownStartWith && canMakeFilterOn(name) =>
Option(prefix).map { v =>
FilterApi.userDefined(binaryColumn(name),
new UserDefinedPredicate[Binary] with Serializable {
private val strToBinary = Binary.fromReusedByteArray(v.getBytes)
private val size = strToBinary.length

override def canDrop(statistics: Statistics[Binary]): Boolean = {
val comparator = PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR
val max = statistics.getMax
val min = statistics.getMin
comparator.compare(max.slice(0, math.min(size, max.length)), strToBinary) < 0 ||
comparator.compare(min.slice(0, math.min(size, min.length)), strToBinary) > 0
}

override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = {
val comparator = PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR
val max = statistics.getMax
val min = statistics.getMin
comparator.compare(max.slice(0, math.min(size, max.length)), strToBinary) == 0 &&
comparator.compare(min.slice(0, math.min(size, min.length)), strToBinary) == 0
}

override def keep(value: Binary): Boolean = {
UTF8String.fromBytes(value.getBytes).startsWith(
UTF8String.fromBytes(strToBinary.getBytes))
}
}
)
}

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
*/
class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {

private lazy val parquetFilters = new ParquetFilters(conf.parquetFilterPushDownDate)
private lazy val parquetFilters =
new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownStringStartWith)

override def beforeEach(): Unit = {
super.beforeEach()
Expand All @@ -82,6 +83,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
val query = df
.select(output.map(e => Column(e)): _*)
Expand Down Expand Up @@ -140,6 +142,31 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
}

// This function tests that exactly go through the `canDrop` and `inverseCanDrop`.
private def testStringStartsWith(dataFrame: DataFrame, filter: String): Unit = {
withTempPath { dir =>
val path = dir.getCanonicalPath
dataFrame.write.option("parquet.block.size", 512).parquet(path)
Seq(true, false).foreach { pushDown =>
withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> pushDown.toString) {
val accu = new NumRowGroupsAcc
sparkContext.register(accu)

val df = spark.read.parquet(path).filter(filter)
df.foreachPartition((it: Iterator[Row]) => it.foreach(v => accu.add(0)))
if (pushDown) {
assert(accu.value == 0)
} else {
assert(accu.value > 0)
}

AccumulatorContext.remove(accu.id)
}
}
}
}

test("filter pushdown - boolean") {
withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
Expand Down Expand Up @@ -574,7 +601,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex

val df = spark.read.parquet(path).filter("a < 100")
df.foreachPartition((it: Iterator[Row]) => it.foreach(v => accu.add(0)))
df.collect

if (enablePushDown) {
assert(accu.value == 0)
Expand Down Expand Up @@ -660,6 +686,60 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assert(df.where("col > 0").count() === 2)
}
}

test("filter pushdown - StringStartsWith") {
withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { implicit df =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that all of these tests go through the keep method instead of the canDrop and inverseCanDrop. I think those methods need to be tested. You can do that by constructing a Parquet file with row groups that have predictable statistics, but that would be difficult. An easier way to do this is to define the predicate class elsewhere and create a unit test for it that passes in different statistics values.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added testStringStartsWith to test that exactly go through the canDrop and inverseCanDrop.

checkFilterPredicate(
'_1.startsWith("").asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]],
Seq("1str1", "2str2", "3str3", "4str4").map(Row(_)))

Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix =>
checkFilterPredicate(
'_1.startsWith(prefix).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]],
"2str2")
}

Seq("2S", "null", "2str22").foreach { prefix =>
checkFilterPredicate(
'_1.startsWith(prefix).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]],
Seq.empty[Row])
}

checkFilterPredicate(
!'_1.startsWith("").asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]],
Seq().map(Row(_)))

Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix =>
checkFilterPredicate(
!'_1.startsWith(prefix).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]],
Seq("1str1", "3str3", "4str4").map(Row(_)))
}

Seq("2S", "null", "2str22").foreach { prefix =>
checkFilterPredicate(
!'_1.startsWith(prefix).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]],
Seq("1str1", "2str2", "3str3", "4str4").map(Row(_)))
}

assertResult(None) {
parquetFilters.createFilter(
df.schema,
sources.StringStartsWith("_1", null))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @attilapiros , sources.StringStartsWith("_1", null) will not matches them, same as before.

}
}

import testImplicits._
// Test canDrop() has taken effect
testStringStartsWith(spark.range(1024).map(_.toString).toDF(), "value like 'a%'")
// Test inverseCanDrop() has taken effect
testStringStartsWith(spark.range(1024).map(c => "100").toDF(), "value not like '10%'")
}
}

class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
Expand Down