diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part1.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part1.sql index 3e877333c07f..d829a5c1159f 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part1.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part1.sql @@ -9,12 +9,10 @@ -- SET extra_float_digits = 0; -- This test file was converted from pgSQL/aggregates_part1.sql. --- Note that currently registered UDF returns a string. So there are some differences, for instance --- in string cast within UDF in Scala and Python. -SELECT CAST(avg(udf(four)) AS decimal(10,3)) AS avg_1 FROM onek; +SELECT avg(udf(four)) AS avg_1 FROM onek; -SELECT CAST(udf(avg(a)) AS decimal(10,3)) AS avg_32 FROM aggtest WHERE a < 100; +SELECT udf(avg(a)) AS avg_32 FROM aggtest WHERE a < 100; -- In 7.1, avg(float4) is computed using float8 arithmetic. -- Round the result to 3 digits to avoid platform-specific results. @@ -23,32 +21,32 @@ select CAST(avg(udf(b)) AS Decimal(10,3)) AS avg_107_943 FROM aggtest; -- `student` has a column with data type POINT, which is not supported by Spark [SPARK-27766] -- SELECT avg(gpa) AS avg_3_4 FROM ONLY student; -SELECT CAST(sum(udf(four)) AS int) AS sum_1500 FROM onek; +SELECT sum(udf(four)) AS sum_1500 FROM onek; SELECT udf(sum(a)) AS sum_198 FROM aggtest; -SELECT CAST(udf(udf(sum(b))) AS decimal(10,3)) AS avg_431_773 FROM aggtest; +SELECT udf(udf(sum(b))) AS avg_431_773 FROM aggtest; -- `student` has a column with data type POINT, which is not supported by Spark [SPARK-27766] -- SELECT sum(gpa) AS avg_6_8 FROM ONLY student; SELECT udf(max(four)) AS max_3 FROM onek; -SELECT max(CAST(udf(a) AS int)) AS max_100 FROM aggtest; -SELECT CAST(udf(udf(max(aggtest.b))) AS decimal(10,3)) AS max_324_78 FROM aggtest; +SELECT max(udf(a)) AS max_100 FROM aggtest; +SELECT udf(udf(max(aggtest.b))) AS max_324_78 FROM aggtest; -- `student` has a column with data type POINT, which is not supported by Spark [SPARK-27766] -- SELECT max(student.gpa) AS max_3_7 FROM student; -SELECT CAST(stddev_pop(udf(b)) AS decimal(10,3)) FROM aggtest; -SELECT CAST(udf(stddev_samp(b)) AS decimal(10,3)) FROM aggtest; -SELECT CAST(var_pop(udf(b)) AS decimal(10,3)) FROM aggtest; -SELECT CAST(udf(var_samp(b)) AS decimal(10,3)) FROM aggtest; +SELECT stddev_pop(udf(b)) FROM aggtest; +SELECT udf(stddev_samp(b)) FROM aggtest; +SELECT var_pop(udf(b)) FROM aggtest; +SELECT udf(var_samp(b)) FROM aggtest; -SELECT CAST(udf(stddev_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest; -SELECT CAST(stddev_samp(CAST(udf(b) AS Decimal(38,0))) AS decimal(10,3)) FROM aggtest; -SELECT CAST(udf(var_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest; -SELECT CAST(var_samp(udf(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest; +SELECT udf(stddev_pop(CAST(b AS Decimal(38,0)))) FROM aggtest; +SELECT stddev_samp(CAST(udf(b) AS Decimal(38,0))) FROM aggtest; +SELECT udf(var_pop(CAST(b AS Decimal(38,0)))) FROM aggtest; +SELECT var_samp(udf(CAST(b AS Decimal(38,0)))) FROM aggtest; -- population variance is defined for a single tuple, sample variance -- is not -SELECT CAST(udf(var_pop(1.0)) AS int), var_samp(udf(2.0)); -SELECT CAST(stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))) AS int), stddev_samp(CAST(udf(4.0) AS Decimal(38,0))); +SELECT udf(var_pop(1.0)), var_samp(udf(2.0)); +SELECT stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))), stddev_samp(CAST(udf(4.0) AS Decimal(38,0))); -- verify correct results for null and NaN inputs @@ -76,9 +74,9 @@ FROM (VALUES ('-Infinity'), ('Infinity')) v(x); -- test accuracy with a large input offset -SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS int), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3)) +SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE))) FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x); -SELECT CAST(avg(udf(x)) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3)) +SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE))) FROM (VALUES (7000000000005), (7000000000007)) v(x); -- SQL2003 binary aggregates [SPARK-23907] @@ -89,8 +87,8 @@ FROM (VALUES (7000000000005), (7000000000007)) v(x); -- SELECT regr_avgx(b, a), regr_avgy(b, a) FROM aggtest; -- SELECT regr_r2(b, a) FROM aggtest; -- SELECT regr_slope(b, a), regr_intercept(b, a) FROM aggtest; -SELECT CAST(udf(covar_pop(b, udf(a))) AS decimal(10,3)), CAST(covar_samp(udf(b), a) as decimal(10,3)) FROM aggtest; -SELECT CAST(corr(b, udf(a)) AS decimal(10,3)) FROM aggtest; +SELECT udf(covar_pop(b, udf(a))), covar_samp(udf(b), a) FROM aggtest; +SELECT corr(b, udf(a)) FROM aggtest; -- test accum and combine functions directly [SPARK-23907] @@ -122,7 +120,7 @@ SELECT CAST(corr(b, udf(a)) AS decimal(10,3)) FROM aggtest; SELECT count(udf(four)) AS cnt_1000 FROM onek; SELECT udf(count(DISTINCT four)) AS cnt_4 FROM onek; -select ten, udf(count(*)), CAST(sum(udf(four)) AS int) from onek +select ten, udf(count(*)), sum(udf(four)) from onek group by ten order by ten; select ten, count(udf(four)), udf(sum(DISTINCT four)) from onek diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part2.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part2.sql index 57491a32c48f..5636537398a8 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part2.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-aggregates_part2.sql @@ -6,8 +6,6 @@ -- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/aggregates.sql#L145-L350 -- -- This test file was converted from pgSQL/aggregates_part2.sql. --- Note that currently registered UDF returns a string. So there are some differences, for instance --- in string cast within UDF in Scala and Python. create temporary view int4_tbl as select * from values (0), diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-case.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-case.sql index b05c21d24b36..63cecc674f1d 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-case.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-case.sql @@ -10,8 +10,6 @@ -- Thus, we set spark.sql.crossJoin.enabled to true. -- This test file was converted from pgSQL/case.sql. --- Note that currently registered UDF returns a string. So there are some differences, for instance --- in string cast within UDF in Scala and Python. set spark.sql.crossJoin.enabled=true; CREATE TABLE CASE_TBL ( diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-having.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-having.sql index 6ae34ae589fa..ff8573ad7e56 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-having.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-having.sql @@ -1,6 +1,4 @@ -- This test file was converted from having.sql. --- Note that currently registered UDF returns a string. So there are some differences, for instance --- in string cast within UDF in Scala and Python. create temporary view hav as select * from values ("one", 1), diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-natural-join.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-natural-join.sql index 686268317800..e5eb812d69a1 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-natural-join.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-natural-join.sql @@ -4,8 +4,6 @@ --SET spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=false -- This test file was converted from natural-join.sql. --- Note that currently registered UDF returns a string. So there are some differences, for instance --- in string cast within UDF in Scala and Python. create temporary view nt1 as select * from values ("one", 1), diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-special-values.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-special-values.sql new file mode 100644 index 000000000000..0e85dd73edbd --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-special-values.sql @@ -0,0 +1,9 @@ +-- This file tests special values such as NaN, Infinity and NULL. + +SELECT udf(x) FROM (VALUES (1), (2), (NULL)) v(x); +SELECT udf(x) FROM (VALUES ('A'), ('B'), (NULL)) v(x); +SELECT udf(x) FROM (VALUES ('NaN'), ('1'), ('2')) v(x); +SELECT udf(x) FROM (VALUES ('Infinity'), ('1'), ('2')) v(x); +SELECT udf(x) FROM (VALUES ('-Infinity'), ('1'), ('2')) v(x); +SELECT udf(x) FROM (VALUES 0.00000001D, 0.00000002D, 0.00000003D) v(x); +SELECT array(1, 2, x), map('a', x), struct(x) FROM (VALUES (1), (2), (3)) v(x); diff --git a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out index 5c08245fd320..3360350f7826 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out @@ -3,19 +3,19 @@ -- !query 0 -SELECT CAST(avg(udf(four)) AS decimal(10,3)) AS avg_1 FROM onek +SELECT avg(udf(four)) AS avg_1 FROM onek -- !query 0 schema -struct +struct -- !query 0 output 1.5 -- !query 1 -SELECT CAST(udf(avg(a)) AS decimal(10,3)) AS avg_32 FROM aggtest WHERE a < 100 +SELECT udf(avg(a)) AS avg_32 FROM aggtest WHERE a < 100 -- !query 1 schema -struct +struct -- !query 1 output -32.667 +32.666666666666664 -- !query 2 @@ -27,9 +27,9 @@ struct -- !query 3 -SELECT CAST(sum(udf(four)) AS int) AS sum_1500 FROM onek +SELECT sum(udf(four)) AS sum_1500 FROM onek -- !query 3 schema -struct +struct -- !query 3 output 1500 @@ -37,29 +37,29 @@ struct -- !query 4 SELECT udf(sum(a)) AS sum_198 FROM aggtest -- !query 4 schema -struct +struct -- !query 4 output 198 -- !query 5 -SELECT CAST(udf(udf(sum(b))) AS decimal(10,3)) AS avg_431_773 FROM aggtest +SELECT udf(udf(sum(b))) AS avg_431_773 FROM aggtest -- !query 5 schema -struct +struct -- !query 5 output -431.773 +431.77260909229517 -- !query 6 SELECT udf(max(four)) AS max_3 FROM onek -- !query 6 schema -struct +struct -- !query 6 output 3 -- !query 7 -SELECT max(CAST(udf(a) AS int)) AS max_100 FROM aggtest +SELECT max(udf(a)) AS max_100 FROM aggtest -- !query 7 schema struct -- !query 7 output @@ -67,97 +67,97 @@ struct -- !query 8 -SELECT CAST(udf(udf(max(aggtest.b))) AS decimal(10,3)) AS max_324_78 FROM aggtest +SELECT udf(udf(max(aggtest.b))) AS max_324_78 FROM aggtest -- !query 8 schema -struct +struct -- !query 8 output 324.78 -- !query 9 -SELECT CAST(stddev_pop(udf(b)) AS decimal(10,3)) FROM aggtest +SELECT stddev_pop(udf(b)) FROM aggtest -- !query 9 schema -struct +struct -- !query 9 output -131.107 +131.10703231895047 -- !query 10 -SELECT CAST(udf(stddev_samp(b)) AS decimal(10,3)) FROM aggtest +SELECT udf(stddev_samp(b)) FROM aggtest -- !query 10 schema -struct +struct -- !query 10 output -151.389 +151.38936080399804 -- !query 11 -SELECT CAST(var_pop(udf(b)) AS decimal(10,3)) FROM aggtest +SELECT var_pop(udf(b)) FROM aggtest -- !query 11 schema -struct +struct -- !query 11 output -17189.054 +17189.053923482323 -- !query 12 -SELECT CAST(udf(var_samp(b)) AS decimal(10,3)) FROM aggtest +SELECT udf(var_samp(b)) FROM aggtest -- !query 12 schema -struct +struct -- !query 12 output -22918.739 +22918.738564643096 -- !query 13 -SELECT CAST(udf(stddev_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest +SELECT udf(stddev_pop(CAST(b AS Decimal(38,0)))) FROM aggtest -- !query 13 schema -struct +struct -- !query 13 output -131.181 +131.18117242958306 -- !query 14 -SELECT CAST(stddev_samp(CAST(udf(b) AS Decimal(38,0))) AS decimal(10,3)) FROM aggtest +SELECT stddev_samp(CAST(udf(b) AS Decimal(38,0))) FROM aggtest -- !query 14 schema -struct +struct -- !query 14 output -151.475 +151.47497042966097 -- !query 15 -SELECT CAST(udf(var_pop(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest +SELECT udf(var_pop(CAST(b AS Decimal(38,0)))) FROM aggtest -- !query 15 schema -struct +struct -- !query 15 output 17208.5 -- !query 16 -SELECT CAST(var_samp(udf(CAST(b AS Decimal(38,0)))) AS decimal(10,3)) FROM aggtest +SELECT var_samp(udf(CAST(b AS Decimal(38,0)))) FROM aggtest -- !query 16 schema -struct +struct -- !query 16 output -22944.667 +22944.666666666668 -- !query 17 -SELECT CAST(udf(var_pop(1.0)) AS int), var_samp(udf(2.0)) +SELECT udf(var_pop(1.0)), var_samp(udf(2.0)) -- !query 17 schema -struct +struct -- !query 17 output -0 NaN +0.0 NaN -- !query 18 -SELECT CAST(stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))) AS int), stddev_samp(CAST(udf(4.0) AS Decimal(38,0))) +SELECT stddev_pop(udf(CAST(3.0 AS Decimal(38,0)))), stddev_samp(CAST(udf(4.0) AS Decimal(38,0))) -- !query 18 schema -struct +struct -- !query 18 output -0 NaN +0.0 NaN -- !query 19 select sum(udf(CAST(null AS int))) from range(1,4) -- !query 19 schema -struct +struct -- !query 19 output NULL @@ -165,7 +165,7 @@ NULL -- !query 20 select sum(udf(CAST(null AS long))) from range(1,4) -- !query 20 schema -struct +struct -- !query 20 output NULL @@ -173,7 +173,7 @@ NULL -- !query 21 select sum(udf(CAST(null AS Decimal(38,0)))) from range(1,4) -- !query 21 schema -struct +struct -- !query 21 output NULL @@ -181,7 +181,7 @@ NULL -- !query 22 select sum(udf(CAST(null AS DOUBLE))) from range(1,4) -- !query 22 schema -struct +struct -- !query 22 output NULL @@ -189,7 +189,7 @@ NULL -- !query 23 select avg(udf(CAST(null AS int))) from range(1,4) -- !query 23 schema -struct +struct -- !query 23 output NULL @@ -197,7 +197,7 @@ NULL -- !query 24 select avg(udf(CAST(null AS long))) from range(1,4) -- !query 24 schema -struct +struct -- !query 24 output NULL @@ -205,7 +205,7 @@ NULL -- !query 25 select avg(udf(CAST(null AS Decimal(38,0)))) from range(1,4) -- !query 25 schema -struct +struct -- !query 25 output NULL @@ -213,7 +213,7 @@ NULL -- !query 26 select avg(udf(CAST(null AS DOUBLE))) from range(1,4) -- !query 26 schema -struct +struct -- !query 26 output NULL @@ -262,37 +262,37 @@ NaN NaN -- !query 32 -SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS int), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3)) +SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE))) FROM (VALUES (100000003), (100000004), (100000006), (100000007)) v(x) -- !query 32 schema -struct +struct -- !query 32 output -100000005 2.5 +1.00000005E8 2.5 -- !query 33 -SELECT CAST(avg(udf(x)) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3)) +SELECT avg(udf(CAST(x AS DOUBLE))), udf(var_pop(CAST(x AS DOUBLE))) FROM (VALUES (7000000000005), (7000000000007)) v(x) -- !query 33 schema -struct +struct -- !query 33 output -7000000000006 1 +7.000000000006E12 1.0 -- !query 34 -SELECT CAST(udf(covar_pop(b, udf(a))) AS decimal(10,3)), CAST(covar_samp(udf(b), a) as decimal(10,3)) FROM aggtest +SELECT udf(covar_pop(b, udf(a))), covar_samp(udf(b), a) FROM aggtest -- !query 34 schema -struct +struct -- !query 34 output -653.629 871.505 +653.6289553875104 871.5052738500139 -- !query 35 -SELECT CAST(corr(b, udf(a)) AS decimal(10,3)) FROM aggtest +SELECT corr(b, udf(a)) FROM aggtest -- !query 35 schema -struct +struct -- !query 35 output -0.14 +0.1396345165178734 -- !query 36 @@ -306,16 +306,16 @@ struct -- !query 37 SELECT udf(count(DISTINCT four)) AS cnt_4 FROM onek -- !query 37 schema -struct +struct -- !query 37 output 4 -- !query 38 -select ten, udf(count(*)), CAST(sum(udf(four)) AS int) from onek +select ten, udf(count(*)), sum(udf(four)) from onek group by ten order by ten -- !query 38 schema -struct +struct -- !query 38 output 0 100 100 1 100 200 @@ -333,7 +333,7 @@ struct +struct -- !query 39 output 0 100 2 1 100 4 @@ -352,7 +352,7 @@ select ten, udf(sum(distinct four)) from onek a group by ten having exists (select 1 from onek b where udf(sum(distinct a.four)) = b.four) -- !query 40 schema -struct +struct -- !query 40 output 0 2 2 2 diff --git a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part2.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part2.sql.out index d90aa11fc6ef..99c3ae8eb49c 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part2.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part2.sql.out @@ -59,7 +59,7 @@ true false true false true true true true true -- !query 3 select min(udf(unique1)) from tenk1 -- !query 3 schema -struct +struct -- !query 3 output 0 @@ -67,7 +67,7 @@ struct -- !query 4 select udf(max(unique1)) from tenk1 -- !query 4 schema -struct +struct -- !query 4 output 9999 @@ -115,7 +115,7 @@ struct -- !query 10 select distinct max(udf(unique2)) from tenk1 -- !query 10 schema -struct +struct -- !query 10 output 9999 @@ -139,7 +139,7 @@ struct -- !query 13 select udf(max(udf(unique2))) from tenk1 order by udf(max(unique2))+1 -- !query 13 schema -struct +struct -- !query 13 output 9999 @@ -147,7 +147,7 @@ struct -- !query 14 select t1.max_unique2, udf(g) from (select max(udf(unique2)) as max_unique2 FROM tenk1) t1 LATERAL VIEW explode(array(1,2,3)) t2 AS g order by g desc -- !query 14 schema -struct +struct -- !query 14 output 9999 3 9999 2 @@ -157,6 +157,6 @@ struct -- !query 15 select udf(max(100)) from tenk1 -- !query 15 schema -struct +struct -- !query 15 output 100 diff --git a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out index 55bef64338f4..a073f2bca3fb 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out @@ -129,7 +129,7 @@ SELECT '' AS `One`, WHEN 1 > 2 THEN udf(3) END AS `Simple default` -- !query 14 schema -struct +struct -- !query 14 output NULL @@ -141,7 +141,7 @@ SELECT '3' AS `One`, ELSE udf(4) END AS `Simple ELSE` -- !query 15 schema -struct +struct -- !query 15 output 3 3 @@ -192,7 +192,7 @@ struct +struct -- !query 20 output 1.0 @@ -200,18 +200,18 @@ struct 100 THEN udf(1/0) ELSE udf(0) END FROM case_tbl -- !query 21 schema -struct 100) THEN udf((cast(1 as double) / cast(0 as double))) ELSE udf(0) END:string> +struct 100) THEN udf((cast(1 as double) / cast(0 as double))) ELSE CAST(udf(0) AS DOUBLE) END:double> -- !query 21 output -0 -0 -0 -0 +0.0 +0.0 +0.0 +0.0 -- !query 22 SELECT CASE 'a' WHEN 'a' THEN udf(1) ELSE udf(2) END -- !query 22 schema -struct +struct -- !query 22 output 1 @@ -302,7 +302,7 @@ struct SELECT udf(COALESCE(a.f, b.i, b.j)) FROM CASE_TBL a, CASE2_TBL b -- !query 29 schema -struct +struct -- !query 29 output -30.3 -30.3 diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-count.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-count.sql.out index 9476937abd9e..3cf2635e1f6d 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-count.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-count.sql.out @@ -17,7 +17,7 @@ SELECT udf(count(*)), udf(count(1)), udf(count(null)), udf(count(a)), udf(count(b)), udf(count(a + b)), udf(count((a, b))) FROM testData -- !query 1 schema -struct +struct -- !query 1 output 7 7 0 5 5 4 7 @@ -32,7 +32,7 @@ SELECT udf(count(DISTINCT (a, b))) FROM testData -- !query 2 schema -struct +struct -- !query 2 output 1 0 2 2 2 6 @@ -40,7 +40,7 @@ struct +struct -- !query 3 output 4 4 4 @@ -50,6 +50,6 @@ SELECT udf(count(DISTINCT a, b)), udf(count(DISTINCT b, a)), udf(count(DISTINCT *)), udf(count(DISTINCT testData.*)) FROM testData -- !query 4 schema -struct +struct -- !query 4 output 3 3 3 3 diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out index 7cea2e5128f8..a6bc0eb4adcc 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out @@ -18,7 +18,7 @@ struct<> -- !query 1 SELECT udf(k) AS k, udf(sum(v)) FROM hav GROUP BY k HAVING udf(sum(v)) > 2 -- !query 1 schema -struct +struct -- !query 1 output one 6 three 3 @@ -27,7 +27,7 @@ three 3 -- !query 2 SELECT udf(count(udf(k))) FROM hav GROUP BY v + 1 HAVING v + 1 = udf(2) -- !query 2 schema -struct +struct -- !query 2 output 1 @@ -35,7 +35,7 @@ struct -- !query 3 SELECT udf(MIN(t.v)) FROM (SELECT * FROM hav WHERE v > 0) t HAVING(udf(COUNT(udf(1))) > 0) -- !query 3 schema -struct +struct -- !query 3 output 1 @@ -43,7 +43,7 @@ struct -- !query 4 SELECT udf(a + b) FROM VALUES (1L, 2), (3L, 4) AS T(a, b) GROUP BY a + b HAVING a + b > udf(1) -- !query 4 schema -struct +struct -- !query 4 output 3 7 diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out index 10952cb21e4f..120f2d39f73d 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-inner-join.sql.out @@ -59,7 +59,7 @@ struct<> -- !query 6 SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag -- !query 6 schema -struct +struct -- !query 6 output 1 a 1 a diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out index 53ef177db0bb..1f0ee14d32b0 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out @@ -59,6 +59,6 @@ two 2 22 -- !query 5 SELECT udf(count(*)) FROM nt1 natural full outer join nt2 -- !query 5 schema -struct +struct -- !query 5 output 4 diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-special-values.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-special-values.sql.out new file mode 100644 index 000000000000..5c76a10418ed --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-special-values.sql.out @@ -0,0 +1,72 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 7 + + +-- !query 0 +SELECT udf(x) FROM (VALUES (1), (2), (NULL)) v(x) +-- !query 0 schema +struct +-- !query 0 output +1 +2 +NULL + + +-- !query 1 +SELECT udf(x) FROM (VALUES ('A'), ('B'), (NULL)) v(x) +-- !query 1 schema +struct +-- !query 1 output +A +B +NULL + + +-- !query 2 +SELECT udf(x) FROM (VALUES ('NaN'), ('1'), ('2')) v(x) +-- !query 2 schema +struct +-- !query 2 output +1 +2 +NaN + + +-- !query 3 +SELECT udf(x) FROM (VALUES ('Infinity'), ('1'), ('2')) v(x) +-- !query 3 schema +struct +-- !query 3 output +1 +2 +Infinity + + +-- !query 4 +SELECT udf(x) FROM (VALUES ('-Infinity'), ('1'), ('2')) v(x) +-- !query 4 schema +struct +-- !query 4 output +-Infinity +1 +2 + + +-- !query 5 +SELECT udf(x) FROM (VALUES 0.00000001D, 0.00000002D, 0.00000003D) v(x) +-- !query 5 schema +struct +-- !query 5 output +1.0E-8 +2.0E-8 +3.0E-8 + + +-- !query 6 +SELECT array(1, 2, x), map('a', x), struct(x) FROM (VALUES (1), (2), (3)) v(x) +-- !query 6 schema +struct,map(a, x):map,named_struct(x, x):struct> +-- !query 6 output +[1,2,1] {"a":1} {"x":1} +[1,2,2] {"a":2} {"x":2} +[1,2,3] {"a":3} {"x":3} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala index e379d6df867c..df7d617b293d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala @@ -22,21 +22,23 @@ import java.nio.file.{Files, Paths} import scala.collection.JavaConverters._ import scala.util.Try +import com.google.common.cache.CacheBuilder + import org.apache.spark.TestUtils import org.apache.spark.api.python.{PythonBroadcast, PythonEvalType, PythonFunction, PythonUtils} import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.config.Tests +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, PythonUDF} import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.execution.python.UserDefinedPythonFunction import org.apache.spark.sql.expressions.SparkUserDefinedFunction -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, NullType, StringType, StructType} /** * This object targets to integrate various UDF test cases so that Scalar UDF, Python UDF and * Scalar Pandas UDFs can be tested in SBT & Maven tests. * - * The available UDFs cast input to strings, which take one column as input and return a string - * type column as output. + * The available UDFs are special that returns as are. * * To register Scala UDF in SQL: * {{{ @@ -125,19 +127,31 @@ object IntegratedUDFTestUtils extends SQLHelper { throw new RuntimeException(s"Python executable [$pythonExec] is unavailable.") } + private def makePyDataType(dataType: DataType): String = dataType match { + case st: StructType => + val fields = st.fields.map(f => s"StructField('${f.name}', ${makePyDataType(f.dataType)}))") + s"StructType([${fields.mkString(", ")}])" + case at: ArrayType => + s"ArrayType(${makePyDataType(at.elementType)})" + case mt: MapType => + s"MapType(${makePyDataType(mt.keyType)}, ${makePyDataType(mt.valueType)})" + case other => s"${other.getClass.getSimpleName.stripSuffix("$")}()" + } + // Dynamically pickles and reads the Python instance into JVM side in order to mimic // Python native function within Python UDF. - private lazy val pythonFunc: Array[Byte] = if (shouldTestPythonUDFs) { + private def makePythonFunc(dataType: DataType): Array[Byte] = if (shouldTestPythonUDFs) { var binaryPythonFunc: Array[Byte] = null withTempPath { path => Process( Seq( pythonExec, "-c", - "from pyspark.sql.types import StringType; " + + "from pyspark.sql.types import *; " + "from pyspark.serializers import CloudPickleSerializer; " + s"f = open('$path', 'wb');" + - s"f.write(CloudPickleSerializer().dumps((lambda x: str(x), StringType())))"), + "f.write(CloudPickleSerializer().dumps((" + + s"lambda x: None if x is None else x, ${makePyDataType(dataType)})))"), None, "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! binaryPythonFunc = Files.readAllBytes(path.toPath) @@ -148,17 +162,19 @@ object IntegratedUDFTestUtils extends SQLHelper { throw new RuntimeException(s"Python executable [$pythonExec] and/or pyspark are unavailable.") } - private lazy val pandasFunc: Array[Byte] = if (shouldTestScalarPandasUDFs) { + private def makePandasFunc(dataType: DataType): Array[Byte] = if (shouldTestScalarPandasUDFs) { var binaryPandasFunc: Array[Byte] = null withTempPath { path => Process( Seq( pythonExec, "-c", - "from pyspark.sql.types import StringType; " + + "from pyspark.sql.types import *; " + "from pyspark.serializers import CloudPickleSerializer; " + s"f = open('$path', 'wb');" + - s"f.write(CloudPickleSerializer().dumps((lambda x: x.apply(str), StringType())))"), + "f.write(CloudPickleSerializer().dumps((" + + "lambda x: x.apply(" + + s"lambda v: None if v is None else v), ${makePyDataType(dataType)})))"), None, "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! binaryPandasFunc = Files.readAllBytes(path.toPath) @@ -169,6 +185,32 @@ object IntegratedUDFTestUtils extends SQLHelper { throw new RuntimeException(s"Python executable [$pythonExec] and/or pyspark are unavailable.") } + private val pythonFuncCache = CacheBuilder.newBuilder() + .maximumSize(16) + .build[DataType, Array[Byte]]() + + private val pandasFuncCache = CacheBuilder.newBuilder() + .maximumSize(16) + .build[DataType, Array[Byte]]() + + private def getOrCreatePythonFunc(dataType: DataType): Array[Byte] = { + var nativeFunc = pythonFuncCache.getIfPresent(dataType) + if (nativeFunc == null) { + nativeFunc = makePythonFunc(dataType) + pythonFuncCache.put(dataType, nativeFunc) + } + nativeFunc + } + + private def getOrCreatePandasFunc(dataType: DataType): Array[Byte] = { + var nativeFunc = pandasFuncCache.getIfPresent(dataType) + if (nativeFunc == null) { + nativeFunc = makePandasFunc(dataType) + pandasFuncCache.put(dataType, nativeFunc) + } + nativeFunc + } + // Make sure this map stays mutable - this map gets updated later in Python runners. private val workerEnv = new java.util.HashMap[String, String]() workerEnv.put("PYTHONPATH", s"$pysparkPythonPath:$pythonPath") @@ -198,23 +240,32 @@ object IntegratedUDFTestUtils extends SQLHelper { } /** - * A Python UDF that takes one column and returns a string column. - * Equivalent to `udf(lambda x: str(x), "string")` + * A Python UDF that takes one column and returns are is. It has same output type + * as its input type. */ case class TestPythonUDF(name: String) extends TestUDF { - private[IntegratedUDFTestUtils] lazy val udf = UserDefinedPythonFunction( + private[IntegratedUDFTestUtils] lazy val udf = new UserDefinedPythonFunction( name = name, func = PythonFunction( - command = pythonFunc, + command = Array.empty[Byte], // This will be replaced when it becomes an actual expression. envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]], pythonIncludes = List.empty[String].asJava, pythonExec = pythonExec, pythonVer = pythonVer, broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava, accumulator = null), - dataType = StringType, + dataType = NullType, // This will be replaced when it becomes an actual expression. pythonEvalType = PythonEvalType.SQL_BATCHED_UDF, - udfDeterministic = true) + udfDeterministic = true) { + + override def builder(e: Seq[Expression]): PythonUDF = { + assert(e.length == 1, "Defined UDF only has one column") + val expr = e.head + val pythonUDF = super.builder(e) + pythonUDF.copy(func = pythonUDF.func.copy( + command = getOrCreatePythonFunc(expr.dataType)), dataType = expr.dataType) + } + } def apply(exprs: Column*): Column = udf(exprs: _*) @@ -222,23 +273,32 @@ object IntegratedUDFTestUtils extends SQLHelper { } /** - * A Scalar Pandas UDF that takes one column and returns a string column. - * Equivalent to `pandas_udf(lambda x: x.apply(str), "string", PandasUDFType.SCALAR)`. + * A Scalar Pandas UDF that takes one column and returns are is. It has same output type + * as its input type. */ case class TestScalarPandasUDF(name: String) extends TestUDF { - private[IntegratedUDFTestUtils] lazy val udf = UserDefinedPythonFunction( + private[IntegratedUDFTestUtils] lazy val udf = new UserDefinedPythonFunction( name = name, func = PythonFunction( - command = pandasFunc, + command = Array.empty[Byte], // This will be replaced when it becomes an actual expression. envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]], pythonIncludes = List.empty[String].asJava, pythonExec = pythonExec, pythonVer = pythonVer, broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava, accumulator = null), - dataType = StringType, + dataType = NullType, // This will be replaced when it becomes an actual expression. pythonEvalType = PythonEvalType.SQL_SCALAR_PANDAS_UDF, - udfDeterministic = true) + udfDeterministic = true) { + + override def builder(e: Seq[Expression]): PythonUDF = { + assert(e.length == 1, "Defined UDF only has one column") + val expr = e.head + val pythonUDF = super.builder(e) + pythonUDF.copy(func = pythonUDF.func.copy( + command = getOrCreatePandasFunc(expr.dataType)), dataType = expr.dataType) + } + } def apply(exprs: Column*): Column = udf(exprs: _*) @@ -246,15 +306,26 @@ object IntegratedUDFTestUtils extends SQLHelper { } /** - * A Scala UDF that takes one column and returns a string column. - * Equivalent to `udf((input: Any) => String.valueOf(input)`. + * A Scala UDF that takes one column and returns are is. It has same output type + * as its input type. */ case class TestScalaUDF(name: String) extends TestUDF { - private[IntegratedUDFTestUtils] lazy val udf = SparkUserDefinedFunction( - (input: Any) => String.valueOf(input), + private[IntegratedUDFTestUtils] lazy val udf = new SparkUserDefinedFunction( + (input: Any) => if (input == null) { + null + } else { + input: Any + }, StringType, inputSchemas = Seq.fill(1)(None), - name = Some(name)) + name = Some(name)) { + + override def apply(exprs: Column*): Column = { + assert(exprs.length == 1, "Defined UDF only has one column") + val expr = exprs.head.expr + Column(createScalaUDF(expr :: Nil).copy(dataType = expr.dataType)) + } + } def apply(exprs: Column*): Column = udf(exprs: _*)