Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions sql/core/benchmarks/FilterPushdownBenchmark-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -702,3 +702,37 @@ Parquet Vectorized (Pushdown) 11766 / 11927 1.3 7
Native ORC Vectorized 12101 / 12301 1.3 769.3 1.0X
Native ORC Vectorized (Pushdown) 11983 / 12651 1.3 761.9 1.0X


================================================================================================
Pushdown benchmark with many filters
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz

Select 1 row with 1 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 158 / 182 0.0 158442969.0 1.0X
Parquet Vectorized (Pushdown) 150 / 158 0.0 149718289.0 1.1X
Native ORC Vectorized 141 / 148 0.0 141259852.0 1.1X
Native ORC Vectorized (Pushdown) 142 / 147 0.0 142016472.0 1.1X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz

Select 1 row with 250 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 1013 / 1026 0.0 1013194322.0 1.0X
Parquet Vectorized (Pushdown) 1326 / 1332 0.0 1326301956.0 0.8X
Native ORC Vectorized 1005 / 1010 0.0 1005266379.0 1.0X
Native ORC Vectorized (Pushdown) 1068 / 1071 0.0 1067964993.0 0.9X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz

Select 1 row with 500 filters: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 3598 / 3614 0.0 3598001202.0 1.0X
Parquet Vectorized (Pushdown) 4282 / 4333 0.0 4281849770.0 0.8X
Native ORC Vectorized 3594 / 3619 0.0 3593551548.0 1.0X
Native ORC Vectorized (Pushdown) 3834 / 3840 0.0 3834240570.0 0.9X
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.sql.execution.datasources.orc

import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument, SearchArgumentFactory}
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.orc.storage.serde2.io.HiveDecimalWritable

import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.{And, Filter}
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -54,7 +55,17 @@ import org.apache.spark.sql.types._
* builder methods mentioned above can only be found in test code, where all tested filters are
* known to be convertible.
*/
private[orc] object OrcFilters {
private[sql] object OrcFilters {
private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = {
filters match {
case Seq() => None
case Seq(filter) => Some(filter)
case Seq(filter1, filter2) => Some(And(filter1, filter2))
case _ => // length > 2
val (left, right) = filters.splitAt(filters.length / 2)
Some(And(buildTree(left).get, buildTree(right).get))
}
}

/**
* Create ORC filter as a SearchArgument instance.
Expand All @@ -66,14 +77,14 @@ private[orc] object OrcFilters {
// collect all convertible ones to build the final `SearchArgument`.
val convertibleFilters = for {
filter <- filters
_ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())
_ <- buildSearchArgument(dataTypeMap, filter, newBuilder)
} yield filter

for {
// Combines all convertible filters using `And` to produce a single conjunction
conjunction <- convertibleFilters.reduceOption(org.apache.spark.sql.sources.And)
conjunction <- buildTree(convertibleFilters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does parquet has the same problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In parquet, this is done as

filters
  .flatMap(ParquetFilters.createFilter(requiredSchema, _))
  .reduceOption(FilterApi.and)

can we follow it?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Sep 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first question, I don't think Parquet has the same issue because Parquet uses canMakeFilterOn while ORC is trying to build a full result (with a fresh builder) to check if it's okay or not.

For the second question,

  1. in ORC, we did the first half(flatMap) to compute convertibleFilters, but we can change it with filters.filter. I'll update like that
val convertibleFilters = for {
    filter <- filters
    _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())
} yield filter
  1. The second half reduceOption(FilterApi.and) was the original ORC code which generated a skewed tree having exponential time complexity. We need to use buildTree.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, Parquet has another issue here due to .reduceOption(FilterApi.and). When I make a benchmark, Parquet seems to be unable to handle 1000 filters, @cloud-fan .

// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder())
builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
} yield builder.build()
}

Expand Down Expand Up @@ -127,8 +138,6 @@ private[orc] object OrcFilters {
dataTypeMap: Map[String, DataType],
expression: Filter,
builder: Builder): Option[Builder] = {
def newBuilder = SearchArgumentFactory.newBuilder()

def getType(attribute: String): PredicateLeaf.Type =
getPredicateLeafType(dataTypeMap(attribute))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter

ignore("Pushdown for many distinct value case") {
withTempPath { dir =>
withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
Seq(true, false).foreach { useStringForValue =>
prepareTable(dir, numRows, width, useStringForValue)
if (useStringForValue) {
Expand All @@ -259,7 +259,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
withTempPath { dir =>
val numDistinctValues = 200

withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
prepareStringDictTable(dir, numRows, numDistinctValues, width)
runStringBenchmark(numRows, width, numDistinctValues / 2, "distinct string")
}
Expand All @@ -268,7 +268,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter

ignore("Pushdown benchmark for StringStartsWith") {
withTempPath { dir =>
withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
prepareTable(dir, numRows, width, true)
Seq(
"value like '10%'",
Expand Down Expand Up @@ -296,7 +296,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
monotonically_increasing_id()
}
val df = spark.range(numRows).selectExpr(columns: _*).withColumn("value", valueCol.cast(dt))
withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
saveAsTable(df, dir)

Seq(s"value = $mid").foreach { whereExpr =>
Expand All @@ -320,7 +320,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter

ignore("Pushdown benchmark for InSet -> InFilters") {
withTempPath { dir =>
withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
prepareTable(dir, numRows, width, false)
Seq(5, 10, 50, 100).foreach { count =>
Seq(10, 50, 90).foreach { distribution =>
Expand All @@ -341,7 +341,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
val df = spark.range(numRows).selectExpr(columns: _*)
.withColumn("value", (monotonically_increasing_id() % Byte.MaxValue).cast(ByteType))
.orderBy("value")
withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
saveAsTable(df, dir)

Seq(s"value = CAST(${Byte.MaxValue / 2} AS ${ByteType.simpleString})")
Expand Down Expand Up @@ -373,7 +373,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
val df = spark.range(numRows).selectExpr(columns: _*)
.withColumn("value", monotonically_increasing_id().cast(TimestampType))
withTempTable("orcTable", "patquetTable") {
withTempTable("orcTable", "parquetTable") {
saveAsTable(df, dir)

Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr =>
Expand All @@ -398,6 +398,24 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter
}
}
}

test(s"Pushdown benchmark with many filters") {
val numRows = 1
val width = 500

withTempPath { dir =>
val columns = (1 to width).map(i => s"id c$i")
val df = spark.range(1).selectExpr(columns: _*)
withTempTable("orcTable", "parquetTable") {
saveAsTable(df, dir)
Seq(1, 250, 500).foreach { numFilter =>
val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ")
// Note: InferFiltersFromConstraints will add more filters to this given filters
filterPushDownBenchmark(numRows, s"Select 1 row with $numFilter filters", whereExpr)
}
}
}
}
}

trait BenchmarkBeforeAndAfterEachTest extends BeforeAndAfterEachTestData { this: Suite =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql.hive.orc

import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument, SearchArgumentFactory}
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -62,23 +64,21 @@ private[orc] object OrcFilters extends Logging {
// collect all convertible ones to build the final `SearchArgument`.
val convertibleFilters = for {
filter <- filters
_ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())
_ <- buildSearchArgument(dataTypeMap, filter, newBuilder)
} yield filter

for {
// Combines all convertible filters using `And` to produce a single conjunction
conjunction <- convertibleFilters.reduceOption(And)
conjunction <- buildTree(convertibleFilters)
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder())
builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
} yield builder.build()
}

private def buildSearchArgument(
dataTypeMap: Map[String, DataType],
expression: Filter,
builder: Builder): Option[Builder] = {
def newBuilder = SearchArgumentFactory.newBuilder()

def isSearchableType(dataType: DataType): Boolean = dataType match {
// Only the values in the Spark types below can be recognized by
// the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
Expand Down