Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date")
.doc("If true, enables Parquet filter push-down optimization for Date. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be an internal conf, i.e., .internal()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile any idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be an internal conf. In Spark 3.0 release, we will revisit all the internal confs and remove all the unnecessary confs.

"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.booleanConf
.createWithDefault(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this kind of thing, we had better start with false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
.doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
"versions, when converting Parquet schema to Spark SQL schema and vice versa.")
Expand Down Expand Up @@ -1319,6 +1325,8 @@ class SQLConf extends Serializable with Logging {

def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)

def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)

def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -50,6 +51,15 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => {
FilterApi.eq(
intColumn(n),
Option(v).map { date =>
val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000)
days.toInt.asInstanceOf[Integer]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use DateTimeUtils.fromJavaDate here and below?

}.orNull)
}
}

private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -72,6 +82,15 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

(n: String, v: Any) => FilterApi.notEq(
 intColumn(n),
 Option(v).map { d =>
   DateTimeUtils.fromJavaDate(d.asInstanceOf[java.sql.Date]).asInstanceOf[Integer]
 }.orNull)

FilterApi.notEq(
intColumn(n),
Option(v).map { date =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 3 lines are repeated several times in this change. Please refactor to a new method.

val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000)
days.toInt.asInstanceOf[Integer]
}.orNull)
}
}

private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -91,6 +110,15 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => {
FilterApi.lt(
intColumn(n),
Option(v).map { date =>
val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000)
days.toInt.asInstanceOf[Integer]
}.orNull)
}
}

private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -110,6 +138,15 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => {
FilterApi.ltEq(
intColumn(n),
Option(v).map { date =>
val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000)
days.toInt.asInstanceOf[Integer]
}.orNull)
}
}

private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -129,6 +166,15 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should refactor it, so that adding a new data type doesn't need to touch so many places. This can be done later.

(n: String, v: Any) => {
FilterApi.gt(
intColumn(n),
Option(v).map { date =>
val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000)
days.toInt.asInstanceOf[Integer]
}.orNull)
}
}

private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -148,6 +194,15 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => {
FilterApi.gtEq(
intColumn(n),
Option(v).map { date =>
val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000)
days.toInt.asInstanceOf[Integer]
}.orNull)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.nio.charset.StandardCharsets
import java.sql.Date

import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
Expand Down Expand Up @@ -76,7 +77,9 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
expected: Seq[Row]): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

withSQLConf(
  SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
  SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
  SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {

withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
val query = df
.select(output.map(e => Column(e)): _*)
Expand Down Expand Up @@ -313,6 +316,36 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("filter pushdown - date") {
implicit class IntToDate(int: Int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we pass a string here and convert it into a date?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this way? It is from ORC's date push down.

  test("filter pushdown - date") {
    val dates = Seq("2017-08-18", "2017-08-19", "2017-08-20", "2017-08-21").map { day =>
      Date.valueOf(day)
    }
    withOrcDataFrame(dates.map(Tuple1(_))) { implicit df =>
      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)

      checkFilterPredicate('_1 === dates(0), PredicateLeaf.Operator.EQUALS)
      checkFilterPredicate('_1 <=> dates(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)

      checkFilterPredicate('_1 < dates(1), PredicateLeaf.Operator.LESS_THAN)
      checkFilterPredicate('_1 > dates(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
      checkFilterPredicate('_1 <= dates(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
      checkFilterPredicate('_1 >= dates(3), PredicateLeaf.Operator.LESS_THAN)

      checkFilterPredicate(Literal(dates(0)) === '_1, PredicateLeaf.Operator.EQUALS)
      checkFilterPredicate(Literal(dates(0)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
      checkFilterPredicate(Literal(dates(1)) > '_1, PredicateLeaf.Operator.LESS_THAN)
      checkFilterPredicate(Literal(dates(2)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
      checkFilterPredicate(Literal(dates(0)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
      checkFilterPredicate(Literal(dates(3)) <= '_1, PredicateLeaf.Operator.LESS_THAN)
    }
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way looks fine but I usually stick to what other codes do around the fix though ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, do you mean this way? Looks like we need more words :).

    implicit class StringToDate(s: String) {
      def d: Date = Date.valueOf(s)
    }

    withParquetDataFrame(
      Seq("2017-08-18", "2017-08-19", "2017-08-20", "2017-08-21").map(i => Tuple1(i.d))) {
        implicit df =>
          checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
          checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]],
            Seq("2017-08-18", "2017-08-19", "2017-08-20", "2017-08-21").map(i => Row.apply(i.d)))

          checkFilterPredicate('_1 === "2017-08-18".d, classOf[Eq[_]], "2017-08-18".d)
          checkFilterPredicate('_1 =!= "2017-08-18".d, classOf[NotEq[_]],
            Seq("2017-08-19", "2017-08-20", "2017-08-21").map(i => Row.apply(i.d)))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "2017-08-19".d is at least better than 1.d.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I think this one is better than the current one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per @cloud-fan 's suggestion, I have changed 1.d to 1.date, which one is perferred?

  1. "2017-08-18".d
  2. 1.date
  3. "2017-08-18".date

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the string one is more readable to me

def d: Date = new Date(Date.valueOf("2018-03-01").getTime + 24 * 60 * 60 * 1000 * (int - 1))
}

withParquetDataFrame((1 to 4).map(i => Tuple1(i.d))) { implicit df =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test cases only cover the limited cases. We need to check the boundary values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you kindly give me some examples about what kind of boundary tests? I checked parquet integer push down and ORC date type push down, seems like have covered all their tests.

checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.d)))

checkFilterPredicate('_1 === 1.d, classOf[Eq[_]], 1.d)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.d is weird, can we name it 1.date?

Copy link
Contributor Author

@yucai yucai Mar 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree 1.date is better, but binary is using "1.b", shall we keep the same pattern with it?

  test("filter pushdown - binary") {
    implicit class IntToBinary(int: Int) {
      def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b might be OK, but d is confusing as it's also a postfix for double.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks very much for explanation.

checkFilterPredicate('_1 <=> 1.d, classOf[Eq[_]], 1.d)
checkFilterPredicate('_1 =!= 1.d, classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.d)))

checkFilterPredicate('_1 < 2.d, classOf[Lt[_]], 1.d)
checkFilterPredicate('_1 > 3.d, classOf[Gt[_]], 4.d)
checkFilterPredicate('_1 <= 1.d, classOf[LtEq[_]], 1.d)
checkFilterPredicate('_1 >= 4.d, classOf[GtEq[_]], 4.d)

checkFilterPredicate(Literal(1.d) === '_1, classOf[Eq[_]], 1.d)
checkFilterPredicate(Literal(1.d) <=> '_1, classOf[Eq[_]], 1.d)
checkFilterPredicate(Literal(2.d) > '_1, classOf[Lt[_]], 1.d)
checkFilterPredicate(Literal(3.d) < '_1, classOf[Gt[_]], 4.d)
checkFilterPredicate(Literal(1.d) >= '_1, classOf[LtEq[_]], 1.d)
checkFilterPredicate(Literal(4.d) <= '_1, classOf[GtEq[_]], 4.d)

checkFilterPredicate(!('_1 < 4.d), classOf[GtEq[_]], 4.d)
checkFilterPredicate('_1 < 2.d || '_1 > 3.d, classOf[Operators.Or], Seq(Row(1.d), Row(4.d)))
}
}

test("SPARK-6554: don't push down predicates which reference partition columns") {
import testImplicits._

Expand Down