Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date")
.doc("If true, enables Parquet filter push-down optimization for Date. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be an internal conf, i.e., .internal()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile any idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be an internal conf. In Spark 3.0 release, we will revisit all the internal confs and remove all the unnecessary confs.

"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.internal()
.booleanConf
.createWithDefault(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this kind of thing, we had better start with false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
.doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
"versions, when converting Parquet schema to Spark SQL schema and vice versa.")
Expand Down Expand Up @@ -1319,6 +1326,8 @@ class SQLConf extends Serializable with Logging {

def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)

def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)

def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.sql.Date

import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._

Expand All @@ -29,6 +34,10 @@ import org.apache.spark.sql.types._
*/
private[parquet] object ParquetFilters {

private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
}

private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
Expand All @@ -50,6 +59,10 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I was wrong. I took a look at how these dates get created, in DataSourceStrategy.translateFilter. Actually they are created via DateTimeUtils.toJavaDate without timezone, which means here we should not use timezone either.

}

private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -72,6 +85,10 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}

private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -91,6 +108,10 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.lt(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}

private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -110,6 +131,10 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.ltEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}

private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -129,6 +154,10 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should refactor it, so that adding a new data type doesn't need to touch so many places. This can be done later.

(n: String, v: Any) => FilterApi.gt(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}

private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -148,6 +177,10 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.gtEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.nio.charset.StandardCharsets
import java.sql.Date

import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
Expand Down Expand Up @@ -76,8 +77,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
expected: Seq[Row]): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))
Expand All @@ -102,7 +105,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
maybeFilter.exists(_.getClass === filterClass)
}
checker(stripSparkFilter(query), expected)
}
}
}

Expand Down Expand Up @@ -313,6 +315,48 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("filter pushdown - date") {
implicit class StringToDate(s: String) {
def date: Date = Date.valueOf(s)
}

val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")

withParquetDataFrame(data.map(i => Tuple1(i.date))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], data.map(i => Row.apply(i.date)))

checkFilterPredicate('_1 === "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate('_1 <=> "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate('_1 =!= "2018-03-18".date, classOf[NotEq[_]],
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(i.date)))

checkFilterPredicate('_1 < "2018-03-19".date, classOf[Lt[_]], "2018-03-18".date)
checkFilterPredicate('_1 > "2018-03-20".date, classOf[Gt[_]], "2018-03-21".date)
checkFilterPredicate('_1 <= "2018-03-18".date, classOf[LtEq[_]], "2018-03-18".date)
checkFilterPredicate('_1 >= "2018-03-21".date, classOf[GtEq[_]], "2018-03-21".date)

checkFilterPredicate(
Literal("2018-03-18".date) === '_1, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-18".date) <=> '_1, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-19".date) > '_1, classOf[Lt[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-20".date) < '_1, classOf[Gt[_]], "2018-03-21".date)
checkFilterPredicate(
Literal("2018-03-18".date) >= '_1, classOf[LtEq[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-21".date) <= '_1, classOf[GtEq[_]], "2018-03-21".date)

checkFilterPredicate(!('_1 < "2018-03-21".date), classOf[GtEq[_]], "2018-03-21".date)
checkFilterPredicate(
'_1 < "2018-03-19".date || '_1 > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row("2018-03-18".date), Row("2018-03-21".date)))
}
}

test("SPARK-6554: don't push down predicates which reference partition columns") {
import testImplicits._

Expand Down