From db65be567e8daeec7ee1323141629ed2c5ac0ce8 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 16 Jun 2016 12:07:51 -0700 Subject: [PATCH 1/2] fix the Python UDF in Scala 2.10 --- .../apache/spark/sql/execution/python/ExtractPythonUDFs.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 668470ee6a29..da3087f48735 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -132,7 +132,7 @@ private[spark] object ExtractPythonUDFs extends Rule[SparkPlan] { val validUdfs = udfs.filter { case udf => // Check to make sure that the UDF can be evaluated with only the input of this child. udf.references.subsetOf(child.outputSet) - } + }.toArray // force the iterator to an array, that can't be serialized in Scala 2.10 if (validUdfs.nonEmpty) { val resultAttrs = udfs.zipWithIndex.map { case (u, i) => AttributeReference(s"pythonUDF$i", u.dataType)() From 06e92654a127b02754c6d707bf4d0dee99e1652b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 16 Jun 2016 22:23:56 -0700 Subject: [PATCH 2/2] Update ExtractPythonUDFs.scala --- .../apache/spark/sql/execution/python/ExtractPythonUDFs.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index da3087f48735..87583c82347e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -132,7 +132,7 @@ private[spark] object ExtractPythonUDFs extends Rule[SparkPlan] { val validUdfs = udfs.filter { case udf => // Check to make sure that the UDF can be evaluated with only the input of this child. udf.references.subsetOf(child.outputSet) - }.toArray // force the iterator to an array, that can't be serialized in Scala 2.10 + }.toArray // Turn it into an array since iterators cannot be serialized in Scala 2.10 if (validUdfs.nonEmpty) { val resultAttrs = udfs.zipWithIndex.map { case (u, i) => AttributeReference(s"pythonUDF$i", u.dataType)()