Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import org.apache.spark.sql.types._
* builder methods mentioned above can only be found in test code, where all tested filters are
* known to be convertible.
*/
private[orc] object OrcFilters {
private[sql] object OrcFilters {

/**
* Create ORC filter as a SearchArgument instance.
Expand All @@ -71,12 +71,24 @@ private[orc] object OrcFilters {

for {
// Combines all convertible filters using `And` to produce a single conjunction
conjunction <- convertibleFilters.reduceOption(org.apache.spark.sql.sources.And)
conjunction <- buildTree(convertibleFilters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does parquet has the same problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In parquet, this is done as

filters
  .flatMap(ParquetFilters.createFilter(requiredSchema, _))
  .reduceOption(FilterApi.and)

can we follow it?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Sep 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first question, I don't think Parquet has the same issue because Parquet uses canMakeFilterOn while ORC is trying to build a full result (with a fresh builder) to check if it's okay or not.

For the second question,

  1. in ORC, we did the first half(flatMap) to compute convertibleFilters, but we can change it with filters.filter. I'll update like that
val convertibleFilters = for {
    filter <- filters
    _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())
} yield filter
  1. The second half reduceOption(FilterApi.and) was the original ORC code which generated a skewed tree having exponential time complexity. We need to use buildTree.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, Parquet has another issue here due to .reduceOption(FilterApi.and). When I make a benchmark, Parquet seems to be unable to handle 1000 filters, @cloud-fan .

// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder())
} yield builder.build()
}

def buildTree(filters: Seq[Filter]): Option[Filter] = {
import org.apache.spark.sql.sources.And
filters match {
case Seq() => None
case Seq(filter) => Some(filter)
case Seq(filter1, filter2) => Some(And(filter1, filter2))
case _ => // length > 2
val (left, right) = filters.splitAt(filters.length / 2)
Some(And(buildTree(left).get, buildTree(right).get))
}
}

/**
* Return true if this is a searchable type in ORC.
* Both CharType and VarcharType are cleaned at AstBuilder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._

import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.scalatest.concurrent.TimeLimits
import org.scalatest.time.SpanSugar._
import scala.collection.JavaConverters._

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand All @@ -39,7 +42,7 @@ import org.apache.spark.sql.types._
* - OrcFilterSuite uses 'org.apache.orc.storage.ql.io.sarg' package.
* - HiveOrcFilterSuite uses 'org.apache.hadoop.hive.ql.io.sarg' package.
*/
class OrcFilterSuite extends OrcTest with SharedSQLContext {
class OrcFilterSuite extends OrcTest with SharedSQLContext with TimeLimits {

private def checkFilterPredicate(
df: DataFrame,
Expand Down Expand Up @@ -383,4 +386,13 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
)).get.toString
}
}

test("SPARK-25306 createFilter should not hang") {
import org.apache.spark.sql.sources._
val schema = new StructType(Array(StructField("a", IntegerType, nullable = true)))
val filters = (1 to 2000).map(LessThan("a", _)).toArray[Filter]
failAfter(2 seconds) {
OrcFilters.createFilter(schema, filters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test looks tricky... It's a bad practice to assume some code will return in a certain time. Can we just add a microbenchmark for it?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Sep 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

  1. Something like the test code in the PR description? And marked as ignore(...) instead of test(...) here?
  2. Or, do you want another test case in FilterPushdownBenchmark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll choose (2), @cloud-fan .

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument, SearchArgumentFactory}
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -67,7 +68,7 @@ private[orc] object OrcFilters extends Logging {

for {
// Combines all convertible filters using `And` to produce a single conjunction
conjunction <- convertibleFilters.reduceOption(And)
conjunction <- buildTree(convertibleFilters)
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder())
} yield builder.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._

import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.scalatest.concurrent.TimeLimits
import org.scalatest.time.SpanSugar._
import scala.collection.JavaConverters._

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand All @@ -36,7 +39,7 @@ import org.apache.spark.sql.types._
/**
* A test suite that tests Hive ORC filter API based filter pushdown optimization.
*/
class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton {
class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton with TimeLimits {

override val orcImp: String = "hive"

Expand Down Expand Up @@ -384,4 +387,13 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton {
)).get.toString
}
}

test("SPARK-25306 createFilter should not hang") {
import org.apache.spark.sql.sources._
val schema = new StructType(Array(StructField("a", IntegerType, nullable = true)))
val filters = (1 to 2000).map(LessThan("a", _)).toArray[Filter]
failAfter(2 seconds) {
OrcFilters.createFilter(schema, filters)
}
}
}