Skip to content

Conversation

@gatorsmile
Copy link
Member

@gatorsmile gatorsmile commented Dec 7, 2016

What changes were proposed in this pull request?

Currently, when users use Python UDF in Filter, BatchEvalPython is always generated below FilterExec. However, not all the predicates need to be evaluated after Python UDF execution. Thus, this PR is to push down the determinisitc predicates through BatchEvalPython.

>>> df = spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
>>> from pyspark.sql.functions import udf, col
>>> from pyspark.sql.types import BooleanType
>>> my_filter = udf(lambda a: a < 2, BooleanType())
>>> sel = df.select(col("key"), col("value")).filter((my_filter(col("key"))) & (df.value < "2"))
>>> sel.explain(True)

Before the fix, the plan looks like

== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]

== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter ((isnotnull(value#1) && pythonUDF0#9) && (value#1 < 2))
   +- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
      +- Scan ExistingRDD[key#0L,value#1]

After the fix, the plan looks like

== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]

== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter pythonUDF0#9: boolean
   +- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
      +- *Filter (isnotnull(value#1) && (value#1 < 2))
         +- Scan ExistingRDD[key#0L,value#1]

How was this patch tested?

Added both unit test cases for BatchEvalPythonExec and also add an end-to-end test case in Python test suite.

@gatorsmile gatorsmile changed the title [SPARK-18766] [SQL] Push Down Filter Through BatchEvalPython [SPARK-18766] [SQL] Push Down Filter Through BatchEvalPython (Python UDF) Dec 7, 2016
val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
case f @ FilterExec(And(_: AttributeReference, _: AttributeReference), _) => f
case b: BatchEvalPythonExec => b
case f @ FilterExec(_: In, _) => f
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The physical plan has a few hidden nodes that are not shown in Explain output. Thus, I did not compare the result with the expected tree structure.

assert(qualifiedPlanNodes.size == 2)
}

test("Python UDF refers to the attributes from more than one child") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case is not directly related to this PR. In the future, we need to add more unit test cases in Scala side for verifying BatchEvalPythonExec for improving the test case coverage.

}

// This rule is to push deterministic predicates through BatchEvalPythonExec
object PushPredicateThroughBatchEvalPython extends Rule[SparkPlan] with PredicateHelper {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of codes are from the optimizer rule PushDownPredicate. Not sure whether we should combine them. You know, this rule is for SparkPlan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a predicate-pushdown rule for SparkPlan sounds bad, can we try to do this in extract()? for example

val splittedFilter = trySplitFilter(plan)
val newChildren = splittedFilter.children.map { child =>
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! The new commit does it.

@gatorsmile
Copy link
Member Author

cc @cloud-fan @davies @liancheng

def apply(plan: SparkPlan): SparkPlan = plan transformUp {
case plan: SparkPlan => extract(plan)
case plan: SparkPlan =>
val newPlan = extract(plan)
Copy link
Member Author

@gatorsmile gatorsmile Dec 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract is a recursive function. That is why I did not move the following logics into extract for performance reasons.

from pyspark.sql.types import BooleanType

my_filter = udf(lambda a: a < 2, BooleanType())
sel = df.select(col("key"), col("value")).filter((my_filter(col("key"))) & (df.value < "2"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this test fail before this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. This case works well.

@cloud-fan
Copy link
Contributor

Would it be easier if we create a logical node for python evaluator? We do have one in Spark 1.6 but get removed in 2.0, not sure why

@SparkQA
Copy link

SparkQA commented Dec 7, 2016

Test build #69787 has finished for PR 16193 at commit eaf740a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
import testImplicits.newProductEncoder
import testImplicits.localSeqToDatasetHolder
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation?

// Only push down the predicates that is deterministic and all the referenced attributes
// come from grandchild.
val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(filter.condition).span(_.deterministic)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Indentation?

@gatorsmile
Copy link
Member Author

@cloud-fan Let me do a history search and see why we dropped the logical plan node EvaluatePython

@gatorsmile
Copy link
Member Author

#12127 dropped the node EvaluatePython . Based on the PR description, we removed the node for the following reasons:

Currently we extract Python UDFs into a special logical plan EvaluatePython in analyzer, But EvaluatePython is not part of catalyst, many rules have no knowledge of it , which will break many things (for example, filter push down or column pruning).
We should treat Python UDFs as normal expressions, until we want to evaluate in physical plan, we could extract them in end of optimizer, or physical plan.

@gatorsmile
Copy link
Member Author

I also checked the plan of our 1.6.3 branch. The filter is not appropriately pushed down, even if we have the logical node EvaluatePython.

== Parsed Logical Plan ==
'Filter (PythonUDF#<lambda>('key) && (value#1 < 2))
+- Project [key#0L,value#1]
   +- LogicalRDD [key#0L,value#1], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2

== Analyzed Logical Plan ==
key: bigint, value: string
Project [key#0L,value#1]
+- Filter (pythonUDF#2 && (value#1 < 2))
   +- EvaluatePython PythonUDF#<lambda>(key#0L), pythonUDF#2: boolean
      +- Project [key#0L,value#1]
         +- LogicalRDD [key#0L,value#1], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2

== Optimized Logical Plan ==
Project [key#0L,value#1]
+- Filter (pythonUDF#2 && (value#1 < 2))
   +- EvaluatePython PythonUDF#<lambda>(key#0L), pythonUDF#2: boolean
      +- LogicalRDD [key#0L,value#1], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2

== Physical Plan ==
Project [key#0L,value#1]
+- Filter (pythonUDF#2 && (value#1 < 2))
   +- !BatchPythonEvaluation PythonUDF#<lambda>(key#0L), [key#0L,value#1,pythonUDF#2]
      +- Scan ExistingRDD[key#0L,value#1]

@SparkQA
Copy link

SparkQA commented Dec 7, 2016

Test build #69808 has finished for PR 16193 at commit 2c3b917.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please

@SparkQA
Copy link

SparkQA commented Dec 8, 2016

Test build #69850 has started for PR 16193 at commit 2c3b917.

@viirya
Copy link
Member

viirya commented Dec 8, 2016

As we can push down predicates through data source scan, those predicates should be already pushed down if they are in the query plan above data source scan node. This seems only work on ExistingRDD scan which the predicates cannot be pushed down.

So the question is, should we push down predicates to ExistingRDD scan? I think there is not much benefit except for this case.

@viirya
Copy link
Member

viirya commented Dec 8, 2016

If we really want to do this, I'd suggest to push down predicates to rdd scan node during query planning stage. So we don't need to push down predicates to SparkPlan like this.

@gatorsmile
Copy link
Member Author

@viirya I did not get your points. Why pushing down predicates through Python UDF does not have significant benefit? Based on my understanding, it could greatly reduce the number of rows consumed/processed by UDF. Normally, UDF is much more expensive than the built-in expressions.

@gatorsmile
Copy link
Member Author

ExistingRDD might not be always the child of Filter. For example,

>>> sel = df.select('key', 'value', rand()).filter((my_filter(col("key"))) & (df.value < "2"))
== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- Project [key#0L, value#1, rand(9089678730530723370) AS rand(9089678730530723370)#20]
   +- LogicalRDD [key#0L, value#1]

== Physical Plan ==
*Project [key#0L, value#1, rand(9089678730530723370)#20]
+- *Filter pythonUDF0#26: boolean
   +- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, rand(9089678730530723370)#20, pythonUDF0#26]
      +- *Filter (isnotnull(value#1) && (value#1 < 2))
         +- *Project [key#0L, value#1, rand(9089678730530723370) AS rand(9089678730530723370)#20]
            +- Scan ExistingRDD[key#0L,value#1]


if (pushDown.nonEmpty) {
val newChild = FilterExec(pushDown.reduceLeft(And), filter.child)
if (stayUp.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are should be some UDFs, so this will not be empty

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. : )

// come from child.
val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(filter.condition).span(_.deterministic)
val (pushDown, rest) = candidates.partition(!hasPythonUDF(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: splitConjunctivePredicates(filter.condition).span(e => e.deterministic && !hasPythonUDF(e))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will change the semantics. span and partition have different semantics. Thus, we still have to keep the existing behavior.

Let me write a comment to explain PythonUDF is always assumed to deterministic.

@SparkQA
Copy link

SparkQA commented Dec 8, 2016

Test build #69878 has finished for PR 16193 at commit 3d9ba67.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private def trySplitFilter(plan: SparkPlan): SparkPlan = {
plan match {
case filter: FilterExec =>
// Only push down the predicates that is deterministic and all the referenced attributes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only push down the first few predicates that are all deterministic

// come from child.
val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(filter.condition).span(_.deterministic)
// Python UDF is always deterministic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this one useful? We just won't push down expressions that has Python UDFs here.

@davies
Copy link
Contributor

davies commented Dec 9, 2016

@cloud-fan There is no R UDF at this point.

@davies
Copy link
Contributor

davies commented Dec 9, 2016

If no objection in next two hours, I will merge this one into master.

@gatorsmile
Copy link
Member Author

@davies Just updated the code comments, as you suggested. It does not affect the code logics. Sorry for the late update.

@gatorsmile
Copy link
Member Author

gatorsmile commented Dec 9, 2016

@cloud-fan If the functions in dapply and gapply can be called as UDF in SparkR, we have very limited support. In the plan output, it is represented as MapPartitionsInR and FlatMapGroupsInR.

More strictly, as @davies said, SparkR does not have actual SQL-level registered UDF.

@gatorsmile
Copy link
Member Author

@viirya I think your idea is trying to resolve a different issue. It does not apply to all the cases for PythonUDF pushdown.

@SparkQA
Copy link

SparkQA commented Dec 9, 2016

Test build #69931 has finished for PR 16193 at commit 04b0e9c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 10, 2016

Test build #69935 has finished for PR 16193 at commit 04b0e9c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Rewrite the child that has the input required for the UDF
val newChildren = plan.children.map { child =>
val newChildren =
splittedFilter.children.map { child =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no need to start a new line here?

case f: FilterExec => f
case b: BatchEvalPythonExec => b
}
assert(qualifiedPlanNodes.size == 3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's really hard to tell the correctness by checking the number of plan nodes...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me improve them.

@cloud-fan
Copy link
Contributor

It's a little hacky to me that we do optimization in a planner. How hard is it if we introduce a logical node for python evaluator? We can define an interface in catalyst, e.g. ExternalUDFEvaluator, so that R(or other languages in the future) UDF can also benefit from it.

@davies
Copy link
Contributor

davies commented Dec 10, 2016

@cloud-fan It's not trivial to do this in optimizer, for example, we should split one Filter into two, that will conflict with another optimizer rule, that combine two filter into one.

@viirya
Copy link
Member

viirya commented Dec 10, 2016

If we add a logical node for python evaluator, we'd push down the Filter through it, so the optimizer rule won't combine two Filter into one again?

@davies
Copy link
Contributor

davies commented Dec 10, 2016

The reason we move the PythonUDFEvaluator from logical plan into physical plan, because this one-off break many things, many rules need to treat specially.

@davies
Copy link
Contributor

davies commented Dec 10, 2016

Pushing down predicates into data source is also during optimization in planner, I think this one is not the first that do optimization outside Optimizer.

@SparkQA
Copy link

SparkQA commented Dec 10, 2016

Test build #69961 has started for PR 16193 at commit 2c8e593.

@cloud-fan
Copy link
Contributor

LGTM

@viirya
Copy link
Member

viirya commented Dec 10, 2016

retest this please.

@viirya
Copy link
Member

viirya commented Dec 10, 2016

LGTM

@SparkQA
Copy link

SparkQA commented Dec 10, 2016

Test build #69965 has finished for PR 16193 at commit 2c8e593.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

Thanks! Merging to master!

@asfgit asfgit closed this in 422a45c Dec 10, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
### What changes were proposed in this pull request?
Currently, when users use Python UDF in Filter, BatchEvalPython is always generated below FilterExec. However, not all the predicates need to be evaluated after Python UDF execution. Thus, this PR is to push down the determinisitc predicates through `BatchEvalPython`.
```Python
>>> df = spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
>>> from pyspark.sql.functions import udf, col
>>> from pyspark.sql.types import BooleanType
>>> my_filter = udf(lambda a: a < 2, BooleanType())
>>> sel = df.select(col("key"), col("value")).filter((my_filter(col("key"))) & (df.value < "2"))
>>> sel.explain(True)
```
Before the fix, the plan looks like
```
== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]

== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter ((isnotnull(value#1) && pythonUDF0#9) && (value#1 < 2))
   +- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
      +- Scan ExistingRDD[key#0L,value#1]
```

After the fix, the plan looks like
```
== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]

== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter pythonUDF0#9: boolean
   +- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
      +- *Filter (isnotnull(value#1) && (value#1 < 2))
         +- Scan ExistingRDD[key#0L,value#1]
```

### How was this patch tested?
Added both unit test cases for `BatchEvalPythonExec` and also add an end-to-end test case in Python test suite.

Author: gatorsmile <[email protected]>

Closes apache#16193 from gatorsmile/pythonUDFPredicatePushDown.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
### What changes were proposed in this pull request?
Currently, when users use Python UDF in Filter, BatchEvalPython is always generated below FilterExec. However, not all the predicates need to be evaluated after Python UDF execution. Thus, this PR is to push down the determinisitc predicates through `BatchEvalPython`.
```Python
>>> df = spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
>>> from pyspark.sql.functions import udf, col
>>> from pyspark.sql.types import BooleanType
>>> my_filter = udf(lambda a: a < 2, BooleanType())
>>> sel = df.select(col("key"), col("value")).filter((my_filter(col("key"))) & (df.value < "2"))
>>> sel.explain(True)
```
Before the fix, the plan looks like
```
== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]

== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter ((isnotnull(value#1) && pythonUDF0#9) && (value#1 < 2))
   +- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
      +- Scan ExistingRDD[key#0L,value#1]
```

After the fix, the plan looks like
```
== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]

== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter pythonUDF0#9: boolean
   +- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
      +- *Filter (isnotnull(value#1) && (value#1 < 2))
         +- Scan ExistingRDD[key#0L,value#1]
```

### How was this patch tested?
Added both unit test cases for `BatchEvalPythonExec` and also add an end-to-end test case in Python test suite.

Author: gatorsmile <[email protected]>

Closes apache#16193 from gatorsmile/pythonUDFPredicatePushDown.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants