Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,18 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
.doc("The maximum number of values to filter push-down optimization for IN predicate. " +
"Large threshold won't necessarily provide much better performance. " +
"The experiment argued that 300 is the limit threshold. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also depends on the data types, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

Type limit threshold
string 370
int 210
long 285
double 270
float 220
decimal Will not provide better performance

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interesting finding. Thanks for the update. Maybe you do not need to mention this limit threshold in the doc?

How about post your finding in the PR description?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment #21603 (comment)

"By setting this value to 0 this feature can be disabled. " +
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.internal()
.intConf
.checkValue(threshold => threshold >= 0, "The threshold must not be negative.")
.createWithDefault(10)

val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
.doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
"versions, when converting Parquet schema to Spark SQL schema and vice versa.")
Expand Down Expand Up @@ -1485,6 +1497,9 @@ class SQLConf extends Serializable with Logging {
def parquetFilterPushDownStringStartWith: Boolean =
getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)

def parquetFilterPushDownInFilterThreshold: Int =
getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD)

def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
Expand Down
96 changes: 48 additions & 48 deletions sql/core/benchmarks/FilterPushdownBenchmark-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -417,120 +417,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

InSet -> InFilters (values count: 5, distribution: 10): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 7477 / 7587 2.1 475.4 1.0X
Parquet Vectorized (Pushdown) 7862 / 8346 2.0 499.9 1.0X
Native ORC Vectorized 6447 / 7021 2.4 409.9 1.2X
Native ORC Vectorized (Pushdown) 983 / 1003 16.0 62.5 7.6X
Parquet Vectorized 7993 / 8104 2.0 508.2 1.0X
Parquet Vectorized (Pushdown) 507 / 532 31.0 32.2 15.8X
Native ORC Vectorized 6922 / 7163 2.3 440.1 1.2X
Native ORC Vectorized (Pushdown) 1017 / 1058 15.5 64.6 7.9X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

InSet -> InFilters (values count: 5, distribution: 50): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 7107 / 7290 2.2 451.9 1.0X
Parquet Vectorized (Pushdown) 7196 / 7258 2.2 457.5 1.0X
Native ORC Vectorized 6102 / 6222 2.6 388.0 1.2X
Native ORC Vectorized (Pushdown) 926 / 958 17.0 58.9 7.7X
Parquet Vectorized 7855 / 7963 2.0 499.4 1.0X
Parquet Vectorized (Pushdown) 503 / 516 31.3 32.0 15.6X
Native ORC Vectorized 6825 / 6954 2.3 433.9 1.2X
Native ORC Vectorized (Pushdown) 1019 / 1044 15.4 64.8 7.7X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

InSet -> InFilters (values count: 5, distribution: 90): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 7374 / 7692 2.1 468.8 1.0X
Parquet Vectorized (Pushdown) 7771 / 7848 2.0 494.1 0.9X
Native ORC Vectorized 6184 / 6356 2.5 393.2 1.2X
Native ORC Vectorized (Pushdown) 920 / 963 17.1 58.5 8.0X
Parquet Vectorized 7858 / 7928 2.0 499.6 1.0X
Parquet Vectorized (Pushdown) 490 / 519 32.1 31.1 16.0X
Native ORC Vectorized 7079 / 7966 2.2 450.1 1.1X
Native ORC Vectorized (Pushdown) 1276 / 1673 12.3 81.1 6.2X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

InSet -> InFilters (values count: 10, distribution: 10): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 7073 / 7326 2.2 449.7 1.0X
Parquet Vectorized (Pushdown) 7304 / 7647 2.2 464.4 1.0X
Native ORC Vectorized 6222 / 6579 2.5 395.6 1.1X
Native ORC Vectorized (Pushdown) 958 / 994 16.4 60.9 7.4X
Parquet Vectorized 8007 / 11155 2.0 509.0 1.0X
Parquet Vectorized (Pushdown) 519 / 540 30.3 33.0 15.4X
Native ORC Vectorized 6848 / 7072 2.3 435.4 1.2X
Native ORC Vectorized (Pushdown) 1026 / 1050 15.3 65.2 7.8X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

