Skip to content

Conversation

@xguo27
Copy link
Contributor

@xguo27 xguo27 commented Jan 27, 2016

When Aggregate operator being applied ExtractPythonUDFs rule, it becomes a Project. This change fixes that and maintain Aggregate operator to the original type.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@xguo27
Copy link
Contributor Author

xguo27 commented Feb 26, 2016

@rxin Does this fix look good to you?

@rxin
Copy link
Contributor

rxin commented Feb 27, 2016

cc @davies


if (plan.isInstanceOf[Aggregate]) {
transformed
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a style nit: put else on the same line as the previous }

also can you add some comment explaining what's happening

@xguo27
Copy link
Contributor Author

xguo27 commented Mar 1, 2016

Using these two functionally equavalent code snippets:

Scala

val data = Seq((1, "1"), (2, "2"), (3, "2"), (1, "3")).toDF("a","b")
val my_filter = sqlContext.udf.register("my_filter", (a:Int) => a==1)
data.select(col("a")).distinct().filter(my_filter(col("a")))

Python

data = sqlContext.createDataFrame([(1, "1"), (2, "2"), (3, "2"), (1, "3")], ["a", "b"])
my_filter = udf(lambda a: a == 1, BooleanType())
data.select(col("a")).distinct().filter(my_filter(col("a")))

The logical plan comes out execute(aggregateCondition) in here is as below:

val resolvedOperator = execute(aggregatedCondition)

Scala

Aggregate [a#8], [UDF(a#8) AS havingCondition#11]
+- Project [a#8]
   +- Project [_1#6 AS a#8,_2#7 AS b#9]
      +- LocalRelation [_1#6,_2#7], [[1,1],[2,2],[3,2],[1,3]]

Python

Project [havingCondition#2]
+- Aggregate [a#0L], [pythonUDF#3 AS havingCondition#2]
   +- EvaluatePython PythonUDF#<lambda>(a#0L), pythonUDF#3: boolean
      +- Project [a#0L]
         +- LogicalRDD [a#0L,b#1], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2

We can see in Python's case, we inject an extra Project when execute(aggregateCondition)going through ExtractPythonUDFs, but ResolveAggregateFunctions expects an Aggregate here:

val resolvedOperator = execute(aggregatedCondition)
def resolvedAggregateFilter =
resolvedOperator
.asInstanceOf[Aggregate]
.aggregateExpressions.head

With this fix, the logical plan generated for Python UDFs does not construct a Project if it is an Aggregate, making it consistent with its Scala counterpart, which gives correct results for ResolveAggregateFunctions to consume:

After fix, Python:

Aggregate [a#0L], [pythonUDF#3 AS havingCondition#2]
+- EvaluatePython PythonUDF#<lambda>(a#0L), pythonUDF#3: boolean
   +- Project [a#0L]
      +- LogicalRDD [a#0L,b#1], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2

@davies
Copy link
Contributor

davies commented Apr 2, 2016

@xguo27 Thanks for working on this. I think the root cause here is that we extract Python UDFs too early (in analyzer), EvaluatePython is an special logical plan, many rules have no knowledge of it, which will break many things. We should extract Python UDFs later, in end of optimizer, or physical plan, I will send an PR to fix that.

@xguo27
Copy link
Contributor Author

xguo27 commented Apr 3, 2016

Sure @davies . I will close this PR.

@xguo27 xguo27 closed this Apr 3, 2016
@xguo27 xguo27 deleted the SPARK-12981 branch April 3, 2016 01:08
asfgit pushed a commit that referenced this pull request Apr 4, 2016
## What changes were proposed in this pull request?

Currently we extract Python UDFs into a special logical plan EvaluatePython in analyzer, But EvaluatePython is not part of catalyst, many rules have no knowledge of it , which will break many things (for example, filter push down or column pruning).

We should treat Python UDFs as normal expressions, until we want to evaluate in physical plan, we could extract them in end of optimizer, or physical plan.

This PR extract Python UDFs in physical plan.

Closes #10935

## How was this patch tested?

Added regression tests.

Author: Davies Liu <[email protected]>

Closes #12127 from davies/py_udf.
tarnfeld added a commit to duedil-ltd/spark that referenced this pull request May 24, 2016
We'ved attempted to backport the following patch for a pretty major bug
in 1.6 dataframes, hopefully it works...

apache#10935
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants