diff --git a/LICENSE b/LICENSE index c21032a1fd274..66a2e8f132953 100644 --- a/LICENSE +++ b/LICENSE @@ -249,11 +249,11 @@ The text of each license is also included at licenses/LICENSE-[project].txt. (Interpreter classes (all .scala files in repl/src/main/scala except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala), and for SerializableMapWrapper in JavaUtils.scala) - (BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.11.7 - http://www.scala-lang.org/) - (BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.11.7 - http://www.scala-lang.org/) - (BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.11.7 - http://www.scala-lang.org/) - (BSD-like) Scala Library (org.scala-lang:scala-library:2.11.7 - http://www.scala-lang.org/) - (BSD-like) Scalap (org.scala-lang:scalap:2.11.7 - http://www.scala-lang.org/) + (BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.11.8 - http://www.scala-lang.org/) + (BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.11.8 - http://www.scala-lang.org/) + (BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.11.8 - http://www.scala-lang.org/) + (BSD-like) Scala Library (org.scala-lang:scala-library:2.11.8 - http://www.scala-lang.org/) + (BSD-like) Scalap (org.scala-lang:scalap:2.11.8 - http://www.scala-lang.org/) (BSD-style) scalacheck (org.scalacheck:scalacheck_2.11:1.10.0 - http://www.scalacheck.org) (BSD-style) spire (org.spire-math:spire_2.11:0.7.1 - http://spire-math.org) (BSD-style) spire-macros (org.spire-math:spire-macros_2.11:0.7.1 - http://spire-math.org) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index aab2fc17aedaf..166b39813c14e 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -549,7 +549,7 @@ setMethod("registerTempTable", #' sparkR.session() #' df <- read.df(path, "parquet") #' df2 <- read.df(path2, "parquet") -#' createOrReplaceTempView(df, "table1") +#' saveAsTable(df, "table1") #' insertInto(df2, "table1", overwrite = TRUE) #'} #' @note insertInto since 1.4.0 @@ -1125,7 +1125,8 @@ setMethod("dim", #' path <- "path/to/file.json" #' df <- read.json(path) #' collected <- collect(df) -#' firstName <- collected[[1]]$name +#' class(collected) +#' firstName <- names(collected)[1] #' } #' @note collect since 1.4.0 setMethod("collect", @@ -2814,7 +2815,7 @@ setMethod("except", #' path <- "path/to/file.json" #' df <- read.json(path) #' write.df(df, "myfile", "parquet", "overwrite") -#' saveDF(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema) +#' saveDF(df, parquetPath2, "parquet", mode = "append", mergeSchema = TRUE) #' } #' @note write.df since 1.4.0 setMethod("write.df", @@ -3097,8 +3098,8 @@ setMethod("fillna", #' @family SparkDataFrame functions #' @aliases as.data.frame,SparkDataFrame-method #' @rdname as.data.frame -#' @examples \dontrun{ -#' +#' @examples +#' \dontrun{ #' irisDF <- createDataFrame(iris) #' df <- as.data.frame(irisDF[irisDF$Species == "setosa", ]) #' } @@ -3175,7 +3176,8 @@ setMethod("with", #' @aliases str,SparkDataFrame-method #' @family SparkDataFrame functions #' @param object a SparkDataFrame -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' # Create a SparkDataFrame from the Iris dataset #' irisDF <- createDataFrame(iris) #' @@ -3667,8 +3669,8 @@ setMethod("checkpoint", #' mean(cube(df, "cyl", "gear", "am"), "mpg") #' #' # Following calls are equivalent -#' agg(cube(carsDF), mean(carsDF$mpg)) -#' agg(carsDF, mean(carsDF$mpg)) +#' agg(cube(df), mean(df$mpg)) +#' agg(df, mean(df$mpg)) #' } #' @note cube since 2.3.0 #' @seealso \link{agg}, \link{groupBy}, \link{rollup} @@ -3702,8 +3704,8 @@ setMethod("cube", #' mean(rollup(df, "cyl", "gear", "am"), "mpg") #' #' # Following calls are equivalent -#' agg(rollup(carsDF), mean(carsDF$mpg)) -#' agg(carsDF, mean(carsDF$mpg)) +#' agg(rollup(df), mean(df$mpg)) +#' agg(df, mean(df$mpg)) #' } #' @note rollup since 2.3.0 #' @seealso \link{agg}, \link{cube}, \link{groupBy} diff --git a/R/pkg/R/WindowSpec.R b/R/pkg/R/WindowSpec.R index 4ac83c29c6f7e..81beac9ea9925 100644 --- a/R/pkg/R/WindowSpec.R +++ b/R/pkg/R/WindowSpec.R @@ -203,7 +203,8 @@ setMethod("rangeBetween", #' @aliases over,Column,WindowSpec-method #' @family colum_func #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(mtcars) #' #' # Partition by am (transmission) and order by hp (horsepower) diff --git a/R/pkg/R/column.R b/R/pkg/R/column.R index 574078012adad..a5c2ea81f2490 100644 --- a/R/pkg/R/column.R +++ b/R/pkg/R/column.R @@ -135,7 +135,8 @@ createMethods() #' @aliases alias,Column-method #' @family colum_func #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(iris) #' #' head(select( @@ -244,7 +245,8 @@ setMethod("between", signature(x = "Column"), #' @family colum_func #' @aliases cast,Column-method #' -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' cast(df$age, "string") #' } #' @note cast since 1.4.0 diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index a6c2dea0ff2a7..06a90192bb12f 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3257,7 +3257,8 @@ setMethod("when", signature(condition = "Column", value = "ANY"), #' @aliases ifelse,Column-method #' @seealso \link{when} #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' ifelse(df$a > 1 & df$b > 2, 0, 1) #' ifelse(df$a > 1, df$a, 1) #' } @@ -3292,7 +3293,8 @@ setMethod("ifelse", #' @family window functions #' @aliases cume_dist,missing-method #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(mtcars) #' ws <- orderBy(windowPartitionBy("am"), "hp") #' out <- select(df, over(cume_dist(), ws), df$hp, df$am) @@ -3321,7 +3323,8 @@ setMethod("cume_dist", #' @family window functions #' @aliases dense_rank,missing-method #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(mtcars) #' ws <- orderBy(windowPartitionBy("am"), "hp") #' out <- select(df, over(dense_rank(), ws), df$hp, df$am) @@ -3352,7 +3355,8 @@ setMethod("dense_rank", #' @aliases lag,characterOrColumn-method #' @family window functions #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(mtcars) #' #' # Partition by am (transmission) and order by hp (horsepower) @@ -3395,7 +3399,8 @@ setMethod("lag", #' @family window functions #' @aliases lead,characterOrColumn,numeric-method #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(mtcars) #' #' # Partition by am (transmission) and order by hp (horsepower) @@ -3434,7 +3439,8 @@ setMethod("lead", #' @aliases ntile,numeric-method #' @family window functions #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(mtcars) #' #' # Partition by am (transmission) and order by hp (horsepower) @@ -3466,7 +3472,8 @@ setMethod("ntile", #' @family window functions #' @aliases percent_rank,missing-method #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(mtcars) #' ws <- orderBy(windowPartitionBy("am"), "hp") #' out <- select(df, over(percent_rank(), ws), df$hp, df$am) @@ -3496,7 +3503,8 @@ setMethod("percent_rank", #' @family window functions #' @aliases rank,missing-method #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(mtcars) #' ws <- orderBy(windowPartitionBy("am"), "hp") #' out <- select(df, over(rank(), ws), df$hp, df$am) @@ -3533,7 +3541,8 @@ setMethod("rank", #' @aliases row_number,missing-method #' @family window functions #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(mtcars) #' ws <- orderBy(windowPartitionBy("am"), "hp") #' out <- select(df, over(row_number(), ws), df$hp, df$am) @@ -3761,7 +3770,8 @@ setMethod("collect_set", #' @family string functions #' @aliases split_string,Column-method #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- read.text("README.md") #' #' head(select(df, split_string(df$value, "\\s+"))) @@ -3790,7 +3800,8 @@ setMethod("split_string", #' @family string functions #' @aliases repeat_string,Column-method #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- read.text("README.md") #' #' first(select(df, repeat_string(df$value, 3))) @@ -3819,7 +3830,8 @@ setMethod("repeat_string", #' @family collection functions #' @aliases explode_outer,Column-method #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(data.frame( #' id = c(1, 2, 3), text = c("a,b,c", NA, "d,e") #' )) @@ -3847,7 +3859,8 @@ setMethod("explode_outer", #' @family collection functions #' @aliases posexplode_outer,Column-method #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(data.frame( #' id = c(1, 2, 3), text = c("a,b,c", NA, "d,e") #' )) @@ -3875,7 +3888,8 @@ setMethod("posexplode_outer", #' @aliases not,Column-method #' @family non-aggregate functions #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(data.frame( #' is_true = c(TRUE, FALSE, NA), #' flag = c(1, 0, 1) @@ -3908,7 +3922,8 @@ setMethod("not", #' @family aggregate functions #' @aliases grouping_bit,Column-method #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(mtcars) #' #' # With cube @@ -3949,7 +3964,8 @@ setMethod("grouping_bit", #' @family aggregate functions #' @aliases grouping_id,Column-method #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- createDataFrame(mtcars) #' #' # With cube @@ -3987,7 +4003,8 @@ setMethod("grouping_id", #' @family non-aggregate functions #' @aliases input_file_name,missing-method #' @export -#' @examples \dontrun{ +#' @examples +#' \dontrun{ #' df <- read.text("README.md") #' #' head(select(df, input_file_name())) diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 58b865969f517..622f7985ba444 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.shuffle.ShuffleWriter -import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId} +import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId} import org.apache.spark.util.{MutablePair, Utils} abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext { @@ -277,7 +277,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC // Delete one of the local shuffle blocks. val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0)) val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0)) - assert(hashFile.exists() || sortFile.exists()) + val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0)) + assert(hashFile.exists() || (sortFile.exists() && indexFile.exists())) if (hashFile.exists()) { hashFile.delete() @@ -285,11 +286,36 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC if (sortFile.exists()) { sortFile.delete() } + if (indexFile.exists()) { + indexFile.delete() + } // This count should retry the execution of the previous stage and rerun shuffle. rdd.count() } + test("cannot find its local shuffle file if no execution of the stage and rerun shuffle") { + sc = new SparkContext("local", "test", conf.clone()) + val rdd = sc.parallelize(1 to 10, 1).map((_, 1)).reduceByKey(_ + _) + + // Cannot find one of the local shuffle blocks. + val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0)) + val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0)) + val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0)) + assert(!hashFile.exists() && !sortFile.exists() && !indexFile.exists()) + + rdd.count() + + // Can find one of the local shuffle blocks. + val hashExistsFile = sc.env.blockManager.diskBlockManager + .getFile(new ShuffleBlockId(0, 0, 0)) + val sortExistsFile = sc.env.blockManager.diskBlockManager + .getFile(new ShuffleDataBlockId(0, 0, 0)) + val indexExistsFile = sc.env.blockManager.diskBlockManager + .getFile(new ShuffleIndexBlockId(0, 0, 0)) + assert(hashExistsFile.exists() || (sortExistsFile.exists() && indexExistsFile.exists())) + } + test("metrics for shuffle without aggregation") { sc = new SparkContext("local", "test", conf.clone()) val numRecords = 10000 diff --git a/docs/_config.yml b/docs/_config.yml index 21255ef7a5c45..dcc211204d766 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -17,7 +17,7 @@ include: SPARK_VERSION: 2.3.0-SNAPSHOT SPARK_VERSION_SHORT: 2.3.0 SCALA_BINARY_VERSION: "2.11" -SCALA_VERSION: "2.11.7" +SCALA_VERSION: "2.11.8" MESOS_VERSION: 1.0.0 SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK SPARK_GITHUB_URL: https://github.com/apache/spark diff --git a/docs/ml-guide.md b/docs/ml-guide.md index 971761961b965..362e883e55e83 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -26,7 +26,7 @@ The primary Machine Learning API for Spark is now the [DataFrame](sql-programmin * MLlib will still support the RDD-based API in `spark.mllib` with bug fixes. * MLlib will not add new features to the RDD-based API. * In the Spark 2.x releases, MLlib will add features to the DataFrames-based API to reach feature parity with the RDD-based API. -* After reaching feature parity (roughly estimated for Spark 2.2), the RDD-based API will be deprecated. +* After reaching feature parity (roughly estimated for Spark 2.3), the RDD-based API will be deprecated. * The RDD-based API is expected to be removed in Spark 3.0. *Why is MLlib switching to the DataFrame-based API?* @@ -66,41 +66,59 @@ To use MLlib in Python, you will need [NumPy](http://www.numpy.org) version 1.4 [^1]: To learn more about the benefits and background of system optimised natives, you may wish to watch Sam Halliday's ScalaX talk on [High Performance Linear Algebra in Scala](http://fommil.github.io/scalax14/#/). +# Highlights in 2.2 + +The list below highlights some of the new features and enhancements added to MLlib in the `2.2` +release of Spark: + +* `ALS` methods for _top-k_ recommendations for all users or items, matching the functionality + in `mllib` ([SPARK-19535](https://issues.apache.org/jira/browse/SPARK-19535)). Performance + was also improved for both `ml` and `mllib` + ([SPARK-11968](https://issues.apache.org/jira/browse/SPARK-11968) and + [SPARK-20587](https://issues.apache.org/jira/browse/SPARK-20587)) +* `Correlation` and `ChiSquareTest` stats functions for `DataFrames` + ([SPARK-19636](https://issues.apache.org/jira/browse/SPARK-19636) and + [SPARK-19635](https://issues.apache.org/jira/browse/SPARK-19635)) +* `FPGrowth` algorithm for frequent pattern mining + ([SPARK-14503](https://issues.apache.org/jira/browse/SPARK-14503)) +* `GLM` now supports the full `Tweedie` family + ([SPARK-18929](https://issues.apache.org/jira/browse/SPARK-18929)) +* `Imputer` feature transformer to impute missing values in a dataset + ([SPARK-13568](https://issues.apache.org/jira/browse/SPARK-13568)) +* `LinearSVC` for linear Support Vector Machine classification + ([SPARK-14709](https://issues.apache.org/jira/browse/SPARK-14709)) +* Logistic regression now supports constraints on the coefficients during training + ([SPARK-20047](https://issues.apache.org/jira/browse/SPARK-20047)) + # Migration guide MLlib is under active development. The APIs marked `Experimental`/`DeveloperApi` may change in future releases, and the migration guide below will explain all changes between releases. -## From 2.0 to 2.1 +## From 2.1 to 2.2 ### Breaking changes - -**Deprecated methods removed** -* `setLabelCol` in `feature.ChiSqSelectorModel` -* `numTrees` in `classification.RandomForestClassificationModel` (This now refers to the Param called `numTrees`) -* `numTrees` in `regression.RandomForestRegressionModel` (This now refers to the Param called `numTrees`) -* `model` in `regression.LinearRegressionSummary` -* `validateParams` in `PipelineStage` -* `validateParams` in `Evaluator` +There are no breaking changes. ### Deprecations and changes of behavior **Deprecations** -* [SPARK-18592](https://issues.apache.org/jira/browse/SPARK-18592): - Deprecate all Param setter methods except for input/output column Params for `DecisionTreeClassificationModel`, `GBTClassificationModel`, `RandomForestClassificationModel`, `DecisionTreeRegressionModel`, `GBTRegressionModel` and `RandomForestRegressionModel` +There are no deprecations. **Changes of behavior** -* [SPARK-17870](https://issues.apache.org/jira/browse/SPARK-17870): - Fix a bug of `ChiSqSelector` which will likely change its result. Now `ChiSquareSelector` use pValue rather than raw statistic to select a fixed number of top features. -* [SPARK-3261](https://issues.apache.org/jira/browse/SPARK-3261): - `KMeans` returns potentially fewer than k cluster centers in cases where k distinct centroids aren't available or aren't selected. -* [SPARK-17389](https://issues.apache.org/jira/browse/SPARK-17389): - `KMeans` reduces the default number of steps from 5 to 2 for the k-means|| initialization mode. - +* [SPARK-19787](https://issues.apache.org/jira/browse/SPARK-19787): + Default value of `regParam` changed from `1.0` to `0.1` for `ALS.train` method (marked `DeveloperApi`). + **Note** this does _not affect_ the `ALS` Estimator or Model, nor MLlib's `ALS` class. +* [SPARK-14772](https://issues.apache.org/jira/browse/SPARK-14772): + Fixed inconsistency between Python and Scala APIs for `Param.copy` method. +* [SPARK-11569](https://issues.apache.org/jira/browse/SPARK-11569): + `StringIndexer` now handles `NULL` values in the same way as unseen values. Previously an exception + would always be thrown regardless of the setting of the `handleInvalid` parameter. + ## Previous Spark versions Earlier migration guides are archived [on this page](ml-migration-guides.html). diff --git a/docs/ml-migration-guides.md b/docs/ml-migration-guides.md index 58c3747ea6387..687d7c8930362 100644 --- a/docs/ml-migration-guides.md +++ b/docs/ml-migration-guides.md @@ -7,6 +7,35 @@ description: MLlib migration guides from before Spark SPARK_VERSION_SHORT The migration guide for the current Spark version is kept on the [MLlib Guide main page](ml-guide.html#migration-guide). +## From 2.0 to 2.1 + +### Breaking changes + +**Deprecated methods removed** + +* `setLabelCol` in `feature.ChiSqSelectorModel` +* `numTrees` in `classification.RandomForestClassificationModel` (This now refers to the Param called `numTrees`) +* `numTrees` in `regression.RandomForestRegressionModel` (This now refers to the Param called `numTrees`) +* `model` in `regression.LinearRegressionSummary` +* `validateParams` in `PipelineStage` +* `validateParams` in `Evaluator` + +### Deprecations and changes of behavior + +**Deprecations** + +* [SPARK-18592](https://issues.apache.org/jira/browse/SPARK-18592): + Deprecate all Param setter methods except for input/output column Params for `DecisionTreeClassificationModel`, `GBTClassificationModel`, `RandomForestClassificationModel`, `DecisionTreeRegressionModel`, `GBTRegressionModel` and `RandomForestRegressionModel` + +**Changes of behavior** + +* [SPARK-17870](https://issues.apache.org/jira/browse/SPARK-17870): + Fix a bug of `ChiSqSelector` which will likely change its result. Now `ChiSquareSelector` use pValue rather than raw statistic to select a fixed number of top features. +* [SPARK-3261](https://issues.apache.org/jira/browse/SPARK-3261): + `KMeans` returns potentially fewer than k cluster centers in cases where k distinct centroids aren't available or aren't selected. +* [SPARK-17389](https://issues.apache.org/jira/browse/SPARK-17389): + `KMeans` reduces the default number of steps from 5 to 2 for the k-means|| initialization mode. + ## From 1.6 to 2.0 ### Breaking changes diff --git a/external/docker/spark-test/base/Dockerfile b/external/docker/spark-test/base/Dockerfile index 76f550f886ce4..5a95a9387c310 100644 --- a/external/docker/spark-test/base/Dockerfile +++ b/external/docker/spark-test/base/Dockerfile @@ -25,7 +25,7 @@ RUN apt-get update && \ apt-get install -y less openjdk-7-jre-headless net-tools vim-tiny sudo openssh-server && \ rm -rf /var/lib/apt/lists/* -ENV SCALA_VERSION 2.11.7 +ENV SCALA_VERSION 2.11.8 ENV CDH_VERSION cdh4 ENV SCALA_HOME /opt/scala-$SCALA_VERSION ENV SPARK_HOME /opt/spark diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 6fc154f8debcf..d2042ad00a816 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -234,6 +234,7 @@ object FunctionRegistry { expression[StringToMap]("str_to_map"), expression[Sqrt]("sqrt"), expression[Tan]("tan"), + expression[Cot]("cot"), expression[Tanh]("tanh"), expression[Add]("+"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 7e4c9089a2cb9..b358102d914bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -50,10 +50,17 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro fieldTypes: Seq[DataType], bufferHolder: String): String = { val fieldEvals = fieldTypes.zipWithIndex.map { case (dt, i) => - val fieldName = ctx.freshName("fieldName") - val code = s"final ${ctx.javaType(dt)} $fieldName = ${ctx.getValue(input, dt, i.toString)};" - val isNull = s"$input.isNullAt($i)" - ExprCode(code, isNull, fieldName) + val javaType = ctx.javaType(dt) + val isNullVar = ctx.freshName("isNull") + val valueVar = ctx.freshName("value") + val defaultValue = ctx.defaultValue(dt) + val readValue = ctx.getValue(input, dt, i.toString) + val code = + s""" + boolean $isNullVar = $input.isNullAt($i); + $javaType $valueVar = $isNullVar ? $defaultValue : $readValue; + """ + ExprCode(code, isNullVar, valueVar) } s""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala index de1a46dc47805..a7bf81e98be8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala @@ -543,6 +543,20 @@ case class Sqrt(child: Expression) extends UnaryMathExpression(math.sqrt, "SQRT" """) case class Tan(child: Expression) extends UnaryMathExpression(math.tan, "TAN") +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the cotangent of `expr`.", + extended = """ + Examples: + > SELECT _FUNC_(1); + 0.6420926159343306 + """) +case class Cot(child: Expression) + extends UnaryMathExpression((x: Double) => 1 / math.tan(x), "COT") { + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + defineCodeGen(ctx, ev, c => s"${ev.value} = 1 / java.lang.Math.tan($c);") + } +} + @ExpressionDescription( usage = "_FUNC_(expr) - Returns the hyperbolic tangent of `expr`.", extended = """ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index b9a3d986bc58a..efb42292634ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -603,7 +603,14 @@ object DateTimeUtils { */ private[this] def getYearAndDayInYear(daysSince1970: SQLDate): (Int, Int) = { // add the difference (in days) between 1.1.1970 and the artificial year 0 (-17999) - val daysNormalized = daysSince1970 + toYearZero + var daysSince1970Tmp = daysSince1970 + // Since Julian calendar was replaced with the Gregorian calendar, + // the 10 days after Oct. 4 were skipped. + // (1582-10-04) -141428 days since 1970-01-01 + if (daysSince1970 <= -141428) { + daysSince1970Tmp -= 10 + } + val daysNormalized = daysSince1970Tmp + toYearZero val numOfQuarterCenturies = daysNormalized / daysIn400Years val daysInThis400 = daysNormalized % daysIn400Years + 1 val (years, dayInYear) = numYears(daysInThis400) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index d3bac0a4d2773..4ce68538c87a1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -76,6 +76,9 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } } checkEvaluation(DayOfYear(Literal.create(null, DateType)), null) + + checkEvaluation(DayOfYear(Literal(new Date(sdf.parse("1582-10-15 13:10:15").getTime))), 288) + checkEvaluation(DayOfYear(Literal(new Date(sdf.parse("1582-10-04 13:10:15").getTime))), 277) checkConsistencyBetweenInterpretedAndCodegen(DayOfYear, DateType) } @@ -96,6 +99,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } } } + checkEvaluation(Year(Literal(new Date(sdf.parse("1582-01-01 13:10:15").getTime))), 1582) + checkEvaluation(Year(Literal(new Date(sdf.parse("1581-12-31 13:10:15").getTime))), 1581) checkConsistencyBetweenInterpretedAndCodegen(Year, DateType) } @@ -116,6 +121,9 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } } } + + checkEvaluation(Quarter(Literal(new Date(sdf.parse("1582-10-01 13:10:15").getTime))), 4) + checkEvaluation(Quarter(Literal(new Date(sdf.parse("1582-09-30 13:10:15").getTime))), 3) checkConsistencyBetweenInterpretedAndCodegen(Quarter, DateType) } @@ -125,6 +133,10 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Month(Cast(Literal(sdfDate.format(d)), DateType, gmtId)), 4) checkEvaluation(Month(Cast(Literal(ts), DateType, gmtId)), 11) + checkEvaluation(Month(Literal(new Date(sdf.parse("1582-04-28 13:10:15").getTime))), 4) + checkEvaluation(Month(Literal(new Date(sdf.parse("1582-10-04 13:10:15").getTime))), 10) + checkEvaluation(Month(Literal(new Date(sdf.parse("1582-10-15 13:10:15").getTime))), 10) + val c = Calendar.getInstance() (2003 to 2004).foreach { y => (0 to 3).foreach { m => @@ -146,6 +158,10 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(DayOfMonth(Cast(Literal(sdfDate.format(d)), DateType, gmtId)), 8) checkEvaluation(DayOfMonth(Cast(Literal(ts), DateType, gmtId)), 8) + checkEvaluation(DayOfMonth(Literal(new Date(sdf.parse("1582-04-28 13:10:15").getTime))), 28) + checkEvaluation(DayOfMonth(Literal(new Date(sdf.parse("1582-10-15 13:10:15").getTime))), 15) + checkEvaluation(DayOfMonth(Literal(new Date(sdf.parse("1582-10-04 13:10:15").getTime))), 4) + val c = Calendar.getInstance() (1999 to 2000).foreach { y => c.set(y, 0, 1, 0, 0, 0) @@ -186,6 +202,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(WeekOfYear(Cast(Literal(sdfDate.format(d)), DateType, gmtId)), 15) checkEvaluation(WeekOfYear(Cast(Literal(ts), DateType, gmtId)), 45) checkEvaluation(WeekOfYear(Cast(Literal("2011-05-06"), DateType, gmtId)), 18) + checkEvaluation(WeekOfYear(Literal(new Date(sdf.parse("1582-10-15 13:10:15").getTime))), 40) + checkEvaluation(WeekOfYear(Literal(new Date(sdf.parse("1582-10-04 13:10:15").getTime))), 40) checkConsistencyBetweenInterpretedAndCodegen(WeekOfYear, DateType) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjectionSuite.scala new file mode 100644 index 0000000000000..e9d21f8a8ebcd --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjectionSuite.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.BoundReference +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{DataType, Decimal, StringType, StructType} +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} + +class GenerateUnsafeProjectionSuite extends SparkFunSuite { + test("Test unsafe projection string access pattern") { + val dataType = (new StructType).add("a", StringType) + val exprs = BoundReference(0, dataType, nullable = true) :: Nil + val projection = GenerateUnsafeProjection.generate(exprs) + val result = projection.apply(InternalRow(AlwaysNull)) + assert(!result.isNullAt(0)) + assert(result.getStruct(0, 1).isNullAt(0)) + } +} + +object AlwaysNull extends InternalRow { + override def numFields: Int = 1 + override def setNullAt(i: Int): Unit = {} + override def copy(): InternalRow = this + override def anyNull: Boolean = true + override def isNullAt(ordinal: Int): Boolean = true + override def update(i: Int, value: Any): Unit = notSupported + override def getBoolean(ordinal: Int): Boolean = notSupported + override def getByte(ordinal: Int): Byte = notSupported + override def getShort(ordinal: Int): Short = notSupported + override def getInt(ordinal: Int): Int = notSupported + override def getLong(ordinal: Int): Long = notSupported + override def getFloat(ordinal: Int): Float = notSupported + override def getDouble(ordinal: Int): Double = notSupported + override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = notSupported + override def getUTF8String(ordinal: Int): UTF8String = notSupported + override def getBinary(ordinal: Int): Array[Byte] = notSupported + override def getInterval(ordinal: Int): CalendarInterval = notSupported + override def getStruct(ordinal: Int, numFields: Int): InternalRow = notSupported + override def getArray(ordinal: Int): ArrayData = notSupported + override def getMap(ordinal: Int): MapData = notSupported + override def get(ordinal: Int, dataType: DataType): AnyRef = notSupported + private def notSupported: Nothing = throw new UnsupportedOperationException +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index a6ce4c2edc232..8b7b0e655b31d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -198,21 +198,25 @@ public boolean anyNull() { @Override public Decimal getDecimal(int ordinal, int precision, int scale) { + if (columns[ordinal].isNullAt(rowId)) return null; return columns[ordinal].getDecimal(rowId, precision, scale); } @Override public UTF8String getUTF8String(int ordinal) { + if (columns[ordinal].isNullAt(rowId)) return null; return columns[ordinal].getUTF8String(rowId); } @Override public byte[] getBinary(int ordinal) { + if (columns[ordinal].isNullAt(rowId)) return null; return columns[ordinal].getBinary(rowId); } @Override public CalendarInterval getInterval(int ordinal) { + if (columns[ordinal].isNullAt(rowId)) return null; final int months = columns[ordinal].getChildColumn(0).getInt(rowId); final long microseconds = columns[ordinal].getChildColumn(1).getLong(rowId); return new CalendarInterval(months, microseconds); @@ -220,11 +224,13 @@ public CalendarInterval getInterval(int ordinal) { @Override public InternalRow getStruct(int ordinal, int numFields) { + if (columns[ordinal].isNullAt(rowId)) return null; return columns[ordinal].getStruct(rowId); } @Override public ArrayData getArray(int ordinal) { + if (columns[ordinal].isNullAt(rowId)) return null; return columns[ordinal].getArray(rowId); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 38b0e33937f3c..63a8666f0d774 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -58,7 +58,7 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit private var schema: StructType = _ // `ValueWriter`s for all fields of the schema - private var rootFieldWriters: Seq[ValueWriter] = _ + private var rootFieldWriters: Array[ValueWriter] = _ // The Parquet `RecordConsumer` to which all `InternalRow`s are written private var recordConsumer: RecordConsumer = _ @@ -90,7 +90,7 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit } - this.rootFieldWriters = schema.map(_.dataType).map(makeWriter) + this.rootFieldWriters = schema.map(_.dataType).map(makeWriter).toArray[ValueWriter] val messageType = new ParquetSchemaConverter(configuration).convert(schema) val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava @@ -116,7 +116,7 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit } private def writeFields( - row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = { + row: InternalRow, schema: StructType, fieldWriters: Array[ValueWriter]): Unit = { var i = 0 while (i < row.numFields) { if (!row.isNullAt(i)) { @@ -192,7 +192,7 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit makeDecimalWriter(precision, scale) case t: StructType => - val fieldWriters = t.map(_.dataType).map(makeWriter) + val fieldWriters = t.map(_.dataType).map(makeWriter).toArray[ValueWriter] (row: SpecializedGetters, ordinal: Int) => consumeGroup { writeFields(row.getStruct(ordinal, t.length), t, fieldWriters) diff --git a/sql/core/src/test/resources/sql-tests/inputs/operators.sql b/sql/core/src/test/resources/sql-tests/inputs/operators.sql index 6339d69ca6473..1920a108c6584 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/operators.sql @@ -53,3 +53,9 @@ explain select 2 * 4 + 3 || 'b'; explain select 3 + 1 || 'a' || 4 / 2; explain select 1 == 1 OR 'a' || 'b' == 'ab'; explain select 'a' || 'c' == 'ac' AND 2 == 3; + +-- math functions +select cot(1); +select cot(null); +select cot(0); +select cot(-1); diff --git a/sql/core/src/test/resources/sql-tests/results/operators.sql.out b/sql/core/src/test/resources/sql-tests/results/operators.sql.out index e0236f41187ec..abd18211c70d8 100644 --- a/sql/core/src/test/resources/sql-tests/results/operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/operators.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 34 +-- Number of queries: 38 -- !query 0 @@ -284,3 +284,35 @@ struct == Physical Plan == *Project [false AS ((concat(a, c) = ac) AND (2 = 3))#x] +- Scan OneRowRelation[] + + +-- !query 34 +select cot(1) +-- !query 34 schema +struct +-- !query 34 output +0.6420926159343306 + + +-- !query 35 +select cot(null) +-- !query 35 schema +struct +-- !query 35 output +NULL + + +-- !query 36 +select cot(0) +-- !query 36 schema +struct +-- !query 36 output +Infinity + + +-- !query 37 +select cot(-1) +-- !query 37 schema +struct +-- !query 37 output +-0.6420926159343306