From 55a6cc4549b19aa92099e70feb66c2948dddf038 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Tue, 9 Jan 2024 17:25:16 -0800 Subject: [PATCH 1/6] fix --- python/pyspark/sql/udf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 16605bc12acc..6be698754078 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -448,7 +448,7 @@ def func(*args: Any, **kwargs: Any) -> Any: jPythonUDF = judf.fromUDFExpr(jUDFExpr) id = jUDFExpr.resultId().id() sc.profiler_collector.add_profiler(id, profiler) - else: # memory_profiler_enabled + elif memory_profiler_enabled: f = self.func memory_profiler = sc.profiler_collector.new_memory_profiler(sc) (sub_lines, start_line) = inspect.getsourcelines(f.__code__) From d412cf74e25e727d9297b7372636ea36eb69dd0d Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 10 Jan 2024 13:54:31 -0800 Subject: [PATCH 2/6] fix --- python/pyspark/sql/udf.py | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 6be698754078..0181deb25c3f 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -403,24 +403,25 @@ def __call__(self, *args: "ColumnOrName", **kwargs: "ColumnOrName") -> Column: for key, value in kwargs.items() ] - profiler: Optional[Profiler] = None - memory_profiler: Optional[Profiler] = None - if sc.profiler_collector: - profiler_enabled = sc._conf.get("spark.python.profile", "false") == "true" - memory_profiler_enabled = sc._conf.get("spark.python.profile.memory", "false") == "true" - - # Disable profiling Pandas UDFs with iterators as input/output. - if profiler_enabled or memory_profiler_enabled: - if self.evalType in [ - PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, - PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, - PythonEvalType.SQL_MAP_ARROW_ITER_UDF, - ]: - profiler_enabled = memory_profiler_enabled = False - warnings.warn( - "Profiling UDFs with iterators input/output is not supported.", - UserWarning, - ) + profiler_enabled = sc._conf.get("spark.python.profile", "false") == "true" + memory_profiler_enabled = sc._conf.get("spark.python.profile.memory", "false") == "true" + + if profiler_enabled or memory_profiler_enabled: + if self.evalType in [ + PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, + PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, + PythonEvalType.SQL_MAP_ARROW_ITER_UDF, + ]: + # Disable profiling Pandas UDFs with iterators as input/output. + profiler_enabled = memory_profiler_enabled = False + warnings.warn( + "Profiling UDFs with iterators input/output is not supported.", + UserWarning, + ) + judf = self._judf + jUDFExpr = judf.builder(_to_seq(sc, jexprs)) + jPythonUDF = judf.fromUDFExpr(jUDFExpr) + return Column(jPythonUDF) # Disallow enabling two profilers at the same time. if profiler_enabled and memory_profiler_enabled: @@ -448,7 +449,7 @@ def func(*args: Any, **kwargs: Any) -> Any: jPythonUDF = judf.fromUDFExpr(jUDFExpr) id = jUDFExpr.resultId().id() sc.profiler_collector.add_profiler(id, profiler) - elif memory_profiler_enabled: + else: # memory_profiler_enabled f = self.func memory_profiler = sc.profiler_collector.new_memory_profiler(sc) (sub_lines, start_line) = inspect.getsourcelines(f.__code__) From 2012eb440c52c6b36209b2e5ef315c971bcbe214 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 10 Jan 2024 13:55:20 -0800 Subject: [PATCH 3/6] comment --- python/pyspark/sql/udf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 0181deb25c3f..195a422ec204 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -407,12 +407,12 @@ def __call__(self, *args: "ColumnOrName", **kwargs: "ColumnOrName") -> Column: memory_profiler_enabled = sc._conf.get("spark.python.profile.memory", "false") == "true" if profiler_enabled or memory_profiler_enabled: + # Disable profiling Pandas UDFs with iterators as input/output. if self.evalType in [ PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, PythonEvalType.SQL_MAP_ARROW_ITER_UDF, ]: - # Disable profiling Pandas UDFs with iterators as input/output. profiler_enabled = memory_profiler_enabled = False warnings.warn( "Profiling UDFs with iterators input/output is not supported.", From ecc38ac99bfb891bf8ae5bbaffba3222dcdac968 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 10 Jan 2024 13:56:04 -0800 Subject: [PATCH 4/6] rmv --- python/pyspark/sql/udf.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 195a422ec204..4d4a9e3ca7f7 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -413,7 +413,6 @@ def __call__(self, *args: "ColumnOrName", **kwargs: "ColumnOrName") -> Column: PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, PythonEvalType.SQL_MAP_ARROW_ITER_UDF, ]: - profiler_enabled = memory_profiler_enabled = False warnings.warn( "Profiling UDFs with iterators input/output is not supported.", UserWarning, From 7ec983fd1bae5e3f3e91f0236300d9e301534dc0 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 10 Jan 2024 17:32:46 -0800 Subject: [PATCH 5/6] rmv import --- python/pyspark/sql/udf.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 4d4a9e3ca7f7..ca38556431ad 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -28,7 +28,6 @@ from py4j.java_gateway import JavaObject from pyspark import SparkContext -from pyspark.profiler import Profiler from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType from pyspark.sql.column import Column, _to_java_expr, _to_seq from pyspark.sql.types import ( From a0ea1d4b1f59197cb97974971cb6802ec3cd792b Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Thu, 11 Jan 2024 17:26:44 -0800 Subject: [PATCH 6/6] test --- python/pyspark/sql/tests/test_udf_profiler.py | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 136f423d0a35..776d5da88bb2 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -19,11 +19,13 @@ import unittest import os import sys +import warnings from io import StringIO +from typing import Iterator from pyspark import SparkConf from pyspark.sql import SparkSession -from pyspark.sql.functions import udf +from pyspark.sql.functions import udf, pandas_udf from pyspark.profiler import UDFBasicProfiler @@ -101,6 +103,47 @@ def add2(x): df = self.spark.range(10) df.select(add1("id"), add2("id"), add1("id")).collect() + # Unsupported + def exec_pandas_udf_iter_to_iter(self): + import pandas as pd + + @pandas_udf("int") + def iter_to_iter(batch_ser: Iterator[pd.Series]) -> Iterator[pd.Series]: + for ser in batch_ser: + yield ser + 1 + + self.spark.range(10).select(iter_to_iter("id")).collect() + + # Unsupported + def exec_map(self): + import pandas as pd + + def map(pdfs: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]: + for pdf in pdfs: + yield pdf[pdf.id == 1] + + df = self.spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0)], ("id", "v")) + df.mapInPandas(map, schema=df.schema).collect() + + def test_unsupported(self): + with warnings.catch_warnings(record=True) as warns: + warnings.simplefilter("always") + self.exec_pandas_udf_iter_to_iter() + user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)] + self.assertTrue(len(user_warns) > 0) + self.assertTrue( + "Profiling UDFs with iterators input/output is not supported" in str(user_warns[0]) + ) + + with warnings.catch_warnings(record=True) as warns: + warnings.simplefilter("always") + self.exec_map() + user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)] + self.assertTrue(len(user_warns) > 0) + self.assertTrue( + "Profiling UDFs with iterators input/output is not supported" in str(user_warns[0]) + ) + if __name__ == "__main__": from pyspark.sql.tests.test_udf_profiler import * # noqa: F401