From 6187d5a0df7c409a49cd636eb74dea9323044c6b Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 8 Dec 2017 21:20:25 +0100 Subject: [PATCH 1/6] [SPARK-22629][PYTHON] Add deterministic flag to pyspark UDF --- python/pyspark/sql/functions.py | 5 +++-- python/pyspark/sql/udf.py | 11 ++++++++++- .../apache/spark/sql/execution/python/PythonUDF.scala | 5 ++++- .../execution/python/UserDefinedPythonFunction.scala | 5 +++-- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 4e0faddb1c0d..ec14cd1f3794 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2075,9 +2075,10 @@ class PandasUDFType(object): def udf(f=None, returnType=StringType()): """Creates a user defined function (UDF). - .. note:: The user-defined functions must be deterministic. Due to optimization, + .. note:: The user-defined functions are considered deterministic. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than - it is present in the query. + it is present in the query. If your function is not deterministic, call + `asNondeterministic`. .. note:: The user-defined functions do not support conditional expressions or short curcuiting in boolean expressions and it ends up with being executed all internally. If the functions diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index c3301a41ccd5..9d5d99568669 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -88,6 +88,7 @@ def __init__(self, func, func.__name__ if hasattr(func, '__name__') else func.__class__.__name__) self.evalType = evalType + self._deterministic = True @property def returnType(self): @@ -125,7 +126,7 @@ def _create_judf(self): wrapped_func = _wrap_function(sc, self.func, self.returnType) jdt = spark._jsparkSession.parseDataType(self.returnType.json()) judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction( - self._name, wrapped_func, jdt, self.evalType) + self._name, wrapped_func, jdt, self.evalType, self._deterministic) return judf def __call__(self, *cols): @@ -157,5 +158,13 @@ def wrapper(*args): wrapper.func = self.func wrapper.returnType = self.returnType wrapper.evalType = self.evalType + wrapper.asNondeterministic = self.asNondeterministic return wrapper + + def asNondeterministic(self): + """ + Updates UserDefinedFunction to nondeterministic. + """ + self._deterministic = False + return self diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDF.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDF.scala index ef27fbc2db7d..a633132b1371 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDF.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDF.scala @@ -29,9 +29,12 @@ case class PythonUDF( func: PythonFunction, dataType: DataType, children: Seq[Expression], - evalType: Int) + evalType: Int, + udfDeterministic: Boolean = true) extends Expression with Unevaluable with NonSQLExpression with UserDefinedExpression { + override lazy val deterministic = udfDeterministic && children.forall(_.deterministic) + override def toString: String = s"$name(${children.mkString(", ")})" override def nullable: Boolean = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala index 348e49e473ed..f9e10b4f4a86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala @@ -29,10 +29,11 @@ case class UserDefinedPythonFunction( name: String, func: PythonFunction, dataType: DataType, - pythonEvalType: Int) { + pythonEvalType: Int, + udfDeterministic: Boolean = true) { def builder(e: Seq[Expression]): PythonUDF = { - PythonUDF(name, func, dataType, e, pythonEvalType) + PythonUDF(name, func, dataType, e, pythonEvalType, udfDeterministic) } /** Returns a [[Column]] that will evaluate to calling this UDF with the given input. */ From 187ff9a22edecdca582893b2fd836a343972f68b Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 22 Dec 2017 12:50:33 +0100 Subject: [PATCH 2/6] add test --- python/pyspark/sql/tests.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b4d32d8de8a2..1f0460defff9 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -434,6 +434,16 @@ def test_udf_with_array_type(self): self.assertEqual(list(range(3)), l1) self.assertEqual(1, l2) + def test_nondeterministic_udf(self): + from pyspark.sql.functions import udf + import random + udf_random_col = udf(lambda: int(100 * random.random()), IntegerType()).asNondeterministic() + df = self.spark.createDataFrame([Row(1)]).select(udf_random_col().alias('RAND')) + random.seed(1234) + udf_add_ten = udf(lambda rand: rand + 10, IntegerType()) + [row] = df.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).collect() + self.assertEqual(row[0] + 10, row[1]) + def test_broadcast_in_udf(self): bar = {"a": "aa", "b": "bb", "c": "abc"} foo = self.sc.broadcast(bar) From cc309b0ce2496365afd8c602c282e3d84aeed940 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 22 Dec 2017 19:32:26 +0100 Subject: [PATCH 3/6] address reviews --- .../src/main/scala/org/apache/spark/sql/UDFRegistration.scala | 1 + .../org/apache/spark/sql/execution/python/PythonUDF.scala | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index 3ff476147b8b..d7d98a1ad3e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -58,6 +58,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends | pythonIncludes: ${udf.func.pythonIncludes} | pythonExec: ${udf.func.pythonExec} | dataType: ${udf.dataType} + | udfDeterministic: ${udf.udfDeterministic} """.stripMargin) functionRegistry.createOrReplaceTempFunction(name, udf.builder) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDF.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDF.scala index a633132b1371..d3f743d9eb61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDF.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDF.scala @@ -30,10 +30,10 @@ case class PythonUDF( dataType: DataType, children: Seq[Expression], evalType: Int, - udfDeterministic: Boolean = true) + udfDeterministic: Boolean) extends Expression with Unevaluable with NonSQLExpression with UserDefinedExpression { - override lazy val deterministic = udfDeterministic && children.forall(_.deterministic) + override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic) override def toString: String = s"$name(${children.mkString(", ")})" From 462a92a4237deb63d0b7128ff3585bb7595692fe Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sat, 23 Dec 2017 10:36:55 +0100 Subject: [PATCH 4/6] address review comments --- .../scala/org/apache/spark/api/python/PythonRunner.scala | 7 +++++++ python/pyspark/sql/tests.py | 1 - python/pyspark/sql/udf.py | 2 ++ .../main/scala/org/apache/spark/sql/UDFRegistration.scala | 2 ++ 4 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 93d508c28ebb..1ec0e717fac2 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -39,6 +39,13 @@ private[spark] object PythonEvalType { val SQL_PANDAS_SCALAR_UDF = 200 val SQL_PANDAS_GROUP_MAP_UDF = 201 + + def toString(pythonEvalType: Int): String = pythonEvalType match { + case NON_UDF => "NON_UDF" + case SQL_BATCHED_UDF => "SQL_BATCHED_UDF" + case SQL_PANDAS_SCALAR_UDF => "SQL_PANDAS_SCALAR_UDF" + case SQL_PANDAS_GROUP_MAP_UDF => "SQL_PANDAS_GROUP_MAP_UDF" + } } /** diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 1f0460defff9..d69c812db5c2 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -439,7 +439,6 @@ def test_nondeterministic_udf(self): import random udf_random_col = udf(lambda: int(100 * random.random()), IntegerType()).asNondeterministic() df = self.spark.createDataFrame([Row(1)]).select(udf_random_col().alias('RAND')) - random.seed(1234) udf_add_ten = udf(lambda rand: rand + 10, IntegerType()) [row] = df.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).collect() self.assertEqual(row[0] + 10, row[1]) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 9d5d99568669..bea3828049a2 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -165,6 +165,8 @@ def wrapper(*args): def asNondeterministic(self): """ Updates UserDefinedFunction to nondeterministic. + + .. versionadded:: 2.3 """ self._deterministic = False return self diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index d7d98a1ad3e4..aa36e70e4220 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -23,6 +23,7 @@ import scala.reflect.runtime.universe.TypeTag import scala.util.Try import org.apache.spark.annotation.InterfaceStability +import org.apache.spark.api.python.PythonEvalType import org.apache.spark.internal.Logging import org.apache.spark.sql.api.java._ import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection} @@ -58,6 +59,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends | pythonIncludes: ${udf.func.pythonIncludes} | pythonExec: ${udf.func.pythonExec} | dataType: ${udf.dataType} + | pythonEvalType: ${PythonEvalType.toString(udf.pythonEvalType)} | udfDeterministic: ${udf.udfDeterministic} """.stripMargin) From a40ba7384db1030b6facb14b741349da09562d1f Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sun, 24 Dec 2017 12:13:49 +0100 Subject: [PATCH 5/6] update docs according to comments --- python/pyspark/sql/functions.py | 12 ++++++++---- .../scala/org/apache/spark/sql/UDFRegistration.scala | 2 -- .../execution/python/UserDefinedPythonFunction.scala | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index ec14cd1f3794..3a347c30e57d 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2075,10 +2075,14 @@ class PandasUDFType(object): def udf(f=None, returnType=StringType()): """Creates a user defined function (UDF). - .. note:: The user-defined functions are considered deterministic. Due to optimization, - duplicate invocations may be eliminated or the function may even be invoked more times than - it is present in the query. If your function is not deterministic, call - `asNondeterministic`. + .. note:: The user-defined functions are considered deterministic by default. Due to + optimization, duplicate invocations may be eliminated or the function may even be invoked + more times than it is present in the query. If your function is not deterministic, call + `asNondeterministic` on the user defined function. E.g.: + + >>> from pyspark.sql.types import IntegerType + >>> import random + >>> random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic() .. note:: The user-defined functions do not support conditional expressions or short curcuiting in boolean expressions and it ends up with being executed all internally. If the functions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index aa36e70e4220..dc2468a721e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -42,8 +42,6 @@ import org.apache.spark.util.Utils * spark.udf * }}} * - * @note The user-defined functions must be deterministic. - * * @since 1.3.0 */ @InterfaceStability.Stable diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala index f9e10b4f4a86..50dca32cb786 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala @@ -30,7 +30,7 @@ case class UserDefinedPythonFunction( func: PythonFunction, dataType: DataType, pythonEvalType: Int, - udfDeterministic: Boolean = true) { + udfDeterministic: Boolean) { def builder(e: Seq[Expression]): PythonUDF = { PythonUDF(name, func, dataType, e, pythonEvalType, udfDeterministic) From 47801c7dc532aa9a19d59cdef1fe021c61a0b2c8 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sun, 24 Dec 2017 12:42:03 +0100 Subject: [PATCH 6/6] fix MyDummyPythonUDF --- .../spark/sql/execution/python/BatchEvalPythonExecSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 53d3f3456751..9e4a2e877695 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -109,4 +109,5 @@ class MyDummyPythonUDF extends UserDefinedPythonFunction( name = "dummyUDF", func = new DummyUDF, dataType = BooleanType, - pythonEvalType = PythonEvalType.SQL_BATCHED_UDF) + pythonEvalType = PythonEvalType.SQL_BATCHED_UDF, + udfDeterministic = true)