Skip to content

Conversation

@cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented May 22, 2019

What changes were proposed in this pull request?

In #22104 , we create the python-eval nodes at the end of the optimization phase, which causes a problem.

After the main optimization batch, Filter and Project nodes are usually pushed to the bottom, near the scan node. However, if we extract Python UDFs from Filter/Project, and create a python-eval node under Filter/Project, it will break column pruning/filter pushdown of the scan node.

There are some hacks in the ExtractPythonUDFs rule, to duplicate the column pruning and filter pushdown logic. However, it has some bugs as demonstrated in the new test case(only column pruning is broken). This PR removes the hacks and re-apply the column pruning and filter pushdown rules explicitly.

Before:

...
== Analyzed Logical Plan ==
a: bigint
Project [a#168L]
+- Filter dummyUDF(a#168L)
   +- Relation[a#168L,b#169L] parquet

== Optimized Logical Plan ==
Project [a#168L]
+- Project [a#168L, b#169L]
   +- Filter pythonUDF0#174: boolean
      +- BatchEvalPython [dummyUDF(a#168L)], [a#168L, b#169L, pythonUDF0#174]
         +- Relation[a#168L,b#169L] parquet

== Physical Plan ==
*(2) Project [a#168L]
+- *(2) Project [a#168L, b#169L]
   +- *(2) Filter pythonUDF0#174: boolean
      +- BatchEvalPython [dummyUDF(a#168L)], [a#168L, b#169L, pythonUDF0#174]
         +- *(1) FileScan parquet [a#168L,b#169L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/_1/bzcp960d0hlb988k90654z2w0000gp/T/spark-798bae3c-a2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>

After:

...
== Analyzed Logical Plan ==
a: bigint
Project [a#168L]
+- Filter dummyUDF(a#168L)
   +- Relation[a#168L,b#169L] parquet

== Optimized Logical Plan ==
Project [a#168L]
+- Filter pythonUDF0#174: boolean
   +- BatchEvalPython [dummyUDF(a#168L)], [pythonUDF0#174]
      +- Project [a#168L]
         +- Relation[a#168L,b#169L] parquet

== Physical Plan ==
*(2) Project [a#168L]
+- *(2) Filter pythonUDF0#174: boolean
   +- BatchEvalPython [dummyUDF(a#168L)], [pythonUDF0#174]
      +- *(1) FileScan parquet [a#168L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/_1/bzcp960d0hlb988k90654z2w0000gp/T/spark-9500cafb-78..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint>

How was this patch tested?

new test

Copy link
Contributor Author

@cloud-fan cloud-fan May 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to work with the ColumnPruning and PushDownPredicate rule, we must correctly implement the references method. resultAttrs are definitely not references.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If references only cover references in udfs, will some output attributes from child that aren't referred by udfs be pruned from BaseEvalPython?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, and this is "column pruning".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to work with the ColumnPruning rule, the python-eval node should be able to dynamically update its output if the child's output updated.

@cloud-fan
Copy link
Contributor Author

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for moving out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've to move out because I need to access them in PushdownPredicate, which is in catalyst module.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ExtractPythonUDFs newly added to nonExcludableRules? Is it also for the fix? Or just it should be there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be there. We can do it in another PR, but since I'm touching this file, I just fixed it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Just out of curiosity.

@SparkQA
Copy link

SparkQA commented May 22, 2019

Test build #105671 has finished for PR 24675 at commit bbc085d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait BaseEvalPython extends UnaryNode
  • case class BatchEvalPython(
  • case class ArrowEvalPython(
  • case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)
  • case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)
  • abstract class EvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)

@cloud-fan
Copy link
Contributor Author

retest this please

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented May 22, 2019

Test build #105678 has finished for PR 24675 at commit bbc085d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait BaseEvalPython extends UnaryNode
  • case class BatchEvalPython(
  • case class ArrowEvalPython(
  • case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)
  • case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)
  • abstract class EvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented May 22, 2019

Test build #105684 has finished for PR 24675 at commit bbc085d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait BaseEvalPython extends UnaryNode
  • case class BatchEvalPython(
  • case class ArrowEvalPython(
  • case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)
  • case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)
  • abstract class EvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)

@SparkQA
Copy link

SparkQA commented May 22, 2019

Test build #105692 has finished for PR 24675 at commit 636b603.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait BaseEvalPython extends UnaryNode
  • case class BatchEvalPython(
  • case class ArrowEvalPython(
  • case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)
  • case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)
  • abstract class EvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)

case _: Repartition => true
case _: ScriptTransformation => true
case _: Sort => true
case _: BatchEvalPython => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my benefit, would you mind explain what does canPushThrough define? Are these nodes that a projection and/or filter can be pushed through?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This defines the nodes that we can push filters through.


// Split the original FilterExec to two FilterExecs. Only push down the first few predicates
// that are all deterministic.
private def trySplitFilter(plan: LogicalPlan): LogicalPlan = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a little why this is no longer needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quote from the PR description

There are some hacks in the ExtractPythonUDFs rule, to duplicate the column pruning and filter pushdown logic. However, it has some bugs as demonstrated in the new test case(only column pruning is broken). This PR removes the hacks and re-apply the column pruning and filter pushdown rules explicitly.

@HyukjinKwon HyukjinKwon changed the title [SPARK-27803][SQL] fix column pruning for python UDF [SPARK-27803][SQL][PYTHON] Fix column pruning for Python UDF May 23, 2019
@HyukjinKwon
Copy link
Member

makes sense to me.

override val producedAttributes = AttributeSet(output)
}

trait BaseEvalPython extends UnaryNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is producedAttributes missing from this? Previously, BatchEvalPython and ArrowEvalPython have it defined.

Copy link
Contributor Author

@cloud-fan cloud-fan May 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a problem I want to address later. I think producedAttributes makes no sense. It's only used to define missingInput, but we can overwrite reference to do the same thing.

More specifically, if reference is wrongly implemented, column pruning will be broken. If producedAttributes is not implemented, nothing serious will happen.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good. We should have column pruning at single place, not like separately in ExtractPythonUDFs, previously.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 24, 2019

BTW, just to be sync'ed with you too @BryanCutler, @viirya and @icexelloss, I am planning to add a bunch of tests specific to regular Python UDF and Pandas Scalar UDF, which are possibly able to reused to Scala UDF too - I am trying to find a way to deduplicate as much as possible. I hopefully it makes sense to you guys.

This special rule ExtractPythonUDF[s|FromAggregate] has unevaluable expressions that always has to be wrapped with special plans. Seems like we remove some hacks now but I think we're not sure about the coverage.

I think we started to observe those issues since we turned those Python ones from physical plans to logical plans, which was (I think) right fix but couldn't catch many cases like this. My idea is basically to share (or partially duplicate) *.sql files for Python / Pandas / Scala UDFs - hope this idea prevents such issues in the future.

@HyukjinKwon
Copy link
Member

Will get this in in few days if there are no more comments.

@HyukjinKwon
Copy link
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants