Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 16 additions & 19 deletions python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,25 +208,22 @@ def test_array_type_correct(self):
assert_frame_equal(expected, result)

def test_register_grouped_map_udf(self):
with self.quiet():
self.check_register_grouped_map_udf()

def check_register_grouped_map_udf(self):
foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP)

with self.assertRaises(PySparkTypeError) as pe:
self.spark.catalog.registerFunction("foo_udf", foo_udf)

self.check_error(
exception=pe.exception,
errorClass="INVALID_UDF_EVAL_TYPE",
messageParameters={
"eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, "
"SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, "
"SQL_SCALAR_PANDAS_ITER_UDF, SQL_SCALAR_ARROW_ITER_UDF, "
"SQL_GROUPED_AGG_PANDAS_UDF or SQL_GROUPED_AGG_ARROW_UDF"
},
)
with self.quiet(), self.temp_func("foo_udf"):
foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP)

with self.assertRaises(PySparkTypeError) as pe:
self.spark.catalog.registerFunction("foo_udf", foo_udf)

self.check_error(
exception=pe.exception,
errorClass="INVALID_UDF_EVAL_TYPE",
messageParameters={
"eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, "
"SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, "
"SQL_SCALAR_PANDAS_ITER_UDF, SQL_SCALAR_ARROW_ITER_UDF, "
"SQL_GROUPED_AGG_PANDAS_UDF or SQL_GROUPED_AGG_ARROW_UDF"
},
)

def test_decorator(self):
df = self.data
Expand Down
38 changes: 22 additions & 16 deletions python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,17 +522,23 @@ def check_invalid_args(self):
df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()

def test_register_vectorized_udf_basic(self):
sum_pandas_udf = pandas_udf(
lambda v: v.sum(), "integer", PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
)
with self.temp_func("sum_pandas_udf"):
sum_pandas_udf = pandas_udf(
lambda v: v.sum(), "integer", PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
)

self.assertEqual(sum_pandas_udf.evalType, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
group_agg_pandas_udf = self.spark.udf.register("sum_pandas_udf", sum_pandas_udf)
self.assertEqual(group_agg_pandas_udf.evalType, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
q = "SELECT sum_pandas_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
actual = sorted(map(lambda r: r[0], self.spark.sql(q).collect()))
expected = [1, 5]
self.assertEqual(actual, expected)
self.assertEqual(sum_pandas_udf.evalType, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
group_agg_pandas_udf = self.spark.udf.register("sum_pandas_udf", sum_pandas_udf)
self.assertEqual(
group_agg_pandas_udf.evalType, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
)
q = """
SELECT sum_pandas_udf(v1)
FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2
"""
actual = sorted(map(lambda r: r[0], self.spark.sql(q).collect()))
expected = [1, 5]
self.assertEqual(actual, expected)

def test_grouped_with_empty_partition(self):
data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)]
Expand All @@ -551,10 +557,10 @@ def max_udf(v):
return v.max()

df = self.spark.range(0, 100)
self.spark.udf.register("max_udf", max_udf)

with self.tempView("table"):
with self.tempView("table"), self.temp_func("max_udf"):
df.createTempView("table")
self.spark.udf.register("max_udf", max_udf)

agg1 = df.agg(max_udf(df["id"]))
agg2 = self.spark.sql("select max_udf(id) from table")
Expand All @@ -579,7 +585,7 @@ def test_named_arguments(self):
df = self.data
weighted_mean = self.pandas_agg_weighted_mean_udf

with self.tempView("v"):
with self.tempView("v"), self.temp_func("weighted_mean"):
df.createOrReplaceTempView("v")
self.spark.udf.register("weighted_mean", weighted_mean)

Expand All @@ -604,7 +610,7 @@ def test_named_arguments_negative(self):
df = self.data
weighted_mean = self.pandas_agg_weighted_mean_udf

with self.tempView("v"):
with self.tempView("v"), self.temp_func("weighted_mean"):
df.createOrReplaceTempView("v")
self.spark.udf.register("weighted_mean", weighted_mean)

Expand Down Expand Up @@ -644,7 +650,7 @@ def weighted_mean(**kwargs):

return np.average(kwargs["v"], weights=kwargs["w"])

with self.tempView("v"):
with self.tempView("v"), self.temp_func("weighted_mean"):
df.createOrReplaceTempView("v")
self.spark.udf.register("weighted_mean", weighted_mean)

Expand Down Expand Up @@ -684,7 +690,7 @@ def test_named_arguments_and_defaults(self):
def biased_sum(v, w=None):
return v.sum() + (w.sum() if w is not None else 100)

with self.tempView("v"):
with self.tempView("v"), self.temp_func("biased_sum"):
df.createOrReplaceTempView("v")
self.spark.udf.register("biased_sum", biased_sum)

Expand Down
182 changes: 97 additions & 85 deletions python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,17 @@ def test_register_nondeterministic_vectorized_udf_basic(self):
).asNondeterministic()
self.assertEqual(random_pandas_udf.deterministic, False)
self.assertEqual(random_pandas_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
nondeterministic_pandas_udf = self.spark.catalog.registerFunction(
"randomPandasUDF", random_pandas_udf
)
self.assertEqual(nondeterministic_pandas_udf.deterministic, False)
self.assertEqual(nondeterministic_pandas_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
[row] = self.spark.sql("SELECT randomPandasUDF(1)").collect()
self.assertEqual(row[0], 7)

with self.temp_func("randomPandasUDF"):
nondeterministic_pandas_udf = self.spark.catalog.registerFunction(
"randomPandasUDF", random_pandas_udf
)
self.assertEqual(nondeterministic_pandas_udf.deterministic, False)
self.assertEqual(
nondeterministic_pandas_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF
)
[row] = self.spark.sql("SELECT randomPandasUDF(1)").collect()
self.assertEqual(row[0], 7)

def random_iter_udf(it):
for i in it:
Expand All @@ -310,15 +314,17 @@ def random_iter_udf(it):
).asNondeterministic()
self.assertEqual(random_pandas_iter_udf.deterministic, False)
self.assertEqual(random_pandas_iter_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF)
nondeterministic_pandas_iter_udf = self.spark.catalog.registerFunction(
"randomPandasIterUDF", random_pandas_iter_udf
)
self.assertEqual(nondeterministic_pandas_iter_udf.deterministic, False)
self.assertEqual(
nondeterministic_pandas_iter_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF
)
[row] = self.spark.sql("SELECT randomPandasIterUDF(1)").collect()
self.assertEqual(row[0], 7)

with self.temp_func("randomPandasIterUDF"):
nondeterministic_pandas_iter_udf = self.spark.catalog.registerFunction(
"randomPandasIterUDF", random_pandas_iter_udf
)
self.assertEqual(nondeterministic_pandas_iter_udf.deterministic, False)
self.assertEqual(
nondeterministic_pandas_iter_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF
)
[row] = self.spark.sql("SELECT randomPandasIterUDF(1)").collect()
self.assertEqual(row[0], 7)

def test_vectorized_udf_null_boolean(self):
data = [(True,), (True,), (None,), (False,)]
Expand Down Expand Up @@ -1397,14 +1403,16 @@ def iter_original_add(it):

for original_add in [scalar_original_add, iter_original_add]:
self.assertEqual(original_add.deterministic, True)
new_add = self.spark.catalog.registerFunction("add1", original_add)
res1 = df.select(new_add(col("a"), col("b")))
res2 = self.spark.sql(
"SELECT add1(t.a, t.b) FROM (SELECT id as a, id as b FROM range(10)) t"
)
expected = df.select(expr("a + b"))
self.assertEqual(expected.collect(), res1.collect())
self.assertEqual(expected.collect(), res2.collect())

with self.temp_func("add1"):
new_add = self.spark.catalog.registerFunction("add1", original_add)
res1 = df.select(new_add(col("a"), col("b")))
res2 = self.spark.sql(
"SELECT add1(t.a, t.b) FROM (SELECT id as a, id as b FROM range(10)) t"
)
expected = df.select(expr("a + b"))
self.assertEqual(expected.collect(), res1.collect())
self.assertEqual(expected.collect(), res2.collect())

def test_scalar_iter_udf_init(self):
import numpy as np
Expand Down Expand Up @@ -1788,92 +1796,96 @@ def test_named_arguments(self):
def test_udf(a, b):
return a + 10 * b

self.spark.udf.register("test_udf", test_udf)
with self.temp_func("test_udf"):
self.spark.udf.register("test_udf", test_udf)

for i, df in enumerate(
[
self.spark.range(2).select(test_udf(col("id"), b=col("id") * 10)),
self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 10)),
self.spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))),
self.spark.sql("SELECT test_udf(id, b => id * 10) FROM range(2)"),
self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM range(2)"),
self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)"),
]
):
with self.subTest(query_no=i):
assertDataFrameEqual(df, [Row(0), Row(101)])
for i, df in enumerate(
[
self.spark.range(2).select(test_udf(col("id"), b=col("id") * 10)),
self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 10)),
self.spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))),
self.spark.sql("SELECT test_udf(id, b => id * 10) FROM range(2)"),
self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM range(2)"),
self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)"),
]
):
with self.subTest(query_no=i):
assertDataFrameEqual(df, [Row(0), Row(101)])

def test_named_arguments_negative(self):
@pandas_udf("int")
def test_udf(a, b):
return a + b

self.spark.udf.register("test_udf", test_udf)
with self.temp_func("test_udf"):
self.spark.udf.register("test_udf", test_udf)

with self.assertRaisesRegex(
AnalysisException,
"DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT.DOUBLE_NAMED_ARGUMENT_REFERENCE",
):
self.spark.sql("SELECT test_udf(a => id, a => id * 10) FROM range(2)").show()
with self.assertRaisesRegex(
AnalysisException,
"DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT.DOUBLE_NAMED_ARGUMENT_REFERENCE",
):
self.spark.sql("SELECT test_udf(a => id, a => id * 10) FROM range(2)").show()

