Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst

import java.sql.{Date, Timestamp}
import java.time.LocalDate

import scala.language.implicitConversions

Expand Down Expand Up @@ -146,6 +147,7 @@ package object dsl {
implicit def doubleToLiteral(d: Double): Literal = Literal(d)
implicit def stringToLiteral(s: String): Literal = Literal.create(s, StringType)
implicit def dateToLiteral(d: Date): Literal = Literal(d)
implicit def localDateToLiteral(d: LocalDate): Literal = Literal(d)
implicit def bigDecimalToLiteral(d: BigDecimal): Literal = Literal(d.underlying())
implicit def bigDecimalToLiteral(d: java.math.BigDecimal): Literal = Literal(d)
implicit def decimalToLiteral(d: Decimal): Literal = Literal(d)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
import java.sql.{Date, Timestamp}
import java.time.LocalDate
import java.util.Locale

import scala.collection.JavaConverters.asScalaBufferConverter
Expand Down Expand Up @@ -123,8 +124,9 @@ class ParquetFilters(
private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, INT64, 0, null)
private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, INT64, 0, null)

private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
private def dateToDays(date: Any): SQLDate = date match {
case d: Date => DateTimeUtils.fromJavaDate(d)
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
}

private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue()
Expand Down Expand Up @@ -173,7 +175,7 @@ class ParquetFilters(
case ParquetDateType if pushDownDate =>
(n: Array[String], v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
Option(v).map(date => dateToDays(date).asInstanceOf[Integer]).orNull)
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.eq(
longColumn(n),
Expand Down Expand Up @@ -224,7 +226,7 @@ class ParquetFilters(
case ParquetDateType if pushDownDate =>
(n: Array[String], v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
Option(v).map(date => dateToDays(date).asInstanceOf[Integer]).orNull)
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.notEq(
longColumn(n),
Expand Down Expand Up @@ -269,7 +271,7 @@ class ParquetFilters(
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case ParquetDateType if pushDownDate =>
(n: Array[String], v: Any) =>
FilterApi.lt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
FilterApi.lt(intColumn(n), dateToDays(v).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.lt(
longColumn(n),
Expand Down Expand Up @@ -310,7 +312,7 @@ class ParquetFilters(
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case ParquetDateType if pushDownDate =>
(n: Array[String], v: Any) =>
FilterApi.ltEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
FilterApi.ltEq(intColumn(n), dateToDays(v).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.ltEq(
longColumn(n),
Expand Down Expand Up @@ -351,7 +353,7 @@ class ParquetFilters(
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case ParquetDateType if pushDownDate =>
(n: Array[String], v: Any) =>
FilterApi.gt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
FilterApi.gt(intColumn(n), dateToDays(v).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.gt(
longColumn(n),
Expand Down Expand Up @@ -392,7 +394,7 @@ class ParquetFilters(
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case ParquetDateType if pushDownDate =>
(n: Array[String], v: Any) =>
FilterApi.gtEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
FilterApi.gtEq(intColumn(n), dateToDays(v).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.gtEq(
longColumn(n),
Expand Down Expand Up @@ -471,7 +473,8 @@ class ParquetFilters(
case ParquetDoubleType => value.isInstanceOf[JDouble]
case ParquetStringType => value.isInstanceOf[String]
case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
case ParquetDateType => value.isInstanceOf[Date]
case ParquetDateType =>
value.isInstanceOf[Date] || value.isInstanceOf[LocalDate]
case ParquetTimestampMicrosType | ParquetTimestampMillisType =>
value.isInstanceOf[Timestamp]
case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.math.{BigDecimal => JBigDecimal}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time.LocalDate

import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
Expand Down Expand Up @@ -525,52 +526,62 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
def date: Date = Date.valueOf(s)
}

val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21").map(_.date)
val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")
import testImplicits._
withNestedDataFrame(data.map(i => Tuple1(i)).toDF()) { case (inputDF, colName, resultFun) =>
withParquetDataFrame(inputDF) { implicit df =>
val dateAttr: Expression = df(colName).expr
assert(df(colName).expr.dataType === DateType)

checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
data.map(i => Row.apply(resultFun(i))))

checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]],
resultFun("2018-03-18".date))
checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]],
resultFun("2018-03-18".date))
checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]],
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i.date))))

checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
resultFun("2018-03-18".date))
checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
resultFun("2018-03-21".date))
checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]],
resultFun("2018-03-18".date))
checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
resultFun("2018-03-21".date))

checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]],
resultFun("2018-03-18".date))
checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]],
resultFun("2018-03-18".date))
checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
resultFun("2018-03-18".date))
checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
resultFun("2018-03-21".date))
checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]],
resultFun("2018-03-18".date))
checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
resultFun("2018-03-21".date))

checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
resultFun("2018-03-21".date))
checkFilterPredicate(
dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row(resultFun("2018-03-18".date)), Row(resultFun("2018-03-21".date))))
Seq(false, true).foreach { java8Api =>
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
val df = data.map(i => Tuple1(Date.valueOf(i))).toDF()
withNestedDataFrame(df) { case (inputDF, colName, fun) =>
def resultFun(dateStr: String): Any = {
val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr)
fun(parsed)
}
withParquetDataFrame(inputDF) { implicit df =>
val dateAttr: Expression = df(colName).expr
assert(df(colName).expr.dataType === DateType)

checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
data.map(i => Row.apply(resultFun(i))))

checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]],
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i))))

checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
resultFun("2018-03-21"))

checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
resultFun("2018-03-21"))

checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
resultFun("2018-03-21"))
checkFilterPredicate(
dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21"))))
}
}
}
}
}
Expand Down