Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion python/pyspark/sql/tests/test_udf_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import unittest
import os
import sys
import warnings
from io import StringIO
from typing import Iterator

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import udf, pandas_udf
from pyspark.profiler import UDFBasicProfiler


Expand Down Expand Up @@ -101,6 +103,47 @@ def add2(x):
df = self.spark.range(10)
df.select(add1("id"), add2("id"), add1("id")).collect()

# Unsupported
def exec_pandas_udf_iter_to_iter(self):
import pandas as pd

@pandas_udf("int")
def iter_to_iter(batch_ser: Iterator[pd.Series]) -> Iterator[pd.Series]:
for ser in batch_ser:
yield ser + 1

self.spark.range(10).select(iter_to_iter("id")).collect()

# Unsupported
def exec_map(self):
import pandas as pd

def map(pdfs: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for pdf in pdfs:
yield pdf[pdf.id == 1]

df = self.spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0)], ("id", "v"))
df.mapInPandas(map, schema=df.schema).collect()

def test_unsupported(self):
with warnings.catch_warnings(record=True) as warns:
warnings.simplefilter("always")
self.exec_pandas_udf_iter_to_iter()
user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)]
self.assertTrue(len(user_warns) > 0)
self.assertTrue(
"Profiling UDFs with iterators input/output is not supported" in str(user_warns[0])
)

with warnings.catch_warnings(record=True) as warns:
warnings.simplefilter("always")
self.exec_map()
user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)]
self.assertTrue(len(user_warns) > 0)
self.assertTrue(
"Profiling UDFs with iterators input/output is not supported" in str(user_warns[0])
)


if __name__ == "__main__":
from pyspark.sql.tests.test_udf_profiler import * # noqa: F401
Expand Down
33 changes: 16 additions & 17 deletions python/pyspark/sql/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from py4j.java_gateway import JavaObject

from pyspark import SparkContext
from pyspark.profiler import Profiler
from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType
from pyspark.sql.column import Column, _to_java_expr, _to_seq
from pyspark.sql.types import (
Expand Down Expand Up @@ -403,24 +402,24 @@ def __call__(self, *args: "ColumnOrName", **kwargs: "ColumnOrName") -> Column:
for key, value in kwargs.items()
]

profiler: Optional[Profiler] = None
memory_profiler: Optional[Profiler] = None
if sc.profiler_collector:
profiler_enabled = sc._conf.get("spark.python.profile", "false") == "true"
memory_profiler_enabled = sc._conf.get("spark.python.profile.memory", "false") == "true"
profiler_enabled = sc._conf.get("spark.python.profile", "false") == "true"
memory_profiler_enabled = sc._conf.get("spark.python.profile.memory", "false") == "true"

if profiler_enabled or memory_profiler_enabled:
# Disable profiling Pandas UDFs with iterators as input/output.
if profiler_enabled or memory_profiler_enabled:
if self.evalType in [
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
]:
profiler_enabled = memory_profiler_enabled = False
warnings.warn(
"Profiling UDFs with iterators input/output is not supported.",
UserWarning,
)
if self.evalType in [
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
]:
warnings.warn(
"Profiling UDFs with iterators input/output is not supported.",
UserWarning,
)
judf = self._judf
jUDFExpr = judf.builder(_to_seq(sc, jexprs))
jPythonUDF = judf.fromUDFExpr(jUDFExpr)
return Column(jPythonUDF)

# Disallow enabling two profilers at the same time.
if profiler_enabled and memory_profiler_enabled:
Expand Down