Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
# @rdname aggregateRDD
# @seealso reduce
# @export
setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })
setGeneric("aggregateRDD",
function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })

# @rdname cache-methods
# @export
Expand Down
12 changes: 6 additions & 6 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,8 @@ setMethod("join",
# Left outer join two RDDs
#
# @description
# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of the form list(K, V).
# The key types of the two RDDs should be the same.
# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of
# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
Expand Down Expand Up @@ -597,8 +597,8 @@ setMethod("leftOuterJoin",
# Right outer join two RDDs
#
# @description
# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of the form list(K, V).
# The key types of the two RDDs should be the same.
# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of
# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
Expand Down Expand Up @@ -634,8 +634,8 @@ setMethod("rightOuterJoin",
# Full outer join two RDDs
#
# @description
# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V).
# The key types of the two RDDs should be the same.
# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of
# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
Expand Down
9 changes: 6 additions & 3 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ sparkR.init <- function(
sparkPackages = "") {

if (exists(".sparkRjsc", envir = .sparkREnv)) {
cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n")
cat(paste("Re-using existing Spark Context.",
"Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n"))
return(get(".sparkRjsc", envir = .sparkREnv))
}

Expand Down Expand Up @@ -180,14 +181,16 @@ sparkR.init <- function(

sparkExecutorEnvMap <- new.env()
if (!any(names(sparkExecutorEnv) == "LD_LIBRARY_PATH")) {
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <- paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
}
for (varname in names(sparkExecutorEnv)) {
sparkExecutorEnvMap[[varname]] <- sparkExecutorEnv[[varname]]
}

nonEmptyJars <- Filter(function(x) { x != "" }, jars)
localJarPaths <- sapply(nonEmptyJars, function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
localJarPaths <- sapply(nonEmptyJars,
function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })

# Set the start time to identify jobjs
# Seconds resolution is good enough for this purpose, so use ints
Expand Down
31 changes: 18 additions & 13 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -334,18 +334,21 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
"MEMORY_ONLY_SER_2",
"OFF_HEAP")) {
match.arg(newLevel)
storageLevelClass <- "org.apache.spark.storage.StorageLevel"
storageLevel <- switch(newLevel,
"DISK_ONLY" = callJStatic("org.apache.spark.storage.StorageLevel", "DISK_ONLY"),
"DISK_ONLY_2" = callJStatic("org.apache.spark.storage.StorageLevel", "DISK_ONLY_2"),
"MEMORY_AND_DISK" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK"),
"MEMORY_AND_DISK_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_2"),
"MEMORY_AND_DISK_SER" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_SER"),
"MEMORY_AND_DISK_SER_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_SER_2"),
"MEMORY_ONLY" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY"),
"MEMORY_ONLY_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_2"),
"MEMORY_ONLY_SER" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_SER"),
"MEMORY_ONLY_SER_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_SER_2"),
"OFF_HEAP" = callJStatic("org.apache.spark.storage.StorageLevel", "OFF_HEAP"))
"DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"),
"DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"),
"MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"),
"MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"),
"MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass,
"MEMORY_AND_DISK_SER"),
"MEMORY_AND_DISK_SER_2" = callJStatic(storageLevelClass,
"MEMORY_AND_DISK_SER_2"),
"MEMORY_ONLY" = callJStatic(storageLevelClass, "MEMORY_ONLY"),
"MEMORY_ONLY_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_2"),
"MEMORY_ONLY_SER" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER"),
"MEMORY_ONLY_SER_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER_2"),
"OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
}

# Utility function for functions where an argument needs to be integer but we want to allow
Expand Down Expand Up @@ -545,9 +548,11 @@ mergePartitions <- function(rdd, zip) {
lengthOfKeys <- part[[len - lengthOfValues]]
stopifnot(len == lengthOfKeys + lengthOfValues)

# For zip operation, check if corresponding partitions of both RDDs have the same number of elements.
# For zip operation, check if corresponding partitions
# of both RDDs have the same number of elements.
if (zip && lengthOfKeys != lengthOfValues) {
stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
stop(paste("Can only zip RDDs with same number of elements",
"in each pair of corresponding partitions."))
}

if (lengthOfKeys > 1) {
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/test_includeJAR.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ context("include an external JAR in SparkContext")

runScript <- function() {
sparkHome <- Sys.getenv("SPARK_HOME")
jarPath <- paste("--jars",
shQuote(file.path(sparkHome, "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar")))
sparkTestJarPath <- "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar"
jarPath <- paste("--jars", shQuote(file.path(sparkHome, sparkTestJarPath)))
scriptPath <- file.path(sparkHome, "R/lib/SparkR/tests/jarTest.R")
submitPath <- file.path(sparkHome, "bin/spark-submit")
res <- system2(command = submitPath,
Expand Down
12 changes: 8 additions & 4 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -669,27 +669,31 @@ test_that("fullOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1,2), list(1,3), list(3,3)))
rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)), list(3, list(3, NULL)))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)),
list(2, list(NULL, 4)), list(3, list(3, NULL)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))

rdd1 <- parallelize(sc, list(list("a",2), list("a",3), list("c", 1)))
rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)), list("c", list(1, NULL)))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)),
list("a", list(3, 1)), list("c", list(1, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))

rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)), list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)),
list(3, list(NULL, 3)), list(4, list(NULL, 4)))))

rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)),
list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
})

test_that("sortByKey() on pairwise RDDs", {
Expand Down
11 changes: 9 additions & 2 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ test_that("collect() and take() on a DataFrame return the same number of rows an
expect_equal(ncol(collect(df)), ncol(take(df, 10)))
})

test_that("multiple pipeline transformations starting with a DataFrame result in an RDD with the correct values", {
test_that("multiple pipeline transformations result in an RDD with the correct values", {
df <- jsonFile(sqlContext, jsonPath)
first <- lapply(df, function(row) {
row$age <- row$age + 5
Expand Down Expand Up @@ -756,7 +756,14 @@ test_that("toJSON() returns an RDD of the correct values", {
test_that("showDF()", {
df <- jsonFile(sqlContext, jsonPath)
s <- capture.output(showDF(df))
expect_output(s , "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n")
expected <- paste("+----+-------+\n",
"| age| name|\n",
"+----+-------+\n",
"|null|Michael|\n",
"| 30| Andy|\n",
"| 19| Justin|\n",
"+----+-------+\n", sep="")
expect_output(s , expected)
})

test_that("isLocal()", {
Expand Down