Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,8 @@ setMethod("toRadians",
#' \url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}.
#' If the string cannot be parsed according to the specified format (or default),
#' the value of the column will be null.
#' The default format is 'yyyy-MM-dd'.
#' By default, it follows casting rules to a DateType if the format is omitted
#' (equivalent to \code{cast(df$x, "date")}).
#'
#' @param x Column to parse.
#' @param format string to use to parse x Column to DateType. (optional)
Expand Down Expand Up @@ -1832,10 +1833,11 @@ setMethod("to_json", signature(x = "Column"),
#' \url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}.
#' If the string cannot be parsed according to the specified format (or default),
#' the value of the column will be null.
#' The default format is 'yyyy-MM-dd HH:mm:ss'.
#' By default, it follows casting rules to a TimestampType if the format is omitted
#' (equivalent to \code{cast(df$x, "timestamp")}).
#'
#' @param x Column to parse.
#' @param format string to use to parse x Column to DateType. (optional)
#' @param format string to use to parse x Column to TimestampType. (optional)
#'
#' @rdname to_timestamp
#' @name to_timestamp
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/jarTest.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#
library(SparkR)

sc <- sparkR.session()
sc <- sparkR.session(master = "local[1]")

helloTest <- SparkR:::callJStatic("sparkrtest.DummyClass",
"helloWorld",
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/packageInAJarTest.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
library(SparkR)
library(sparkPackageTest)

sparkR.session()
sparkR.session(master = "local[1]")

run1 <- myfunc(5L)

Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_Serde.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

context("SerDe functionality")

sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

test_that("SerDe of primitive types", {
skip_on_cran()
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_binaryFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
context("functions on binary files")

# JavaSparkContext handle
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)

mockFile <- c("Spark is pretty.", "Spark is awesome.")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_binary_function.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
context("binary functions")

# JavaSparkContext handle
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)

# Data
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_broadcast.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
context("broadcast variables")

# JavaSparkContext handle
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)

# Partitioned data
Expand Down
16 changes: 8 additions & 8 deletions R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ test_that("repeatedly starting and stopping SparkR", {
skip_on_cran()

for (i in 1:4) {
sc <- suppressWarnings(sparkR.init())
sc <- suppressWarnings(sparkR.init(master = sparkRTestMaster))
rdd <- parallelize(sc, 1:20, 2L)
expect_equal(countRDD(rdd), 20)
suppressWarnings(sparkR.stop())
Expand All @@ -69,7 +69,7 @@ test_that("repeatedly starting and stopping SparkR", {

test_that("repeatedly starting and stopping SparkSession", {
for (i in 1:4) {
sparkR.session(enableHiveSupport = FALSE)
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
df <- createDataFrame(data.frame(dummy = 1:i))
expect_equal(count(df), i)
sparkR.session.stop()
Expand All @@ -79,12 +79,12 @@ test_that("repeatedly starting and stopping SparkSession", {
test_that("rdd GC across sparkR.stop", {
skip_on_cran()

sc <- sparkR.sparkContext() # sc should get id 0
sc <- sparkR.sparkContext(master = sparkRTestMaster) # sc should get id 0
rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1
rdd2 <- parallelize(sc, 1:10, 2L) # rdd2 should get id 2
sparkR.session.stop()

sc <- sparkR.sparkContext() # sc should get id 0 again
sc <- sparkR.sparkContext(master = sparkRTestMaster) # sc should get id 0 again

# GC rdd1 before creating rdd3 and rdd2 after
rm(rdd1)
Expand All @@ -104,7 +104,7 @@ test_that("rdd GC across sparkR.stop", {
test_that("job group functions can be called", {
skip_on_cran()

sc <- sparkR.sparkContext()
sc <- sparkR.sparkContext(master = sparkRTestMaster)
setJobGroup("groupId", "job description", TRUE)
cancelJobGroup("groupId")
clearJobGroup()
Expand All @@ -118,7 +118,7 @@ test_that("job group functions can be called", {
test_that("utility function can be called", {
skip_on_cran()

sparkR.sparkContext()
sparkR.sparkContext(master = sparkRTestMaster)
setLogLevel("ERROR")
sparkR.session.stop()
})
Expand Down Expand Up @@ -175,7 +175,7 @@ test_that("sparkJars sparkPackages as comma-separated strings", {
})

test_that("spark.lapply should perform simple transforms", {
sparkR.sparkContext()
sparkR.sparkContext(master = sparkRTestMaster)
doubled <- spark.lapply(1:10, function(x) { 2 * x })
expect_equal(doubled, as.list(2 * 1:10))
sparkR.session.stop()
Expand All @@ -184,7 +184,7 @@ test_that("spark.lapply should perform simple transforms", {
test_that("add and get file to be downloaded with Spark job on every node", {
skip_on_cran()

sparkR.sparkContext()
sparkR.sparkContext(master = sparkRTestMaster)
# Test add file.
path <- tempfile(pattern = "hello", fileext = ".txt")
filename <- basename(path)
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_includePackage.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
context("include R packages")

# JavaSparkContext handle
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)

# Partitioned data
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_jvm_api.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

context("JVM API")

sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

test_that("Create and call methods on object", {
jarr <- sparkR.newJObject("java.util.ArrayList")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ library(testthat)
context("MLlib classification algorithms, except for tree-based algorithms")

# Tests for MLlib classification algorithms in SparkR
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

absoluteSparkPath <- function(x) {
sparkHome <- sparkR.conf("spark.home")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_mllib_clustering.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ library(testthat)
context("MLlib clustering algorithms")

# Tests for MLlib clustering algorithms in SparkR
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

absoluteSparkPath <- function(x) {
sparkHome <- sparkR.conf("spark.home")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_mllib_fpm.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ library(testthat)
context("MLlib frequent pattern mining")

# Tests for MLlib frequent pattern mining algorithms in SparkR
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

test_that("spark.fpGrowth", {
data <- selectExpr(createDataFrame(data.frame(items = c(
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_mllib_recommendation.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ library(testthat)
context("MLlib recommendation algorithms")

# Tests for MLlib recommendation algorithms in SparkR
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

test_that("spark.als", {
data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_mllib_regression.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ library(testthat)
context("MLlib regression algorithms, except for tree-based algorithms")

# Tests for MLlib regression algorithms in SparkR
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

test_that("formula of spark.glm", {
skip_on_cran()
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_mllib_stat.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ library(testthat)
context("MLlib statistics algorithms")

# Tests for MLlib statistics algorithms in SparkR
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

test_that("spark.kstest", {
data <- data.frame(test = c(0.1, 0.15, 0.2, 0.3, 0.25, -1, -0.5))
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_mllib_tree.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ library(testthat)
context("MLlib tree-based algorithms")

# Tests for MLlib tree-based algorithms in SparkR
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

absoluteSparkPath <- function(x) {
sparkHome <- sparkR.conf("spark.home")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_parallelize_collect.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ numPairs <- list(list(1, 1), list(1, 2), list(2, 2), list(2, 3))
strPairs <- list(list(strList, strList), list(strList, strList))

# JavaSparkContext handle
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
jsc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)

# Tests
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
context("basic RDD functions")

# JavaSparkContext handle
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)

# Data
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_shuffle.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
context("partitionBy, groupByKey, reduceByKey etc.")

# JavaSparkContext handle
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)

# Data
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ unsetHiveContext <- function() {
# Tests for SparkSQL functions in SparkR

filesBefore <- list.files(path = sparkRDir, all.files = TRUE)
sparkSession <- sparkR.session()
sparkSession <- sparkR.session(master = sparkRTestMaster)
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)

mockLines <- c("{\"name\":\"Michael\"}",
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_streaming.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ context("Structured Streaming")

# Tests for Structured Streaming functions in SparkR

sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

jsonSubDir <- file.path("sparkr-test", "json", "")
if (.Platform$OS.type == "windows") {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_take.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ",
"raising me. But they're both dead now. I didn't kill them. Honest.")

# JavaSparkContext handle
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)

test_that("take() gives back the original elements in correct count and order", {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_textFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
context("the textFile() function")

# JavaSparkContext handle
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)

mockFile <- c("Spark is pretty.", "Spark is awesome.")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
context("functions in utils.R")

# JavaSparkContext handle
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)

test_that("convertJListToRList() gives back (deserializes) the original JLists
Expand Down
5 changes: 5 additions & 0 deletions R/pkg/tests/run-all.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ sparkRWhitelistSQLDirs <- c("spark-warehouse", "metastore_db")
invisible(lapply(sparkRWhitelistSQLDirs,
function(x) { unlink(file.path(sparkRDir, x), recursive = TRUE, force = TRUE)}))

sparkRTestMaster <- "local[1]"
if (identical(Sys.getenv("NOT_CRAN"), "true")) {
sparkRTestMaster <- ""
}

test_package("SparkR")
3 changes: 2 additions & 1 deletion R/pkg/vignettes/sparkr-vignettes.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ We use default settings in which it runs in local mode. It auto downloads Spark

```{r, include=FALSE}
install.spark()
sparkR.session(master = "local[1]")
```
```{r, message=FALSE, results="hide"}
```{r, eval=FALSE}
sparkR.session()
```

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ private[spark] class TaskContextImpl(

/** Marks the task as completed and triggers the completion listeners. */
@GuardedBy("this")
private[spark] def markTaskCompleted(): Unit = synchronized {
private[spark] def markTaskCompleted(error: Option[Throwable]): Unit = synchronized {
if (completed) return
completed = true
invokeListeners(onCompleteCallbacks, "TaskCompletionListener", None) {
invokeListeners(onCompleteCallbacks, "TaskCompletionListener", error) {
_.onTaskCompletion(this)
}
}
Expand Down
39 changes: 23 additions & 16 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,26 +115,33 @@ private[spark] abstract class Task[T](
case t: Throwable =>
e.addSuppressed(t)
}
context.markTaskCompleted(Some(e))
throw e
} finally {
// Call the task completion callbacks.
context.markTaskCompleted()
try {
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
// Notify any tasks waiting for execution memory to be freed to wake up and try to
// acquire memory again. This makes impossible the scenario where a task sleeps forever
// because there are no other tasks left to notify it. Since this is safe to do but may
// not be strictly necessary, we should revisit whether we can remove this in the future.
val memoryManager = SparkEnv.get.memoryManager
memoryManager.synchronized { memoryManager.notifyAll() }
}
// Call the task completion callbacks. If "markTaskCompleted" is called twice, the second
// one is no-op.
context.markTaskCompleted(None)
} finally {
// Though we unset the ThreadLocal here, the context member variable itself is still queried
// directly in the TaskRunner to check for FetchFailedExceptions.
TaskContext.unset()
try {
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(
MemoryMode.OFF_HEAP)
// Notify any tasks waiting for execution memory to be freed to wake up and try to
// acquire memory again. This makes impossible the scenario where a task sleeps forever
// because there are no other tasks left to notify it. Since this is safe to do but may
// not be strictly necessary, we should revisit whether we can remove this in the
// future.
val memoryManager = SparkEnv.get.memoryManager
memoryManager.synchronized { memoryManager.notifyAll() }
}
} finally {
// Though we unset the ThreadLocal here, the context member variable itself is still
// queried directly in the TaskRunner to check for FetchFailedExceptions.
TaskContext.unset()
}
}
}
}
Expand Down
Loading