Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -237,21 +237,77 @@ private[hive] trait HiveStrategies {
* applied.
*/
object HiveTableScans extends Strategy {

def constructBinaryOperators(left:Expression, right: Expression, op_type: String): Expression ={
(left == null, right == null) match {
case (true, true) => null
case (true, false) => right
case (false, true) => left
case (false, false) =>
if(op_type == "or")
Or(left, right)
else if (op_type == "and")
And(left, right)
else
null
}
}

def resolveAndExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = {
if(expr.isInstanceOf[And]){
val and = expr.asInstanceOf[And]
constructBinaryOperators(resolvePredicatesExpression(and.left, partitionKeyIds), resolvePredicatesExpression(and.right, partitionKeyIds), "and")
}else{
resolvePredicatesExpression(expr, partitionKeyIds)
}
}

def resolveOrExpression(or: Or, partitionKeyIds: AttributeSet): Expression = {
(or.left.isInstanceOf[Or],or.right.isInstanceOf[Or]) match {
case (true, true) => constructBinaryOperators(resolveOrExpression(or.left.asInstanceOf[Or], partitionKeyIds) , resolveOrExpression(or.right.asInstanceOf[Or], partitionKeyIds), "or")
case (true, false) => constructBinaryOperators(resolveOrExpression(or.left.asInstanceOf[Or], partitionKeyIds) , resolveAndExpression(or.right, partitionKeyIds), "or")
case (false, true) => constructBinaryOperators(resolveAndExpression(or.left, partitionKeyIds) , resolveOrExpression(or.right.asInstanceOf[Or], partitionKeyIds), "or")
case (false, false) => constructBinaryOperators(resolveAndExpression(or.left, partitionKeyIds) , resolveAndExpression(or.right, partitionKeyIds), "or")
}
}

def resolvePredicatesExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression ={
if(!expr.references.isEmpty && expr.references.subsetOf(partitionKeyIds))
expr
else
null
}

def extractPushDownPredicate(predicates: Seq[Expression], partitionKeyIds: AttributeSet): Seq[Expression] ={
predicates.map(predicate => {
if(predicate.isInstanceOf[Or]){
val or = predicate.asInstanceOf[Or]
resolveOrExpression(or, partitionKeyIds)
}else{
resolvePredicatesExpression(predicate,partitionKeyIds)
}
})
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, predicates, relation: HiveTableRelation) =>
// Filter out all predicates that only deal with partition keys, these are given to the
// hive table scan operator to be used for partition pruning.
val partitionKeyIds = AttributeSet(relation.partitionCols)
val (pruningPredicates, otherPredicates) = predicates.partition { predicate =>
val (_, otherPredicates) = predicates.partition { predicate => {
!predicate.references.isEmpty &&
predicate.references.subsetOf(partitionKeyIds)
predicate.references.subsetOf(partitionKeyIds)
}
}

val extractedPruningPredicates = extractPushDownPredicate(predicates, partitionKeyIds)
.filter(_ != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu, just for clarification, this code path does support OR expression but you want to do a partial pushdown right? Considering it needs a lot of codes as @wangyum pointed out, I think we should better try to promote to use (or convert) Spark's Parquet or ORC. It looks like an overkill to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon What I do is to extract condition's about partition keys.For the old code :
val (pruningPredicates, otherPredicates) = predicates.partition { predicate => !predicate.references.isEmpty && predicate.references.subsetOf(partitionKeyIds) }

If in expression, there contains other key, it won't be a push to HiveTableScanExec, So what I to it to fix this situation, just extract all condition about partition keys, then push it to HiveTableScanExec, HiveTableScanExec will handle complex combine expressions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon
Spark's Parquet or ORC is perfect, and it can push down filter condition, but it can't resolve the problem that when we read a Hive table, our first behavior is scan, What this pr want to do is to reduce the time of resolve file info and partition metadata, and the file we scan. Then the file num or partition num is big, it takes too long.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we convert Hive table reading operations into Spark's ones, via, for instance, spark.sql.hive.convertMetastoreParquet conf. If the diff is small, I might be fine but this does look like an overkill to me. I haven't taken a close look but it virtually looks like we need a fix like #24598

I won't object if some other committers are fine with that.

Copy link
Contributor Author

@AngersZhuuuu AngersZhuuuu Jun 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I know that it's better to convert Hive table reading operations into Spark's , but it can't fix all situation. In our production env, we just change hive data's default storage type to orc. For partition table, if different partition's serde is not the same, Convert will failed, since during converting , it will check all partition's file by table level serde.


pruneFilterProject(
projectList,
otherPredicates,
identity[Seq[Expression]],
HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil
HiveTableScanExec(_, relation, extractedPruningPredicates)(sparkSession)) :: Nil
case _ =>
Nil
}
Expand Down