Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED =
buildConf("spark.sql.parquet.filterPushdown.timestamp")
.doc("If true, enables Parquet filter push-down optimization for Timestamp. " +
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is " +
"enabled and Timestamp stored as TIMESTAMP_MICROS or TIMESTAMP_MILLIS type.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shell we note INT64 here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think end users have a better understanding of TIMESTAMP_MICROS and TIMESTAMP_MILLIS.

Copy link
Member

@HyukjinKwon HyukjinKwon Jul 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... I don't think ordinary users will understand any of them ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to explain how to use spark.sql.parquet.outputTimestampType to control the Parquet timestamp type Spark uses to writes parquet files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just note that push-down doesn't work for INT96 timestamps in the file. It should work for the others.

.internal()
.booleanConf
.createWithDefault(true)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be default should be false. because PARQUET_OUTPUT_TIMESTAMP_TYPE default is INT96.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we're using the file schema, it doesn't mater what the write configuration is. It only matters what it was when the file was written. If the file has an INT96 timestamp, this should just not push anything down.


val PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED =
buildConf("spark.sql.parquet.filterPushdown.string.startsWith")
.doc("If true, enables Parquet filter push-down optimization for string startsWith function. " +
Expand Down Expand Up @@ -1494,6 +1503,8 @@ class SQLConf extends Serializable with Logging {

def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)

def parquetFilterPushDownTimestamp: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED)

def parquetFilterPushDownStringStartWith: Boolean =
getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)

Expand Down
124 changes: 124 additions & 0 deletions sql/core/benchmarks/FilterPushdownBenchmark-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,127 @@ Native ORC Vectorized 11622 / 12196 1.4 7
Native ORC Vectorized (Pushdown) 11377 / 11654 1.4 723.3 1.0X


