Skip to content

Commit 6c80ebb

Browse files
MaxGekkdongjoon-hyun
authored andcommitted
[SPARK-31818][SQL] Fix pushing down filters with java.time.Instant values in ORC
### What changes were proposed in this pull request? Convert `java.time.Instant` to `java.sql.Timestamp` in pushed down filters to ORC datasource when Java 8 time API enabled. ### Why are the changes needed? The changes fix the exception raised while pushing date filters when `spark.sql.datetime.java8API.enabled` is set to `true`: ``` java.lang.IllegalArgumentException: Wrong value class java.time.Instant for TIMESTAMP.EQUALS leaf at org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.checkLiteralType(SearchArgumentImpl.java:192) at org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.<init>(SearchArgumentImpl.java:75) ``` ### Does this PR introduce any user-facing change? Yes ### How was this patch tested? Added tests to `OrcFilterSuite`. Closes #28636 from MaxGekk/orc-timestamp-filter-pushdown. Authored-by: Max Gekk <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 695cb61 commit 6c80ebb

File tree

5 files changed

+77
-48
lines changed

5 files changed

+77
-48
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.catalyst
1919

2020
import java.sql.{Date, Timestamp}
21-
import java.time.LocalDate
21+
import java.time.{Instant, LocalDate}
2222

2323
import scala.language.implicitConversions
2424

@@ -152,6 +152,7 @@ package object dsl {
152152
implicit def bigDecimalToLiteral(d: java.math.BigDecimal): Literal = Literal(d)
153153
implicit def decimalToLiteral(d: Decimal): Literal = Literal(d)
154154
implicit def timestampToLiteral(t: Timestamp): Literal = Literal(t)
155+
implicit def instantToLiteral(i: Instant): Literal = Literal(i)
155156
implicit def binaryToLiteral(a: Array[Byte]): Literal = Literal(a)
156157

157158
implicit def symbolToUnresolvedAttribute(s: Symbol): analysis.UnresolvedAttribute =

sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources.orc
1919

20-
import java.time.LocalDate
20+
import java.time.{Instant, LocalDate}
2121

2222
import org.apache.orc.storage.common.`type`.HiveDecimal
2323
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
@@ -26,7 +26,7 @@ import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
2626
import org.apache.orc.storage.serde2.io.HiveDecimalWritable
2727

2828
import org.apache.spark.SparkException
29-
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{localDateToDays, toJavaDate}
29+
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
3030
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
3131
import org.apache.spark.sql.sources.Filter
3232
import org.apache.spark.sql.types._
@@ -167,6 +167,8 @@ private[sql] object OrcFilters extends OrcFiltersBase {
167167
new HiveDecimalWritable(HiveDecimal.create(value.asInstanceOf[java.math.BigDecimal]))
168168
case _: DateType if value.isInstanceOf[LocalDate] =>
169169
toJavaDate(localDateToDays(value.asInstanceOf[LocalDate]))
170+
case _: TimestampType if value.isInstanceOf[Instant] =>
171+
toJavaTimestamp(instantToMicros(value.asInstanceOf[Instant]))
170172
case _ => value
171173
}
172174

sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -245,29 +245,41 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
245245
}
246246

247247
test("filter pushdown - timestamp") {
248-
val timeString = "2015-08-20 14:57:00"
249-
val timestamps = (1 to 4).map { i =>
250-
val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
251-
new Timestamp(milliseconds)
252-
}
253-
withOrcDataFrame(timestamps.map(Tuple1(_))) { implicit df =>
254-
checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
248+
val input = Seq(
249+
"1000-01-01 01:02:03",
250+
"1582-10-01 00:11:22",
251+
"1900-01-01 23:59:59",
252+
"2020-05-25 10:11:12").map(Timestamp.valueOf)
255253

256-
checkFilterPredicate($"_1" === timestamps(0), PredicateLeaf.Operator.EQUALS)
257-
checkFilterPredicate($"_1" <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
258-
259-
checkFilterPredicate($"_1" < timestamps(1), PredicateLeaf.Operator.LESS_THAN)
260-
checkFilterPredicate($"_1" > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
261-
checkFilterPredicate($"_1" <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
262-
checkFilterPredicate($"_1" >= timestamps(3), PredicateLeaf.Operator.LESS_THAN)
263-
264-
checkFilterPredicate(Literal(timestamps(0)) === $"_1", PredicateLeaf.Operator.EQUALS)
265-
checkFilterPredicate(Literal(timestamps(0)) <=> $"_1",
266-
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
267-
checkFilterPredicate(Literal(timestamps(1)) > $"_1", PredicateLeaf.Operator.LESS_THAN)
268-
checkFilterPredicate(Literal(timestamps(2)) < $"_1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
269-
checkFilterPredicate(Literal(timestamps(0)) >= $"_1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
270-
checkFilterPredicate(Literal(timestamps(3)) <= $"_1", PredicateLeaf.Operator.LESS_THAN)
254+
withOrcFile(input.map(Tuple1(_))) { path =>
255+
Seq(false, true).foreach { java8Api =>
256+
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
257+
readFile(path) { implicit df =>
258+
val timestamps = input.map(Literal(_))
259+
checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
260+
261+
checkFilterPredicate($"_1" === timestamps(0), PredicateLeaf.Operator.EQUALS)
262+
checkFilterPredicate($"_1" <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
263+
264+
checkFilterPredicate($"_1" < timestamps(1), PredicateLeaf.Operator.LESS_THAN)
265+
checkFilterPredicate($"_1" > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
266+
checkFilterPredicate($"_1" <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
267+
checkFilterPredicate($"_1" >= timestamps(3), PredicateLeaf.Operator.LESS_THAN)
268+
269+
checkFilterPredicate(Literal(timestamps(0)) === $"_1", PredicateLeaf.Operator.EQUALS)
270+
checkFilterPredicate(
271+
Literal(timestamps(0)) <=> $"_1", PredicateLeaf.Operator.NULL_SAFE_EQUALS)
272+
checkFilterPredicate(Literal(timestamps(1)) > $"_1", PredicateLeaf.Operator.LESS_THAN)
273+
checkFilterPredicate(
274+
Literal(timestamps(2)) < $"_1",
275+
PredicateLeaf.Operator.LESS_THAN_EQUALS)
276+
checkFilterPredicate(
277+
Literal(timestamps(0)) >= $"_1",
278+
PredicateLeaf.Operator.LESS_THAN_EQUALS)
279+
checkFilterPredicate(Literal(timestamps(3)) <= $"_1", PredicateLeaf.Operator.LESS_THAN)
280+
}
281+
}
282+
}
271283
}
272284
}
273285

sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources.orc
1919

20-
import java.time.LocalDate
20+
import java.time.{Instant, LocalDate}
2121

2222
import org.apache.hadoop.hive.common.`type`.HiveDecimal
2323
import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder
2626
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
2727

2828
import org.apache.spark.SparkException
29-
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{localDateToDays, toJavaDate}
29+
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
3030
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
3131
import org.apache.spark.sql.sources.Filter
3232
import org.apache.spark.sql.types._
@@ -167,6 +167,8 @@ private[sql] object OrcFilters extends OrcFiltersBase {
167167
new HiveDecimalWritable(HiveDecimal.create(value.asInstanceOf[java.math.BigDecimal]))
168168
case _: DateType if value.isInstanceOf[LocalDate] =>
169169
toJavaDate(localDateToDays(value.asInstanceOf[LocalDate]))
170+
case _: TimestampType if value.isInstanceOf[Instant] =>
171+
toJavaTimestamp(instantToMicros(value.asInstanceOf[Instant]))
170172
case _ => value
171173
}
172174

sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -246,29 +246,41 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
246246
}
247247

248248
test("filter pushdown - timestamp") {
249-
val timeString = "2015-08-20 14:57:00"
250-
val timestamps = (1 to 4).map { i =>
251-
val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
252-
new Timestamp(milliseconds)
253-
}
254-
withOrcDataFrame(timestamps.map(Tuple1(_))) { implicit df =>
255-
checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
256-
257-
checkFilterPredicate($"_1" === timestamps(0), PredicateLeaf.Operator.EQUALS)
258-
checkFilterPredicate($"_1" <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
249+
val input = Seq(
250+
"1000-01-01 01:02:03",
251+
"1582-10-01 00:11:22",
252+
"1900-01-01 23:59:59",
253+
"2020-05-25 10:11:12").map(Timestamp.valueOf)
259254

260-
checkFilterPredicate($"_1" < timestamps(1), PredicateLeaf.Operator.LESS_THAN)
261-
checkFilterPredicate($"_1" > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
262-
checkFilterPredicate($"_1" <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
263-
checkFilterPredicate($"_1" >= timestamps(3), PredicateLeaf.Operator.LESS_THAN)
255+
withOrcFile(input.map(Tuple1(_))) { path =>
256+
Seq(false, true).foreach { java8Api =>
257+
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
258+
readFile(path) { implicit df =>
259+
val timestamps = input.map(Literal(_))
260+
checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
264261

265-
checkFilterPredicate(Literal(timestamps(0)) === $"_1", PredicateLeaf.Operator.EQUALS)
266-
checkFilterPredicate(
267-
Literal(timestamps(0)) <=> $"_1", PredicateLeaf.Operator.NULL_SAFE_EQUALS)
268-
checkFilterPredicate(Literal(timestamps(1)) > $"_1", PredicateLeaf.Operator.LESS_THAN)
269-
checkFilterPredicate(Literal(timestamps(2)) < $"_1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
270-
checkFilterPredicate(Literal(timestamps(0)) >= $"_1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
271-
checkFilterPredicate(Literal(timestamps(3)) <= $"_1", PredicateLeaf.Operator.LESS_THAN)
262+
checkFilterPredicate($"_1" === timestamps(0), PredicateLeaf.Operator.EQUALS)
263+
checkFilterPredicate($"_1" <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
264+
265+
checkFilterPredicate($"_1" < timestamps(1), PredicateLeaf.Operator.LESS_THAN)
266+
checkFilterPredicate($"_1" > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
267+
checkFilterPredicate($"_1" <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
268+
checkFilterPredicate($"_1" >= timestamps(3), PredicateLeaf.Operator.LESS_THAN)
269+
270+
checkFilterPredicate(Literal(timestamps(0)) === $"_1", PredicateLeaf.Operator.EQUALS)
271+
checkFilterPredicate(
272+
Literal(timestamps(0)) <=> $"_1", PredicateLeaf.Operator.NULL_SAFE_EQUALS)
273+
checkFilterPredicate(Literal(timestamps(1)) > $"_1", PredicateLeaf.Operator.LESS_THAN)
274+
checkFilterPredicate(
275+
Literal(timestamps(2)) < $"_1",
276+
PredicateLeaf.Operator.LESS_THAN_EQUALS)
277+
checkFilterPredicate(
278+
Literal(timestamps(0)) >= $"_1",
279+
PredicateLeaf.Operator.LESS_THAN_EQUALS)
280+
checkFilterPredicate(Literal(timestamps(3)) <= $"_1", PredicateLeaf.Operator.LESS_THAN)
281+
}
282+
}
283+
}
272284
}
273285
}
274286

0 commit comments

Comments
 (0)