Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
.doc("The maximum number of values to filter push-down optimization for IN predicate. " +
"Large threshold will not provide much better performance. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Large threshold won't necessarily provide much better performance.

"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.internal()
.intConf
.checkValue(threshold => threshold > 0 && threshold <= 300,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we enforcing that the threshold <= 300 despite this might be a reasonable value, I think that the user should be able to set the value he/she wants. If the value the user sets is a bad one, its his/her problem to set a proper value. We can probably mention that from the experiments 300 seems to be a limit threshold in the description eventually, in order to give an hint to the user.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think to set a maximum value 300 is a bit weird.

"The threshold must be greater than 0 and less than 300.")
.createWithDefault(10)

val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
.doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
"versions, when converting Parquet schema to Spark SQL schema and vice versa.")
Expand Down Expand Up @@ -1420,6 +1431,9 @@ class SQLConf extends Serializable with Logging {

def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)

def parquetFilterPushDownInFilterThreshold: Int =
getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD)

def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,16 +335,14 @@ class ParquetFileFormat
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableRecordFilter: Boolean =
sparkSession.sessionState.conf.parquetRecordFilterEnabled
val timestampConversion: Boolean =
sparkSession.sessionState.conf.isParquetINT96TimestampConversion
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
val enableParquetFilterPushDown: Boolean =
sparkSession.sessionState.conf.parquetFilterPushDown
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val inThreshold = sqlConf.parquetFilterPushDownInFilterThreshold

(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
Expand All @@ -355,7 +353,7 @@ class ParquetFileFormat
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(new ParquetFilters(pushDownDate).createFilter(requiredSchema, _))
.flatMap(new ParquetFilters(pushDownDate, inThreshold).createFilter(requiredSchema, _))
.reduceOption(FilterApi.and)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.types._
/**
* Some utility function to convert Spark data source filters to Parquet filters.
*/
private[parquet] class ParquetFilters(pushDownDate: Boolean) {
private[parquet] class ParquetFilters(pushDownDate: Boolean, inFilterThreshold: Int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not a big deal, but since now SQLConf are available also on executor side, can we get its value inside the class rather than outside? If we add more configurations this constructor might explode...


private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
Expand Down Expand Up @@ -270,6 +270,12 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean) {
case sources.Not(pred) =>
createFilter(schema, pred).map(FilterApi.not)

case sources.In(name, values)
if canMakeFilterOn(name) && values.distinct.length <= inFilterThreshold =>
values.distinct.flatMap { v =>
makeEq.lift(nameToType(name)).map(_(name, v))
}.reduceLeftOption(FilterApi.or)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about null handling? Do we get the same result as before? Anyway, can we add a test for it?


case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.nio.charset.StandardCharsets
import java.sql.Date

import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}

Expand Down Expand Up @@ -55,7 +55,8 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
*/
class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {

private lazy val parquetFilters = new ParquetFilters(conf.parquetFilterPushDownDate)
private lazy val parquetFilters =
new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownInFilterThreshold)

override def beforeEach(): Unit = {
super.beforeEach()
Expand Down Expand Up @@ -660,6 +661,62 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assert(df.where("col > 0").count() === 2)
}
}

test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false)
))

assertResult(Some(FilterApi.eq(intColumn("a"), null: Integer))) {
parquetFilters.createFilter(schema, sources.In("a", Array(null)))
}

assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) {
parquetFilters.createFilter(schema, sources.In("a", Array(10)))
}

// Remove duplicates
assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) {
parquetFilters.createFilter(schema, sources.In("a", Array(10, 10)))
}

assertResult(Some(or(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove this test because it looks basically a duplicate of the below.

FilterApi.eq(intColumn("a"), 10: Integer),
FilterApi.eq(intColumn("a"), 20: Integer)))
) {
parquetFilters.createFilter(schema, sources.In("a", Array(10, 20)))
}

assertResult(Some(or(or(
FilterApi.eq(intColumn("a"), 10: Integer),
FilterApi.eq(intColumn("a"), 20: Integer)),
FilterApi.eq(intColumn("a"), 30: Integer)))
) {
parquetFilters.createFilter(schema, sources.In("a", Array(10, 20, 30)))
}

assert(parquetFilters.createFilter(schema, sources.In("a",
Range(0, conf.parquetFilterPushDownInFilterThreshold).toArray)).isDefined)
assert(parquetFilters.createFilter(schema, sources.In("a",
Range(0, conf.parquetFilterPushDownInFilterThreshold + 1).toArray)).isEmpty)

import testImplicits._
withTempPath { path =>
(0 to 1024).toDF("a").coalesce(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a null here?

.write.option("parquet.block.size", 512)
.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
Seq(true, false).foreach { pushEnabled =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum, does this really test the Parquet itself push down fine? I think you should stripSparkFilter and check if they are actually filtered out when spark.sql.parquet.filterPushdown is enabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to:

val actual = stripSparkFilter(df.where(filter)).collect().length
if (pushEnabled && count <= conf.parquetFilterPushDownInFilterThreshold) {
  assert(actual > 1 && actual < data.length)
} else {
  assert(actual === data.length)
}

withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushEnabled.toString) {
Seq(1, 5, 10, 11, 1000).foreach { count =>
assert(df.where(s"a in(${Range(0, count).mkString(",")})").count() === count)
}
assert(df.where(s"a in(null)").count() === 0)
}
}
}
}
}

class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
Expand Down