with self.assertRaisesRegex(AnalysisException, "UNEXPECTED_POSITIONAL_ARGUMENT"):
self.spark.sql("SELECT test_udf(a => id, id * 10) FROM range(2)").show()
with self.assertRaisesRegex(AnalysisException, "UNEXPECTED_POSITIONAL_ARGUMENT"):
self.spark.sql("SELECT test_udf(a => id, id * 10) FROM range(2)").show()

with self.assertRaisesRegex(
PythonException, r"test_udf\(\) got an unexpected keyword argument 'c'"
):
self.spark.sql("SELECT test_udf(c => 'x') FROM range(2)").show()
with self.assertRaisesRegex(
PythonException, r"test_udf\(\) got an unexpected keyword argument 'c'"
):
self.spark.sql("SELECT test_udf(c => 'x') FROM range(2)").show()

def test_kwargs(self):
@pandas_udf("int")
def test_udf(a, **kwargs):
return a + 10 * kwargs["b"]

self.spark.udf.register("test_udf", test_udf)
with self.temp_func("test_udf"):
self.spark.udf.register("test_udf", test_udf)

for i, df in enumerate(
[
self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 10)),
self.spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))),
self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM range(2)"),
self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)"),
]
):
with self.subTest(query_no=i):
assertDataFrameEqual(df, [Row(0), Row(101)])
for i, df in enumerate(
[
self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 10)),
self.spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))),
self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM range(2)"),
self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)"),
]
):
with self.subTest(query_no=i):
assertDataFrameEqual(df, [Row(0), Row(101)])

def test_named_arguments_and_defaults(self):
@pandas_udf("int")
def test_udf(a, b=0):
return a + 10 * b

self.spark.udf.register("test_udf", test_udf)
with self.temp_func("test_udf"):
self.spark.udf.register("test_udf", test_udf)

# without "b"
for i, df in enumerate(
[
self.spark.range(2).select(test_udf(col("id"))),
self.spark.range(2).select(test_udf(a=col("id"))),
self.spark.sql("SELECT test_udf(id) FROM range(2)"),
self.spark.sql("SELECT test_udf(a => id) FROM range(2)"),
]
):
with self.subTest(with_b=False, query_no=i):
assertDataFrameEqual(df, [Row(0), Row(1)])
# without "b"
for i, df in enumerate(
[
self.spark.range(2).select(test_udf(col("id"))),
self.spark.range(2).select(test_udf(a=col("id"))),
self.spark.sql("SELECT test_udf(id) FROM range(2)"),
self.spark.sql("SELECT test_udf(a => id) FROM range(2)"),
]
):
with self.subTest(with_b=False, query_no=i):
assertDataFrameEqual(df, [Row(0), Row(1)])

# with "b"
for i, df in enumerate(
[
self.spark.range(2).select(test_udf(col("id"), b=col("id") * 10)),
self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 10)),
self.spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))),
self.spark.sql("SELECT test_udf(id, b => id * 10) FROM range(2)"),
self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM range(2)"),
self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)"),
]
):
with self.subTest(with_b=True, query_no=i):
assertDataFrameEqual(df, [Row(0), Row(101)])
# with "b"
for i, df in enumerate(
[
self.spark.range(2).select(test_udf(col("id"), b=col("id") * 10)),
self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 10)),
self.spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))),
self.spark.sql("SELECT test_udf(id, b => id * 10) FROM range(2)"),
self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM range(2)"),
self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)"),
]
):
with self.subTest(with_b=True, query_no=i):
assertDataFrameEqual(df, [Row(0), Row(101)])

def test_arrow_cast_enabled_numeric_to_decimal(self):
import numpy as np
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/tests/pandas/test_pandas_udf_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ def test_named_arguments(self):
with self.subTest(bound=bound, query_no=i):
assertDataFrameEqual(windowed, df.withColumn("wm", sf.mean(df.v).over(w)))

with self.tempView("v"):
with self.tempView("v"), self.temp_func("weighted_mean"):
df.createOrReplaceTempView("v")
self.spark.udf.register("weighted_mean", weighted_mean)

Expand Down Expand Up @@ -436,7 +436,7 @@ def test_named_arguments_negative(self):
df = self.data
weighted_mean = self.pandas_agg_weighted_mean_udf

with self.tempView("v"):
with self.tempView("v"), self.temp_func("weighted_mean"):
df.createOrReplaceTempView("v")
self.spark.udf.register("weighted_mean", weighted_mean)

Expand Down Expand Up @@ -504,7 +504,7 @@ def weighted_mean(**kwargs):
with self.subTest(bound=bound, query_no=i):
assertDataFrameEqual(windowed, df.withColumn("wm", sf.mean(df.v).over(w)))

with self.tempView("v"):
with self.tempView("v"), self.temp_func("weighted_mean"):
df.createOrReplaceTempView("v")
self.spark.udf.register("weighted_mean", weighted_mean)

Expand Down
Loading