Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.orc.storage.serde2.io.HiveDecimalWritable

import org.apache.spark.SparkException
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -64,26 +65,72 @@ private[sql] object OrcFilters extends OrcFiltersBase {
*/
def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = {
val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
val orcFilterConverter = new OrcFilterConverter(dataTypeMap)
for {
// Combines all filters using `And` to produce a single conjunction
conjunction <- buildTree(filters)
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder)
} yield builder.build()
// Combines all convertible filters using `And` to produce a single conjunction
val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters))
conjunctionOptional.map { conjunction =>
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate.
// The input predicate is fully convertible. There should not be any empty result in the
// following recursive method call `buildSearchArgument`.
buildSearchArgument(dataTypeMap, conjunction, newBuilder).build()
}
}

def convertibleFilters(
schema: StructType,
dataTypeMap: Map[String, DataType],
filters: Seq[Filter]): Seq[Filter] = {
val orcFilterConverter = new OrcFilterConverter(dataTypeMap)
filters.flatMap(orcFilterConverter.trimUnconvertibleFilters)
}
import org.apache.spark.sql.sources._

}
def convertibleFiltersHelper(
filter: Filter,
canPartialPushDown: Boolean): Option[Filter] = filter match {
// At here, it is not safe to just convert one side and remove the other side
// if we do not understand what the parent filters are.
//
// Here is an example used to explain the reason.
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
// NOT(a = 2), which will generate wrong results.
//
// Pushing one side of AND down is only safe to do at the top level or in the child
// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
// can be safely removed.
case And(left, right) =>
val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown)
val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown)
(leftResultOptional, rightResultOptional) match {
case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult))
case (Some(leftResult), None) if canPartialPushDown => Some(leftResult)
case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult)
case _ => None
}

private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) {
// The Or predicate is convertible when both of its children can be pushed down.
// That is to say, if one/both of the children can be partially pushed down, the Or
// predicate can be partially pushed down as well.
//
// Here is an example used to explain the reason.
// Let's say we have
// (a1 AND a2) OR (b1 AND b2),
// a1 and b1 is convertible, while a2 and b2 is not.
// The predicate can be converted as
// (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
// As per the logical in And predicate, we can push down (a1 OR b1).
case Or(left, right) =>
for {
lhs <- convertibleFiltersHelper(left, canPartialPushDown)
rhs <- convertibleFiltersHelper(right, canPartialPushDown)
} yield Or(lhs, rhs)
case Not(pred) =>
val childResultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false)
childResultOptional.map(Not)
case other =>
for (_ <- buildLeafSearchArgument(dataTypeMap, other, newBuilder())) yield other
}
filters.flatMap { filter =>
convertibleFiltersHelper(filter, true)
}
}

/**
* Get PredicateLeafType which is corresponding to the given DataType.
Expand Down Expand Up @@ -115,228 +162,108 @@ private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) {
case _ => value
}

import org.apache.spark.sql.sources._
import OrcFilters._

/**
* Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then
* only building the remaining convertible nodes.
* Build a SearchArgument and return the builder so far.
*
* Doing the conversion in this way avoids the computational complexity problems introduced by
* checking whether a node is convertible while building it. The approach implemented here has
* complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run
* a single pass over the tree to trim it, and then another pass on the trimmed tree to convert
* the remaining nodes.
*
* The alternative approach of checking-while-building can (and did) result
* in exponential complexity in the height of the tree, causing perf problems with Filters with
* as few as ~35 nodes if they were skewed.
* @param dataTypeMap a map from the attribute name to its data type.
* @param expression the input predicates, which should be fully convertible to SearchArgument.
* @param builder the input SearchArgument.Builder.
* @return the builder so far.
*/
private[sql] def buildSearchArgument(
private def buildSearchArgument(
dataTypeMap: Map[String, DataType],
expression: Filter,
builder: Builder): Option[Builder] = {
trimUnconvertibleFilters(expression).map { filter =>
updateBuilder(filter, builder)
builder
}
}

/**
* Removes all sub-Filters from a given Filter that are not convertible to an ORC SearchArgument.
*/
private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = {
performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = true), expression)
}

/**
* Builds a SearchArgument for the given Filter. This method should only be called on Filters
* that have previously been trimmed to remove unsupported sub-Filters!
*/
private def updateBuilder(expression: Filter, builder: Builder): Unit =
performAction(BuildSearchArgument(builder), expression)

sealed trait ActionType[ReturnType]
case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean)
extends ActionType[Option[Filter]]
case class BuildSearchArgument(builder: Builder) extends ActionType[Unit]

