Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType =>
(n: String, v: Any) => {
FilterApi.eq(
intColumn(n),
Option(v).map{ date =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: p{ -> p {

val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000)
days.toInt.asInstanceOf[Integer]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use DateTimeUtils.fromJavaDate here and below?

}.orNull)
}
}

private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -72,6 +81,15 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType =>
(n: String, v: Any) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

(n: String, v: Any) => FilterApi.notEq(
 intColumn(n),
 Option(v).map { d =>
   DateTimeUtils.fromJavaDate(d.asInstanceOf[java.sql.Date]).asInstanceOf[Integer]
 }.orNull)

FilterApi.notEq(
intColumn(n),
Option(v).map{ date =>
val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000)
days.toInt.asInstanceOf[Integer]
}.orNull)
}
}

private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -91,6 +109,15 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType =>
(n: String, v: Any) => {
FilterApi.lt(
intColumn(n),
Option(v).map{ date =>
val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000)
days.toInt.asInstanceOf[Integer]
}.orNull)
}
}

private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -110,6 +137,15 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType =>
(n: String, v: Any) => {
FilterApi.ltEq(
intColumn(n),
Option(v).map{ date =>
val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000)
days.toInt.asInstanceOf[Integer]
}.orNull)
}
}

private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -129,6 +165,15 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType =>
(n: String, v: Any) => {
FilterApi.gt(
intColumn(n),
Option(v).map{ date =>
val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000)
days.toInt.asInstanceOf[Integer]
}.orNull)
}
}

private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -148,6 +193,15 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType =>
Copy link
Member

@gatorsmile gatorsmile Mar 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new internal SQLConf to make it configurable. Users can turn it off if they hit a bug introduced in this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added, kindly help review.

(n: String, v: Any) => {
FilterApi.gtEq(
intColumn(n),
Option(v).map{ date =>
val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000)
days.toInt.asInstanceOf[Integer]
}.orNull)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.nio.charset.StandardCharsets
import java.sql.Date

import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
Expand Down Expand Up @@ -313,6 +314,36 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("filter pushdown - date") {
implicit class IntToDate(int: Int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we pass a string here and convert it into a date?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this way? It is from ORC's date push down.

  test("filter pushdown - date") {
    val dates = Seq("2017-08-18", "2017-08-19", "2017-08-20", "2017-08-21").map { day =>
      Date.valueOf(day)
    }
    withOrcDataFrame(dates.map(Tuple1(_))) { implicit df =>
      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)

      checkFilterPredicate('_1 === dates(0), PredicateLeaf.Operator.EQUALS)
      checkFilterPredicate('_1 <=> dates(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)

      checkFilterPredicate('_1 < dates(1), PredicateLeaf.Operator.LESS_THAN)
      checkFilterPredicate('_1 > dates(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
      checkFilterPredicate('_1 <= dates(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
      checkFilterPredicate('_1 >= dates(3), PredicateLeaf.Operator.LESS_THAN)

      checkFilterPredicate(Literal(dates(0)) === '_1, PredicateLeaf.Operator.EQUALS)
      checkFilterPredicate(Literal(dates(0)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
      checkFilterPredicate(Literal(dates(1)) > '_1, PredicateLeaf.Operator.LESS_THAN)
      checkFilterPredicate(Literal(dates(2)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
      checkFilterPredicate(Literal(dates(0)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
      checkFilterPredicate(Literal(dates(3)) <= '_1, PredicateLeaf.Operator.LESS_THAN)
    }
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way looks fine but I usually stick to what other codes do around the fix though ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, do you mean this way? Looks like we need more words :).

    implicit class StringToDate(s: String) {
      def d: Date = Date.valueOf(s)
    }

    withParquetDataFrame(
      Seq("2017-08-18", "2017-08-19", "2017-08-20", "2017-08-21").map(i => Tuple1(i.d))) {
        implicit df =>
          checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
          checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]],
            Seq("2017-08-18", "2017-08-19", "2017-08-20", "2017-08-21").map(i => Row.apply(i.d)))

          checkFilterPredicate('_1 === "2017-08-18".d, classOf[Eq[_]], "2017-08-18".d)
          checkFilterPredicate('_1 =!= "2017-08-18".d, classOf[NotEq[_]],
            Seq("2017-08-19", "2017-08-20", "2017-08-21").map(i => Row.apply(i.d)))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "2017-08-19".d is at least better than 1.d.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I think this one is better than the current one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per @cloud-fan 's suggestion, I have changed 1.d to 1.date, which one is perferred?

  1. "2017-08-18".d
  2. 1.date
  3. "2017-08-18".date

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the string one is more readable to me

def d: Date = new Date(Date.valueOf("2018-03-01").getTime + 24 * 60 * 60 * 1000 * (int - 1))
}

withParquetDataFrame((1 to 4).map(i => Tuple1(i.d))) { implicit df =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test cases only cover the limited cases. We need to check the boundary values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you kindly give me some examples about what kind of boundary tests? I checked parquet integer push down and ORC date type push down, seems like have covered all their tests.

checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.d)))

checkFilterPredicate('_1 === 1.d, classOf[Eq[_]], 1.d)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.d is weird, can we name it 1.date?

Copy link
Contributor Author

@yucai yucai Mar 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree 1.date is better, but binary is using "1.b", shall we keep the same pattern with it?

  test("filter pushdown - binary") {
    implicit class IntToBinary(int: Int) {
      def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b might be OK, but d is confusing as it's also a postfix for double.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks very much for explanation.

checkFilterPredicate('_1 <=> 1.d, classOf[Eq[_]], 1.d)
checkFilterPredicate('_1 =!= 1.d, classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.d)))

checkFilterPredicate('_1 < 2.d, classOf[Lt[_]], 1.d)
checkFilterPredicate('_1 > 3.d, classOf[Gt[_]], 4.d)
checkFilterPredicate('_1 <= 1.d, classOf[LtEq[_]], 1.d)
checkFilterPredicate('_1 >= 4.d, classOf[GtEq[_]], 4.d)

checkFilterPredicate(Literal(1.d) === '_1, classOf[Eq[_]], 1.d)
checkFilterPredicate(Literal(1.d) <=> '_1, classOf[Eq[_]], 1.d)
checkFilterPredicate(Literal(2.d) > '_1, classOf[Lt[_]], 1.d)
checkFilterPredicate(Literal(3.d) < '_1, classOf[Gt[_]], 4.d)
checkFilterPredicate(Literal(1.d) >= '_1, classOf[LtEq[_]], 1.d)
checkFilterPredicate(Literal(4.d) <= '_1, classOf[GtEq[_]], 4.d)

checkFilterPredicate(!('_1 < 4.d), classOf[GtEq[_]], 4.d)
checkFilterPredicate('_1 < 2.d || '_1 > 3.d, classOf[Operators.Or], Seq(Row(1.d), Row(4.d)))
}
}

test("SPARK-6554: don't push down predicates which reference partition columns") {
import testImplicits._

Expand Down