================================================================================================
Pushdown benchmark for Timestamp
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 1 timestamp stored as INT96 row (value = CAST(7864320 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a new line after the benchmark name? e.g.

Select 1 timestamp stored as INT96 row (value = CAST(7864320 AS timestamp)):
Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
...

We can send a follow-up PR to fix this entire file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'll send a follow-up PR.

------------------------------------------------------------------------------------------------
Parquet Vectorized 4784 / 4956 3.3 304.2 1.0X
Parquet Vectorized (Pushdown) 4838 / 4917 3.3 307.6 1.0X
Native ORC Vectorized 3923 / 4173 4.0 249.4 1.2X
Native ORC Vectorized (Pushdown) 894 / 943 17.6 56.8 5.4X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 10% timestamp stored as INT96 rows (value < CAST(1572864 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 5686 / 5901 2.8 361.5 1.0X
Parquet Vectorized (Pushdown) 5555 / 5895 2.8 353.2 1.0X
Native ORC Vectorized 4844 / 4957 3.2 308.0 1.2X
Native ORC Vectorized (Pushdown) 2141 / 2230 7.3 136.1 2.7X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 50% timestamp stored as INT96 rows (value < CAST(7864320 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 9100 / 9421 1.7 578.6 1.0X
Parquet Vectorized (Pushdown) 9122 / 9496 1.7 580.0 1.0X
Native ORC Vectorized 8365 / 8874 1.9 531.9 1.1X
Native ORC Vectorized (Pushdown) 7128 / 7376 2.2 453.2 1.3X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 90% timestamp stored as INT96 rows (value < CAST(14155776 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 12764 / 13120 1.2 811.5 1.0X
Parquet Vectorized (Pushdown) 12656 / 13003 1.2 804.7 1.0X
Native ORC Vectorized 13096 / 13233 1.2 832.6 1.0X
Native ORC Vectorized (Pushdown) 12710 / 15611 1.2 808.1 1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 1 timestamp stored as TIMESTAMP_MICROS row (value = CAST(7864320 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 4381 / 4796 3.6 278.5 1.0X
Parquet Vectorized (Pushdown) 122 / 137 129.3 7.7 36.0X
Native ORC Vectorized 3913 / 3988 4.0 248.8 1.1X
Native ORC Vectorized (Pushdown) 905 / 945 17.4 57.6 4.8X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 10% timestamp stored as TIMESTAMP_MICROS rows (value < CAST(1572864 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 5145 / 5184 3.1 327.1 1.0X
Parquet Vectorized (Pushdown) 1426 / 1519 11.0 90.7 3.6X
Native ORC Vectorized 4827 / 4901 3.3 306.9 1.1X
Native ORC Vectorized (Pushdown) 2133 / 2210 7.4 135.6 2.4X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 50% timestamp stored as TIMESTAMP_MICROS rows (value < CAST(7864320 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 9234 / 9516 1.7 587.1 1.0X
Parquet Vectorized (Pushdown) 6752 / 7046 2.3 429.3 1.4X
Native ORC Vectorized 8418 / 8998 1.9 535.2 1.1X
Native ORC Vectorized (Pushdown) 7199 / 7314 2.2 457.7 1.3X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 90% timestamp stored as TIMESTAMP_MICROS rows (value < CAST(14155776 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 12414 / 12458 1.3 789.2 1.0X
Parquet Vectorized (Pushdown) 12094 / 12249 1.3 768.9 1.0X
Native ORC Vectorized 12198 / 13755 1.3 775.5 1.0X
Native ORC Vectorized (Pushdown) 12205 / 12431 1.3 776.0 1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 1 timestamp stored as TIMESTAMP_MILLIS row (value = CAST(7864320 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 4369 / 4515 3.6 277.8 1.0X
Parquet Vectorized (Pushdown) 116 / 125 136.2 7.3 37.8X
Native ORC Vectorized 3965 / 4703 4.0 252.1 1.1X
Native ORC Vectorized (Pushdown) 892 / 1162 17.6 56.7 4.9X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 10% timestamp stored as TIMESTAMP_MILLIS rows (value < CAST(1572864 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 5211 / 5409 3.0 331.3 1.0X
Parquet Vectorized (Pushdown) 1427 / 1438 11.0 90.7 3.7X
Native ORC Vectorized 4719 / 4883 3.3 300.1 1.1X
Native ORC Vectorized (Pushdown) 2191 / 2228 7.2 139.3 2.4X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 50% timestamp stored as TIMESTAMP_MILLIS rows (value < CAST(7864320 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 8716 / 8953 1.8 554.2 1.0X
Parquet Vectorized (Pushdown) 6632 / 6968 2.4 421.7 1.3X
Native ORC Vectorized 8376 / 9118 1.9 532.5 1.0X
Native ORC Vectorized (Pushdown) 7218 / 7609 2.2 458.9 1.2X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Select 90% timestamp stored as TIMESTAMP_MILLIS rows (value < CAST(14155776 AS timestamp)): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 12264 / 12452 1.3 779.7 1.0X
Parquet Vectorized (Pushdown) 11766 / 11927 1.3 748.0 1.0X
Native ORC Vectorized 12101 / 12301 1.3 769.3 1.0X
Native ORC Vectorized (Pushdown) 11983 / 12651 1.3 761.9 1.0X

Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ class ParquetFileFormat
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold

Expand All @@ -366,7 +367,7 @@ class ParquetFileFormat
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
.getFileMetaData.getSchema
val parquetFilters = new ParquetFilters(pushDownDate,
val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp,
pushDownStringStartWith, pushDownInFilterThreshold)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.sql.Date
import java.lang.{Long => JLong}
import java.sql.{Date, Timestamp}

import scala.collection.JavaConverters.asScalaBufferConverter

import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator, PrimitiveType}
import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator}
import org.apache.parquet.schema.OriginalType._
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
Expand All @@ -39,6 +40,7 @@ import org.apache.spark.unsafe.types.UTF8String
*/
private[parquet] class ParquetFilters(
pushDownDate: Boolean,
pushDownTimestamp: Boolean,
pushDownStartWith: Boolean,
pushDownInFilterThreshold: Int) {

Expand All @@ -57,6 +59,8 @@ private[parquet] class ParquetFilters(
private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, INT64, null)
private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, INT64, null)

private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
Expand Down Expand Up @@ -89,6 +93,15 @@ private[parquet] class ParquetFilters(
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.eq(
longColumn(n),
Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
.asInstanceOf[JLong]).orNull)
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.eq(
longColumn(n),
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
}

private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
Expand Down Expand Up @@ -117,6 +130,15 @@ private[parquet] class ParquetFilters(
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.notEq(
longColumn(n),
Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
.asInstanceOf[JLong]).orNull)
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.notEq(
longColumn(n),
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
}

private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
Expand All @@ -139,6 +161,14 @@ private[parquet] class ParquetFilters(
case ParquetDateType if pushDownDate =>
(n: String, v: Any) =>
FilterApi.lt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.lt(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.lt(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
}

private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
Expand All @@ -161,6 +191,14 @@ private[parquet] class ParquetFilters(
case ParquetDateType if pushDownDate =>
(n: String, v: Any) =>
FilterApi.ltEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.ltEq(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.ltEq(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
}

private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
Expand All @@ -183,6 +221,14 @@ private[parquet] class ParquetFilters(
case ParquetDateType if pushDownDate =>
(n: String, v: Any) =>
FilterApi.gt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.gt(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.gt(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
}

private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
Expand All @@ -205,6 +251,14 @@ private[parquet] class ParquetFilters(
case ParquetDateType if pushDownDate =>
(n: String, v: Any) =>
FilterApi.gtEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.gtEq(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: String, v: Any) => FilterApi.gtEq(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
}

/**
Expand Down Expand Up @@ -241,6 +295,7 @@ private[parquet] class ParquetFilters(
| ParquetLongType | ParquetFloatType | ParquetDoubleType | ParquetStringType
| ParquetBinaryType => true
case ParquetDateType if pushDownDate => true
case ParquetTimestampMicrosType | ParquetTimestampMillisType if pushDownTimestamp => true
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType}
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType, TimestampType}
import org.apache.spark.util.{Benchmark, Utils}

/**
Expand Down Expand Up @@ -359,6 +360,40 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
}
}
}

ignore(s"Pushdown benchmark for Timestamp") {
withTempPath { dir =>
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> true.toString) {
ParquetOutputTimestampType.values.toSeq.map(_.toString).foreach { fileType =>
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fileType) {
val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
val df = spark.range(numRows).selectExpr(columns: _*)
.withColumn("value", monotonically_increasing_id().cast(TimestampType))
withTempTable("orcTable", "patquetTable") {
saveAsTable(df, dir)

Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr =>
val title = s"Select 1 timestamp stored as $fileType row ($whereExpr)"
.replace("value AND value", "value")
filterPushDownBenchmark(numRows, title, whereExpr)
}

val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")
Seq(10, 50, 90).foreach { percent =>
filterPushDownBenchmark(
numRows,
s"Select $percent% timestamp stored as $fileType rows " +
s"(value < CAST(${numRows * percent / 100} AS timestamp))",
s"value < CAST(${numRows * percent / 100} as timestamp)",
selectExpr
)
}
}
}
}
}
}
}
}

trait BenchmarkBeforeAndAfterEachTest extends BeforeAndAfterEachTestData { this: Suite =>
Expand Down
Loading