diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py index fb81cd772777..4c52303481fa 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py @@ -208,25 +208,22 @@ def test_array_type_correct(self): assert_frame_equal(expected, result) def test_register_grouped_map_udf(self): - with self.quiet(): - self.check_register_grouped_map_udf() - - def check_register_grouped_map_udf(self): - foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP) - - with self.assertRaises(PySparkTypeError) as pe: - self.spark.catalog.registerFunction("foo_udf", foo_udf) - - self.check_error( - exception=pe.exception, - errorClass="INVALID_UDF_EVAL_TYPE", - messageParameters={ - "eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, " - "SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, " - "SQL_SCALAR_PANDAS_ITER_UDF, SQL_SCALAR_ARROW_ITER_UDF, " - "SQL_GROUPED_AGG_PANDAS_UDF or SQL_GROUPED_AGG_ARROW_UDF" - }, - ) + with self.quiet(), self.temp_func("foo_udf"): + foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP) + + with self.assertRaises(PySparkTypeError) as pe: + self.spark.catalog.registerFunction("foo_udf", foo_udf) + + self.check_error( + exception=pe.exception, + errorClass="INVALID_UDF_EVAL_TYPE", + messageParameters={ + "eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, " + "SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, " + "SQL_SCALAR_PANDAS_ITER_UDF, SQL_SCALAR_ARROW_ITER_UDF, " + "SQL_GROUPED_AGG_PANDAS_UDF or SQL_GROUPED_AGG_ARROW_UDF" + }, + ) def test_decorator(self): df = self.data diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py index cfcbb96fcc36..3fd970061b30 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py @@ -522,17 +522,23 @@ def check_invalid_args(self): df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect() def test_register_vectorized_udf_basic(self): - sum_pandas_udf = pandas_udf( - lambda v: v.sum(), "integer", PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF - ) + with self.temp_func("sum_pandas_udf"): + sum_pandas_udf = pandas_udf( + lambda v: v.sum(), "integer", PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF + ) - self.assertEqual(sum_pandas_udf.evalType, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF) - group_agg_pandas_udf = self.spark.udf.register("sum_pandas_udf", sum_pandas_udf) - self.assertEqual(group_agg_pandas_udf.evalType, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF) - q = "SELECT sum_pandas_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2" - actual = sorted(map(lambda r: r[0], self.spark.sql(q).collect())) - expected = [1, 5] - self.assertEqual(actual, expected) + self.assertEqual(sum_pandas_udf.evalType, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF) + group_agg_pandas_udf = self.spark.udf.register("sum_pandas_udf", sum_pandas_udf) + self.assertEqual( + group_agg_pandas_udf.evalType, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF + ) + q = """ + SELECT sum_pandas_udf(v1) + FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2 + """ + actual = sorted(map(lambda r: r[0], self.spark.sql(q).collect())) + expected = [1, 5] + self.assertEqual(actual, expected) def test_grouped_with_empty_partition(self): data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)] @@ -551,10 +557,10 @@ def max_udf(v): return v.max() df = self.spark.range(0, 100) - self.spark.udf.register("max_udf", max_udf) - with self.tempView("table"): + with self.tempView("table"), self.temp_func("max_udf"): df.createTempView("table") + self.spark.udf.register("max_udf", max_udf) agg1 = df.agg(max_udf(df["id"])) agg2 = self.spark.sql("select max_udf(id) from table") @@ -579,7 +585,7 @@ def test_named_arguments(self): df = self.data weighted_mean = self.pandas_agg_weighted_mean_udf - with self.tempView("v"): + with self.tempView("v"), self.temp_func("weighted_mean"): df.createOrReplaceTempView("v") self.spark.udf.register("weighted_mean", weighted_mean) @@ -604,7 +610,7 @@ def test_named_arguments_negative(self): df = self.data weighted_mean = self.pandas_agg_weighted_mean_udf - with self.tempView("v"): + with self.tempView("v"), self.temp_func("weighted_mean"): df.createOrReplaceTempView("v") self.spark.udf.register("weighted_mean", weighted_mean) @@ -644,7 +650,7 @@ def weighted_mean(**kwargs): return np.average(kwargs["v"], weights=kwargs["w"]) - with self.tempView("v"): + with self.tempView("v"), self.temp_func("weighted_mean"): df.createOrReplaceTempView("v") self.spark.udf.register("weighted_mean", weighted_mean) @@ -684,7 +690,7 @@ def test_named_arguments_and_defaults(self): def biased_sum(v, w=None): return v.sum() + (w.sum() if w is not None else 100) - with self.tempView("v"): + with self.tempView("v"), self.temp_func("biased_sum"): df.createOrReplaceTempView("v") self.spark.udf.register("biased_sum", biased_sum) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py index e614d9039b61..3c2ae56067ae 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py @@ -293,13 +293,17 @@ def test_register_nondeterministic_vectorized_udf_basic(self): ).asNondeterministic() self.assertEqual(random_pandas_udf.deterministic, False) self.assertEqual(random_pandas_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF) - nondeterministic_pandas_udf = self.spark.catalog.registerFunction( - "randomPandasUDF", random_pandas_udf - ) - self.assertEqual(nondeterministic_pandas_udf.deterministic, False) - self.assertEqual(nondeterministic_pandas_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF) - [row] = self.spark.sql("SELECT randomPandasUDF(1)").collect() - self.assertEqual(row[0], 7) + + with self.temp_func("randomPandasUDF"): + nondeterministic_pandas_udf = self.spark.catalog.registerFunction( + "randomPandasUDF", random_pandas_udf + ) + self.assertEqual(nondeterministic_pandas_udf.deterministic, False) + self.assertEqual( + nondeterministic_pandas_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF + ) + [row] = self.spark.sql("SELECT randomPandasUDF(1)").collect() + self.assertEqual(row[0], 7) def random_iter_udf(it): for i in it: @@ -310,15 +314,17 @@ def random_iter_udf(it): ).asNondeterministic() self.assertEqual(random_pandas_iter_udf.deterministic, False) self.assertEqual(random_pandas_iter_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF) - nondeterministic_pandas_iter_udf = self.spark.catalog.registerFunction( - "randomPandasIterUDF", random_pandas_iter_udf - ) - self.assertEqual(nondeterministic_pandas_iter_udf.deterministic, False) - self.assertEqual( - nondeterministic_pandas_iter_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF - ) - [row] = self.spark.sql("SELECT randomPandasIterUDF(1)").collect() - self.assertEqual(row[0], 7) + + with self.temp_func("randomPandasIterUDF"): + nondeterministic_pandas_iter_udf = self.spark.catalog.registerFunction( + "randomPandasIterUDF", random_pandas_iter_udf + ) + self.assertEqual(nondeterministic_pandas_iter_udf.deterministic, False) + self.assertEqual( + nondeterministic_pandas_iter_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF + ) + [row] = self.spark.sql("SELECT randomPandasIterUDF(1)").collect() + self.assertEqual(row[0], 7) def test_vectorized_udf_null_boolean(self): data = [(True,), (True,), (None,), (False,)] @@ -1397,14 +1403,16 @@ def iter_original_add(it): for original_add in [scalar_original_add, iter_original_add]: self.assertEqual(original_add.deterministic, True) - new_add = self.spark.catalog.registerFunction("add1", original_add) - res1 = df.select(new_add(col("a"), col("b"))) - res2 = self.spark.sql( - "SELECT add1(t.a, t.b) FROM (SELECT id as a, id as b FROM range(10)) t" - ) - expected = df.select(expr("a + b")) - self.assertEqual(expected.collect(), res1.collect()) - self.assertEqual(expected.collect(), res2.collect()) + + with self.temp_func("add1"): + new_add = self.spark.catalog.registerFunction("add1", original_add) + res1 = df.select(new_add(col("a"), col("b"))) + res2 = self.spark.sql( + "SELECT add1(t.a, t.b) FROM (SELECT id as a, id as b FROM range(10)) t" + ) + expected = df.select(expr("a + b")) + self.assertEqual(expected.collect(), res1.collect()) + self.assertEqual(expected.collect(), res2.collect()) def test_scalar_iter_udf_init(self): import numpy as np @@ -1788,92 +1796,96 @@ def test_named_arguments(self): def test_udf(a, b): return a + 10 * b - self.spark.udf.register("test_udf", test_udf) + with self.temp_func("test_udf"): + self.spark.udf.register("test_udf", test_udf) - for i, df in enumerate( - [ - self.spark.range(2).select(test_udf(col("id"), b=col("id") * 10)), - self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 10)), - self.spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))), - self.spark.sql("SELECT test_udf(id, b => id * 10) FROM range(2)"), - self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM range(2)"), - self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)"), - ] - ): - with self.subTest(query_no=i): - assertDataFrameEqual(df, [Row(0), Row(101)]) + for i, df in enumerate( + [ + self.spark.range(2).select(test_udf(col("id"), b=col("id") * 10)), + self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 10)), + self.spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))), + self.spark.sql("SELECT test_udf(id, b => id * 10) FROM range(2)"), + self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM range(2)"), + self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)"), + ] + ): + with self.subTest(query_no=i): + assertDataFrameEqual(df, [Row(0), Row(101)]) def test_named_arguments_negative(self): @pandas_udf("int") def test_udf(a, b): return a + b - self.spark.udf.register("test_udf", test_udf) + with self.temp_func("test_udf"): + self.spark.udf.register("test_udf", test_udf) - with self.assertRaisesRegex( - AnalysisException, - "DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT.DOUBLE_NAMED_ARGUMENT_REFERENCE", - ): - self.spark.sql("SELECT test_udf(a => id, a => id * 10) FROM range(2)").show() + with self.assertRaisesRegex( + AnalysisException, + "DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT.DOUBLE_NAMED_ARGUMENT_REFERENCE", + ): + self.spark.sql("SELECT test_udf(a => id, a => id * 10) FROM range(2)").show() - with self.assertRaisesRegex(AnalysisException, "UNEXPECTED_POSITIONAL_ARGUMENT"): - self.spark.sql("SELECT test_udf(a => id, id * 10) FROM range(2)").show() + with self.assertRaisesRegex(AnalysisException, "UNEXPECTED_POSITIONAL_ARGUMENT"): + self.spark.sql("SELECT test_udf(a => id, id * 10) FROM range(2)").show() - with self.assertRaisesRegex( - PythonException, r"test_udf\(\) got an unexpected keyword argument 'c'" - ): - self.spark.sql("SELECT test_udf(c => 'x') FROM range(2)").show() + with self.assertRaisesRegex( + PythonException, r"test_udf\(\) got an unexpected keyword argument 'c'" + ): + self.spark.sql("SELECT test_udf(c => 'x') FROM range(2)").show() def test_kwargs(self): @pandas_udf("int") def test_udf(a, **kwargs): return a + 10 * kwargs["b"] - self.spark.udf.register("test_udf", test_udf) + with self.temp_func("test_udf"): + self.spark.udf.register("test_udf", test_udf) - for i, df in enumerate( - [ - self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 10)), - self.spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))), - self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM range(2)"), - self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)"), - ] - ): - with self.subTest(query_no=i): - assertDataFrameEqual(df, [Row(0), Row(101)]) + for i, df in enumerate( + [ + self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 10)), + self.spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))), + self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM range(2)"), + self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)"), + ] + ): + with self.subTest(query_no=i): + assertDataFrameEqual(df, [Row(0), Row(101)]) def test_named_arguments_and_defaults(self): @pandas_udf("int") def test_udf(a, b=0): return a + 10 * b - self.spark.udf.register("test_udf", test_udf) + with self.temp_func("test_udf"): + self.spark.udf.register("test_udf", test_udf) - # without "b" - for i, df in enumerate( - [ - self.spark.range(2).select(test_udf(col("id"))), - self.spark.range(2).select(test_udf(a=col("id"))), - self.spark.sql("SELECT test_udf(id) FROM range(2)"), - self.spark.sql("SELECT test_udf(a => id) FROM range(2)"), - ] - ): - with self.subTest(with_b=False, query_no=i): - assertDataFrameEqual(df, [Row(0), Row(1)]) + # without "b" + for i, df in enumerate( + [ + self.spark.range(2).select(test_udf(col("id"))), + self.spark.range(2).select(test_udf(a=col("id"))), + self.spark.sql("SELECT test_udf(id) FROM range(2)"), + self.spark.sql("SELECT test_udf(a => id) FROM range(2)"), + ] + ): + with self.subTest(with_b=False, query_no=i): + assertDataFrameEqual(df, [Row(0), Row(1)]) - # with "b" - for i, df in enumerate( - [ - self.spark.range(2).select(test_udf(col("id"), b=col("id") * 10)), - self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 10)), - self.spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))), - self.spark.sql("SELECT test_udf(id, b => id * 10) FROM range(2)"), - self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM range(2)"), - self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)"), - ] - ): - with self.subTest(with_b=True, query_no=i): - assertDataFrameEqual(df, [Row(0), Row(101)]) + # with "b" + for i, df in enumerate( + [ + self.spark.range(2).select(test_udf(col("id"), b=col("id") * 10)), + self.spark.range(2).select(test_udf(a=col("id"), b=col("id") * 10)), + self.spark.range(2).select(test_udf(b=col("id") * 10, a=col("id"))), + self.spark.sql("SELECT test_udf(id, b => id * 10) FROM range(2)"), + self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM range(2)"), + self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM range(2)"), + ] + ): + with self.subTest(with_b=True, query_no=i): + assertDataFrameEqual(df, [Row(0), Row(101)]) def test_arrow_cast_enabled_numeric_to_decimal(self): import numpy as np diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py b/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py index fbc2b32d1c69..547e237902b3 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py @@ -408,7 +408,7 @@ def test_named_arguments(self): with self.subTest(bound=bound, query_no=i): assertDataFrameEqual(windowed, df.withColumn("wm", sf.mean(df.v).over(w))) - with self.tempView("v"): + with self.tempView("v"), self.temp_func("weighted_mean"): df.createOrReplaceTempView("v") self.spark.udf.register("weighted_mean", weighted_mean) @@ -436,7 +436,7 @@ def test_named_arguments_negative(self): df = self.data weighted_mean = self.pandas_agg_weighted_mean_udf - with self.tempView("v"): + with self.tempView("v"), self.temp_func("weighted_mean"): df.createOrReplaceTempView("v") self.spark.udf.register("weighted_mean", weighted_mean) @@ -504,7 +504,7 @@ def weighted_mean(**kwargs): with self.subTest(bound=bound, query_no=i): assertDataFrameEqual(windowed, df.withColumn("wm", sf.mean(df.v).over(w))) - with self.tempView("v"): + with self.tempView("v"), self.temp_func("weighted_mean"): df.createOrReplaceTempView("v") self.spark.udf.register("weighted_mean", weighted_mean) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index b1fb42ad11ec..8d792a54e346 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -216,7 +216,7 @@ def test_single_udf_with_repeated_argument(self): self.assertEqual(tuple(row), (2,)) def test_multiple_udfs(self): - with self.temp_func("double_int"): + with self.temp_func("double_int", "add_int"): self.spark.catalog.registerFunction("double_int", lambda x: x * 2, IntegerType()) [row] = self.spark.sql("SELECT double_int(1), double_int(2)").collect() self.assertEqual(tuple(row), (2, 4)) @@ -224,6 +224,7 @@ def test_multiple_udfs(self): "SELECT double_int(double_int(1)), double_int(double_int(2) + 2)" ).collect() self.assertEqual(tuple(row), (4, 12)) + self.spark.catalog.registerFunction("add_int", lambda x, y: x + y, IntegerType()) [row] = self.spark.sql( "SELECT double_int(add_int(1, 2)), add_int(double_int(2), 1)"