Skip to content

Conversation

@mgaido91
Copy link
Contributor

@mgaido91 mgaido91 commented Dec 8, 2017

What changes were proposed in this pull request?

In SPARK-20586 the flag deterministic was added to Scala UDF, but it is not available for python UDF. This flag is useful for cases when the UDF's code can return different result with the same input. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. This can lead to unexpected behavior.

This PR adds the deterministic flag, via the asNondeterministic method, to let the user mark the function as non-deterministic and therefore avoid the optimizations which might lead to strange behaviors.

How was this patch tested?

Manual tests:

>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> df_br = spark.createDataFrame([{'name': 'hello'}])
>>> import random
>>> udf_random_col =  udf(lambda: int(100*random.random()), IntegerType()).asNondeterministic()
>>> df_br = df_br.withColumn('RAND', udf_random_col())
>>> random.seed(1234)
>>> udf_add_ten =  udf(lambda rand: rand + 10, IntegerType())
>>> df_br.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).show()
+-----+----+-------------+                                                      
| name|RAND|RAND_PLUS_TEN|
+-----+----+-------------+
|hello|   3|           13|
+-----+----+-------------+

@SparkQA
Copy link

SparkQA commented Dec 8, 2017

Test build #84660 has finished for PR 19929 at commit 6187d5a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

@gatorsmile sorry, I saw that you did the path for scala UDF. Might you help reviewing this please? Thanks.

@mgaido91
Copy link
Contributor Author

cc @cloud-fan @HyukjinKwon @zero323 maybe you can help too reviewing this, thanks.

@mgaido91
Copy link
Contributor Author

kindly ping @cloud-fan @gatorsmile @HyukjinKwon @zero323

@gatorsmile
Copy link
Member

gatorsmile commented Dec 22, 2017

We need test cases. Manual tests are not enough. Also update registerPython

I will try to review this tomorrow. Thanks!

@mgaido91
Copy link
Contributor Author

@gatorsmile I added the test, but I didn't get what needs to be updated in registerPython. May you explain me please? Thanks.

children: Seq[Expression],
evalType: Int)
evalType: Int,
udfDeterministic: Boolean = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it is not needed

@cloud-fan
Copy link
Contributor

cloud-fan commented Dec 22, 2017

UDFRegistration.registerPython needs a minor update for the log

@SparkQA
Copy link

SparkQA commented Dec 22, 2017

Test build #85309 has finished for PR 19929 at commit 187ff9a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

thank you @cloud-fan, changed.

@SparkQA
Copy link

SparkQA commented Dec 22, 2017

Test build #85316 has finished for PR 19929 at commit cc309b0.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

Jenkins, retest this please

random.seed(1234)
udf_add_ten = udf(lambda rand: rand + 10, IntegerType())
[row] = df.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).collect()
self.assertEqual(row[0] + 10, row[1])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compare the values, since you already set the seed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why, but setting the seed doesn't seem to take effect. I will remove setting the seed.


def asNondeterministic(self):
"""
Updates UserDefinedFunction to nondeterministic.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        """
        Updates UserDefinedFunction to nondeterministic.

        .. versionadded:: 2.3
        """

| envVars: ${udf.func.envVars}
| pythonIncludes: ${udf.func.pythonIncludes}
| pythonExec: ${udf.func.pythonExec}
| dataType: ${udf.dataType}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also print out pythonEvalType?



@since(1.3)
def udf(f=None, returnType=StringType()):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to just add a parameter for deterministic? Adding it to the end is OK to PySpark without breaking the existing app? cc @ueshin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed what was done for scala UDF, where this parameter is not added, but there is a method to add it. If we add a parameter here, I'd then suggest to add it also to the scala API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala and Python are different, because that is also for JAVA API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile, however, wouldn't it be better to keep them consistent if possible?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am saying this because I had few talks about this before and I am pretty sure we usually keep them as same whenever possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using asNondeterministic is not straightforward to users. In Scala sides, we have no choice for avoiding breaking the API. Anyway, I am fine to keep it as now

@SparkQA
Copy link

SparkQA commented Dec 23, 2017

Test build #85322 has finished for PR 19929 at commit cc309b0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 23, 2017

Test build #85337 has finished for PR 19929 at commit 462a92a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

duplicate invocations may be eliminated or the function may even be invoked more times than
it is present in the query.
it is present in the query. If your function is not deterministic, call
`asNondeterministic`.
Copy link
Member

@HyukjinKwon HyukjinKwon Dec 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say this more explicitly like .. call asNondeterministic() in the user-defined function. It's partly because I think UserDefinedFunction is not exposed in PySpark API doc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I think it is not clear here where to call asNondeterministic on. Maybe add a simple example.

import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.internal.Logging
import org.apache.spark.sql.api.java._
import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UDFRegistration's doc:

 * @note The user-defined functions must be deterministic.

Looks obsolete.

dataType: DataType,
pythonEvalType: Int) {
pythonEvalType: Int,
udfDeterministic: Boolean = true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we always pass in this parameter? Remove the default value?

"""Creates a user defined function (UDF).
.. note:: The user-defined functions must be deterministic. Due to optimization,
.. note:: The user-defined functions are considered deterministic. Due to optimization,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... are considered deterministic by default.

@SparkQA
Copy link

SparkQA commented Dec 24, 2017

Test build #85358 has finished for PR 19929 at commit a40ba73.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 24, 2017

Test build #85359 has finished for PR 19929 at commit 47801c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

gatorsmile commented Dec 25, 2017

@mgaido91
Copy link
Contributor Author

@gatorsmile, yes, the reason why seed doesn't work is in the way Python UDFs are executed, i.e. a new python process is created for each partition to evaluate the Python UDF. Thus the seed is set only on the driver, but not in the process where the UDF is executed. What I am saying can be easily confirmed by this:

>>> from pyspark.sql.functions import udf
>>> import os
>>> pid_udf = udf(lambda: str(os.getpid()))
>>> spark.range(2).select(pid_udf()).show()
+----------+                                                                    
|<lambda>()|
+----------+
|      4132|
|      4130|
+----------+
>>> os.getpid()
4070

Therefore there is no easy way to set the seed. If I set it inside the UDF, the UDF would become deterministic. Therefore I think that the best option is the current test.

@gatorsmile
Copy link
Member

Could you change the JIRA number to https://issues.apache.org/jira/browse/SPARK-22901 ?

@mgaido91 mgaido91 changed the title [SPARK-22629][PYTHON] Add deterministic flag to pyspark UDF [SPARK-22901][PYTHON] Add deterministic flag to pyspark UDF Dec 26, 2017
@mgaido91
Copy link
Contributor Author

@gatorsmile done, thanks!

@gatorsmile
Copy link
Member

LGTM

Thanks! Merged to master.

@asfgit asfgit closed this in ff48b1b Dec 26, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants