-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28108][SQL][test-hadoop3.2] Simplify OrcFilters #24910
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 2 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
1f641d8
refactor
gengliangwang c0cd6f3
rename
gengliangwang ce08f04
address comments
gengliangwang 77eaff3
address comments
gengliangwang f2051ec
add blank lines
gengliangwang 1792bb6
refactor buildSearchArgument
gengliangwang File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,26 +64,79 @@ private[sql] object OrcFilters extends OrcFiltersBase { | |
| */ | ||
| def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { | ||
| val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap | ||
| val orcFilterConverter = new OrcFilterConverter(dataTypeMap) | ||
| for { | ||
| // Combines all filters using `And` to produce a single conjunction | ||
| conjunction <- buildTree(filters) | ||
| // Then tries to build a single ORC `SearchArgument` for the conjunction predicate | ||
| builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) | ||
| // Combines all convertible filters using `And` to produce a single conjunction | ||
| conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) | ||
| // Then tries to build a single ORC `SearchArgument` for the conjunction predicate. | ||
| // The input predicate is fully convertible. There should not be any empty result in the | ||
| // following recursive method call `buildSearchArgument`. | ||
| builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) | ||
| } yield builder.build() | ||
| } | ||
|
|
||
| def convertibleFilters( | ||
| schema: StructType, | ||
| dataTypeMap: Map[String, DataType], | ||
| filters: Seq[Filter]): Seq[Filter] = { | ||
| val orcFilterConverter = new OrcFilterConverter(dataTypeMap) | ||
| filters.flatMap(orcFilterConverter.trimUnconvertibleFilters) | ||
| } | ||
| import org.apache.spark.sql.sources._ | ||
|
|
||
| } | ||
| def convertibleFiltersHelper( | ||
| filter: Filter, | ||
| canPartialPushDown: Boolean): Option[Filter] = filter match { | ||
| // At here, it is not safe to just convert one side and remove the other side | ||
| // if we do not understand what the parent filters are. | ||
| // | ||
| // Here is an example used to explain the reason. | ||
| // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to | ||
| // convert b in ('1'). If we only convert a = 2, we will end up with a filter | ||
| // NOT(a = 2), which will generate wrong results. | ||
| // | ||
| // Pushing one side of AND down is only safe to do at the top level or in the child | ||
| // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate | ||
| // can be safely removed. | ||
| case And(left, right) => | ||
| val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) | ||
| val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) | ||
| (leftResultOptional, rightResultOptional) match { | ||
| case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult)) | ||
| case (Some(leftResult), None) if canPartialPushDown => Some(leftResult) | ||
| case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult) | ||
| case _ => None | ||
| } | ||
|
|
||
| private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) { | ||
| // The Or predicate is convertible when both of its children can be pushed down. | ||
| // That is to say, if one/both of the children can be partially pushed down, the Or | ||
| // predicate can be partially pushed down as well. | ||
| // | ||
| // Here is an example used to explain the reason. | ||
| // Let's say we have | ||
| // (a1 AND a2) OR (b1 AND b2), | ||
| // a1 and b1 is convertible, while a2 and b2 is not. | ||
| // The predicate can be converted as | ||
| // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) | ||
| // As per the logical in And predicate, we can push down (a1 OR b1). | ||
| case Or(left, right) => | ||
| val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) | ||
| val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) | ||
| if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { | ||
| None | ||
| } else { | ||
| Some(Or(leftResultOptional.get, rightResultOptional.get)) | ||
| } | ||
| case Not(pred) => | ||
| val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) | ||
| resultOptional.map(Not) | ||
| case other => | ||
| if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { | ||
| Some(other) | ||
| } else { | ||
| None | ||
| } | ||
|
||
| } | ||
| filters.flatMap { filter => | ||
| convertibleFiltersHelper(filter, true) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Get PredicateLeafType which is corresponding to the given DataType. | ||
|
|
@@ -115,228 +168,90 @@ private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) { | |
| case _ => value | ||
| } | ||
|
|
||
| import org.apache.spark.sql.sources._ | ||
| import OrcFilters._ | ||
|
|
||
| /** | ||
| * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then | ||
| * only building the remaining convertible nodes. | ||
| * | ||
| * Doing the conversion in this way avoids the computational complexity problems introduced by | ||
| * checking whether a node is convertible while building it. The approach implemented here has | ||
| * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run | ||
| * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert | ||
| * the remaining nodes. | ||
| * Build a SearchArgument and return the builder so far. | ||
| * | ||
| * The alternative approach of checking-while-building can (and did) result | ||
| * in exponential complexity in the height of the tree, causing perf problems with Filters with | ||
| * as few as ~35 nodes if they were skewed. | ||
| * @param dataTypeMap a map from the attribute name to its data type. | ||
| * @param expression the input filter predicates. | ||
| * @param builder the input SearchArgument.Builder. | ||
| * @return the builder so far. | ||
| */ | ||
| private[sql] def buildSearchArgument( | ||
| private def buildSearchArgument( | ||
| dataTypeMap: Map[String, DataType], | ||
| expression: Filter, | ||
| builder: Builder): Option[Builder] = { | ||
| trimUnconvertibleFilters(expression).map { filter => | ||
| updateBuilder(filter, builder) | ||
| builder | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Removes all sub-Filters from a given Filter that are not convertible to an ORC SearchArgument. | ||
| */ | ||
| private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { | ||
| performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = true), expression) | ||
| } | ||
|
|
||
| /** | ||
| * Builds a SearchArgument for the given Filter. This method should only be called on Filters | ||
| * that have previously been trimmed to remove unsupported sub-Filters! | ||
| */ | ||
| private def updateBuilder(expression: Filter, builder: Builder): Unit = | ||
| performAction(BuildSearchArgument(builder), expression) | ||
|
|
||
| sealed trait ActionType[ReturnType] | ||
| case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) | ||
| extends ActionType[Option[Filter]] | ||
| case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] | ||
|
|
||
| // The performAction method can run both the filtering and building operations for a given | ||
| // node - we signify which one we want with the `actionType` parameter. | ||
| // | ||
| // There are a couple of benefits to coupling the two operations like this: | ||
| // 1. All the logic for a given predicate is grouped logically in the same place. You don't | ||
| // have to scroll across the whole file to see what the filter action for an And is while | ||
| // you're looking at the build action. | ||
| // 2. It's much easier to keep the implementations of the two operations up-to-date with | ||
| // each other. If the `filter` and `build` operations are implemented as separate case-matches | ||
| // in different methods, it's very easy to change one without appropriately updating the | ||
| // other. For example, if we add a new supported node type to `filter`, it would be very | ||
| // easy to forget to update `build` to support it too, thus leading to conversion errors. | ||
| private def performAction[ReturnType]( | ||
| actionType: ActionType[ReturnType], | ||
| expression: Filter): ReturnType = { | ||
| def getType(attribute: String): PredicateLeaf.Type = | ||
| getPredicateLeafType(dataTypeMap(attribute)) | ||
|
|
||
| import org.apache.spark.sql.sources._ | ||
|
|
||
| expression match { | ||
| case And(left, right) => | ||
| actionType match { | ||
| case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) => | ||
| // At here, it is not safe to just keep one side and remove the other side | ||
| // if we do not understand what the parent filters are. | ||
| // | ||
| // Here is an example used to explain the reason. | ||
| // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to | ||
| // convert b in ('1'). If we only convert a = 2, we will end up with a filter | ||
| // NOT(a = 2), which will generate wrong results. | ||
| // | ||
| // Pushing one side of AND down is only safe to do at the top level or in the child | ||
| // AND before hitting NOT or OR conditions, and in this case, the unsupported | ||
| // predicate can be safely removed. | ||
| val lhs = performAction(t, left) | ||
| val rhs = performAction(t, right) | ||
| (lhs, rhs) match { | ||
| case (Some(l), Some(r)) => Some(And(l, r)) | ||
| case (Some(_), None) if canPartialPushDownConjuncts => lhs | ||
| case (None, Some(_)) if canPartialPushDownConjuncts => rhs | ||
| case _ => None | ||
| } | ||
| case b @ BuildSearchArgument(builder) => | ||
| builder.startAnd() | ||
| performAction(b, left) | ||
| performAction(b, right) | ||
| builder.end() | ||
| () | ||
| } | ||
| for { | ||
| lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) | ||
| rhs <- buildSearchArgument(dataTypeMap, right, lhs) | ||
| } yield rhs.end() | ||
|
|
||
| case Or(left, right) => | ||
| actionType match { | ||
| case t: TrimUnconvertibleFilters => | ||
| // The Or predicate is convertible when both of its children can be pushed down. | ||
| // That is to say, if one/both of the children can be partially pushed down, the Or | ||
| // predicate can be partially pushed down as well. | ||
| // | ||
| // Here is an example used to explain the reason. | ||
| // Let's say we have | ||
| // (a1 AND a2) OR (b1 AND b2), | ||
| // a1 and b1 is convertible, while a2 and b2 is not. | ||
| // The predicate can be converted as | ||
| // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) | ||
| // As per the logical in And predicate, we can push down (a1 OR b1). | ||
| for { | ||
| lhs: Filter <- performAction(t, left) | ||
| rhs: Filter <- performAction(t, right) | ||
| } yield Or(lhs, rhs) | ||
| case b @ BuildSearchArgument(builder) => | ||
| builder.startOr() | ||
| performAction(b, left) | ||
| performAction(b, right) | ||
| builder.end() | ||
| () | ||
| } | ||
| for { | ||
| lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) | ||
| rhs <- buildSearchArgument(dataTypeMap, right, lhs) | ||
| } yield rhs.end() | ||
|
|
||
| case Not(child) => | ||
| actionType match { | ||
| case t: TrimUnconvertibleFilters => | ||
| performAction(t.copy(canPartialPushDownConjuncts = false), child).map(Not) | ||
| case b @ BuildSearchArgument(builder) => | ||
| builder.startNot() | ||
| performAction(b, child) | ||
| builder.end() | ||
| () | ||
| } | ||
| for { | ||
| negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) | ||
| } yield negate.end() | ||
|
|
||
| // NOTE: For all case branches dealing with leaf predicates below, the additional | ||
| // `startAnd()` call is mandatory. ORC `SearchArgument` builder requires that all leaf | ||
| // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or `Not`). | ||
| // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` | ||
| // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be | ||
| // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). | ||
|
|
||
| case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => | ||
| actionType match { | ||
| case _: TrimUnconvertibleFilters => Some(expression) | ||
| case BuildSearchArgument(builder) => | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValue = castLiteralValue(value, dataTypeMap(attribute)) | ||
| builder.startAnd().equals(quotedName, getType(attribute), castedValue).end() | ||
| () | ||
| } | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValue = castLiteralValue(value, dataTypeMap(attribute)) | ||
| Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end()) | ||
|
|
||
| case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => | ||
| actionType match { | ||
| case _: TrimUnconvertibleFilters => Some(expression) | ||
| case BuildSearchArgument(builder) => | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValue = castLiteralValue(value, dataTypeMap(attribute)) | ||
| builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end() | ||
| () | ||
| } | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValue = castLiteralValue(value, dataTypeMap(attribute)) | ||
| Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end()) | ||
|
|
||
| case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => | ||
| actionType match { | ||
| case _: TrimUnconvertibleFilters => Some(expression) | ||
| case BuildSearchArgument(builder) => | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValue = castLiteralValue(value, dataTypeMap(attribute)) | ||
| builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end() | ||
| () | ||
| } | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValue = castLiteralValue(value, dataTypeMap(attribute)) | ||
| Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end()) | ||
|
|
||
| case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => | ||
| actionType match { | ||
| case _: TrimUnconvertibleFilters => Some(expression) | ||
| case BuildSearchArgument(builder) => | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValue = castLiteralValue(value, dataTypeMap(attribute)) | ||
| builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end() | ||
| () | ||
| } | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValue = castLiteralValue(value, dataTypeMap(attribute)) | ||
| Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end()) | ||
|
|
||
| case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => | ||
| actionType match { | ||
| case _: TrimUnconvertibleFilters => Some(expression) | ||
| case BuildSearchArgument(builder) => | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValue = castLiteralValue(value, dataTypeMap(attribute)) | ||
| builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end() | ||
| () | ||
| } | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValue = castLiteralValue(value, dataTypeMap(attribute)) | ||
| Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end()) | ||
|
|
||
| case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => | ||
| actionType match { | ||
| case _: TrimUnconvertibleFilters => Some(expression) | ||
| case BuildSearchArgument(builder) => | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValue = castLiteralValue(value, dataTypeMap(attribute)) | ||
| builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end() | ||
| () | ||
| } | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValue = castLiteralValue(value, dataTypeMap(attribute)) | ||
| Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end()) | ||
|
|
||
| case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => | ||
| actionType match { | ||
| case _: TrimUnconvertibleFilters => Some(expression) | ||
| case BuildSearchArgument(builder) => | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| builder.startAnd().isNull(quotedName, getType(attribute)).end() | ||
| () | ||
| } | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| Some(builder.startAnd().isNull(quotedName, getType(attribute)).end()) | ||
|
|
||
| case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => | ||
| actionType match { | ||
| case _: TrimUnconvertibleFilters => Some(expression) | ||
| case BuildSearchArgument(builder) => | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| builder.startNot().isNull(quotedName, getType(attribute)).end() | ||
| () | ||
| } | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| Some(builder.startNot().isNull(quotedName, getType(attribute)).end()) | ||
|
|
||
| case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => | ||
| actionType match { | ||
| case _: TrimUnconvertibleFilters => Some(expression) | ||
| case BuildSearchArgument(builder) => | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) | ||
| builder.startAnd().in(quotedName, getType(attribute), | ||
| castedValues.map(_.asInstanceOf[AnyRef]): _*).end() | ||
| () | ||
| } | ||
| val quotedName = quoteAttributeNameIfNeeded(attribute) | ||
| val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) | ||
| Some(builder.startAnd().in(quotedName, getType(attribute), | ||
| castedValues.map(_.asInstanceOf[AnyRef]): _*).end()) | ||
|
|
||
| case _ => | ||
| actionType match { | ||
| case _: TrimUnconvertibleFilters => None | ||
| case BuildSearchArgument(builder) => | ||
| throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}") | ||
| } | ||
| case _ => None | ||
| } | ||
| } | ||
| } | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part can be simplified to: