Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,11 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(Interpreter classes (all .scala files in repl/src/main/scala
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
and for SerializableMapWrapper in JavaUtils.scala)
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.11.7 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.11.7 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.11.7 - http://www.scala-lang.org/)
(BSD-like) Scala Library (org.scala-lang:scala-library:2.11.7 - http://www.scala-lang.org/)
(BSD-like) Scalap (org.scala-lang:scalap:2.11.7 - http://www.scala-lang.org/)
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scala Library (org.scala-lang:scala-library:2.11.8 - http://www.scala-lang.org/)
(BSD-like) Scalap (org.scala-lang:scalap:2.11.8 - http://www.scala-lang.org/)
(BSD-style) scalacheck (org.scalacheck:scalacheck_2.11:1.10.0 - http://www.scalacheck.org)
(BSD-style) spire (org.spire-math:spire_2.11:0.7.1 - http://spire-math.org)
(BSD-style) spire-macros (org.spire-math:spire-macros_2.11:0.7.1 - http://spire-math.org)
Expand Down
22 changes: 12 additions & 10 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ setMethod("registerTempTable",
#' sparkR.session()
#' df <- read.df(path, "parquet")
#' df2 <- read.df(path2, "parquet")
#' createOrReplaceTempView(df, "table1")
#' saveAsTable(df, "table1")
#' insertInto(df2, "table1", overwrite = TRUE)
#'}
#' @note insertInto since 1.4.0
Expand Down Expand Up @@ -1125,7 +1125,8 @@ setMethod("dim",
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' collected <- collect(df)
#' firstName <- collected[[1]]$name
#' class(collected)
#' firstName <- names(collected)[1]
#' }
#' @note collect since 1.4.0
setMethod("collect",
Expand Down Expand Up @@ -2814,7 +2815,7 @@ setMethod("except",
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' write.df(df, "myfile", "parquet", "overwrite")
#' saveDF(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema)
#' saveDF(df, parquetPath2, "parquet", mode = "append", mergeSchema = TRUE)
#' }
#' @note write.df since 1.4.0
setMethod("write.df",
Expand Down Expand Up @@ -3097,8 +3098,8 @@ setMethod("fillna",
#' @family SparkDataFrame functions
#' @aliases as.data.frame,SparkDataFrame-method
#' @rdname as.data.frame
#' @examples \dontrun{
#'
#' @examples
#' \dontrun{
#' irisDF <- createDataFrame(iris)
#' df <- as.data.frame(irisDF[irisDF$Species == "setosa", ])
#' }
Expand Down Expand Up @@ -3175,7 +3176,8 @@ setMethod("with",
#' @aliases str,SparkDataFrame-method
#' @family SparkDataFrame functions
#' @param object a SparkDataFrame
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' # Create a SparkDataFrame from the Iris dataset
#' irisDF <- createDataFrame(iris)
#'
Expand Down Expand Up @@ -3667,8 +3669,8 @@ setMethod("checkpoint",
#' mean(cube(df, "cyl", "gear", "am"), "mpg")
#'
#' # Following calls are equivalent
#' agg(cube(carsDF), mean(carsDF$mpg))
#' agg(carsDF, mean(carsDF$mpg))
#' agg(cube(df), mean(df$mpg))
#' agg(df, mean(df$mpg))
#' }
#' @note cube since 2.3.0
#' @seealso \link{agg}, \link{groupBy}, \link{rollup}
Expand Down Expand Up @@ -3702,8 +3704,8 @@ setMethod("cube",
#' mean(rollup(df, "cyl", "gear", "am"), "mpg")
#'
#' # Following calls are equivalent
#' agg(rollup(carsDF), mean(carsDF$mpg))
#' agg(carsDF, mean(carsDF$mpg))
#' agg(rollup(df), mean(df$mpg))
#' agg(df, mean(df$mpg))
#' }
#' @note rollup since 2.3.0
#' @seealso \link{agg}, \link{cube}, \link{groupBy}
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/R/WindowSpec.R
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ setMethod("rangeBetween",
#' @aliases over,Column,WindowSpec-method
#' @family colum_func
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#'
#' # Partition by am (transmission) and order by hp (horsepower)
Expand Down
6 changes: 4 additions & 2 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ createMethods()
#' @aliases alias,Column-method
#' @family colum_func
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(iris)
#'
#' head(select(
Expand Down Expand Up @@ -244,7 +245,8 @@ setMethod("between", signature(x = "Column"),
#' @family colum_func
#' @aliases cast,Column-method
#'
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' cast(df$age, "string")
#' }
#' @note cast since 1.4.0
Expand Down
51 changes: 34 additions & 17 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -3257,7 +3257,8 @@ setMethod("when", signature(condition = "Column", value = "ANY"),
#' @aliases ifelse,Column-method
#' @seealso \link{when}
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' ifelse(df$a > 1 & df$b > 2, 0, 1)
#' ifelse(df$a > 1, df$a, 1)
#' }
Expand Down Expand Up @@ -3292,7 +3293,8 @@ setMethod("ifelse",
#' @family window functions
#' @aliases cume_dist,missing-method
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#' ws <- orderBy(windowPartitionBy("am"), "hp")
#' out <- select(df, over(cume_dist(), ws), df$hp, df$am)
Expand Down Expand Up @@ -3321,7 +3323,8 @@ setMethod("cume_dist",
#' @family window functions
#' @aliases dense_rank,missing-method
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#' ws <- orderBy(windowPartitionBy("am"), "hp")
#' out <- select(df, over(dense_rank(), ws), df$hp, df$am)
Expand Down Expand Up @@ -3352,7 +3355,8 @@ setMethod("dense_rank",
#' @aliases lag,characterOrColumn-method
#' @family window functions
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#'
#' # Partition by am (transmission) and order by hp (horsepower)
Expand Down Expand Up @@ -3395,7 +3399,8 @@ setMethod("lag",
#' @family window functions
#' @aliases lead,characterOrColumn,numeric-method
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#'
#' # Partition by am (transmission) and order by hp (horsepower)
Expand Down Expand Up @@ -3434,7 +3439,8 @@ setMethod("lead",
#' @aliases ntile,numeric-method
#' @family window functions
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#'
#' # Partition by am (transmission) and order by hp (horsepower)
Expand Down Expand Up @@ -3466,7 +3472,8 @@ setMethod("ntile",
#' @family window functions
#' @aliases percent_rank,missing-method
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#' ws <- orderBy(windowPartitionBy("am"), "hp")
#' out <- select(df, over(percent_rank(), ws), df$hp, df$am)
Expand Down Expand Up @@ -3496,7 +3503,8 @@ setMethod("percent_rank",
#' @family window functions
#' @aliases rank,missing-method
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#' ws <- orderBy(windowPartitionBy("am"), "hp")
#' out <- select(df, over(rank(), ws), df$hp, df$am)
Expand Down Expand Up @@ -3533,7 +3541,8 @@ setMethod("rank",
#' @aliases row_number,missing-method
#' @family window functions
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#' ws <- orderBy(windowPartitionBy("am"), "hp")
#' out <- select(df, over(row_number(), ws), df$hp, df$am)
Expand Down Expand Up @@ -3761,7 +3770,8 @@ setMethod("collect_set",
#' @family string functions
#' @aliases split_string,Column-method
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- read.text("README.md")
#'
#' head(select(df, split_string(df$value, "\\s+")))
Expand Down Expand Up @@ -3790,7 +3800,8 @@ setMethod("split_string",
#' @family string functions
#' @aliases repeat_string,Column-method
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- read.text("README.md")
#'
#' first(select(df, repeat_string(df$value, 3)))
Expand Down Expand Up @@ -3819,7 +3830,8 @@ setMethod("repeat_string",
#' @family collection functions
#' @aliases explode_outer,Column-method
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(data.frame(
#' id = c(1, 2, 3), text = c("a,b,c", NA, "d,e")
#' ))
Expand Down Expand Up @@ -3847,7 +3859,8 @@ setMethod("explode_outer",
#' @family collection functions
#' @aliases posexplode_outer,Column-method
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(data.frame(
#' id = c(1, 2, 3), text = c("a,b,c", NA, "d,e")
#' ))
Expand Down Expand Up @@ -3875,7 +3888,8 @@ setMethod("posexplode_outer",
#' @aliases not,Column-method
#' @family non-aggregate functions
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(data.frame(
#' is_true = c(TRUE, FALSE, NA),
#' flag = c(1, 0, 1)
Expand Down Expand Up @@ -3908,7 +3922,8 @@ setMethod("not",
#' @family aggregate functions
#' @aliases grouping_bit,Column-method
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#'
#' # With cube
Expand Down Expand Up @@ -3949,7 +3964,8 @@ setMethod("grouping_bit",
#' @family aggregate functions
#' @aliases grouping_id,Column-method
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#'
#' # With cube
Expand Down Expand Up @@ -3987,7 +4003,8 @@ setMethod("grouping_id",
#' @family non-aggregate functions
#' @aliases input_file_name,missing-method
#' @export
#' @examples \dontrun{
#' @examples
#' \dontrun{
#' df <- read.text("README.md")
#'
#' head(select(df, input_file_name()))
Expand Down
30 changes: 28 additions & 2 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD
import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.shuffle.ShuffleWriter
import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId}
import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId}
import org.apache.spark.util.{MutablePair, Utils}

abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext {
Expand Down Expand Up @@ -277,19 +277,45 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// Delete one of the local shuffle blocks.
val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
assert(hashFile.exists() || sortFile.exists())
val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0))
assert(hashFile.exists() || (sortFile.exists() && indexFile.exists()))

if (hashFile.exists()) {
hashFile.delete()
}
if (sortFile.exists()) {
sortFile.delete()
}
if (indexFile.exists()) {
indexFile.delete()
}

// This count should retry the execution of the previous stage and rerun shuffle.
rdd.count()
}

test("cannot find its local shuffle file if no execution of the stage and rerun shuffle") {
sc = new SparkContext("local", "test", conf.clone())
val rdd = sc.parallelize(1 to 10, 1).map((_, 1)).reduceByKey(_ + _)

// Cannot find one of the local shuffle blocks.
val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0))
assert(!hashFile.exists() && !sortFile.exists() && !indexFile.exists())

rdd.count()

// Can find one of the local shuffle blocks.
val hashExistsFile = sc.env.blockManager.diskBlockManager
.getFile(new ShuffleBlockId(0, 0, 0))
val sortExistsFile = sc.env.blockManager.diskBlockManager
.getFile(new ShuffleDataBlockId(0, 0, 0))
val indexExistsFile = sc.env.blockManager.diskBlockManager
.getFile(new ShuffleIndexBlockId(0, 0, 0))
assert(hashExistsFile.exists() || (sortExistsFile.exists() && indexExistsFile.exists()))
}

test("metrics for shuffle without aggregation") {
sc = new SparkContext("local", "test", conf.clone())
val numRecords = 10000
Expand Down
2 changes: 1 addition & 1 deletion docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ include:
SPARK_VERSION: 2.3.0-SNAPSHOT
SPARK_VERSION_SHORT: 2.3.0
SCALA_BINARY_VERSION: "2.11"
SCALA_VERSION: "2.11.7"
SCALA_VERSION: "2.11.8"
MESOS_VERSION: 1.0.0
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark
Loading