Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2075,9 +2075,10 @@ class PandasUDFType(object):
def udf(f=None, returnType=StringType()):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to just add a parameter for deterministic? Adding it to the end is OK to PySpark without breaking the existing app? cc @ueshin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed what was done for scala UDF, where this parameter is not added, but there is a method to add it. If we add a parameter here, I'd then suggest to add it also to the scala API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala and Python are different, because that is also for JAVA API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile, however, wouldn't it be better to keep them consistent if possible?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am saying this because I had few talks about this before and I am pretty sure we usually keep them as same whenever possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using asNondeterministic is not straightforward to users. In Scala sides, we have no choice for avoiding breaking the API. Anyway, I am fine to keep it as now

"""Creates a user defined function (UDF).

.. note:: The user-defined functions must be deterministic. Due to optimization,
.. note:: The user-defined functions are considered deterministic. Due to optimization,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... are considered deterministic by default.

duplicate invocations may be eliminated or the function may even be invoked more times than
it is present in the query.
it is present in the query. If your function is not deterministic, call
`asNondeterministic`.
Copy link
Member

@HyukjinKwon HyukjinKwon Dec 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say this more explicitly like .. call asNondeterministic() in the user-defined function. It's partly because I think UserDefinedFunction is not exposed in PySpark API doc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I think it is not clear here where to call asNondeterministic on. Maybe add a simple example.


.. note:: The user-defined functions do not support conditional expressions or short curcuiting
in boolean expressions and it ends up with being executed all internally. If the functions
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,16 @@ def test_udf_with_array_type(self):
self.assertEqual(list(range(3)), l1)
self.assertEqual(1, l2)

def test_nondeterministic_udf(self):
from pyspark.sql.functions import udf
import random
udf_random_col = udf(lambda: int(100 * random.random()), IntegerType()).asNondeterministic()
df = self.spark.createDataFrame([Row(1)]).select(udf_random_col().alias('RAND'))
random.seed(1234)
udf_add_ten = udf(lambda rand: rand + 10, IntegerType())
[row] = df.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).collect()
self.assertEqual(row[0] + 10, row[1])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compare the values, since you already set the seed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why, but setting the seed doesn't seem to take effect. I will remove setting the seed.


def test_broadcast_in_udf(self):
bar = {"a": "aa", "b": "bb", "c": "abc"}
foo = self.sc.broadcast(bar)
Expand Down
11 changes: 10 additions & 1 deletion python/pyspark/sql/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(self, func,
func.__name__ if hasattr(func, '__name__')
else func.__class__.__name__)
self.evalType = evalType
self._deterministic = True

@property
def returnType(self):
Expand Down Expand Up @@ -125,7 +126,7 @@ def _create_judf(self):
wrapped_func = _wrap_function(sc, self.func, self.returnType)
jdt = spark._jsparkSession.parseDataType(self.returnType.json())
judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
self._name, wrapped_func, jdt, self.evalType)
self._name, wrapped_func, jdt, self.evalType, self._deterministic)
return judf

def __call__(self, *cols):
Expand Down Expand Up @@ -157,5 +158,13 @@ def wrapper(*args):
wrapper.func = self.func
wrapper.returnType = self.returnType
wrapper.evalType = self.evalType
wrapper.asNondeterministic = self.asNondeterministic

return wrapper

def asNondeterministic(self):
"""
Updates UserDefinedFunction to nondeterministic.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        """
        Updates UserDefinedFunction to nondeterministic.

        .. versionadded:: 2.3
        """

"""
self._deterministic = False
return self
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
| pythonIncludes: ${udf.func.pythonIncludes}
| pythonExec: ${udf.func.pythonExec}
| dataType: ${udf.dataType}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also print out pythonEvalType?

| udfDeterministic: ${udf.udfDeterministic}
""".stripMargin)

functionRegistry.createOrReplaceTempFunction(name, udf.builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ case class PythonUDF(
func: PythonFunction,
dataType: DataType,
children: Seq[Expression],
evalType: Int)
evalType: Int,
udfDeterministic: Boolean)
extends Expression with Unevaluable with NonSQLExpression with UserDefinedExpression {

override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic)

override def toString: String = s"$name(${children.mkString(", ")})"

override def nullable: Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ case class UserDefinedPythonFunction(
name: String,
func: PythonFunction,
dataType: DataType,
pythonEvalType: Int) {
pythonEvalType: Int,
udfDeterministic: Boolean = true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we always pass in this parameter? Remove the default value?


def builder(e: Seq[Expression]): PythonUDF = {
PythonUDF(name, func, dataType, e, pythonEvalType)
PythonUDF(name, func, dataType, e, pythonEvalType, udfDeterministic)
}

/** Returns a [[Column]] that will evaluate to calling this UDF with the given input. */
Expand Down