diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 136f423d0a35..776d5da88bb2 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -19,11 +19,13 @@ import unittest import os import sys +import warnings from io import StringIO +from typing import Iterator from pyspark import SparkConf from pyspark.sql import SparkSession -from pyspark.sql.functions import udf +from pyspark.sql.functions import udf, pandas_udf from pyspark.profiler import UDFBasicProfiler @@ -101,6 +103,47 @@ def add2(x): df = self.spark.range(10) df.select(add1("id"), add2("id"), add1("id")).collect() + # Unsupported + def exec_pandas_udf_iter_to_iter(self): + import pandas as pd + + @pandas_udf("int") + def iter_to_iter(batch_ser: Iterator[pd.Series]) -> Iterator[pd.Series]: + for ser in batch_ser: + yield ser + 1 + + self.spark.range(10).select(iter_to_iter("id")).collect() + + # Unsupported + def exec_map(self): + import pandas as pd + + def map(pdfs: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]: + for pdf in pdfs: + yield pdf[pdf.id == 1] + + df = self.spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0)], ("id", "v")) + df.mapInPandas(map, schema=df.schema).collect() + + def test_unsupported(self): + with warnings.catch_warnings(record=True) as warns: + warnings.simplefilter("always") + self.exec_pandas_udf_iter_to_iter() + user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)] + self.assertTrue(len(user_warns) > 0) + self.assertTrue( + "Profiling UDFs with iterators input/output is not supported" in str(user_warns[0]) + ) + + with warnings.catch_warnings(record=True) as warns: + warnings.simplefilter("always") + self.exec_map() + user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)] + self.assertTrue(len(user_warns) > 0) + self.assertTrue( + "Profiling UDFs with iterators input/output is not supported" in str(user_warns[0]) + ) + if __name__ == "__main__": from pyspark.sql.tests.test_udf_profiler import * # noqa: F401 diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 16605bc12acc..ca38556431ad 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -28,7 +28,6 @@ from py4j.java_gateway import JavaObject from pyspark import SparkContext -from pyspark.profiler import Profiler from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType from pyspark.sql.column import Column, _to_java_expr, _to_seq from pyspark.sql.types import ( @@ -403,24 +402,24 @@ def __call__(self, *args: "ColumnOrName", **kwargs: "ColumnOrName") -> Column: for key, value in kwargs.items() ] - profiler: Optional[Profiler] = None - memory_profiler: Optional[Profiler] = None - if sc.profiler_collector: - profiler_enabled = sc._conf.get("spark.python.profile", "false") == "true" - memory_profiler_enabled = sc._conf.get("spark.python.profile.memory", "false") == "true" + profiler_enabled = sc._conf.get("spark.python.profile", "false") == "true" + memory_profiler_enabled = sc._conf.get("spark.python.profile.memory", "false") == "true" + if profiler_enabled or memory_profiler_enabled: # Disable profiling Pandas UDFs with iterators as input/output. - if profiler_enabled or memory_profiler_enabled: - if self.evalType in [ - PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, - PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, - PythonEvalType.SQL_MAP_ARROW_ITER_UDF, - ]: - profiler_enabled = memory_profiler_enabled = False - warnings.warn( - "Profiling UDFs with iterators input/output is not supported.", - UserWarning, - ) + if self.evalType in [ + PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, + PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, + PythonEvalType.SQL_MAP_ARROW_ITER_UDF, + ]: + warnings.warn( + "Profiling UDFs with iterators input/output is not supported.", + UserWarning, + ) + judf = self._judf + jUDFExpr = judf.builder(_to_seq(sc, jexprs)) + jPythonUDF = judf.fromUDFExpr(jUDFExpr) + return Column(jPythonUDF) # Disallow enabling two profilers at the same time. if profiler_enabled and memory_profiler_enabled: