Merged
Conversation
…F test base ## What changes were proposed in this pull request? This PR adds some tests converted from 'udaf.sql' to test UDFs <details><summary>Diff comparing to 'udaf.sql'</summary> <p> ```diff diff --git a/sql/core/src/test/resources/sql-tests/results/udaf.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out index f4455bb..e1747f4 100644 --- a/sql/core/src/test/resources/sql-tests/results/udaf.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out -3,6 +3,8 -- !query 0 +-- This test file was converted from udaf.sql. + CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1), (2), (3), (4) as t1(int_col1) -21,15 +23,15 struct<> -- !query 2 -SELECT default.myDoubleAvg(int_col1) as my_avg from t1 +SELECT default.myDoubleAvg(udf(int_col1)) as my_avg, udf(default.myDoubleAvg(udf(int_col1))) as my_avg2, udf(default.myDoubleAvg(int_col1)) as my_avg3 from t1 -- !query 2 schema -struct<my_avg:double> +struct<my_avg:double,my_avg2:double,my_avg3:double> -- !query 2 output -102.5 +102.5 102.5 102.5 -- !query 3 -SELECT default.myDoubleAvg(int_col1, 3) as my_avg from t1 +SELECT default.myDoubleAvg(udf(int_col1), udf(3)) as my_avg from t1 -- !query 3 schema struct<> -- !query 3 output -46,12 +48,12 struct<> -- !query 5 -SELECT default.udaf1(int_col1) as udaf1 from t1 +SELECT default.udaf1(udf(int_col1)) as udaf1, udf(default.udaf1(udf(int_col1))) as udaf2, udf(default.udaf1(int_col1)) as udaf3 from t1 -- !query 5 schema struct<> -- !query 5 output org.apache.spark.sql.AnalysisException -Can not load class 'test.non.existent.udaf' when registering the function 'default.udaf1', please make sure it is on the classpath; line 1 pos 7 +Can not load class 'test.non.existent.udaf' when registering the function 'default.udaf1', please make sure it is on the classpath; line 1 pos 94 -- !query 6 ``` </p> </details> ## How was this patch tested? Tested as guided in SPARK-27921. Closes #25194 from vinodkc/br_Fix_SPARK-27921_3. Authored-by: Vinod KC <vinod.kc.in@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…nto UDF test base ## What changes were proposed in this pull request? This PR adds some tests converted from ```outer-join.sql``` to test UDFs. Please see contribution guide of this umbrella ticket - [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921). <details><summary>Diff comparing to 'outer-join.sql'</summary> <p> ```diff diff --git a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-outer-join.sql.out index 5db3bae..819f786 100644 --- a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-outer-join.sql.out -24,17 +24,17 struct<> -- !query 2 SELECT - (SUM(COALESCE(t1.int_col1, t2.int_col0))), - ((COALESCE(t1.int_col1, t2.int_col0)) * 2) + (udf(SUM(udf(COALESCE(t1.int_col1, t2.int_col0))))), + (udf(COALESCE(t1.int_col1, t2.int_col0)) * 2) FROM t1 RIGHT JOIN t2 - ON (t2.int_col0) = (t1.int_col1) -GROUP BY GREATEST(COALESCE(t2.int_col1, 109), COALESCE(t1.int_col1, -449)), + ON udf(t2.int_col0) = udf(t1.int_col1) +GROUP BY udf(GREATEST(COALESCE(udf(t2.int_col1), 109), COALESCE(t1.int_col1, udf(-449)))), COALESCE(t1.int_col1, t2.int_col0) -HAVING (SUM(COALESCE(t1.int_col1, t2.int_col0))) - > ((COALESCE(t1.int_col1, t2.int_col0)) * 2) +HAVING (udf(SUM(COALESCE(udf(t1.int_col1), udf(t2.int_col0))))) + > (udf(COALESCE(t1.int_col1, t2.int_col0)) * 2) -- !query 2 schema -struct<sum(coalesce(int_col1, int_col0)):bigint,(coalesce(int_col1, int_col0) * 2):int> +struct<CAST(udf(cast(sum(cast(cast(udf(cast(coalesce(int_col1, int_col0) as string)) as int) as bigint)) as string)) AS BIGINT):bigint,(CAST(udf(cast(coalesce(int_col1, int_col0) as string)) AS INT) * 2):int> -- !query 2 output -367 -734 -507 -1014 -70,10 +70,10 spark.sql.crossJoin.enabled true SELECT * FROM ( SELECT - COALESCE(t2.int_col1, t1.int_col1) AS int_col + udf(COALESCE(udf(t2.int_col1), udf(t1.int_col1))) AS int_col FROM t1 LEFT JOIN t2 ON false -) t where (t.int_col) is not null +) t where (udf(t.int_col)) is not null -- !query 6 schema struct<int_col:int> -- !query 6 output ``` </p> </details> ## How was this patch tested? Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921). Closes #25103 from huaxingao/spark-28285. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…UDF test base ## What changes were proposed in this pull request? This PR adds some tests converted from ```except.sql``` to test UDFs. Please see contribution guide of this umbrella ticket - [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921). <details><summary>Diff comparing to 'except.sql'</summary> <p> ```diff diff --git a/sql/core/src/test/resources/sql-tests/results/except.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-except.sql.out index c9b712d..27ca7ea 100644 --- a/sql/core/src/test/resources/sql-tests/results/except.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-except.sql.out -30,16 +30,16 struct<> -- !query 2 -SELECT * FROM t1 EXCEPT SELECT * FROM t2 +SELECT udf(k), udf(v) FROM t1 EXCEPT SELECT udf(k), udf(v) FROM t2 -- !query 2 schema -struct<k:string,v:int> +struct<CAST(udf(cast(k as string)) AS STRING):string,CAST(udf(cast(v as string)) AS INT):int> -- !query 2 output three 3 two 2 -- !query 3 -SELECT * FROM t1 EXCEPT SELECT * FROM t1 where v <> 1 and v <> 2 +SELECT * FROM t1 EXCEPT SELECT * FROM t1 where udf(v) <> 1 and v <> udf(2) -- !query 3 schema struct<k:string,v:int> -- !query 3 output -49,7 +49,7 two 2 -- !query 4 -SELECT * FROM t1 where v <> 1 and v <> 22 EXCEPT SELECT * FROM t1 where v <> 2 and v >= 3 +SELECT * FROM t1 where udf(v) <> 1 and v <> udf(22) EXCEPT SELECT * FROM t1 where udf(v) <> 2 and v >= udf(3) -- !query 4 schema struct<k:string,v:int> -- !query 4 output -59,7 +59,7 two 2 -- !query 5 SELECT t1.* FROM t1, t2 where t1.k = t2.k EXCEPT -SELECT t1.* FROM t1, t2 where t1.k = t2.k and t1.k != 'one' +SELECT t1.* FROM t1, t2 where t1.k = t2.k and t1.k != udf('one') -- !query 5 schema struct<k:string,v:int> -- !query 5 output -68,7 +68,7 one NULL -- !query 6 -SELECT * FROM t2 where v >= 1 and v <> 22 EXCEPT SELECT * FROM t1 +SELECT * FROM t2 where v >= udf(1) and udf(v) <> 22 EXCEPT SELECT * FROM t1 -- !query 6 schema struct<k:string,v:int> -- !query 6 output -77,9 +77,9 one 5 -- !query 7 -SELECT (SELECT min(k) FROM t2 WHERE t2.k = t1.k) min_t2 FROM t1 +SELECT (SELECT min(udf(k)) FROM t2 WHERE t2.k = t1.k) min_t2 FROM t1 MINUS -SELECT (SELECT min(k) FROM t2) abs_min_t2 FROM t1 WHERE t1.k = 'one' +SELECT (SELECT udf(min(k)) FROM t2) abs_min_t2 FROM t1 WHERE t1.k = udf('one') -- !query 7 schema struct<min_t2:string> -- !query 7 output -90,16 +90,17 two -- !query 8 SELECT t1.k FROM t1 -WHERE t1.v <= (SELECT max(t2.v) +WHERE t1.v <= (SELECT udf(max(udf(t2.v))) FROM t2 - WHERE t2.k = t1.k) + WHERE udf(t2.k) = udf(t1.k)) MINUS SELECT t1.k FROM t1 -WHERE t1.v >= (SELECT min(t2.v) +WHERE udf(t1.v) >= (SELECT min(udf(t2.v)) FROM t2 WHERE t2.k = t1.k) -- !query 8 schema -struct<k:string> +struct<> -- !query 8 output -two +java.lang.UnsupportedOperationException +Cannot evaluate expression: udf(cast(null as string)) ``` </p> </details> ## How was this patch tested? Tested as guided in [SPARK-27921.](https://issues.apache.org/jira/browse/SPARK-27921) Closes #25101 from huaxingao/spark-28277. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…r change at migration guide ## What changes were proposed in this pull request? This PR proposes to add one example to describe 'add_months' behaviour change by #25153. **Spark 2.4:** ```sql select add_months(DATE'2019-02-28', 1) ``` ``` +--------------------------------+ |add_months(DATE '2019-02-28', 1)| +--------------------------------+ | 2019-03-31| +--------------------------------+ ``` **Current master:** ```sql select add_months(DATE'2019-02-28', 1) ``` ``` +--------------------------------+ |add_months(DATE '2019-02-28', 1)| +--------------------------------+ | 2019-03-28| +--------------------------------+ ``` ## How was this patch tested? Manually tested on Spark 2.4.1 and the current master. Closes #25199 from HyukjinKwon/SPARK-28389. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Performance issue using explode was found when a complex field contains huge array is to get duplicated as the number of exploded array elements. Given example:
```scala
val df = spark.sparkContext.parallelize(Seq(("1",
Array.fill(M)({
val i = math.random
(i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
})))).toDF("col", "arr")
.selectExpr("col", "struct(col, arr) as st")
.selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col")
```
The explode causes `st` to be duplicated as many as the exploded elements.
Benchmarks it:
```
[info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
[info] Intel(R) Core(TM) i7-8750H CPU 2.20GHz
[info] generate big nested struct array: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] generate big nested struct array wholestage off 52668 53162 699 0.0 877803.4 1.0X
[info] generate big nested struct array wholestage on 47261 49093 1125 0.0 787690.2 1.1X
[info]
```
The query plan:
```
== Physical Plan ==
Project [col#508, st#512.col AS col1#515, arr_col#519]
+- Generate explode(st#512.arr), [col#508, st#512], false, [arr_col#519]
+- Project [_1#503 AS col#508, named_struct(col, _1#503, arr, _2#504) AS st#512]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#503, mapobjects(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._1, true, false), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._2, true, false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._3, true, false), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._4, true, false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#504]
+- Scan[obj#534]
```
This patch takes nested column pruning approach to prune unnecessary nested fields. It adds a projection of the needed nested fields as aliases on the child of `Generate`, and substitutes them by alias attributes on the projection on top of `Generate`.
Benchmarks it after the change:
```
[info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
[info] Intel(R) Core(TM) i7-8750H CPU 2.20GHz
[info] generate big nested struct array: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] generate big nested struct array wholestage off 311 331 28 0.2 5188.6 1.0X
[info] generate big nested struct array wholestage on 297 312 15 0.2 4947.3 1.0X
[info]
```
The query plan:
```
== Physical Plan ==
Project [col#592, _gen_alias_608#608 AS col1#599, arr_col#603]
+- Generate explode(st#596.arr), [col#592, _gen_alias_608#608], false, [arr_col#603]
+- Project [_1#587 AS col#592, named_struct(col, _1#587, arr, _2#588) AS st#596, _1#587 AS _gen_alias_608#608]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(in
put[0, scala.Tuple2, true]))._1, true, false) AS _1#587, mapobjects(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4),
if (isnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._1, true, false), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._2, true, false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._3, true, false), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._4, true, false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#588]
+- Scan[obj#586]
```
This behavior is controlled by a SQL config `spark.sql.optimizer.expression.nestedPruning.enabled`.
## How was this patch tested?
Added benchmark.
Closes #24637 from viirya/SPARK-27707.
Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
… values ## What changes were proposed in this pull request? Use `org.apache.spark.mllib.util.TestingUtils` object across `MLLIB` component to compare floating point values in tests. ## How was this patch tested? `build/mvn test` - existing tests against updated code. Closes #25191 from eugen-prokhorenko/mllib-testingutils-double-comparison. Authored-by: Ievgen Prokhorenko <eugen.prokhorenko@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…on.sql' into UDF test base ## What changes were proposed in this pull request? This PR adds some tests converted from `join-empty-relation.sql` to test UDFs. Please see contribution guide of this umbrella ticket - [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921). <details><summary>Diff comparing to 'join-empty-relation.sql'</summary> <p> ```diff diff --git a/sql/core/src/test/resources/sql-tests/results/join-empty-relation.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-join-empty-relation.sql.out index 857073a..e79d01f 100644 --- a/sql/core/src/test/resources/sql-tests/results/join-empty-relation.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-join-empty-relation.sql.out -27,111 +27,111 struct<> -- !query 3 -SELECT * FROM t1 INNER JOIN empty_table +SELECT udf(t1.a), udf(empty_table.a) FROM t1 INNER JOIN empty_table ON (udf(t1.a) = udf(udf(empty_table.a))) -- !query 3 schema -struct<a:int,a:int> +struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(a as string)) AS INT):int> -- !query 3 output -- !query 4 -SELECT * FROM t1 CROSS JOIN empty_table +SELECT udf(t1.a), udf(udf(empty_table.a)) FROM t1 CROSS JOIN empty_table ON (udf(udf(t1.a)) = udf(empty_table.a)) -- !query 4 schema -struct<a:int,a:int> +struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(cast(udf(cast(a as string)) as int) as string)) AS INT):int> -- !query 4 output -- !query 5 -SELECT * FROM t1 LEFT OUTER JOIN empty_table +SELECT udf(udf(t1.a)), empty_table.a FROM t1 LEFT OUTER JOIN empty_table ON (udf(t1.a) = udf(empty_table.a)) -- !query 5 schema -struct<a:int,a:int> +struct<CAST(udf(cast(cast(udf(cast(a as string)) as int) as string)) AS INT):int,a:int> -- !query 5 output 1 NULL -- !query 6 -SELECT * FROM t1 RIGHT OUTER JOIN empty_table +SELECT udf(t1.a), udf(empty_table.a) FROM t1 RIGHT OUTER JOIN empty_table ON (udf(t1.a) = udf(empty_table.a)) -- !query 6 schema -struct<a:int,a:int> +struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(a as string)) AS INT):int> -- !query 6 output -- !query 7 -SELECT * FROM t1 FULL OUTER JOIN empty_table +SELECT udf(t1.a), empty_table.a FROM t1 FULL OUTER JOIN empty_table ON (udf(t1.a) = udf(empty_table.a)) -- !query 7 schema -struct<a:int,a:int> +struct<CAST(udf(cast(a as string)) AS INT):int,a:int> -- !query 7 output 1 NULL -- !query 8 -SELECT * FROM t1 LEFT SEMI JOIN empty_table +SELECT udf(udf(t1.a)) FROM t1 LEFT SEMI JOIN empty_table ON (udf(t1.a) = udf(udf(empty_table.a))) -- !query 8 schema -struct<a:int> +struct<CAST(udf(cast(cast(udf(cast(a as string)) as int) as string)) AS INT):int> -- !query 8 output -- !query 9 -SELECT * FROM t1 LEFT ANTI JOIN empty_table +SELECT udf(t1.a) FROM t1 LEFT ANTI JOIN empty_table ON (udf(t1.a) = udf(empty_table.a)) -- !query 9 schema -struct<a:int> +struct<CAST(udf(cast(a as string)) AS INT):int> -- !query 9 output 1 -- !query 10 -SELECT * FROM empty_table INNER JOIN t1 +SELECT udf(empty_table.a), udf(t1.a) FROM empty_table INNER JOIN t1 ON (udf(udf(empty_table.a)) = udf(t1.a)) -- !query 10 schema -struct<a:int,a:int> +struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(a as string)) AS INT):int> -- !query 10 output -- !query 11 -SELECT * FROM empty_table CROSS JOIN t1 +SELECT udf(empty_table.a), udf(udf(t1.a)) FROM empty_table CROSS JOIN t1 ON (udf(empty_table.a) = udf(udf(t1.a))) -- !query 11 schema -struct<a:int,a:int> +struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(cast(udf(cast(a as string)) as int) as string)) AS INT):int> -- !query 11 output -- !query 12 -SELECT * FROM empty_table LEFT OUTER JOIN t1 +SELECT udf(udf(empty_table.a)), udf(t1.a) FROM empty_table LEFT OUTER JOIN t1 ON (udf(empty_table.a) = udf(t1.a)) -- !query 12 schema -struct<a:int,a:int> +struct<CAST(udf(cast(cast(udf(cast(a as string)) as int) as string)) AS INT):int,CAST(udf(cast(a as string)) AS INT):int> -- !query 12 output -- !query 13 -SELECT * FROM empty_table RIGHT OUTER JOIN t1 +SELECT empty_table.a, udf(t1.a) FROM empty_table RIGHT OUTER JOIN t1 ON (udf(empty_table.a) = udf(t1.a)) -- !query 13 schema -struct<a:int,a:int> +struct<a:int,CAST(udf(cast(a as string)) AS INT):int> -- !query 13 output NULL 1 -- !query 14 -SELECT * FROM empty_table FULL OUTER JOIN t1 +SELECT empty_table.a, udf(udf(t1.a)) FROM empty_table FULL OUTER JOIN t1 ON (udf(empty_table.a) = udf(t1.a)) -- !query 14 schema -struct<a:int,a:int> +struct<a:int,CAST(udf(cast(cast(udf(cast(a as string)) as int) as string)) AS INT):int> -- !query 14 output NULL 1 -- !query 15 -SELECT * FROM empty_table LEFT SEMI JOIN t1 +SELECT udf(udf(empty_table.a)) FROM empty_table LEFT SEMI JOIN t1 ON (udf(empty_table.a) = udf(udf(t1.a))) -- !query 15 schema -struct<a:int> +struct<CAST(udf(cast(cast(udf(cast(a as string)) as int) as string)) AS INT):int> -- !query 15 output -- !query 16 -SELECT * FROM empty_table LEFT ANTI JOIN t1 +SELECT empty_table.a FROM empty_table LEFT ANTI JOIN t1 ON (udf(empty_table.a) = udf(t1.a)) -- !query 16 schema struct<a:int> -- !query 16 output -139,56 +139,56 struct<a:int> -- !query 17 -SELECT * FROM empty_table INNER JOIN empty_table +SELECT udf(empty_table.a) FROM empty_table INNER JOIN empty_table AS empty_table2 ON (udf(empty_table.a) = udf(udf(empty_table2.a))) -- !query 17 schema -struct<a:int,a:int> +struct<CAST(udf(cast(a as string)) AS INT):int> -- !query 17 output -- !query 18 -SELECT * FROM empty_table CROSS JOIN empty_table +SELECT udf(udf(empty_table.a)) FROM empty_table CROSS JOIN empty_table AS empty_table2 ON (udf(udf(empty_table.a)) = udf(empty_table2.a)) -- !query 18 schema -struct<a:int,a:int> +struct<CAST(udf(cast(cast(udf(cast(a as string)) as int) as string)) AS INT):int> -- !query 18 output -- !query 19 -SELECT * FROM empty_table LEFT OUTER JOIN empty_table +SELECT udf(empty_table.a) FROM empty_table LEFT OUTER JOIN empty_table AS empty_table2 ON (udf(empty_table.a) = udf(empty_table2.a)) -- !query 19 schema -struct<a:int,a:int> +struct<CAST(udf(cast(a as string)) AS INT):int> -- !query 19 output -- !query 20 -SELECT * FROM empty_table RIGHT OUTER JOIN empty_table +SELECT udf(udf(empty_table.a)) FROM empty_table RIGHT OUTER JOIN empty_table AS empty_table2 ON (udf(empty_table.a) = udf(udf(empty_table2.a))) -- !query 20 schema -struct<a:int,a:int> +struct<CAST(udf(cast(cast(udf(cast(a as string)) as int) as string)) AS INT):int> -- !query 20 output -- !query 21 -SELECT * FROM empty_table FULL OUTER JOIN empty_table +SELECT udf(empty_table.a) FROM empty_table FULL OUTER JOIN empty_table AS empty_table2 ON (udf(empty_table.a) = udf(empty_table2.a)) -- !query 21 schema -struct<a:int,a:int> +struct<CAST(udf(cast(a as string)) AS INT):int> -- !query 21 output -- !query 22 -SELECT * FROM empty_table LEFT SEMI JOIN empty_table +SELECT udf(udf(empty_table.a)) FROM empty_table LEFT SEMI JOIN empty_table AS empty_table2 ON (udf(empty_table.a) = udf(empty_table2.a)) -- !query 22 schema -struct<a:int> +struct<CAST(udf(cast(cast(udf(cast(a as string)) as int) as string)) AS INT):int> -- !query 22 output -- !query 23 -SELECT * FROM empty_table LEFT ANTI JOIN empty_table +SELECT udf(empty_table.a) FROM empty_table LEFT ANTI JOIN empty_table AS empty_table2 ON (udf(empty_table.a) = udf(empty_table2.a)) -- !query 23 schema -struct<a:int> +struct<CAST(udf(cast(a as string)) AS INT):int> -- !query 23 output ``` </p> </details> ## How was this patch tested? Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921). Closes #25127 from imback82/join-empty-relation-sql. Authored-by: Terry Kim <yuminkim@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…e API ## What changes were proposed in this pull request? SPARK-28199 (#24996) hid implementations of Triggers into `private[sql]` and encourage end users to use `Trigger.xxx` methods instead. As I got some post review comment on 7548a88#r34366934 we could remove annotations which are meant to be used with public API. ## How was this patch tested? N/A Closes #25200 from HeartSaVioR/SPARK-28199-FOLLOWUP. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…comparison assertions ## What changes were proposed in this pull request? This PR removes a few hardware-dependent assertions which can cause a failure in `aarch64`. **x86_64** ``` rootdonotdel-openlab-allinone-l00242678:/home/ubuntu# uname -a Linux donotdel-openlab-allinone-l00242678 4.4.0-154-generic #181-Ubuntu SMP Tue Jun 25 05:29:03 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux scala> import java.lang.Float.floatToRawIntBits import java.lang.Float.floatToRawIntBits scala> floatToRawIntBits(0.0f/0.0f) res0: Int = -4194304 scala> floatToRawIntBits(Float.NaN) res1: Int = 2143289344 ``` **aarch64** ``` [rootarm-huangtianhua spark]# uname -a Linux arm-huangtianhua 4.14.0-49.el7a.aarch64 #1 SMP Tue Apr 10 17:22:26 UTC 2018 aarch64 aarch64 aarch64 GNU/Linux scala> import java.lang.Float.floatToRawIntBits import java.lang.Float.floatToRawIntBits scala> floatToRawIntBits(0.0f/0.0f) res1: Int = 2143289344 scala> floatToRawIntBits(Float.NaN) res2: Int = 2143289344 ``` ## How was this patch tested? Pass the Jenkins (This removes the test coverage). Closes #25186 from huangtianhua/special-test-case-for-aarch64. Authored-by: huangtianhua <huangtianhua@huawei.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…ql' into UDF test base ## What changes were proposed in this pull request? This PR adds some tests converted from group-analytics.sql to test UDFs. Please see contribution guide of this umbrella ticket - SPARK-27921. <details><summary>Diff comparing to 'group-analytics.sql'</summary> <p> ```diff diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out index 31e9e08..3439a05 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out -13,9 +13,9 struct<> -- !query 1 -SELECT a + b, b, udf(SUM(a - b)) FROM testData GROUP BY a + b, b WITH CUBE +SELECT a + b, b, SUM(a - b) FROM testData GROUP BY a + b, b WITH CUBE -- !query 1 schema -struct<(a + b):int,b:int,CAST(udf(cast(sum(cast((a - b) as bigint)) as string)) AS BIGINT):bigint> +struct<(a + b):int,b:int,sum((a - b)):bigint> -- !query 1 output 2 1 0 2 NULL 0 -33,9 +33,9 NULL NULL 3 -- !query 2 -SELECT a, udf(b), SUM(b) FROM testData GROUP BY a, b WITH CUBE +SELECT a, b, SUM(b) FROM testData GROUP BY a, b WITH CUBE -- !query 2 schema -struct<a:int,CAST(udf(cast(b as string)) AS INT):int,sum(b):bigint> +struct<a:int,b:int,sum(b):bigint> -- !query 2 output 1 1 1 1 2 2 -52,9 +52,9 NULL NULL 9 -- !query 3 -SELECT udf(a + b), b, SUM(a - b) FROM testData GROUP BY a + b, b WITH ROLLUP +SELECT a + b, b, SUM(a - b) FROM testData GROUP BY a + b, b WITH ROLLUP -- !query 3 schema -struct<CAST(udf(cast((a + b) as string)) AS INT):int,b:int,sum((a - b)):bigint> +struct<(a + b):int,b:int,sum((a - b)):bigint> -- !query 3 output 2 1 0 2 NULL 0 -70,9 +70,9 NULL NULL 3 -- !query 4 -SELECT a, b, udf(SUM(b)) FROM testData GROUP BY a, b WITH ROLLUP +SELECT a, b, SUM(b) FROM testData GROUP BY a, b WITH ROLLUP -- !query 4 schema -struct<a:int,b:int,CAST(udf(cast(sum(cast(b as bigint)) as string)) AS BIGINT):bigint> +struct<a:int,b:int,sum(b):bigint> -- !query 4 output 1 1 1 1 2 2 -97,7 +97,7 struct<> -- !query 6 -SELECT course, year, SUM(earnings) FROM courseSales GROUP BY ROLLUP(course, year) ORDER BY udf(course), year +SELECT course, year, SUM(earnings) FROM courseSales GROUP BY ROLLUP(course, year) ORDER BY course, year -- !query 6 schema struct<course:string,year:int,sum(earnings):bigint> -- !query 6 output -111,7 +111,7 dotNET 2013 48000 -- !query 7 -SELECT course, year, SUM(earnings) FROM courseSales GROUP BY CUBE(course, year) ORDER BY course, udf(year) +SELECT course, year, SUM(earnings) FROM courseSales GROUP BY CUBE(course, year) ORDER BY course, year -- !query 7 schema struct<course:string,year:int,sum(earnings):bigint> -- !query 7 output -127,9 +127,9 dotNET 2013 48000 -- !query 8 -SELECT course, udf(year), SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course, year) +SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course, year) -- !query 8 schema -struct<course:string,CAST(udf(cast(year as string)) AS INT):int,sum(earnings):bigint> +struct<course:string,year:int,sum(earnings):bigint> -- !query 8 output Java NULL 50000 NULL 2012 35000 -138,26 +138,26 dotNET NULL 63000 -- !query 9 -SELECT course, year, udf(SUM(earnings)) FROM courseSales GROUP BY course, year GROUPING SETS(course) +SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course) -- !query 9 schema -struct<course:string,year:int,CAST(udf(cast(sum(cast(earnings as bigint)) as string)) AS BIGINT):bigint> +struct<course:string,year:int,sum(earnings):bigint> -- !query 9 output Java NULL 50000 dotNET NULL 63000 -- !query 10 -SELECT udf(course), year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(year) +SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(year) -- !query 10 schema -struct<CAST(udf(cast(course as string)) AS STRING):string,year:int,sum(earnings):bigint> +struct<course:string,year:int,sum(earnings):bigint> -- !query 10 output NULL 2012 35000 NULL 2013 78000 -- !query 11 -SELECT course, udf(SUM(earnings)) AS sum FROM courseSales -GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, udf(sum) +SELECT course, SUM(earnings) AS sum FROM courseSales +GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum -- !query 11 schema struct<course:string,sum:bigint> -- !query 11 output -173,7 +173,7 dotNET 63000 -- !query 12 SELECT course, SUM(earnings) AS sum, GROUPING_ID(course, earnings) FROM courseSales -GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY udf(course), sum +GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum -- !query 12 schema struct<course:string,sum:bigint,grouping_id(course, earnings):int> -- !query 12 output -188,10 +188,10 dotNET 63000 1 -- !query 13 -SELECT udf(course), udf(year), GROUPING(course), GROUPING(year), GROUPING_ID(course, year) FROM courseSales +SELECT course, year, GROUPING(course), GROUPING(year), GROUPING_ID(course, year) FROM courseSales GROUP BY CUBE(course, year) -- !query 13 schema -struct<CAST(udf(cast(course as string)) AS STRING):string,CAST(udf(cast(year as string)) AS INT):int,grouping(course):tinyint,grouping(year):tinyint,grouping_id(course, year):int> +struct<course:string,year:int,grouping(course):tinyint,grouping(year):tinyint,grouping_id(course, year):int> -- !query 13 output Java 2012 0 0 0 Java 2013 0 0 0 -205,7 +205,7 dotNET NULL 0 1 1 -- !query 14 -SELECT course, udf(year), GROUPING(course) FROM courseSales GROUP BY course, year +SELECT course, year, GROUPING(course) FROM courseSales GROUP BY course, year -- !query 14 schema struct<> -- !query 14 output -214,7 +214,7 grouping() can only be used with GroupingSets/Cube/Rollup; -- !query 15 -SELECT course, udf(year), GROUPING_ID(course, year) FROM courseSales GROUP BY course, year +SELECT course, year, GROUPING_ID(course, year) FROM courseSales GROUP BY course, year -- !query 15 schema struct<> -- !query 15 output -223,7 +223,7 grouping_id() can only be used with GroupingSets/Cube/Rollup; -- !query 16 -SELECT course, year, grouping__id FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, course, udf(year) +SELECT course, year, grouping__id FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, course, year -- !query 16 schema struct<course:string,year:int,grouping__id:int> -- !query 16 output -240,7 +240,7 NULL NULL 3 -- !query 17 SELECT course, year FROM courseSales GROUP BY CUBE(course, year) -HAVING GROUPING(year) = 1 AND GROUPING_ID(course, year) > 0 ORDER BY course, udf(year) +HAVING GROUPING(year) = 1 AND GROUPING_ID(course, year) > 0 ORDER BY course, year -- !query 17 schema struct<course:string,year:int> -- !query 17 output -250,7 +250,7 dotNET NULL -- !query 18 -SELECT course, udf(year) FROM courseSales GROUP BY course, year HAVING GROUPING(course) > 0 +SELECT course, year FROM courseSales GROUP BY course, year HAVING GROUPING(course) > 0 -- !query 18 schema struct<> -- !query 18 output -259,7 +259,7 grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup; -- !query 19 -SELECT course, udf(udf(year)) FROM courseSales GROUP BY course, year HAVING GROUPING_ID(course) > 0 +SELECT course, year FROM courseSales GROUP BY course, year HAVING GROUPING_ID(course) > 0 -- !query 19 schema struct<> -- !query 19 output -268,9 +268,9 grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup; -- !query 20 -SELECT udf(course), year FROM courseSales GROUP BY CUBE(course, year) HAVING grouping__id > 0 +SELECT course, year FROM courseSales GROUP BY CUBE(course, year) HAVING grouping__id > 0 -- !query 20 schema -struct<CAST(udf(cast(course as string)) AS STRING):string,year:int> +struct<course:string,year:int> -- !query 20 output Java NULL NULL 2012 -281,7 +281,7 dotNET NULL -- !query 21 SELECT course, year, GROUPING(course), GROUPING(year) FROM courseSales GROUP BY CUBE(course, year) -ORDER BY GROUPING(course), GROUPING(year), course, udf(year) +ORDER BY GROUPING(course), GROUPING(year), course, year -- !query 21 schema struct<course:string,year:int,grouping(course):tinyint,grouping(year):tinyint> -- !query 21 output -298,7 +298,7 NULL NULL 1 1 -- !query 22 SELECT course, year, GROUPING_ID(course, year) FROM courseSales GROUP BY CUBE(course, year) -ORDER BY GROUPING(course), GROUPING(year), course, udf(year) +ORDER BY GROUPING(course), GROUPING(year), course, year -- !query 22 schema struct<course:string,year:int,grouping_id(course, year):int> -- !query 22 output -314,7 +314,7 NULL NULL 3 -- !query 23 -SELECT course, udf(year) FROM courseSales GROUP BY course, udf(year) ORDER BY GROUPING(course) +SELECT course, year FROM courseSales GROUP BY course, year ORDER BY GROUPING(course) -- !query 23 schema struct<> -- !query 23 output -323,7 +323,7 grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup; -- !query 24 -SELECT course, udf(year) FROM courseSales GROUP BY course, udf(year) ORDER BY GROUPING_ID(course) +SELECT course, year FROM courseSales GROUP BY course, year ORDER BY GROUPING_ID(course) -- !query 24 schema struct<> -- !query 24 output -332,7 +332,7 grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup; -- !query 25 -SELECT course, year FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, udf(course), year +SELECT course, year FROM courseSales GROUP BY CUBE(course, year) ORDER BY grouping__id, course, year -- !query 25 schema struct<course:string,year:int> -- !query 25 output -348,7 +348,7 NULL NULL -- !query 26 -SELECT udf(a + b) AS k1, udf(b) AS k2, SUM(a - b) FROM testData GROUP BY CUBE(k1, k2) +SELECT a + b AS k1, b AS k2, SUM(a - b) FROM testData GROUP BY CUBE(k1, k2) -- !query 26 schema struct<k1:int,k2:int,sum((a - b)):bigint> -- !query 26 output -368,7 +368,7 NULL NULL 3 -- !query 27 -SELECT udf(udf(a + b)) AS k, b, SUM(a - b) FROM testData GROUP BY ROLLUP(k, b) +SELECT a + b AS k, b, SUM(a - b) FROM testData GROUP BY ROLLUP(k, b) -- !query 27 schema struct<k:int,b:int,sum((a - b)):bigint> -- !query 27 output -386,9 +386,9 NULL NULL 3 -- !query 28 -SELECT udf(a + b), udf(udf(b)) AS k, SUM(a - b) FROM testData GROUP BY a + b, k GROUPING SETS(k) +SELECT a + b, b AS k, SUM(a - b) FROM testData GROUP BY a + b, k GROUPING SETS(k) -- !query 28 schema -struct<CAST(udf(cast((a + b) as string)) AS INT):int,k:int,sum((a - b)):bigint> +struct<(a + b):int,k:int,sum((a - b)):bigint> -- !query 28 output NULL 1 3 NULL 2 0 ``` </p> </details> ## How was this patch tested? Tested as guided in SPARK-27921. Verified pandas & pyarrow versions: ```$python3 Python 3.6.8 (default, Jan 14 2019, 11:02:34) [GCC 8.0.1 20180414 (experimental) [trunk revision 259383]] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import pandas >>> import pyarrow >>> pyarrow.__version__ '0.14.0' >>> pandas.__version__ '0.24.2' ``` From the sql output it seems that sql statements are evaluated correctly given that udf returns a string and may change results as Null will be returned as None and will be counted in returned values. Closes #25196 from skonto/group-analytics.sql. Authored-by: Stavros Kontopoulos <st.kontopoulos@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
… into UDF test base ## What changes were proposed in this pull request? This PR adds some tests converted from `inline-table.sql` to test UDFs. Please see contribution guide of this umbrella ticket - [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921). <details><summary>Diff comparing to 'inline-table.sql'</summary> <p> ```diff diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out index 4e80f0b..2cf24e5 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out -3,33 +3,33 -- !query 0 -select * from values ("one", 1) +select udf(col1), udf(col2) from values ("one", 1) -- !query 0 schema -struct<col1:string,col2:int> +struct<CAST(udf(cast(col1 as string)) AS STRING):string,CAST(udf(cast(col2 as string)) AS INT):int> -- !query 0 output one 1 -- !query 1 -select * from values ("one", 1) as data +select udf(col1), udf(udf(col2)) from values ("one", 1) as data -- !query 1 schema -struct<col1:string,col2:int> +struct<CAST(udf(cast(col1 as string)) AS STRING):string,CAST(udf(cast(cast(udf(cast(col2 as string)) as int) as string)) AS INT):int> -- !query 1 output one 1 -- !query 2 -select * from values ("one", 1) as data(a, b) +select udf(a), b from values ("one", 1) as data(a, b) -- !query 2 schema -struct<a:string,b:int> +struct<CAST(udf(cast(a as string)) AS STRING):string,b:int> -- !query 2 output one 1 -- !query 3 -select * from values 1, 2, 3 as data(a) +select udf(a) from values 1, 2, 3 as data(a) -- !query 3 schema -struct<a:int> +struct<CAST(udf(cast(a as string)) AS INT):int> -- !query 3 output 1 2 -37,9 +37,9 struct<a:int> -- !query 4 -select * from values ("one", 1), ("two", 2), ("three", null) as data(a, b) +select udf(a), b from values ("one", 1), ("two", 2), ("three", null) as data(a, b) -- !query 4 schema -struct<a:string,b:int> +struct<CAST(udf(cast(a as string)) AS STRING):string,b:int> -- !query 4 output one 1 three NULL -47,107 +47,107 two 2 -- !query 5 -select * from values ("one", null), ("two", null) as data(a, b) +select udf(a), b from values ("one", null), ("two", null) as data(a, b) -- !query 5 schema -struct<a:string,b:null> +struct<CAST(udf(cast(a as string)) AS STRING):string,b:null> -- !query 5 output one NULL two NULL -- !query 6 -select * from values ("one", 1), ("two", 2L) as data(a, b) +select udf(a), b from values ("one", 1), ("two", 2L) as data(a, b) -- !query 6 schema -struct<a:string,b:bigint> +struct<CAST(udf(cast(a as string)) AS STRING):string,b:bigint> -- !query 6 output one 1 two 2 -- !query 7 -select * from values ("one", 1 + 0), ("two", 1 + 3L) as data(a, b) +select udf(udf(a)), udf(b) from values ("one", 1 + 0), ("two", 1 + 3L) as data(a, b) -- !query 7 schema -struct<a:string,b:bigint> +struct<CAST(udf(cast(cast(udf(cast(a as string)) as string) as string)) AS STRING):string,CAST(udf(cast(b as string)) AS BIGINT):bigint> -- !query 7 output one 1 two 4 -- !query 8 -select * from values ("one", array(0, 1)), ("two", array(2, 3)) as data(a, b) +select udf(a), b from values ("one", array(0, 1)), ("two", array(2, 3)) as data(a, b) -- !query 8 schema -struct<a:string,b:array<int>> +struct<CAST(udf(cast(a as string)) AS STRING):string,b:array<int>> -- !query 8 output one [0,1] two [2,3] -- !query 9 -select * from values ("one", 2.0), ("two", 3.0D) as data(a, b) +select udf(a), b from values ("one", 2.0), ("two", 3.0D) as data(a, b) -- !query 9 schema -struct<a:string,b:double> +struct<CAST(udf(cast(a as string)) AS STRING):string,b:double> -- !query 9 output one 2.0 two 3.0 -- !query 10 -select * from values ("one", rand(5)), ("two", 3.0D) as data(a, b) +select udf(a), b from values ("one", rand(5)), ("two", 3.0D) as data(a, b) -- !query 10 schema struct<> -- !query 10 output org.apache.spark.sql.AnalysisException -cannot evaluate expression rand(5) in inline table definition; line 1 pos 29 +cannot evaluate expression rand(5) in inline table definition; line 1 pos 37 -- !query 11 -select * from values ("one", 2.0), ("two") as data(a, b) +select udf(a), udf(b) from values ("one", 2.0), ("two") as data(a, b) -- !query 11 schema struct<> -- !query 11 output org.apache.spark.sql.AnalysisException -expected 2 columns but found 1 columns in row 1; line 1 pos 14 +expected 2 columns but found 1 columns in row 1; line 1 pos 27 -- !query 12 -select * from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b) +select udf(a), udf(b) from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b) -- !query 12 schema struct<> -- !query 12 output org.apache.spark.sql.AnalysisException -incompatible types found in column b for inline table; line 1 pos 14 +incompatible types found in column b for inline table; line 1 pos 27 -- !query 13 -select * from values ("one"), ("two") as data(a, b) +select udf(a), udf(b) from values ("one"), ("two") as data(a, b) -- !query 13 schema struct<> -- !query 13 output org.apache.spark.sql.AnalysisException -expected 2 columns but found 1 columns in row 0; line 1 pos 14 +expected 2 columns but found 1 columns in row 0; line 1 pos 27 -- !query 14 -select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) +select udf(a), udf(b) from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) -- !query 14 schema struct<> -- !query 14 output org.apache.spark.sql.AnalysisException -Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29 +Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 42 -- !query 15 -select * from values ("one", count(1)), ("two", 2) as data(a, b) +select udf(a), udf(b) from values ("one", count(1)), ("two", 2) as data(a, b) -- !query 15 schema struct<> -- !query 15 output org.apache.spark.sql.AnalysisException -cannot evaluate expression count(1) in inline table definition; line 1 pos 29 +cannot evaluate expression count(1) in inline table definition; line 1 pos 42 -- !query 16 -select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) as data(a, b) +select udf(a), b from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) as data(a, b) -- !query 16 schema -struct<a:timestamp,b:array<timestamp>> +struct<CAST(udf(cast(a as string)) AS TIMESTAMP):timestamp,b:array<timestamp>> -- !query 16 output 1991-12-06 00:00:00 [1991-12-06 01:00:00.0,1991-12-06 12:00:00.0] ``` </p> </details> ## How was this patch tested? Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921). Closes #25124 from imback82/inline-table-sql. Authored-by: Terry Kim <yuminkim@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…hDownPredicate` ## What changes were proposed in this pull request? The optimize rule `PushDownPredicate` has been combined into `PushDownPredicates`, update the comment that references the old rule. ## How was this patch tested? N/A Closes #25207 from jiangxb1987/comment. Authored-by: Xingbo Jiang <xingbo.jiang@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…samplingRate from Python TreeEnsembleParams ## What changes were proposed in this pull request? Remove deprecated setFeatureSubsetStrategy and setSubsamplingRate from Python TreeEnsembleParams ## How was this patch tested? Use existing tests. Closes #25046 from huaxingao/spark-28243. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
…eSuite ## What changes were proposed in this pull request? This pr is to remove the unnecessary test in DataFrameSuite. ## How was this patch tested? N/A Closes #25216 from maropu/SPARK-28189-FOLLOWUP. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Adding doc for the kafka source minPartitions option to "Structured Streaming + Kafka Integration Guide" The text is based on the content in https://docs.databricks.com/spark/latest/structured-streaming/kafka.html#configuration Closes #25219 from arunpandianp/SPARK-28464. Authored-by: Arun Pandian <apandian@groupon.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…ames in PlanTestBase.comparePlans failures ## What changes were proposed in this pull request? This pr proposes to add a prefix '*' to non-nullable attribute names in PlanTestBase.comparePlans failures. In the current master, nullability mismatches might generate the same error message for left/right logical plans like this; ``` // This failure message was extracted from #24765 - constraints should be inferred from aliased literals *** FAILED *** == FAIL: Plans do not match === !'Join Inner, (two#0 = a#0) 'Join Inner, (two#0 = a#0) :- Filter (isnotnull(a#0) AND (2 <=> a#0)) :- Filter (isnotnull(a#0) AND (2 <=> a#0)) : +- LocalRelation <empty>, [a#0, b#0, c#0] : +- LocalRelation <empty>, [a#0, b#0, c#0] +- Project [2 AS two#0] +- Project [2 AS two#0] +- LocalRelation <empty>, [a#0, b#0, c#0] +- LocalRelation <empty>, [a#0, b#0, c#0] (PlanTest.scala:145) ``` With this pr, this error message is changed to one below; ``` - constraints should be inferred from aliased literals *** FAILED *** == FAIL: Plans do not match === !'Join Inner, (*two#0 = a#0) 'Join Inner, (*two#0 = *a#0) :- Filter (isnotnull(a#0) AND (2 <=> a#0)) :- Filter (isnotnull(a#0) AND (2 <=> a#0)) : +- LocalRelation <empty>, [a#0, b#0, c#0] : +- LocalRelation <empty>, [a#0, b#0, c#0] +- Project [2 AS two#0] +- Project [2 AS two#0] +- LocalRelation <empty>, [a#0, b#0, c#0] +- LocalRelation <empty>, [a#0, b#0, c#0] (PlanTest.scala:145) ``` ## How was this patch tested? N/A Closes #25213 from maropu/MarkForNullability. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…fka-integration ## What changes were proposed in this pull request? This PR is a follow-up PR to recover three links from [the previous commit](https://github.com/apache/spark/pull/22703/files#diff-21245da8f8dbfef6401c5500f559f0bc). Currently, those three are broken. ``` $ git grep structured-streaming-kafka-0-10-integration structured-streaming-programming-guide.md: - **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-0-10-integration.html) for more details. structured-streaming-programming-guide.md: See the <a href="structured-streaming-kafka-0-10-integration.html">Kafka Integration Guide</a>. structured-streaming-programming-guide.md: <td>See the <a href="structured-streaming-kafka-0-10-integration.html">Kafka Integration Guide</a></td> ``` It's because we have `structured-streaming-kafka-integration.html` instead of `structured-streaming-kafka-0-10-integration.html`. ``` $ find . -name structured-streaming-kafka-0-10-integration.md $ find . -name structured-streaming-kafka-integration.md ./structured-streaming-kafka-integration.md ``` ## How was this patch tested? Manual. Closes #25221 from dongjoon-hyun/SPARK-25705. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…n ScalaUDF result ## What changes were proposed in this pull request? When a `ScalaUDF` returns a value which overflows, currently it returns null regardless of the value of the config `spark.sql.decimalOperations.nullOnOverflow`. The PR makes it respect the above-mentioned config and behave accordingly. ## How was this patch tested? added UT Closes #25144 from mgaido91/SPARK-28369. Authored-by: Marco Gaido <marcogaido91@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…T with V2 ## What changes were proposed in this pull request? Implements the `REPLACE TABLE` and `REPLACE TABLE AS SELECT` logical plans. `REPLACE TABLE` is now a valid operation in spark-sql provided that the tables being modified are managed by V2 catalogs. This also introduces an atomic mix-in that table catalogs can choose to implement. Table catalogs can now implement `TransactionalTableCatalog`. The semantics of this API are that table creation and replacement can be "staged" and then "committed". On the execution of `REPLACE TABLE AS SELECT`, `REPLACE TABLE`, and `CREATE TABLE AS SELECT`, if the catalog implements transactional operations, the physical plan will use said functionality. Otherwise, these operations fall back on non-atomic variants. For `REPLACE TABLE` in particular, the usage of non-atomic operations can unfortunately lead to inconsistent state. ## How was this patch tested? Unit tests - multiple additions to `DataSourceV2SQLSuite`. Closes #24798 from mccheah/spark-27724. Authored-by: mcheah <mcheah@palantir.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ing Encoder without touching Scala Reflection
## What changes were proposed in this pull request?
Because `Encoder` is not thread safe, the user cannot reuse an `Encoder` in multiple `Dataset`s. However, creating an `Encoder` for a complicated class is slow due to Scala Reflection. To eliminate the cost of Scala Reflection, right now I usually use the private API `ExpressionEncoder.copy` as follows:
```scala
object FooEncoder {
private lazy val _encoder: ExpressionEncoder[Foo] = ExpressionEncoder[Foo]()
implicit def encoder: ExpressionEncoder[Foo] = _encoder.copy()
}
```
This PR proposes a new method `makeCopy` in `Encoder` so that the above codes can be rewritten using public APIs.
```scala
object FooEncoder {
private lazy val _encoder: Encoder[Foo] = Encoders.product[Foo]()
implicit def encoder: Encoder[Foo] = _encoder.makeCopy
}
```
The method name is consistent with `TreeNode.makeCopy`.
## How was this patch tested?
Jenkins
Closes #25209 from zsxwing/encoder-copy.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…UDF's internal behaviour change ## What changes were proposed in this pull request? This PR proposes to add a note in the migration guide. See #25108 (comment) ## How was this patch tested? N/A Closes #25224 from HyukjinKwon/SPARK-28321-doc. Lead-authored-by: HyukjinKwon <gurwls223@apache.org> Co-authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…o UDF test base ## What changes were proposed in this pull request? This PR adds some tests converted from `group-by.sql` to test UDFs. Please see contribution guide of this umbrella ticket - [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921). <details><summary>Diff comparing to 'group-by.sql'</summary> <p> ```diff diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out index 3a5df25..0118c05b1d 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out -13,26 +13,26 struct<> -- !query 1 -SELECT a, COUNT(b) FROM testData +SELECT udf(a), udf(COUNT(b)) FROM testData -- !query 1 schema struct<> -- !query 1 output org.apache.spark.sql.AnalysisException -grouping expressions sequence is empty, and 'testdata.`a`' is not an aggregate function. Wrap '(count(testdata.`b`) AS `count(b)`)' in windowing function(s) or wrap 'testdata.`a`' in first() (or first_value) if you don't care which value you get.; +grouping expressions sequence is empty, and 'testdata.`a`' is not an aggregate function. Wrap '(CAST(udf(cast(count(b) as string)) AS BIGINT) AS `CAST(udf(cast(count(b) as string)) AS BIGINT)`)' in windowing function(s) or wrap 'testdata.`a`' in first() (or first_value) if you don't care which value you get.; -- !query 2 -SELECT COUNT(a), COUNT(b) FROM testData +SELECT COUNT(udf(a)), udf(COUNT(b)) FROM testData -- !query 2 schema -struct<count(a):bigint,count(b):bigint> +struct<count(CAST(udf(cast(a as string)) AS INT)):bigint,CAST(udf(cast(count(b) as string)) AS BIGINT):bigint> -- !query 2 output 7 7 -- !query 3 -SELECT a, COUNT(b) FROM testData GROUP BY a +SELECT udf(a), COUNT(udf(b)) FROM testData GROUP BY a -- !query 3 schema -struct<a:int,count(b):bigint> +struct<CAST(udf(cast(a as string)) AS INT):int,count(CAST(udf(cast(b as string)) AS INT)):bigint> -- !query 3 output 1 2 2 2 -41,7 +41,7 NULL 1 -- !query 4 -SELECT a, COUNT(b) FROM testData GROUP BY b +SELECT udf(a), udf(COUNT(udf(b))) FROM testData GROUP BY b -- !query 4 schema struct<> -- !query 4 output -50,9 +50,9 expression 'testdata.`a`' is neither present in the group by, nor is it an aggre -- !query 5 -SELECT COUNT(a), COUNT(b) FROM testData GROUP BY a +SELECT COUNT(udf(a)), COUNT(udf(b)) FROM testData GROUP BY udf(a) -- !query 5 schema -struct<count(a):bigint,count(b):bigint> +struct<count(CAST(udf(cast(a as string)) AS INT)):bigint,count(CAST(udf(cast(b as string)) AS INT)):bigint> -- !query 5 output 0 1 2 2 -61,15 +61,15 struct<count(a):bigint,count(b):bigint> -- !query 6 -SELECT 'foo', COUNT(a) FROM testData GROUP BY 1 +SELECT 'foo', COUNT(udf(a)) FROM testData GROUP BY 1 -- !query 6 schema -struct<foo:string,count(a):bigint> +struct<foo:string,count(CAST(udf(cast(a as string)) AS INT)):bigint> -- !query 6 output foo 7 -- !query 7 -SELECT 'foo' FROM testData WHERE a = 0 GROUP BY 1 +SELECT 'foo' FROM testData WHERE a = 0 GROUP BY udf(1) -- !query 7 schema struct<foo:string> -- !query 7 output -77,25 +77,25 struct<foo:string> -- !query 8 -SELECT 'foo', APPROX_COUNT_DISTINCT(a) FROM testData WHERE a = 0 GROUP BY 1 +SELECT 'foo', udf(APPROX_COUNT_DISTINCT(udf(a))) FROM testData WHERE a = 0 GROUP BY 1 -- !query 8 schema -struct<foo:string,approx_count_distinct(a):bigint> +struct<foo:string,CAST(udf(cast(approx_count_distinct(cast(udf(cast(a as string)) as int), 0.05, 0, 0) as string)) AS BIGINT):bigint> -- !query 8 output -- !query 9 -SELECT 'foo', MAX(STRUCT(a)) FROM testData WHERE a = 0 GROUP BY 1 +SELECT 'foo', MAX(STRUCT(udf(a))) FROM testData WHERE a = 0 GROUP BY 1 -- !query 9 schema -struct<foo:string,max(named_struct(a, a)):struct<a:int>> +struct<foo:string,max(named_struct(col1, CAST(udf(cast(a as string)) AS INT))):struct<col1:int>> -- !query 9 output -- !query 10 -SELECT a + b, COUNT(b) FROM testData GROUP BY a + b +SELECT udf(a + b), udf(COUNT(b)) FROM testData GROUP BY a + b -- !query 10 schema -struct<(a + b):int,count(b):bigint> +struct<CAST(udf(cast((a + b) as string)) AS INT):int,CAST(udf(cast(count(b) as string)) AS BIGINT):bigint> -- !query 10 output 2 1 3 2 -105,7 +105,7 NULL 1 -- !query 11 -SELECT a + 2, COUNT(b) FROM testData GROUP BY a + 1 +SELECT udf(a + 2), udf(COUNT(b)) FROM testData GROUP BY a + 1 -- !query 11 schema struct<> -- !query 11 output -114,37 +114,35 expression 'testdata.`a`' is neither present in the group by, nor is it an aggre -- !query 12 -SELECT a + 1 + 1, COUNT(b) FROM testData GROUP BY a + 1 +SELECT udf(a + 1 + 1), udf(COUNT(b)) FROM testData GROUP BY udf(a + 1) -- !query 12 schema -struct<((a + 1) + 1):int,count(b):bigint> +struct<> -- !query 12 output -3 2 -4 2 -5 2 -NULL 1 +org.apache.spark.sql.AnalysisException +expression 'testdata.`a`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.; -- !query 13 -SELECT SKEWNESS(a), KURTOSIS(a), MIN(a), MAX(a), AVG(a), VARIANCE(a), STDDEV(a), SUM(a), COUNT(a) +SELECT SKEWNESS(udf(a)), udf(KURTOSIS(a)), udf(MIN(a)), MAX(udf(a)), udf(AVG(udf(a))), udf(VARIANCE(a)), STDDEV(udf(a)), udf(SUM(a)), udf(COUNT(a)) FROM testData -- !query 13 schema -struct<skewness(CAST(a AS DOUBLE)):double,kurtosis(CAST(a AS DOUBLE)):double,min(a):int,max(a):int,avg(a):double,var_samp(CAST(a AS DOUBLE)):double,stddev_samp(CAST(a AS DOUBLE)):double,sum(a):bigint,count(a):bigint> +struct<skewness(CAST(CAST(udf(cast(a as string)) AS INT) AS DOUBLE)):double,CAST(udf(cast(kurtosis(cast(a as double)) as string)) AS DOUBLE):double,CAST(udf(cast(min(a) as string)) AS INT):int,max(CAST(udf(cast(a as string)) AS INT)):int,CAST(udf(cast(avg(cast(cast(udf(cast(a as string)) as int) as bigint)) as string)) AS DOUBLE):double,CAST(udf(cast(var_samp(cast(a as double)) as string)) AS DOUBLE):double,stddev_samp(CAST(CAST(udf(cast(a as string)) AS INT) AS DOUBLE)):double,CAST(udf(cast(sum(cast(a as bigint)) as string)) AS BIGINT):bigint,CAST(udf(cast(count(a) as string)) AS BIGINT):bigint> -- !query 13 output -0.2723801058145729 -1.5069204152249134 1 3 2.142857142857143 0.8095238095238094 0.8997354108424372 15 7 -- !query 14 -SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) GROUP BY a +SELECT COUNT(DISTINCT udf(b)), udf(COUNT(DISTINCT b, c)) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) GROUP BY a -- !query 14 schema -struct<count(DISTINCT b):bigint,count(DISTINCT b, c):bigint> +struct<count(DISTINCT CAST(udf(cast(b as string)) AS INT)):bigint,CAST(udf(cast(count(distinct b, c) as string)) AS BIGINT):bigint> -- !query 14 output 1 1 -- !query 15 -SELECT a AS k, COUNT(b) FROM testData GROUP BY k +SELECT a AS k, COUNT(udf(b)) FROM testData GROUP BY k -- !query 15 schema -struct<k:int,count(b):bigint> +struct<k:int,count(CAST(udf(cast(b as string)) AS INT)):bigint> -- !query 15 output 1 2 2 2 -153,21 +151,21 NULL 1 -- !query 16 -SELECT a AS k, COUNT(b) FROM testData GROUP BY k HAVING k > 1 +SELECT a AS k, udf(COUNT(b)) FROM testData GROUP BY k HAVING k > 1 -- !query 16 schema -struct<k:int,count(b):bigint> +struct<k:int,CAST(udf(cast(count(b) as string)) AS BIGINT):bigint> -- !query 16 output 2 2 3 2 -- !query 17 -SELECT COUNT(b) AS k FROM testData GROUP BY k +SELECT udf(COUNT(b)) AS k FROM testData GROUP BY k -- !query 17 schema struct<> -- !query 17 output org.apache.spark.sql.AnalysisException -aggregate functions are not allowed in GROUP BY, but found count(testdata.`b`); +aggregate functions are not allowed in GROUP BY, but found CAST(udf(cast(count(b) as string)) AS BIGINT); -- !query 18 -180,7 +178,7 struct<> -- !query 19 -SELECT k AS a, COUNT(v) FROM testDataHasSameNameWithAlias GROUP BY a +SELECT k AS a, udf(COUNT(udf(v))) FROM testDataHasSameNameWithAlias GROUP BY a -- !query 19 schema struct<> -- !query 19 output -197,32 +195,32 spark.sql.groupByAliases false -- !query 21 -SELECT a AS k, COUNT(b) FROM testData GROUP BY k +SELECT a AS k, udf(COUNT(udf(b))) FROM testData GROUP BY k -- !query 21 schema struct<> -- !query 21 output org.apache.spark.sql.AnalysisException -cannot resolve '`k`' given input columns: [testdata.a, testdata.b]; line 1 pos 47 +cannot resolve '`k`' given input columns: [testdata.a, testdata.b]; line 1 pos 57 -- !query 22 -SELECT a, COUNT(1) FROM testData WHERE false GROUP BY a +SELECT a, COUNT(udf(1)) FROM testData WHERE false GROUP BY a -- !query 22 schema -struct<a:int,count(1):bigint> +struct<a:int,count(CAST(udf(cast(1 as string)) AS INT)):bigint> -- !query 22 output -- !query 23 -SELECT COUNT(1) FROM testData WHERE false +SELECT udf(COUNT(1)) FROM testData WHERE false -- !query 23 schema -struct<count(1):bigint> +struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint> -- !query 23 output 0 -- !query 24 -SELECT 1 FROM (SELECT COUNT(1) FROM testData WHERE false) t +SELECT 1 FROM (SELECT udf(COUNT(1)) FROM testData WHERE false) t -- !query 24 schema struct<1:int> -- !query 24 output -232,7 +230,7 struct<1:int> -- !query 25 SELECT 1 from ( SELECT 1 AS z, - MIN(a.x) + udf(MIN(a.x)) FROM (select 1 as x) a WHERE false ) b -244,32 +242,32 struct<1:int> -- !query 26 -SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*) +SELECT corr(DISTINCT x, y), udf(corr(DISTINCT y, x)), count(*) FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y) -- !query 26 schema -struct<corr(DISTINCT CAST(x AS DOUBLE), CAST(y AS DOUBLE)):double,corr(DISTINCT CAST(y AS DOUBLE), CAST(x AS DOUBLE)):double,count(1):bigint> +struct<corr(DISTINCT CAST(x AS DOUBLE), CAST(y AS DOUBLE)):double,CAST(udf(cast(corr(distinct cast(y as double), cast(x as double)) as string)) AS DOUBLE):double,count(1):bigint> -- !query 26 output 1.0 1.0 3 -- !query 27 -SELECT 1 FROM range(10) HAVING true +SELECT udf(1) FROM range(10) HAVING true -- !query 27 schema -struct<1:int> +struct<CAST(udf(cast(1 as string)) AS INT):int> -- !query 27 output 1 -- !query 28 -SELECT 1 FROM range(10) HAVING MAX(id) > 0 +SELECT udf(udf(1)) FROM range(10) HAVING MAX(id) > 0 -- !query 28 schema -struct<1:int> +struct<CAST(udf(cast(cast(udf(cast(1 as string)) as int) as string)) AS INT):int> -- !query 28 output 1 -- !query 29 -SELECT id FROM range(10) HAVING id > 0 +SELECT udf(id) FROM range(10) HAVING id > 0 -- !query 29 schema struct<> -- !query 29 output -291,33 +289,33 struct<> -- !query 31 -SELECT every(v), some(v), any(v) FROM test_agg WHERE 1 = 0 +SELECT udf(every(v)), udf(some(v)), any(v) FROM test_agg WHERE 1 = 0 -- !query 31 schema -struct<every(v):boolean,some(v):boolean,any(v):boolean> +struct<CAST(udf(cast(every(v) as string)) AS BOOLEAN):boolean,CAST(udf(cast(some(v) as string)) AS BOOLEAN):boolean,any(v):boolean> -- !query 31 output NULL NULL NULL -- !query 32 -SELECT every(v), some(v), any(v) FROM test_agg WHERE k = 4 +SELECT udf(every(udf(v))), some(v), any(v) FROM test_agg WHERE k = 4 -- !query 32 schema -struct<every(v):boolean,some(v):boolean,any(v):boolean> +struct<CAST(udf(cast(every(cast(udf(cast(v as string)) as boolean)) as string)) AS BOOLEAN):boolean,some(v):boolean,any(v):boolean> -- !query 32 output NULL NULL NULL -- !query 33 -SELECT every(v), some(v), any(v) FROM test_agg WHERE k = 5 +SELECT every(v), udf(some(v)), any(v) FROM test_agg WHERE k = 5 -- !query 33 schema -struct<every(v):boolean,some(v):boolean,any(v):boolean> +struct<every(v):boolean,CAST(udf(cast(some(v) as string)) AS BOOLEAN):boolean,any(v):boolean> -- !query 33 output false true true -- !query 34 -SELECT k, every(v), some(v), any(v) FROM test_agg GROUP BY k +SELECT k, every(v), udf(some(v)), any(v) FROM test_agg GROUP BY k -- !query 34 schema -struct<k:int,every(v):boolean,some(v):boolean,any(v):boolean> +struct<k:int,every(v):boolean,CAST(udf(cast(some(v) as string)) AS BOOLEAN):boolean,any(v):boolean> -- !query 34 output 1 false true true 2 true true true -327,9 +325,9 struct<k:int,every(v):boolean,some(v):boolean,any(v):boolean> -- !query 35 -SELECT k, every(v) FROM test_agg GROUP BY k HAVING every(v) = false +SELECT udf(k), every(v) FROM test_agg GROUP BY k HAVING every(v) = false -- !query 35 schema -struct<k:int,every(v):boolean> +struct<CAST(udf(cast(k as string)) AS INT):int,every(v):boolean> -- !query 35 output 1 false 3 false -337,16 +335,16 struct<k:int,every(v):boolean> -- !query 36 -SELECT k, every(v) FROM test_agg GROUP BY k HAVING every(v) IS NULL +SELECT k, udf(every(v)) FROM test_agg GROUP BY k HAVING every(v) IS NULL -- !query 36 schema -struct<k:int,every(v):boolean> +struct<k:int,CAST(udf(cast(every(v) as string)) AS BOOLEAN):boolean> -- !query 36 output 4 NULL -- !query 37 SELECT k, - Every(v) AS every + udf(Every(v)) AS every FROM test_agg WHERE k = 2 AND v IN (SELECT Any(v) -360,7 +358,7 struct<k:int,every:boolean> -- !query 38 -SELECT k, +SELECT udf(udf(k)), Every(v) AS every FROM test_agg WHERE k = 2 -369,45 +367,45 WHERE k = 2 WHERE k = 1) GROUP BY k -- !query 38 schema -struct<k:int,every:boolean> +struct<CAST(udf(cast(cast(udf(cast(k as string)) as int) as string)) AS INT):int,every:boolean> -- !query 38 output -- !query 39 -SELECT every(1) +SELECT every(udf(1)) -- !query 39 schema struct<> -- !query 39 output org.apache.spark.sql.AnalysisException -cannot resolve 'every(1)' due to data type mismatch: Input to function 'every' should have been boolean, but it's [int].; line 1 pos 7 +cannot resolve 'every(CAST(udf(cast(1 as string)) AS INT))' due to data type mismatch: Input to function 'every' should have been boolean, but it's [int].; line 1 pos 7 -- !query 40 -SELECT some(1S) +SELECT some(udf(1S)) -- !query 40 schema struct<> -- !query 40 output org.apache.spark.sql.AnalysisException -cannot resolve 'some(1S)' due to data type mismatch: Input to function 'some' should have been boolean, but it's [smallint].; line 1 pos 7 +cannot resolve 'some(CAST(udf(cast(1 as string)) AS SMALLINT))' due to data type mismatch: Input to function 'some' should have been boolean, but it's [smallint].; line 1 pos 7 -- !query 41 -SELECT any(1L) +SELECT any(udf(1L)) -- !query 41 schema struct<> -- !query 41 output org.apache.spark.sql.AnalysisException -cannot resolve 'any(1L)' due to data type mismatch: Input to function 'any' should have been boolean, but it's [bigint].; line 1 pos 7 +cannot resolve 'any(CAST(udf(cast(1 as string)) AS BIGINT))' due to data type mismatch: Input to function 'any' should have been boolean, but it's [bigint].; line 1 pos 7 -- !query 42 -SELECT every("true") +SELECT udf(every("true")) -- !query 42 schema struct<> -- !query 42 output org.apache.spark.sql.AnalysisException -cannot resolve 'every('true')' due to data type mismatch: Input to function 'every' should have been boolean, but it's [string].; line 1 pos 7 +cannot resolve 'every('true')' due to data type mismatch: Input to function 'every' should have been boolean, but it's [string].; line 1 pos 11 -- !query 43 -428,9 +426,9 struct<k:int,v:boolean,every(v) OVER (PARTITION BY k ORDER BY v ASC NULLS FIRST -- !query 44 -SELECT k, v, some(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg +SELECT k, udf(udf(v)), some(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg -- !query 44 schema -struct<k:int,v:boolean,some(v) OVER (PARTITION BY k ORDER BY v ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):boolean> +struct<k:int,CAST(udf(cast(cast(udf(cast(v as string)) as boolean) as string)) AS BOOLEAN):boolean,some(v) OVER (PARTITION BY k ORDER BY v ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):boolean> -- !query 44 output 1 false false 1 true true -445,9 +443,9 struct<k:int,v:boolean,some(v) OVER (PARTITION BY k ORDER BY v ASC NULLS FIRST R -- !query 45 -SELECT k, v, any(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg +SELECT udf(udf(k)), v, any(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg -- !query 45 schema -struct<k:int,v:boolean,any(v) OVER (PARTITION BY k ORDER BY v ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):boolean> +struct<CAST(udf(cast(cast(udf(cast(k as string)) as int) as string)) AS INT):int,v:boolean,any(v) OVER (PARTITION BY k ORDER BY v ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):boolean> -- !query 45 output 1 false false 1 true true -462,17 +460,17 struct<k:int,v:boolean,any(v) OVER (PARTITION BY k ORDER BY v ASC NULLS FIRST RA -- !query 46 -SELECT count(*) FROM test_agg HAVING count(*) > 1L +SELECT udf(count(*)) FROM test_agg HAVING count(*) > 1L -- !query 46 schema -struct<count(1):bigint> +struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint> -- !query 46 output 10 -- !query 47 -SELECT k, max(v) FROM test_agg GROUP BY k HAVING max(v) = true +SELECT k, udf(max(v)) FROM test_agg GROUP BY k HAVING max(v) = true -- !query 47 schema -struct<k:int,max(v):boolean> +struct<k:int,CAST(udf(cast(max(v) as string)) AS BOOLEAN):boolean> -- !query 47 output 1 true 2 true -480,7 +478,7 struct<k:int,max(v):boolean> -- !query 48 -SELECT * FROM (SELECT COUNT(*) AS cnt FROM test_agg) WHERE cnt > 1L +SELECT * FROM (SELECT udf(COUNT(*)) AS cnt FROM test_agg) WHERE cnt > 1L -- !query 48 schema struct<cnt:bigint> -- !query 48 output -488,7 +486,7 struct<cnt:bigint> -- !query 49 -SELECT count(*) FROM test_agg WHERE count(*) > 1L +SELECT udf(count(*)) FROM test_agg WHERE count(*) > 1L -- !query 49 schema struct<> -- !query 49 output -500,7 +498,7 Invalid expressions: [count(1)]; -- !query 50 -SELECT count(*) FROM test_agg WHERE count(*) + 1L > 1L +SELECT udf(count(*)) FROM test_agg WHERE count(*) + 1L > 1L -- !query 50 schema struct<> -- !query 50 output -512,7 +510,7 Invalid expressions: [count(1)]; -- !query 51 -SELECT count(*) FROM test_agg WHERE k = 1 or k = 2 or count(*) + 1L > 1L or max(k) > 1 +SELECT udf(count(*)) FROM test_agg WHERE k = 1 or k = 2 or count(*) + 1L > 1L or max(k) > 1 -- !query 51 schema struct<> -- !query 51 output ``` </p> </details> ## How was this patch tested? Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921). Verified pandas & pyarrow versions: ```$python3 Python 3.6.8 (default, Jan 14 2019, 11:02:34) [GCC 8.0.1 20180414 (experimental) [trunk revision 259383]] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import pandas >>> import pyarrow >>> pyarrow.__version__ '0.14.0' >>> pandas.__version__ '0.24.2' ``` From the sql output it seems that sql statements are evaluated correctly given that udf returns a string and may change results as Null will be returned as None and will be counted in returned values. Closes #25098 from skonto/group-by.sql. Authored-by: Stavros Kontopoulos <st.kontopoulos@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…time This would cause the timeout time to be negative, so executors would be timed out immediately (instead of never). I also tweaked a couple of log messages that could get pretty long when lots of executors were active. Added unit test (which failed without the fix). Closes #25208 from vanzin/SPARK-28455. Authored-by: Marcelo Vanzin <vanzin@cloudera.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request? New function `make_date()` takes 3 columns `year`, `month` and `day`, and makes new column of the `DATE` type. If values in the input columns are `null` or out of valid ranges, the function returns `null`. Valid ranges are: - `year` - `[1, 9999]` - `month` - `[1, 12]` - `day` - `[1, 31]` Also constructed date must be valid otherwise `make_date` returns `null`. The function is implemented similarly to `make_date` in PostgreSQL: https://www.postgresql.org/docs/11/functions-datetime.html to maintain feature parity with it. Here is an example: ```sql select make_date(2013, 7, 15); 2013-07-15 ``` ## How was this patch tested? Added new tests to `DateExpressionsSuite`. Closes #25210 from MaxGekk/make_date-timestamp. Authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…'s parsing and writing
## What changes were proposed in this pull request?
Fix CSV datasource to throw `com.univocity.parsers.common.TextParsingException` with large size message, which will make log output consume large disk space.
This issue is troublesome when sometimes we need parse CSV with large size column.
This PR proposes to set CSV parser/writer settings by `setErrorContentLength(1000)` to limit the error message length.
## How was this patch tested?
Manually.
```
val s = "a" * 40 * 1000000
Seq(s).toDF.write.mode("overwrite").csv("/tmp/bogdan/es4196.csv")
spark.read .option("maxCharsPerColumn", 30000000) .csv("/tmp/bogdan/es4196.csv").count
```
**Before:**
The thrown message will include error content of about 30MB size (The column size exceed the max value 30MB, so the error content include the whole parsed content, so it is 30MB).
**After:**
The thrown message will include error content like "...aaa...aa" (the number of 'a' is 1024), i.e. limit the content size to be 1024.
Closes #25184 from WeichenXu123/limit_csv_exception_size.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…esentation from calendarinterval to interval ## What changes were proposed in this pull request? This PR change `CalendarIntervalType`'s readable string representation from `calendarinterval` to `interval`. ## How was this patch tested? Existing UT Closes #25225 from wangyum/SPARK-28469. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…nd planner ## What changes were proposed in this pull request? query plan was designed to be immutable, but sometimes we do allow it to carry mutable states, because of the complexity of the SQL system. One example is `TreeNodeTag`. It's a state of `TreeNode` and can be carried over during copy and transform. The adaptive execution framework relies on it to link the logical and physical plans. This leads to a problem: when we get `QueryExecution#analyzed`, the plan can be changed unexpectedly because it's mutable. I hit a real issue in #25107 : I use `TreeNodeTag` to carry dataset id in logical plans. However, the analyzed plan ends up with many duplicated dataset id tags in different nodes. It turns out that, the optimizer transforms the logical plan and add the tag to more nodes. For example, the logical plan is `SubqueryAlias(Filter(...))`, and I expect only the `SubqueryAlais` has the dataset id tag. However, the optimizer removes `SubqueryAlias` and carries over the dataset id tag to `Filter`. When I go back to the analyzed plan, both `SubqueryAlias` and `Filter` has the dataset id tag, which breaks my assumption. Since now query plan is mutable, I think it's better to limit the life cycle of a query plan instance. We can clone the query plan between analyzer, optimizer and planner, so that the life cycle is limited in one stage. ## How was this patch tested? new test Closes #25111 from cloud-fan/clone. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…the issue ## What changes were proposed in this pull request? The bug fixed by #24886 is caused by Hive's `loadDynamicPartitions`. It's better to keep the fix surgical and put it right before we call `loadDynamicPartitions`. This also makes the fix safer, instead of analyzing all the callers of `saveAsHiveFile` and proving that they are safe. ## How was this patch tested? N/A Closes #25234 from cloud-fan/minor. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Change the format of the build command in the README to start with a `./` prefix
./build/mvn -DskipTests clean package
This increases stylistic consistency across the README- all the other commands have a `./` prefix. Having a visible `./` prefix also makes it clear to the user that the shell command requires the current working directory to be at the repository root.
## How was this patch tested?
README.md was reviewed both in raw markdown and in the Github rendered landing page for stylistic consistency.
Closes #25231 from Mister-Meeseeks/master.
Lead-authored-by: Douglas R Colkitt <douglas.colkitt@gmail.com>
Co-authored-by: Mister-Meeseeks <douglas.colkitt@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…icit.sql' into UDF test base ## What changes were proposed in this pull request? This PR adds some tests converted from 'pgSQL/select_implicit.sql' to test UDFs <details><summary>Diff comparing to 'pgSQL/select_implicit.sql'</summary> <p> ```diff ... diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/select_implicit.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-select_implicit.sql.out index 0675820..e6a5995 100755 --- a/sql/core/src/test/resources/sql-tests/results/pgSQL/select_implicit.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-select_implicit.sql.out -91,9 +91,11 struct<> -- !query 11 -SELECT c, count(*) FROM test_missing_target GROUP BY test_missing_target.c ORDER BY c +SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY +test_missing_target.c +ORDER BY udf(c) -- !query 11 schema -struct<c:string,count(1):bigint> +struct<CAST(udf(cast(c as string)) AS STRING):string,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint> -- !query 11 output ABAB 2 BBBB 2 -104,9 +106,10 cccc 2 -- !query 12 -SELECT count(*) FROM test_missing_target GROUP BY test_missing_target.c ORDER BY c +SELECT udf(count(*)) FROM test_missing_target GROUP BY test_missing_target.c +ORDER BY udf(c) -- !query 12 schema -struct<count(1):bigint> +struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint> -- !query 12 output 2 2 -117,18 +120,18 struct<count(1):bigint> -- !query 13 -SELECT count(*) FROM test_missing_target GROUP BY a ORDER BY b +SELECT udf(count(*)) FROM test_missing_target GROUP BY a ORDER BY udf(b) -- !query 13 schema struct<> -- !query 13 output org.apache.spark.sql.AnalysisException -cannot resolve '`b`' given input columns: [count(1)]; line 1 pos 61 +cannot resolve '`b`' given input columns: [CAST(udf(cast(count(1) as string)) AS BIGINT)]; line 1 pos 70 -- !query 14 -SELECT count(*) FROM test_missing_target GROUP BY b ORDER BY b +SELECT udf(count(*)) FROM test_missing_target GROUP BY b ORDER BY udf(b) -- !query 14 schema -struct<count(1):bigint> +struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint> -- !query 14 output 1 2 -137,10 +140,10 struct<count(1):bigint> -- !query 15 -SELECT test_missing_target.b, count(*) - FROM test_missing_target GROUP BY b ORDER BY b +SELECT udf(test_missing_target.b), udf(count(*)) + FROM test_missing_target GROUP BY b ORDER BY udf(b) -- !query 15 schema -struct<b:int,count(1):bigint> +struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint> -- !query 15 output 1 1 2 2 -149,9 +152,9 struct<b:int,count(1):bigint> -- !query 16 -SELECT c FROM test_missing_target ORDER BY a +SELECT udf(c) FROM test_missing_target ORDER BY udf(a) -- !query 16 schema -struct<c:string> +struct<CAST(udf(cast(c as string)) AS STRING):string> -- !query 16 output XXXX ABAB -166,9 +169,9 CCCC -- !query 17 -SELECT count(*) FROM test_missing_target GROUP BY b ORDER BY b desc +SELECT udf(count(*)) FROM test_missing_target GROUP BY b ORDER BY udf(b) desc -- !query 17 schema -struct<count(1):bigint> +struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint> -- !query 17 output 4 3 -177,17 +180,17 struct<count(1):bigint> -- !query 18 -SELECT count(*) FROM test_missing_target ORDER BY 1 desc +SELECT udf(count(*)) FROM test_missing_target ORDER BY udf(1) desc -- !query 18 schema -struct<count(1):bigint> +struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint> -- !query 18 output 10 -- !query 19 -SELECT c, count(*) FROM test_missing_target GROUP BY 1 ORDER BY 1 +SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY 1 ORDER BY 1 -- !query 19 schema -struct<c:string,count(1):bigint> +struct<CAST(udf(cast(c as string)) AS STRING):string,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint> -- !query 19 output ABAB 2 BBBB 2 -198,18 +201,18 cccc 2 -- !query 20 -SELECT c, count(*) FROM test_missing_target GROUP BY 3 +SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY 3 -- !query 20 schema struct<> -- !query 20 output org.apache.spark.sql.AnalysisException -GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 53 +GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 63 -- !query 21 -SELECT count(*) FROM test_missing_target x, test_missing_target y - WHERE x.a = y.a - GROUP BY b ORDER BY b +SELECT udf(count(*)) FROM test_missing_target x, test_missing_target y + WHERE udf(x.a) = udf(y.a) + GROUP BY b ORDER BY udf(b) -- !query 21 schema struct<> -- !query 21 output -218,10 +221,10 Reference 'b' is ambiguous, could be: x.b, y.b.; line 3 pos 10 -- !query 22 -SELECT a, a FROM test_missing_target - ORDER BY a +SELECT udf(a), udf(a) FROM test_missing_target + ORDER BY udf(a) -- !query 22 schema -struct<a:int,a:int> +struct<CAST(udf(cast(a as string)) AS INT):int,CAST(udf(cast(a as string)) AS INT):int> -- !query 22 output 0 0 1 1 -236,10 +239,10 struct<a:int,a:int> -- !query 23 -SELECT a/2, a/2 FROM test_missing_target - ORDER BY a/2 +SELECT udf(udf(a)/2), udf(udf(a)/2) FROM test_missing_target + ORDER BY udf(udf(a)/2) -- !query 23 schema -struct<(a div 2):int,(a div 2):int> +struct<CAST(udf(cast((cast(udf(cast(a as string)) as int) div 2) as string)) AS INT):int,CAST(udf(cast((cast(udf(cast(a as string)) as int) div 2) as string)) AS INT):int> -- !query 23 output 0 0 0 0 -254,10 +257,10 struct<(a div 2):int,(a div 2):int> -- !query 24 -SELECT a/2, a/2 FROM test_missing_target - GROUP BY a/2 ORDER BY a/2 +SELECT udf(a/2), udf(a/2) FROM test_missing_target + GROUP BY a/2 ORDER BY udf(a/2) -- !query 24 schema -struct<(a div 2):int,(a div 2):int> +struct<CAST(udf(cast((a div 2) as string)) AS INT):int,CAST(udf(cast((a div 2) as string)) AS INT):int> -- !query 24 output 0 0 1 1 -267,11 +270,11 struct<(a div 2):int,(a div 2):int> -- !query 25 -SELECT x.b, count(*) FROM test_missing_target x, test_missing_target y - WHERE x.a = y.a - GROUP BY x.b ORDER BY x.b +SELECT udf(x.b), udf(count(*)) FROM test_missing_target x, test_missing_target y + WHERE udf(x.a) = udf(y.a) + GROUP BY x.b ORDER BY udf(x.b) -- !query 25 schema -struct<b:int,count(1):bigint> +struct<CAST(udf(cast(b as string)) AS INT):int,CAST(udf(cast(count(1) as string)) AS BIGINT):bigint> -- !query 25 output 1 1 2 2 -280,11 +283,11 struct<b:int,count(1):bigint> -- !query 26 -SELECT count(*) FROM test_missing_target x, test_missing_target y - WHERE x.a = y.a - GROUP BY x.b ORDER BY x.b +SELECT udf(count(*)) FROM test_missing_target x, test_missing_target y + WHERE udf(x.a) = udf(y.a) + GROUP BY x.b ORDER BY udf(x.b) -- !query 26 schema -struct<count(1):bigint> +struct<CAST(udf(cast(count(1) as string)) AS BIGINT):bigint> -- !query 26 output 1 2 -293,22 +296,22 struct<count(1):bigint> -- !query 27 -SELECT a%2, count(b) FROM test_missing_target +SELECT a%2, udf(count(udf(b))) FROM test_missing_target GROUP BY test_missing_target.a%2 -ORDER BY test_missing_target.a%2 +ORDER BY udf(test_missing_target.a%2) -- !query 27 schema -struct<(a % 2):int,count(b):bigint> +struct<(a % 2):int,CAST(udf(cast(count(cast(udf(cast(b as string)) as int)) as string)) AS BIGINT):bigint> -- !query 27 output 0 5 1 5 -- !query 28 -SELECT count(c) FROM test_missing_target +SELECT udf(count(c)) FROM test_missing_target GROUP BY lower(test_missing_target.c) -ORDER BY lower(test_missing_target.c) +ORDER BY udf(lower(test_missing_target.c)) -- !query 28 schema -struct<count(c):bigint> +struct<CAST(udf(cast(count(c) as string)) AS BIGINT):bigint> -- !query 28 output 2 3 -317,18 +320,18 struct<count(c):bigint> -- !query 29 -SELECT count(a) FROM test_missing_target GROUP BY a ORDER BY b +SELECT udf(count(udf(a))) FROM test_missing_target GROUP BY a ORDER BY udf(b) -- !query 29 schema struct<> -- !query 29 output org.apache.spark.sql.AnalysisException -cannot resolve '`b`' given input columns: [count(a)]; line 1 pos 61 +cannot resolve '`b`' given input columns: [CAST(udf(cast(count(cast(udf(cast(a as string)) as int)) as string)) AS BIGINT)]; line 1 pos 75 -- !query 30 -SELECT count(b) FROM test_missing_target GROUP BY b/2 ORDER BY b/2 +SELECT udf(count(b)) FROM test_missing_target GROUP BY b/2 ORDER BY udf(b/2) -- !query 30 schema -struct<count(b):bigint> +struct<CAST(udf(cast(count(b) as string)) AS BIGINT):bigint> -- !query 30 output 1 5 -336,10 +339,10 struct<count(b):bigint> -- !query 31 -SELECT lower(test_missing_target.c), count(c) - FROM test_missing_target GROUP BY lower(c) ORDER BY lower(c) +SELECT udf(lower(test_missing_target.c)), udf(count(udf(c))) + FROM test_missing_target GROUP BY lower(c) ORDER BY udf(lower(c)) -- !query 31 schema -struct<lower(c):string,count(c):bigint> +struct<CAST(udf(cast(lower(c) as string)) AS STRING):string,CAST(udf(cast(count(cast(udf(cast(c as string)) as string)) as string)) AS BIGINT):bigint> -- !query 31 output abab 2 bbbb 3 -348,9 +351,9 xxxx 1 -- !query 32 -SELECT a FROM test_missing_target ORDER BY upper(d) +SELECT udf(a) FROM test_missing_target ORDER BY udf(upper(udf(d))) -- !query 32 schema -struct<a:int> +struct<CAST(udf(cast(a as string)) AS INT):int> -- !query 32 output 0 1 -365,19 +368,19 struct<a:int> -- !query 33 -SELECT count(b) FROM test_missing_target - GROUP BY (b + 1) / 2 ORDER BY (b + 1) / 2 desc +SELECT udf(count(b)) FROM test_missing_target + GROUP BY (b + 1) / 2 ORDER BY udf((b + 1) / 2) desc -- !query 33 schema -struct<count(b):bigint> +struct<CAST(udf(cast(count(b) as string)) AS BIGINT):bigint> -- !query 33 output 7 3 -- !query 34 -SELECT count(x.a) FROM test_missing_target x, test_missing_target y - WHERE x.a = y.a - GROUP BY b/2 ORDER BY b/2 +SELECT udf(count(udf(x.a))) FROM test_missing_target x, test_missing_target y + WHERE udf(x.a) = udf(y.a) + GROUP BY b/2 ORDER BY udf(b/2) -- !query 34 schema struct<> -- !query 34 output -386,11 +389,12 Reference 'b' is ambiguous, could be: x.b, y.b.; line 3 pos 10 -- !query 35 -SELECT x.b/2, count(x.b) FROM test_missing_target x, test_missing_target y - WHERE x.a = y.a - GROUP BY x.b/2 ORDER BY x.b/2 +SELECT udf(x.b/2), udf(count(udf(x.b))) FROM test_missing_target x, +test_missing_target y + WHERE udf(x.a) = udf(y.a) + GROUP BY x.b/2 ORDER BY udf(x.b/2) -- !query 35 schema -struct<(b div 2):int,count(b):bigint> +struct<CAST(udf(cast((b div 2) as string)) AS INT):int,CAST(udf(cast(count(cast(udf(cast(b as string)) as int)) as string)) AS BIGINT):bigint> -- !query 35 output 0 1 1 5 -398,14 +402,14 struct<(b div 2):int,count(b):bigint> -- !query 36 -SELECT count(b) FROM test_missing_target x, test_missing_target y - WHERE x.a = y.a +SELECT udf(count(udf(b))) FROM test_missing_target x, test_missing_target y + WHERE udf(x.a) = udf(y.a) GROUP BY x.b/2 -- !query 36 schema struct<> -- !query 36 output org.apache.spark.sql.AnalysisException -Reference 'b' is ambiguous, could be: x.b, y.b.; line 1 pos 13 +Reference 'b' is ambiguous, could be: x.b, y.b.; line 1 pos 21 -- !query 37 ``` </p> </details> ## How was this patch tested? Tested as Guided in SPARK-27921 Closes #25233 from Udbhav30/master. Authored-by: Udbhav30 <u.agrawal30@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request? Implement `RobustScaler` Since the transformation is quite similar to `StandardScaler`, I refactor the transform function so that it can be reused in both scalers. ## How was this patch tested? existing and added tests Closes #25160 from zhengruifeng/robust_scaler. Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
Launcher is implemented as a Java application and sometimes I'd like to apply Java options.
One situation I have met is the time I try to attach debugger to Launcher.
Launcher is launched from bin/spark-class but there is no room to apply Java options.
```
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$"
printf "%d\0" $?
}
```
Considering that it's not so many times to apply Java options to Launcher, one compromise would just modify spark-class by user like as follows.
```
build_command() {
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$"
printf "%d\0" $?
}
```
But it doesn't work when any text related to Java options is output to standard output because whole output is used as command-string for spark-shell and spark-submit in current implementation.
One example is jdwp. When apply agentlib option to use jdwp for debug, we will get output like as follows.
```
Listening for transport dt_socket at address: 9876
```
The output shown above is not a command-string so spark-submit and spark-shell will fail.
To enable Java options for Launcher, we need treat command-string and others.
I changed launcher/Main.java and bin/spark-class to print separator-character and treat it.
## How was this patch tested?
Tested manually using Spark Shell with / without LAUNCHER_JAVA_OPTIONS like as follows.
```
SPARK_LAUNCHER_OPTS="-agentlib:jdwp=transport=dt_socket,suspend=y,address=localhost:9876,server=y" bin/spark-shell
```
Closes #25265 from sarutak/add-spark-launcher-opts.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request? As part of the shuffle storage API proposed in SPARK-25299, this introduces an API for persisting shuffle data in arbitrary storage systems. This patch introduces several concepts: * `ShuffleDataIO`, which is the root of the entire plugin tree that will be proposed over the course of the shuffle API project. * `ShuffleExecutorComponents` - the subset of plugins for managing shuffle-related components for each executor. This will in turn instantiate shuffle readers and writers. * `ShuffleMapOutputWriter` interface - instantiated once per map task. This provides child `ShufflePartitionWriter` instances for persisting the bytes for each partition in the map task. The default implementation of these plugins exactly mirror what was done by the existing shuffle writing code - namely, writing the data to local disk and writing an index file. We leverage the APIs in the `BypassMergeSortShuffleWriter` only. Follow-up PRs will use the APIs in `SortShuffleWriter` and `UnsafeShuffleWriter`, but are left as future work to minimize the review surface area. ## How was this patch tested? New unit tests were added. Micro-benchmarks indicate there's no slowdown in the affected code paths. Closes #25007 from mccheah/spark-shuffle-writer-refactor. Lead-authored-by: mcheah <mcheah@palantir.com> Co-authored-by: mccheah <mcheah@palantir.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request? This PR makes the optimizer rule PullupCorrelatedPredicates idempotent. ## How was this patch tested? A new test PullupCorrelatedPredicatesSuite Closes #25268 from dilipbiswal/pr-25164. Authored-by: Dilip Biswal <dbiswal@us.ibm.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
… to devnull in daemon ## What changes were proposed in this pull request? PySpark worker daemon reads from stdin the worker PIDs to kill. https://github.com/apache/spark/blob/1bb60ab8392adf8b896cc04fb1d060620cf09d8a/python/pyspark/daemon.py#L127 However, the worker process is a forked process from the worker daemon process and we didn't close stdin on the child after fork. This means the child and user program can read stdin as well, which blocks daemon from receiving the PID to kill. This can cause issues because the task reaper might detect the task was not terminated and eventually kill the JVM. This PR fix this by redirecting the standard input of the forked child to devnull. ## How was this patch tested? Manually test. In `pyspark`, run: ``` import subprocess def task(_): subprocess.check_output(["cat"]) sc.parallelize(range(1), 1).mapPartitions(task).count() ``` Before: The job will get stuck and press Ctrl+C to exit the job but the python worker process do not exit. After: The job finish correctly. The "cat" print nothing (because the dummay stdin is "/dev/null"). The python worker process exit normally. Please review https://spark.apache.org/contributing.html before opening a pull request. Closes #25138 from WeichenXu123/SPARK-26175. Authored-by: WeichenXu <weichen.xu@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request? This PR is to port text.sql from PostgreSQL regression tests. https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/text.sql The expected results can be found in the link: https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/expected/text.out When porting the test cases, found a PostgreSQL specific features that do not exist in Spark SQL: [SPARK-28037](https://issues.apache.org/jira/browse/SPARK-28037): Add built-in String Functions: quote_literal Also, found three inconsistent behavior: [SPARK-27930](https://issues.apache.org/jira/browse/SPARK-27930): Spark SQL's format_string can not fully support PostgreSQL's format [SPARK-28036](https://issues.apache.org/jira/browse/SPARK-28036): Built-in udf left/right has inconsistent behavior [SPARK-28033](https://issues.apache.org/jira/browse/SPARK-28033): String concatenation should low priority than other operators ## How was this patch tested? N/A Closes #24862 from wangyum/SPARK-28038. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request? The latest docs http://spark.apache.org/docs/latest/configuration.html contains some description as below: spark.ui.showConsoleProgress | true | Show the progress bar in the console. The progress bar shows the progress of stages that run for longer than 500ms. If multiple stages run at the same time, multiple progress bars will be displayed on the same line. -- | -- | -- But the class `org.apache.spark.internal.config.UI` define the config `spark.ui.showConsoleProgress` as below: ``` val UI_SHOW_CONSOLE_PROGRESS = ConfigBuilder("spark.ui.showConsoleProgress") .doc("When true, show the progress bar in the console.") .booleanConf .createWithDefault(false) ``` So I think there are exists some little mistake and lead to confuse reader. ## How was this patch tested? No need UT. Closes #25297 from beliefer/inconsistent-desc-showConsoleProgress. Authored-by: gengjiaan <gengjiaan@360.cn> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request? I remove the deprecate `ImageSchema.readImages`. Move some useful methods from class `ImageSchema` into class `ImageFileFormat`. In pyspark, I rename `ImageSchema` class to be `ImageUtils`, and keep some useful python methods in it. ## How was this patch tested? UT. Please review https://spark.apache.org/contributing.html before opening a pull request. Closes #25245 from WeichenXu123/remove_image_schema. Authored-by: WeichenXu <weichen.xu@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request? This PR aims to support ANSI SQL `Boolean-Predicate` syntax. ```sql expression IS [NOT] TRUE expression IS [NOT] FALSE expression IS [NOT] UNKNOWN ``` There are some mainstream database support this syntax. - **PostgreSQL:** https://www.postgresql.org/docs/9.1/functions-comparison.html - **Hive:** https://issues.apache.org/jira/browse/HIVE-13583 - **Redshift:** https://docs.aws.amazon.com/redshift/latest/dg/r_Boolean_type.html - **Vertica:** https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/LanguageElements/Predicates/Boolean-predicate.htm For example: ```sql spark-sql> select null is true, null is not true; false true spark-sql> select false is true, false is not true; false true spark-sql> select true is true, true is not true; true false spark-sql> select null is false, null is not false; false true spark-sql> select false is false, false is not false; true false spark-sql> select true is false, true is not false; false true spark-sql> select null is unknown, null is not unknown; true false spark-sql> select false is unknown, false is not unknown; false true spark-sql> select true is unknown, true is not unknown; false true ``` **Note**: A null input is treated as the logical value "unknown". ## How was this patch tested? Pass the Jenkins with the newly added test cases. Closes #25074 from beliefer/ansi-sql-boolean-test. Lead-authored-by: gengjiaan <gengjiaan@360.cn> Co-authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
… support input_file_name with Python UDF) ## What changes were proposed in this pull request? This PR proposes to use `AtomicReference` so that parent and child threads can access to the same file block holder. Python UDF expressions are turned to a plan and then it launches a separate thread to consume the input iterator. In the separate child thread, the iterator sets `InputFileBlockHolder.set` before the parent does which the parent thread is unable to read later. 1. In this separate child thread, if it happens to call `InputFileBlockHolder.set` first without initialization of the parent's thread local (which is done when the `ThreadLocal.get()` is first called), the child thread seems calling its own `initialValue` to initialize. 2. After that, the parent calls its own `initialValue` to initializes at the first call of `ThreadLocal.get()`. 3. Both now have two different references. Updating at child isn't reflected to parent. This PR fixes it via initializing parent's thread local with `AtomicReference` for file status so that they can be used in each task, and children thread's update is reflected. I also tried to explain this a bit more at #24958 (comment). ## How was this patch tested? Manually tested and unittest was added. Closes #24958 from HyukjinKwon/SPARK-28153. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? This PR moves `replaceFunctionName(usage: String, functionName: String)` from `DescribeFunctionCommand` to `ExpressionInfo` in order to make `ExpressionInfo` returns actual name instead of placeholder. We can get `ExpressionInfo`s directly through `SessionCatalog.lookupFunctionInfo` API and get the real names. ## How was this patch tested? unit tests Closes #25314 from wangyum/SPARK-28581. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…ttempt id ## What changes were proposed in this pull request? When we set ```spark.history.ui.maxApplications``` to a small value, we can't get some apps from the page search. If the url is spliced (http://localhost:18080/history/local-xxx), it can be accessed if the app has no attempt. But in the case of multiple attempted apps, such a url cannot be accessed, and the page displays Not Found. ## How was this patch tested? Add UT Closes #25301 from cxzl25/hs_app_last_attempt_id. Authored-by: sychen <sychen@ctrip.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…blacklist test The issue is that the test tried to stop an existing scheduler and replace it with a new one set up for the test. That can cause issues because both were sharing the same RpcEnv underneath, and unregistering RpcEndpoints is actually asynchronous (see comment in Dispatcher.unregisterRpcEndpoint). So that could lead to races where the new scheduler tried to register before the old one was fully unregistered. The updated test avoids the issue by using a separate RpcEnv / scheduler instance altogether, and also avoids a misleading NPE in the test logs. Closes #25318 from vanzin/SPARK-24352. Authored-by: Marcelo Vanzin <vanzin@cloudera.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…t result ## What changes were proposed in this pull request? When an overflow occurs performing an arithmetic operation, we are returning an incorrect value. Instead, we should throw an exception, as stated in the SQL standard. ## How was this patch tested? added UT + existing UTs (improved) Closes #21599 from mgaido91/SPARK-24598. Authored-by: Marco Gaido <marcogaido91@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ittable file
## What changes were proposed in this pull request?
Logging in driver when loading single large unsplittable file via `sc.textFile` or csv/json datasouce.
Current condition triggering logging is
* only generate one partition
* file is unsplittable, possible reason is:
- compressed by unsplittable compression algo such as gzip.
- multiLine mode in csv/json datasource
- wholeText mode in text datasource
* file size exceed the config threshold `spark.io.warning.largeFileThreshold` (default value is 1GB)
## How was this patch tested?
Manually test.
Generate one gzip file exceeding 1GB,
```
base64 -b 50 /dev/urandom | head -c 2000000000 > file1.txt
cat file1.txt | gzip > file1.gz
```
then launch spark-shell,
run
```
sc.textFile("file:///path/to/file1.gz").count()
```
Will print log like:
```
WARN HadoopRDD: Loading one large unsplittable file file:/.../f1.gz with only one partition, because the file is compressed by unsplittable compression codec
```
run
```
sc.textFile("file:///path/to/file1.txt").count()
```
Will print log like:
```
WARN HadoopRDD: Loading one large file file:/.../f1.gz with only one partition, we can increase partition numbers by the `minPartitions` argument in method `sc.textFile
```
run
```
spark.read.csv("file:///path/to/file1.gz").count
```
Will print log like:
```
WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the file is compressed by unsplittable compression codec
```
run
```
spark.read.option("multiLine", true).csv("file:///path/to/file1.gz").count
```
Will print log like:
```
WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the csv datasource is set multiLine mode
```
JSON and Text datasource also tested with similar cases.
Please review https://spark.apache.org/contributing.html before opening a pull request.
Closes #25134 from WeichenXu123/log_gz.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? Prior to this change, in an executor, on each heartbeat, memory metrics are polled and sent in the heartbeat. The heartbeat interval is 10s by default. With this change, in an executor, memory metrics can optionally be polled in a separate poller at a shorter interval. For each executor, we use a map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks) to track what stages are active as well as the per-stage memory metric peaks. When polling the executor memory metrics, we attribute the memory to the active stage(s), and update the peaks. In a heartbeat, we send the per-stage peaks (for stages active at that time), and then reset the peaks. The semantics would be that the per-stage peaks sent in each heartbeat are the peaks since the last heartbeat. We also keep a map of taskId to memory metric peaks. This tracks the metric peaks during the lifetime of the task. The polling thread updates this as well. At end of a task, we send the peak metric values in the task result. In case of task failure, we send the peak metric values in the `TaskFailedReason`. We continue to do the stage-level aggregation in the EventLoggingListener. For the driver, we still only poll on heartbeats. What the driver sends will be the current values of the metrics in the driver at the time of the heartbeat. This is semantically the same as before. ## How was this patch tested? Unit tests. Manually tested applications on an actual system and checked the event logs; the metrics appear in the SparkListenerTaskEnd and SparkListenerStageExecutorMetrics events. Closes #23767 from wypoon/wypoon_SPARK-26329. Authored-by: Wing Yew Poon <wypoon@cloudera.com> Signed-off-by: Imran Rashid <irashid@cloudera.com>
## What changes were proposed in this pull request? it seems that doc for libsvm datasource is not added in #22675. This pr is to add it. ## How was this patch tested? doc built locally  Closes #25286 from zhengruifeng/doc_libsvm_data_source. Authored-by: zhengruifeng <ruifengz@foxmail.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request? This PR proposes to make `merge_spark_pr.py` script Python 3 compatible. ## How was this patch tested? Manually tested against my forked remote with the PR and JIRA below: #25321 #25286 https://issues.apache.org/jira/browse/SPARK-28153 Closes #25322 from HyukjinKwon/merge-script. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
….api visible ## What changes were proposed in this pull request? This PR proposes to make Javadoc in org.apache.spark.shuffle.api visible. ## How was this patch tested? Manually built the doc and checked:  Closes #25323 from HyukjinKwon/SPARK-28568. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
There's a small, probably very hard to hit thread-safety issue in the blacklist abort timers in the task scheduler, where they access a non-thread-safe map without locks. In the tests, the code was also calling methods on the TaskSetManager without holding the proper locks, which could cause threads to call non-thread-safe TSM methods concurrently. Closes #25317 from vanzin/SPARK-28584. Authored-by: Marcelo Vanzin <vanzin@cloudera.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request? ```sql spark-sql> select cast(1); 19/07/26 00:54:17 ERROR SparkSQLDriver: Failed in [select cast(1)] java.lang.UnsupportedOperationException: empty.init at scala.collection.TraversableLike$class.init(TraversableLike.scala:451) at scala.collection.mutable.ArrayOps$ofInt.scala$collection$IndexedSeqOptimized$$super$init(ArrayOps.scala:234) at scala.collection.IndexedSeqOptimized$class.init(IndexedSeqOptimized.scala:135) at scala.collection.mutable.ArrayOps$ofInt.init(ArrayOps.scala:234) at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7$$anonfun$11.apply(FunctionRegistry.scala:565) at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7$$anonfun$11.apply(FunctionRegistry.scala:558) at scala.Option.getOrElse(Option.scala:121) ``` The reason is that we did not handle the case [`validParametersCount.length == 0`](https://github.com/apache/spark/blob/2d74f14d74e7b24109f347822600ebf9819b04c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L588) because the [parameter types](https://github.com/apache/spark/blob/2d74f14d74e7b24109f347822600ebf9819b04c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L589) can be `Expression`, `DataType` and `Option`. This PR makes it handle the case `validParametersCount.length == 0`. ## How was this patch tested? unit tests Closes #25261 from wangyum/SPARK-28521. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
ulysses-you
pushed a commit
that referenced
this pull request
Oct 31, 2019
### What changes were proposed in this pull request? `org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite` failed lately. After had a look at the logs it just shows the following fact without any details: ``` Caused by: sbt.ForkMain$ForkError: sun.security.krb5.KrbException: Server not found in Kerberos database (7) - Server not found in Kerberos database ``` Since the issue is intermittent and not able to reproduce it we should add more debug information and wait for reproduction with the extended logs. ### Why are the changes needed? Failing test doesn't give enough debug information. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? I've started the test manually and checked that such additional debug messages show up: ``` >>> KrbApReq: APOptions are 00000000 00000000 00000000 00000000 >>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType Looking for keys for: kafka/localhostEXAMPLE.COM Added key: 17version: 0 Added key: 23version: 0 Added key: 16version: 0 Found unsupported keytype (3) for kafka/localhostEXAMPLE.COM >>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType Using builtin default etypes for permitted_enctypes default etypes for permitted_enctypes: 17 16 23. >>> EType: sun.security.krb5.internal.crypto.Aes128CtsHmacSha1EType MemoryCache: add 1571936500/174770/16C565221B70AAB2BEFE31A83D13A2F4/client/localhostEXAMPLE.COM to client/localhostEXAMPLE.COM|kafka/localhostEXAMPLE.COM MemoryCache: Existing AuthList: #3: 1571936493/200803/8CD70D280B0862C5DA1FF901ECAD39FE/client/localhostEXAMPLE.COM #2: 1571936499/985009/BAD33290D079DD4E3579A8686EC326B7/client/localhostEXAMPLE.COM #1: 1571936499/995208/B76B9D78A9BE283AC78340157107FD40/client/localhostEXAMPLE.COM ``` Closes apache#26252 from gaborgsomogyi/SPARK-29580. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
ulysses-you
pushed a commit
that referenced
this pull request
Jun 12, 2020
### What changes were proposed in this pull request?
This PR proposes to make `PythonFunction` holds `Seq[Byte]` instead of `Array[Byte]` to be able to compare if the byte array has the same values for the cache manager.
### Why are the changes needed?
Currently the cache manager doesn't use the cache for `udf` if the `udf` is created again even if the functions is the same.
```py
>>> func = lambda x: x
>>> df = spark.range(1)
>>> df.select(udf(func)("id")).cache()
```
```py
>>> df.select(udf(func)("id")).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#14 AS <lambda>(id)#12]
+- BatchEvalPython [<lambda>(id#0L)], [pythonUDF0#14]
+- *(1) Range (0, 1, step=1, splits=12)
```
This is because `PythonFunction` holds `Array[Byte]`, and `equals` method of array equals only when the both array is the same instance.
### Does this PR introduce _any_ user-facing change?
Yes, if the user reuse the Python function for the UDF, the cache manager will detect the same function and use the cache for it.
### How was this patch tested?
I added a test case and manually.
```py
>>> df.select(udf(func)("id")).explain()
== Physical Plan ==
InMemoryTableScan [<lambda>(id)#12]
+- InMemoryRelation [<lambda>(id)#12], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(2) Project [pythonUDF0#5 AS <lambda>(id)#3]
+- BatchEvalPython [<lambda>(id#0L)], [pythonUDF0#5]
+- *(1) Range (0, 1, step=1, splits=12)
```
Closes apache#28774 from ueshin/issues/SPARK-31945/udf_cache.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
ulysses-you
added a commit
that referenced
this pull request
Jul 8, 2020
… without WindowExpression
### What changes were proposed in this pull request?
Add WindowFunction check at `CheckAnalysis`.
### Why are the changes needed?
Provide friendly error msg.
**BEFORE**
```scala
scala> sql("select rank() from values(1)").show
java.lang.UnsupportedOperationException: Cannot generate code for expression: rank()
```
**AFTER**
```scala
scala> sql("select rank() from values(1)").show
org.apache.spark.sql.AnalysisException: Window function rank() requires an OVER clause.;;
Project [rank() AS RANK()#3]
+- LocalRelation [col1#2]
```
### Does this PR introduce _any_ user-facing change?
Yes, user wiill be given a better error msg.
### How was this patch tested?
Pass the newly added UT.
Closes apache#28808 from ulysses-you/SPARK-31975.
Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ulysses-you
pushed a commit
that referenced
this pull request
Oct 14, 2022
…ly equivalent children in `RewriteDistinctAggregates` ### What changes were proposed in this pull request? In `RewriteDistinctAggregates`, when grouping aggregate expressions by function children, treat children that are semantically equivalent as the same. ### Why are the changes needed? This PR will reduce the number of projections in the Expand operator when there are multiple distinct aggregations with superficially different children. In some cases, it will eliminate the need for an Expand operator. Example: In the following query, the Expand operator creates 3\*n rows (where n is the number of incoming rows) because it has a projection for each of function children `b + 1`, `1 + b` and `c`. ``` create or replace temp view v1 as select * from values (1, 2, 3.0), (1, 3, 4.0), (2, 4, 2.5), (2, 3, 1.0) v1(a, b, c); select a, count(distinct b + 1), avg(distinct 1 + b) filter (where c > 0), sum(c) from v1 group by a; ``` The Expand operator has three projections (each producing a row for each incoming row): ``` [a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for regular aggregation) [a#87, (b#88 + 1), null, 1, null, null], <== projection #2 (for distinct aggregation of b + 1) [a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for distinct aggregation of 1 + b) ``` In reality, the Expand only needs one projection for `1 + b` and `b + 1`, because they are semantically equivalent. With the proposed change, the Expand operator's projections look like this: ``` [a#67, null, 0, null, UnscaledValue(c#69)], <== projection #1 (for regular aggregations) [a#67, (b#68 + 1), 1, (c#69 > 0.0), null]], <== projection #2 (for distinct aggregation on b + 1 and 1 + b) ``` With one less projection, Expand produces 2\*n rows instead of 3\*n rows, but still produces the correct result. In the case where all distinct aggregates have semantically equivalent children, the Expand operator is not needed at all. Benchmark code in the JIRA (SPARK-40382). Before the PR: ``` distinct aggregates: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ all semantically equivalent 14721 14859 195 5.7 175.5 1.0X some semantically equivalent 14569 14572 5 5.8 173.7 1.0X none semantically equivalent 14408 14488 113 5.8 171.8 1.0X ``` After the PR: ``` distinct aggregates: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ all semantically equivalent 3658 3692 49 22.9 43.6 1.0X some semantically equivalent 9124 9214 127 9.2 108.8 0.4X none semantically equivalent 14601 14777 250 5.7 174.1 0.3X ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit tests. Closes apache#37825 from bersprockets/rewritedistinct_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ulysses-you
pushed a commit
that referenced
this pull request
Mar 22, 2023
…edExpression() ### What changes were proposed in this pull request? In `EquivalentExpressions.addExpr()`, add a guard `supportedExpression()` to make it consistent with `addExprTree()` and `getExprState()`. ### Why are the changes needed? This fixes a regression caused by apache#39010 which added the `supportedExpression()` to `addExprTree()` and `getExprState()` but not `addExpr()`. One example of a use case affected by the inconsistency is the `PhysicalAggregation` pattern in physical planning. There, it calls `addExpr()` to deduplicate the aggregate expressions, and then calls `getExprState()` to deduplicate the result expressions. Guarding inconsistently will cause the aggregate and result expressions go out of sync, eventually resulting in query execution error (or whole-stage codegen error). ### Does this PR introduce _any_ user-facing change? This fixes a regression affecting Spark 3.3.2+, where it may manifest as an error running aggregate operators with higher-order functions. Example running the SQL command: ```sql select max(transform(array(id), x -> x)), max(transform(array(id), x -> x)) from range(2) ``` example error message before the fix: ``` java.lang.IllegalStateException: Couldn't find max(transform(array(id#0L), lambdafunction(lambda x#2L, lambda x#2L, false)))#4 in [max(transform(array(id#0L), lambdafunction(lambda x#1L, lambda x#1L, false)))#3] ``` after the fix this error is gone. ### How was this patch tested? Added new test cases to `SubexpressionEliminationSuite` for the immediate issue, and to `DataFrameAggregateSuite` for an example of user-visible symptom. Closes apache#40473 from rednaxelafx/spark-42851. Authored-by: Kris Mok <kris.mok@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ulysses-you
pushed a commit
that referenced
this pull request
Nov 14, 2023
…edExpression() ### What changes were proposed in this pull request? In `EquivalentExpressions.addExpr()`, add a guard `supportedExpression()` to make it consistent with `addExprTree()` and `getExprState()`. ### Why are the changes needed? This fixes a regression caused by apache#39010 which added the `supportedExpression()` to `addExprTree()` and `getExprState()` but not `addExpr()`. One example of a use case affected by the inconsistency is the `PhysicalAggregation` pattern in physical planning. There, it calls `addExpr()` to deduplicate the aggregate expressions, and then calls `getExprState()` to deduplicate the result expressions. Guarding inconsistently will cause the aggregate and result expressions go out of sync, eventually resulting in query execution error (or whole-stage codegen error). ### Does this PR introduce _any_ user-facing change? This fixes a regression affecting Spark 3.3.2+, where it may manifest as an error running aggregate operators with higher-order functions. Example running the SQL command: ```sql select max(transform(array(id), x -> x)), max(transform(array(id), x -> x)) from range(2) ``` example error message before the fix: ``` java.lang.IllegalStateException: Couldn't find max(transform(array(id#0L), lambdafunction(lambda x#2L, lambda x#2L, false)))#4 in [max(transform(array(id#0L), lambdafunction(lambda x#1L, lambda x#1L, false)))#3] ``` after the fix this error is gone. ### How was this patch tested? Added new test cases to `SubexpressionEliminationSuite` for the immediate issue, and to `DataFrameAggregateSuite` for an example of user-visible symptom. Closes apache#40473 from rednaxelafx/spark-42851. Authored-by: Kris Mok <kris.mok@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit ef0a76e) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review https://spark.apache.org/contributing.html before opening a pull request.