InSet -> InFilters (values count: 10, distribution: 50): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 7121 / 7501 2.2 452.7 1.0X
Parquet Vectorized (Pushdown) 7751 / 8334 2.0 492.8 0.9X
Native ORC Vectorized 6225 / 6680 2.5 395.8 1.1X
Native ORC Vectorized (Pushdown) 998 / 1020 15.8 63.5 7.1X
Parquet Vectorized 7876 / 7956 2.0 500.7 1.0X
Parquet Vectorized (Pushdown) 521 / 535 30.2 33.1 15.1X
Native ORC Vectorized 7051 / 7368 2.2 448.3 1.1X
Native ORC Vectorized (Pushdown) 1014 / 1035 15.5 64.5 7.8X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

InSet -> InFilters (values count: 10, distribution: 90): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 7157 / 7399 2.2 455.1 1.0X
Parquet Vectorized (Pushdown) 7806 / 7911 2.0 496.3 0.9X
Native ORC Vectorized 6548 / 6720 2.4 416.3 1.1X
Native ORC Vectorized (Pushdown) 1016 / 1050 15.5 64.6 7.0X
Parquet Vectorized 7897 / 8229 2.0 502.1 1.0X
Parquet Vectorized (Pushdown) 513 / 530 30.7 32.6 15.4X
Native ORC Vectorized 6730 / 6990 2.3 427.9 1.2X
Native ORC Vectorized (Pushdown) 1003 / 1036 15.7 63.8 7.9X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

InSet -> InFilters (values count: 50, distribution: 10): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 7662 / 7805 2.1 487.1 1.0X
Parquet Vectorized (Pushdown) 7590 / 7861 2.1 482.5 1.0X
Native ORC Vectorized 6840 / 8073 2.3 434.9 1.1X
Native ORC Vectorized (Pushdown) 1041 / 1075 15.1 66.2 7.4X
Parquet Vectorized 7967 / 8175 2.0 506.5 1.0X
Parquet Vectorized (Pushdown) 8155 / 8434 1.9 518.5 1.0X
Copy link
Member

@viirya viirya Jul 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this benchmark numbers are more meaningful if we show IN predicate is pushdown or not (or threshold).

Native ORC Vectorized 7002 / 7107 2.2 445.2 1.1X
Native ORC Vectorized (Pushdown) 1092 / 1139 14.4 69.4 7.3X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

InSet -> InFilters (values count: 50, distribution: 50): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 8230 / 9266 1.9 523.2 1.0X
Parquet Vectorized (Pushdown) 7735 / 7960 2.0 491.8 1.1X
Native ORC Vectorized 6945 / 7109 2.3 441.6 1.2X
Native ORC Vectorized (Pushdown) 1123 / 1144 14.0 71.4 7.3X
Parquet Vectorized 8032 / 8122 2.0 510.7 1.0X
Parquet Vectorized (Pushdown) 8141 / 8908 1.9 517.6 1.0X
Native ORC Vectorized 7140 / 7387 2.2 454.0 1.1X
Native ORC Vectorized (Pushdown) 1156 / 1220 13.6 73.5 6.9X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

InSet -> InFilters (values count: 50, distribution: 90): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 7656 / 8058 2.1 486.7 1.0X
Parquet Vectorized (Pushdown) 7860 / 8247 2.0 499.7 1.0X
Native ORC Vectorized 6684 / 7003 2.4 424.9 1.1X
Native ORC Vectorized (Pushdown) 1085 / 1172 14.5 69.0 7.1X
Parquet Vectorized 8088 / 8350 1.9 514.2 1.0X
Parquet Vectorized (Pushdown) 8629 / 8702 1.8 548.6 0.9X
Native ORC Vectorized 7480 / 7886 2.1 475.6 1.1X
Native ORC Vectorized (Pushdown) 1106 / 1145 14.2 70.3 7.3X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

InSet -> InFilters (values count: 100, distribution: 10): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 7594 / 8128 2.1 482.8 1.0X
Parquet Vectorized (Pushdown) 7845 / 7923 2.0 498.8 1.0X
Native ORC Vectorized 5859 / 6421 2.7 372.5 1.3X
Native ORC Vectorized (Pushdown) 1037 / 1054 15.2 66.0 7.3X
Parquet Vectorized 8028 / 8165 2.0 510.4 1.0X
Parquet Vectorized (Pushdown) 8349 / 8674 1.9 530.8 1.0X
Native ORC Vectorized 7107 / 7354 2.2 451.8 1.1X
Native ORC Vectorized (Pushdown) 1175 / 1207 13.4 74.7 6.8X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

