Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ object DateTimeUtils {
millisToDays(date.getTime)
}

def fromJavaDate(date: Date, timeZone: TimeZone): SQLDate = {
millisToDays(date.getTime, timeZone)
}

/**
* Returns a java.sql.Date from number of days since epoch.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date")
.doc("If true, enables Parquet filter push-down optimization for Date. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be an internal conf, i.e., .internal()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile any idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be an internal conf. In Spark 3.0 release, we will revisit all the internal confs and remove all the unnecessary confs.

"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.internal()
.booleanConf
.createWithDefault(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this kind of thing, we had better start with false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
.doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
"versions, when converting Parquet schema to Spark SQL schema and vice versa.")
Expand Down Expand Up @@ -1319,6 +1326,8 @@ class SQLConf extends Serializable with Logging {

def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)

def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)

def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.sql.Date

import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._

Expand All @@ -29,6 +34,10 @@ import org.apache.spark.sql.types._
*/
private[parquet] object ParquetFilters {

private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date, DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone))
}

private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
Expand All @@ -50,6 +59,11 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[java.sql.Date]).asInstanceOf[Integer])
Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we imported Date at line 20, we can remove java.sql. There are 6 occurrences.

- Option(v).map(date => dateToDays(date.asInstanceOf[java.sql.Date]).asInstanceOf[Integer])
-   .orNull)
+ Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)

.orNull)
}

private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -72,6 +86,11 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[java.sql.Date]).asInstanceOf[Integer])
.orNull)
}

private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -91,6 +110,11 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.lt(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[java.sql.Date]).asInstanceOf[Integer])
.orNull)
}

private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -110,6 +134,11 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.ltEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[java.sql.Date]).asInstanceOf[Integer])
.orNull)
}

private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -129,6 +158,11 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should refactor it, so that adding a new data type doesn't need to touch so many places. This can be done later.

(n: String, v: Any) => FilterApi.gt(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[java.sql.Date]).asInstanceOf[Integer])
.orNull)
}

private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -148,6 +182,11 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.gtEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[java.sql.Date]).asInstanceOf[Integer])
.orNull)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.nio.charset.StandardCharsets
import java.sql.Date

import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
Expand Down Expand Up @@ -76,8 +77,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
expected: Seq[Row]): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))
Expand All @@ -102,7 +105,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
maybeFilter.exists(_.getClass === filterClass)
}
checker(stripSparkFilter(query), expected)
}
}
}

Expand Down Expand Up @@ -313,6 +315,49 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("filter pushdown - date") {
implicit class StringToDate(s: String) {
def date: Date = Date.valueOf(s)
}

withParquetDataFrame(
Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21").map(i => Tuple1(i.date))) {
implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]],
Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(i.date)))

checkFilterPredicate('_1 === "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate('_1 <=> "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate('_1 =!= "2018-03-18".date, classOf[NotEq[_]],
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(i.date)))

checkFilterPredicate('_1 < "2018-03-19".date, classOf[Lt[_]], "2018-03-18".date)
checkFilterPredicate('_1 > "2018-03-20".date, classOf[Gt[_]], "2018-03-21".date)
checkFilterPredicate('_1 <= "2018-03-18".date, classOf[LtEq[_]], "2018-03-18".date)
checkFilterPredicate('_1 >= "2018-03-21".date, classOf[GtEq[_]], "2018-03-21".date)

checkFilterPredicate(
Literal("2018-03-18".date) === '_1, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-18".date) <=> '_1, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-19".date) > '_1, classOf[Lt[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-20".date) < '_1, classOf[Gt[_]], "2018-03-21".date)
checkFilterPredicate(
Literal("2018-03-18".date) >= '_1, classOf[LtEq[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-21".date) <= '_1, classOf[GtEq[_]], "2018-03-21".date)

checkFilterPredicate(!('_1 < "2018-03-21".date), classOf[GtEq[_]], "2018-03-21".date)
checkFilterPredicate(
'_1 < "2018-03-19".date || '_1 > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row("2018-03-18".date), Row("2018-03-21".date)))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Indentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you kindly show me how to improve? Thanks!

Copy link
Member

@HyukjinKwon HyukjinKwon Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  test("filter pushdown - date") {
    implicit class StringToDate(s: String) {
      def date: Date = Date.valueOf(s)
    }

    val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")

    withParquetDataFrame(data.map(i => Tuple1(i.date))) { implicit df =>
      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], data.map(i => Row.apply(i.date)))

      checkFilterPredicate('_1 === "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
      checkFilterPredicate('_1 <=> "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
      checkFilterPredicate('_1 =!= "2018-03-18".date, classOf[NotEq[_]],
        Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(i.date)))

      checkFilterPredicate('_1 < "2018-03-19".date, classOf[Lt[_]], "2018-03-18".date)
      checkFilterPredicate('_1 > "2018-03-20".date, classOf[Gt[_]], "2018-03-21".date)
      checkFilterPredicate('_1 <= "2018-03-18".date, classOf[LtEq[_]], "2018-03-18".date)
      checkFilterPredicate('_1 >= "2018-03-21".date, classOf[GtEq[_]], "2018-03-21".date)

      checkFilterPredicate(
        Literal("2018-03-18".date) === '_1, classOf[Eq[_]], "2018-03-18".date)
      checkFilterPredicate(
        Literal("2018-03-18".date) <=> '_1, classOf[Eq[_]], "2018-03-18".date)
      checkFilterPredicate(
        Literal("2018-03-19".date) > '_1, classOf[Lt[_]], "2018-03-18".date)
      checkFilterPredicate(
        Literal("2018-03-20".date) < '_1, classOf[Gt[_]], "2018-03-21".date)
      checkFilterPredicate(
        Literal("2018-03-18".date) >= '_1, classOf[LtEq[_]], "2018-03-18".date)
      checkFilterPredicate(
        Literal("2018-03-21".date) <= '_1, classOf[GtEq[_]], "2018-03-21".date)

      checkFilterPredicate(!('_1 < "2018-03-21".date), classOf[GtEq[_]], "2018-03-21".date)
      checkFilterPredicate(
        '_1 < "2018-03-19".date || '_1 > "2018-03-20".date,
        classOf[Operators.Or],
        Seq(Row("2018-03-18".date), Row("2018-03-21".date)))
    }
  }

Oops, I edited it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @HyukjinKwon ! That's better what I thought.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different from what @HyukjinKwon gave you. :)

}

test("SPARK-6554: don't push down predicates which reference partition columns") {
import testImplicits._

Expand Down