// The performAction method can run both the filtering and building operations for a given
// node - we signify which one we want with the `actionType` parameter.
//
// There are a couple of benefits to coupling the two operations like this:
// 1. All the logic for a given predicate is grouped logically in the same place. You don't
// have to scroll across the whole file to see what the filter action for an And is while
// you're looking at the build action.
// 2. It's much easier to keep the implementations of the two operations up-to-date with
// each other. If the `filter` and `build` operations are implemented as separate case-matches
// in different methods, it's very easy to change one without appropriately updating the
// other. For example, if we add a new supported node type to `filter`, it would be very
// easy to forget to update `build` to support it too, thus leading to conversion errors.
private def performAction[ReturnType](
actionType: ActionType[ReturnType],
expression: Filter): ReturnType = {
def getType(attribute: String): PredicateLeaf.Type =
getPredicateLeafType(dataTypeMap(attribute))
builder: Builder): Builder = {
import org.apache.spark.sql.sources._

expression match {
case And(left, right) =>
actionType match {
case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) =>
// At here, it is not safe to just keep one side and remove the other side
// if we do not understand what the parent filters are.
//
// Here is an example used to explain the reason.
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
// NOT(a = 2), which will generate wrong results.
//
// Pushing one side of AND down is only safe to do at the top level or in the child
// AND before hitting NOT or OR conditions, and in this case, the unsupported
// predicate can be safely removed.
val lhs = performAction(t, left)
val rhs = performAction(t, right)
(lhs, rhs) match {
case (Some(l), Some(r)) => Some(And(l, r))
case (Some(_), None) if canPartialPushDownConjuncts => lhs
case (None, Some(_)) if canPartialPushDownConjuncts => rhs
case _ => None
}
case b @ BuildSearchArgument(builder) =>
builder.startAnd()
performAction(b, left)
performAction(b, right)
builder.end()
()
}
val lhs = buildSearchArgument(dataTypeMap, left, builder.startAnd())
val rhs = buildSearchArgument(dataTypeMap, right, lhs)
rhs.end()

case Or(left, right) =>
actionType match {
case t: TrimUnconvertibleFilters =>
// The Or predicate is convertible when both of its children can be pushed down.
// That is to say, if one/both of the children can be partially pushed down, the Or
// predicate can be partially pushed down as well.
//
// Here is an example used to explain the reason.
// Let's say we have
// (a1 AND a2) OR (b1 AND b2),
// a1 and b1 is convertible, while a2 and b2 is not.
// The predicate can be converted as
// (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
// As per the logical in And predicate, we can push down (a1 OR b1).
for {
lhs: Filter <- performAction(t, left)
rhs: Filter <- performAction(t, right)
} yield Or(lhs, rhs)
case b @ BuildSearchArgument(builder) =>
builder.startOr()
performAction(b, left)
performAction(b, right)
builder.end()
()
}
val lhs = buildSearchArgument(dataTypeMap, left, builder.startOr())
val rhs = buildSearchArgument(dataTypeMap, right, lhs)
rhs.end()

case Not(child) =>
actionType match {
case t: TrimUnconvertibleFilters =>
performAction(t.copy(canPartialPushDownConjuncts = false), child).map(Not)
case b @ BuildSearchArgument(builder) =>
builder.startNot()
performAction(b, child)
builder.end()
()
buildSearchArgument(dataTypeMap, child, builder.startNot()).end()

case other =>
buildLeafSearchArgument(dataTypeMap, other, builder).getOrElse {
throw new SparkException(
"The input filter of OrcFilters.buildSearchArgument should be fully convertible.")
}
}
}

// NOTE: For all case branches dealing with leaf predicates below, the additional
// `startAnd()` call is mandatory. ORC `SearchArgument` builder requires that all leaf
// predicates must be wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
/**
* Build a SearchArgument for a leaf predicate and return the builder so far.
*
* @param dataTypeMap a map from the attribute name to its data type.
* @param expression the input filter predicates.
* @param builder the input SearchArgument.Builder.
* @return the builder so far.
*/
private def buildLeafSearchArgument(
dataTypeMap: Map[String, DataType],
expression: Filter,
builder: Builder): Option[Builder] = {
def getType(attribute: String): PredicateLeaf.Type =
getPredicateLeafType(dataTypeMap(attribute))

import org.apache.spark.sql.sources._

// NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()`
// call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
expression match {
case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
actionType match {
case _: TrimUnconvertibleFilters => Some(expression)
case BuildSearchArgument(builder) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
builder.startAnd().equals(quotedName, getType(attribute), castedValue).end()
()
}
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end())

case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
actionType match {
case _: TrimUnconvertibleFilters => Some(expression)
case BuildSearchArgument(builder) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end()
()
}
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end())

case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
actionType match {
case _: TrimUnconvertibleFilters => Some(expression)
case BuildSearchArgument(builder) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end()
()
}
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end())

case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
actionType match {
case _: TrimUnconvertibleFilters => Some(expression)
case BuildSearchArgument(builder) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end()
()
}
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end())

case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
actionType match {
case _: TrimUnconvertibleFilters => Some(expression)
case BuildSearchArgument(builder) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end()
()
}
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end())

case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
actionType match {
case _: TrimUnconvertibleFilters => Some(expression)
case BuildSearchArgument(builder) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end()
()
}
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end())

case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
actionType match {
case _: TrimUnconvertibleFilters => Some(expression)
case BuildSearchArgument(builder) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
builder.startAnd().isNull(quotedName, getType(attribute)).end()
()
}
val quotedName = quoteAttributeNameIfNeeded(attribute)
Some(builder.startAnd().isNull(quotedName, getType(attribute)).end())

case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
actionType match {
case _: TrimUnconvertibleFilters => Some(expression)
case BuildSearchArgument(builder) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
builder.startNot().isNull(quotedName, getType(attribute)).end()
()
}
val quotedName = quoteAttributeNameIfNeeded(attribute)
Some(builder.startNot().isNull(quotedName, getType(attribute)).end())

case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
actionType match {
case _: TrimUnconvertibleFilters => Some(expression)
case BuildSearchArgument(builder) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute)))
builder.startAnd().in(quotedName, getType(attribute),
castedValues.map(_.asInstanceOf[AnyRef]): _*).end()
()
}
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute)))
Some(builder.startAnd().in(quotedName, getType(attribute),
castedValues.map(_.asInstanceOf[AnyRef]): _*).end())

case _ =>
actionType match {
case _: TrimUnconvertibleFilters => None
case BuildSearchArgument(builder) =>
throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}")
}
case _ => None
}
}
}
Expand Down
Loading