InSet -> InFilters (values count: 100, distribution: 50): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 6762 / 6775 2.3 429.9 1.0X
Parquet Vectorized (Pushdown) 6911 / 6970 2.3 439.4 1.0X
Native ORC Vectorized 5884 / 5960 2.7 374.1 1.1X
Native ORC Vectorized (Pushdown) 1028 / 1052 15.3 65.4 6.6X
Parquet Vectorized 8041 / 8195 2.0 511.2 1.0X
Parquet Vectorized (Pushdown) 8466 / 8604 1.9 538.2 0.9X
Native ORC Vectorized 7116 / 7286 2.2 452.4 1.1X
Native ORC Vectorized (Pushdown) 1197 / 1214 13.1 76.1 6.7X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

InSet -> InFilters (values count: 100, distribution: 90): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 6718 / 6767 2.3 427.1 1.0X
Parquet Vectorized (Pushdown) 6812 / 6909 2.3 433.1 1.0X
Native ORC Vectorized 5842 / 5883 2.7 371.4 1.1X
Native ORC Vectorized (Pushdown) 1040 / 1058 15.1 66.1 6.5X
Parquet Vectorized 7998 / 8311 2.0 508.5 1.0X
Parquet Vectorized (Pushdown) 9366 / 11257 1.7 595.5 0.9X
Native ORC Vectorized 7856 / 9273 2.0 499.5 1.0X
Native ORC Vectorized (Pushdown) 1350 / 1747 11.7 85.8 5.9X


================================================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,17 +334,15 @@ class ParquetFileFormat
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableRecordFilter: Boolean =
sparkSession.sessionState.conf.parquetRecordFilterEnabled
val timestampConversion: Boolean =
sparkSession.sessionState.conf.isParquetINT96TimestampConversion
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
val enableParquetFilterPushDown: Boolean =
sparkSession.sessionState.conf.parquetFilterPushDown
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold

(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
Expand All @@ -368,12 +366,13 @@ class ParquetFileFormat
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
.getFileMetaData.getSchema
val parquetFilters = new ParquetFilters(pushDownDate,
pushDownStringStartWith, pushDownInFilterThreshold)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
.createFilter(parquetSchema, _))
.flatMap(parquetFilters.createFilter(parquetSchema, _))
.reduceOption(FilterApi.and)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ import org.apache.spark.unsafe.types.UTF8String
/**
* Some utility function to convert Spark data source filters to Parquet filters.
*/
private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) {
private[parquet] class ParquetFilters(
pushDownDate: Boolean,
pushDownStartWith: Boolean,
pushDownInFilterThreshold: Int) {

private case class ParquetSchemaType(
originalType: OriginalType,
Expand Down Expand Up @@ -232,6 +235,15 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
// See SPARK-20364.
def canMakeFilterOn(name: String): Boolean = nameToType.contains(name) && !name.contains(".")

// All DataTypes that support `makeEq` can provide better performance.
def shouldConvertInPredicate(name: String): Boolean = nameToType(name) match {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon How about remove this?
Timestamp type and Decimal type will be support soon.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us keep it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on which PR will be merged first. The corresponding PRs should update this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need to update the benchmark suite.

case ParquetBooleanType | ParquetByteType | ParquetShortType | ParquetIntegerType
| ParquetLongType | ParquetFloatType | ParquetDoubleType | ParquetStringType
| ParquetBinaryType => true
case ParquetDateType if pushDownDate => true
case _ => false
}

// NOTE:
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
Expand Down Expand Up @@ -295,6 +307,12 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
case sources.Not(pred) =>
createFilter(schema, pred).map(FilterApi.not)

case sources.In(name, values) if canMakeFilterOn(name) && shouldConvertInPredicate(name)
&& values.distinct.length <= pushDownInFilterThreshold =>
values.distinct.flatMap { v =>
makeEq.lift(nameToType(name)).map(_(name, v))
}.reduceLeftOption(FilterApi.or)

case sources.StringStartsWith(name, prefix) if pushDownStartWith && canMakeFilterOn(name) =>
Option(prefix).map { v =>
FilterApi.userDefined(binaryColumn(name),
Expand Down
Loading