Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ object DateTimeUtils {
millisToDays(date.getTime)
}

def fromJavaDate(date: Date, timeZone: TimeZone): SQLDate = {
millisToDays(date.getTime, timeZone)
}

/**
* Returns a java.sql.Date from number of days since epoch.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date")
.doc("If true, enables Parquet filter push-down optimization for Date. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be an internal conf, i.e., .internal()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile any idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be an internal conf. In Spark 3.0 release, we will revisit all the internal confs and remove all the unnecessary confs.

"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.internal()
.booleanConf
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an internal by-default-false conf usually means it's not available for users...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun @HyukjinKwon suggested such configuration could be better to start with "false", but actually ORC does support Date push down by default, any idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine either way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yucai . The reason is that spark.sql.orc.filterPushdown is still false in Spark 2.3 while spark.sql.parquet.filterPushdown is true. We don't know this is safe or not.

Anyway, we have 6 or more months for Apache Spark 2.4. We may enable this in master branch temporarily for testing purpose only, and are able to disable this at the last moment of 2.4 release like we did about ORC conf if there is some issue.

BTW, did you use this in your company a lot in production?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's common that we turn on new feature by default, if there is no known regression. And turn it off if we find regression later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you think so, +1. :)

BTW, based on Apache Spark way, I assume that this will not land on branch-2.3 with spark.sql.parquet.filterPushdown.date=true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a bug fix. We will not backport it to branch-2.3

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thank you for confirmation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun, we are investigating to use this feature in some kind of eBay's queries, if its performance is good, will benefit a lot.
As per our discussion, I will turn it on by default, thanks very much!


val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
.doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
"versions, when converting Parquet schema to Spark SQL schema and vice versa.")
Expand Down Expand Up @@ -1319,6 +1326,8 @@ class SQLConf extends Serializable with Logging {

def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)

def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)

def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.sql.Date

import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._

Expand All @@ -29,6 +34,10 @@ import org.apache.spark.sql.types._
*/
private[parquet] object ParquetFilters {

private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date, DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone))
}

private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
Expand All @@ -50,6 +59,10 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I was wrong. I took a look at how these dates get created, in DataSourceStrategy.translateFilter. Actually they are created via DateTimeUtils.toJavaDate without timezone, which means here we should not use timezone either.

}

private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -72,6 +85,10 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}

private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -91,6 +108,10 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.lt(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}

private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -110,6 +131,10 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.ltEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}

private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -129,6 +154,10 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should refactor it, so that adding a new data type doesn't need to touch so many places. This can be done later.

(n: String, v: Any) => FilterApi.gt(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}

private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -148,6 +177,10 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.gtEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.nio.charset.StandardCharsets
import java.sql.Date

import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
Expand Down Expand Up @@ -76,8 +77,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
expected: Seq[Row]): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))
Expand All @@ -102,7 +105,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
maybeFilter.exists(_.getClass === filterClass)
}
checker(stripSparkFilter(query), expected)
}
}
}

Expand Down Expand Up @@ -313,6 +315,48 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("filter pushdown - date") {
implicit class StringToDate(s: String) {
def date: Date = Date.valueOf(s)
}

val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")

withParquetDataFrame(data.map(i => Tuple1(i.date))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], data.map(i => Row.apply(i.date)))

checkFilterPredicate('_1 === "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate('_1 <=> "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate('_1 =!= "2018-03-18".date, classOf[NotEq[_]],
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(i.date)))

checkFilterPredicate('_1 < "2018-03-19".date, classOf[Lt[_]], "2018-03-18".date)
checkFilterPredicate('_1 > "2018-03-20".date, classOf[Gt[_]], "2018-03-21".date)
checkFilterPredicate('_1 <= "2018-03-18".date, classOf[LtEq[_]], "2018-03-18".date)
checkFilterPredicate('_1 >= "2018-03-21".date, classOf[GtEq[_]], "2018-03-21".date)

checkFilterPredicate(
Literal("2018-03-18".date) === '_1, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-18".date) <=> '_1, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-19".date) > '_1, classOf[Lt[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-20".date) < '_1, classOf[Gt[_]], "2018-03-21".date)
checkFilterPredicate(
Literal("2018-03-18".date) >= '_1, classOf[LtEq[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-21".date) <= '_1, classOf[GtEq[_]], "2018-03-21".date)

checkFilterPredicate(!('_1 < "2018-03-21".date), classOf[GtEq[_]], "2018-03-21".date)
checkFilterPredicate(
'_1 < "2018-03-19".date || '_1 > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row("2018-03-18".date), Row("2018-03-21".date)))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Indentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you kindly show me how to improve? Thanks!

Copy link
Member

@HyukjinKwon HyukjinKwon Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  test("filter pushdown - date") {
    implicit class StringToDate(s: String) {
      def date: Date = Date.valueOf(s)
    }

    val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")

    withParquetDataFrame(data.map(i => Tuple1(i.date))) { implicit df =>
      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], data.map(i => Row.apply(i.date)))

      checkFilterPredicate('_1 === "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
      checkFilterPredicate('_1 <=> "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
      checkFilterPredicate('_1 =!= "2018-03-18".date, classOf[NotEq[_]],
        Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(i.date)))

      checkFilterPredicate('_1 < "2018-03-19".date, classOf[Lt[_]], "2018-03-18".date)
      checkFilterPredicate('_1 > "2018-03-20".date, classOf[Gt[_]], "2018-03-21".date)
      checkFilterPredicate('_1 <= "2018-03-18".date, classOf[LtEq[_]], "2018-03-18".date)
      checkFilterPredicate('_1 >= "2018-03-21".date, classOf[GtEq[_]], "2018-03-21".date)

      checkFilterPredicate(
        Literal("2018-03-18".date) === '_1, classOf[Eq[_]], "2018-03-18".date)
      checkFilterPredicate(
        Literal("2018-03-18".date) <=> '_1, classOf[Eq[_]], "2018-03-18".date)
      checkFilterPredicate(
        Literal("2018-03-19".date) > '_1, classOf[Lt[_]], "2018-03-18".date)
      checkFilterPredicate(
        Literal("2018-03-20".date) < '_1, classOf[Gt[_]], "2018-03-21".date)
      checkFilterPredicate(
        Literal("2018-03-18".date) >= '_1, classOf[LtEq[_]], "2018-03-18".date)
      checkFilterPredicate(
        Literal("2018-03-21".date) <= '_1, classOf[GtEq[_]], "2018-03-21".date)

      checkFilterPredicate(!('_1 < "2018-03-21".date), classOf[GtEq[_]], "2018-03-21".date)
      checkFilterPredicate(
        '_1 < "2018-03-19".date || '_1 > "2018-03-20".date,
        classOf[Operators.Or],
        Seq(Row("2018-03-18".date), Row("2018-03-21".date)))
    }
  }

Oops, I edited it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @HyukjinKwon ! That's better what I thought.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different from what @HyukjinKwon gave you. :)

}

test("SPARK-6554: don't push down predicates which reference partition columns") {
import testImplicits._

Expand Down