Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
.doc("The maximum number of values to filter push-down optimization for IN predicate. " +
"Large threshold won't necessarily provide much better performance. " +
"The experiment argued that 300 is the limit threshold. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also depends on the data types, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

Type limit threshold
string 370
int 210
long 285
double 270
float 220
decimal Will not provide better performance

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interesting finding. Thanks for the update. Maybe you do not need to mention this limit threshold in the doc?

How about post your finding in the PR description?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment #21603 (comment)

"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.internal()
.intConf
.checkValue(threshold => threshold > 0, "The threshold must be greater than 0.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shell we allow, for example, -1 here to disable this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case sources.In(name, values) if canMakeFilterOn(name) && shouldConvertInPredicate(name) 
  && values.distinct.length <= pushDownInFilterThreshold =>

How about 0. values.distinct.length will not be less than 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

.createWithDefault(10)

val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
.doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
"versions, when converting Parquet schema to Spark SQL schema and vice versa.")
Expand Down Expand Up @@ -1420,6 +1431,9 @@ class SQLConf extends Serializable with Logging {

def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)

def parquetFilterPushDownInFilterThreshold: Int =
getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD)

def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,16 +335,12 @@ class ParquetFileFormat
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableRecordFilter: Boolean =
sparkSession.sessionState.conf.parquetRecordFilterEnabled
val timestampConversion: Boolean =
sparkSession.sessionState.conf.isParquetINT96TimestampConversion
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
val enableParquetFilterPushDown: Boolean =
sparkSession.sessionState.conf.parquetFilterPushDown
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate

(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
Expand All @@ -355,7 +351,7 @@ class ParquetFileFormat
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(new ParquetFilters(pushDownDate).createFilter(requiredSchema, _))
.flatMap(new ParquetFilters().createFilter(requiredSchema, _))
.reduceOption(FilterApi.and)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@ import org.apache.parquet.io.api.Binary

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._

/**
* Some utility function to convert Spark data source filters to Parquet filters.
*/
private[parquet] class ParquetFilters(pushDownDate: Boolean) {
private[parquet] class ParquetFilters {

val sqlConf: SQLConf = SQLConf.get

val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold

private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
Expand Down Expand Up @@ -270,6 +276,12 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean) {
case sources.Not(pred) =>
createFilter(schema, pred).map(FilterApi.not)

case sources.In(name, values)
if canMakeFilterOn(name) && values.distinct.length <= pushDownInFilterThreshold =>
values.distinct.flatMap { v =>
makeEq.lift(nameToType(name)).map(_(name, v))
}.reduceLeftOption(FilterApi.or)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about null handling? Do we get the same result as before? Anyway, can we add a test for it?


case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.nio.charset.StandardCharsets
import java.sql.Date

import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}

Expand Down Expand Up @@ -55,7 +55,7 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
*/
class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {

private lazy val parquetFilters = new ParquetFilters(conf.parquetFilterPushDownDate)
private lazy val parquetFilters = new ParquetFilters()

override def beforeEach(): Unit = {
super.beforeEach()
Expand Down Expand Up @@ -660,6 +660,64 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assert(df.where("col > 0").count() === 2)
}
}

test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false)
))

assertResult(Some(FilterApi.eq(intColumn("a"), null: Integer))) {
parquetFilters.createFilter(schema, sources.In("a", Array(null)))
}

assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) {
parquetFilters.createFilter(schema, sources.In("a", Array(10)))
}

// Remove duplicates
assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) {
parquetFilters.createFilter(schema, sources.In("a", Array(10, 10)))
}

assertResult(Some(or(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove this test because it looks basically a duplicate of the below.

FilterApi.eq(intColumn("a"), 10: Integer),
FilterApi.eq(intColumn("a"), 20: Integer)))
) {
parquetFilters.createFilter(schema, sources.In("a", Array(10, 20)))
}

assertResult(Some(or(or(
FilterApi.eq(intColumn("a"), 10: Integer),
FilterApi.eq(intColumn("a"), 20: Integer)),
FilterApi.eq(intColumn("a"), 30: Integer)))
) {
parquetFilters.createFilter(schema, sources.In("a", Array(10, 20, 30)))
}

assert(parquetFilters.createFilter(schema, sources.In("a",
Range(0, conf.parquetFilterPushDownInFilterThreshold).toArray)).isDefined)
assert(parquetFilters.createFilter(schema, sources.In("a",
Range(0, conf.parquetFilterPushDownInFilterThreshold + 1).toArray)).isEmpty)

import testImplicits._
withTempPath { path =>
(0 to 1024).toDF("a").selectExpr("if (a = 1024, null, a) AS a") // convert 1024 to null
.coalesce(1).write.option("parquet.block.size", 512)
.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
Seq(true, false).foreach { pushEnabled =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum, does this really test the Parquet itself push down fine? I think you should stripSparkFilter and check if they are actually filtered out when spark.sql.parquet.filterPushdown is enabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to:

val actual = stripSparkFilter(df.where(filter)).collect().length
if (pushEnabled && count <= conf.parquetFilterPushDownInFilterThreshold) {
  assert(actual > 1 && actual < data.length)
} else {
  assert(actual === data.length)
}

withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushEnabled.toString) {
Seq(1, 5, 10, 11, 1000).foreach { count =>
assert(df.where(s"a in(${Range(0, count).mkString(",")})").count() === count)
}
assert(df.where("a in(null)").count() === 0)
assert(df.where("a = null").count() === 0)
assert(df.where("a is null").count() === 1)
}
}
}
}
}

class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
Expand Down