Skip to content

Commit bf52b17

Browse files
committed
Merge remote-tracking branch 'amplab-sparkr/master' into sparkr-runner
Conflicts: pkg/R/sparkR.R
2 parents 88bf97f + 08102b0 commit bf52b17

File tree

12 files changed

+382
-27
lines changed

12 files changed

+382
-27
lines changed

README.md

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,6 @@ SparkR requires Scala 2.10 and Spark version >= 0.9.0. Current build by default
1313
Apache Spark 1.1.0. You can also build SparkR against a
1414
different Spark version (>= 0.9.0) by modifying `pkg/src/build.sbt`.
1515

16-
SparkR also requires the R package `rJava` to be installed. To install `rJava`,
17-
you can run the following command in R:
18-
19-
install.packages("rJava")
20-
21-
2216
### Package installation
2317
To develop SparkR, you can build the scala package and the R package using
2418

@@ -31,9 +25,9 @@ If you wish to try out the package directly from github, you can use [`install_g
3125

3226
SparkR by default uses Apache Spark 1.1.0. You can switch to a different Spark
3327
version by setting the environment variable `SPARK_VERSION`. For example, to
34-
use Apache Spark 1.2.0, you can run
28+
use Apache Spark 1.3.0, you can run
3529

36-
SPARK_VERSION=1.2.0 ./install-dev.sh
30+
SPARK_VERSION=1.3.0 ./install-dev.sh
3731

3832
SparkR by default links to Hadoop 1.0.4. To use SparkR with other Hadoop
3933
versions, you will need to rebuild SparkR with the same version that [Spark is
@@ -97,8 +91,9 @@ To run one of them, use `./sparkR <filename> <args>`. For example:
9791

9892
./sparkR examples/pi.R local[2]
9993

100-
You can also run the unit-tests for SparkR by running
94+
You can also run the unit-tests for SparkR by running (you need to install the [testthat](http://cran.r-project.org/web/packages/testthat/index.html) package first):
10195

96+
R -e 'install.packages("testthat", repos="http://cran.us.r-project.org")'
10297
./run-tests.sh
10398

10499
## Running on EC2
@@ -110,7 +105,7 @@ Instructions for running SparkR on EC2 can be found in the
110105
Currently, SparkR supports running on YARN with the `yarn-client` mode. These steps show how to build SparkR with YARN support and run SparkR programs on a YARN cluster:
111106

112107
```
113-
# assumes Java, R, rJava, yarn, spark etc. are installed on the whole cluster.
108+
# assumes Java, R, yarn, spark etc. are installed on the whole cluster.
114109
cd SparkR-pkg/
115110
USE_YARN=1 SPARK_YARN_VERSION=2.4.0 SPARK_HADOOP_VERSION=2.4.0 ./install-dev.sh
116111
```

pkg/NAMESPACE

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
exportClasses("RDD")
33
exportClasses("Broadcast")
44
exportMethods(
5+
"aggregateByKey",
56
"aggregateRDD",
67
"cache",
78
"checkpoint",
@@ -19,6 +20,7 @@ exportMethods(
1920
"flatMap",
2021
"flatMapValues",
2122
"fold",
23+
"foldByKey",
2224
"foreach",
2325
"foreachPartition",
2426
"fullOuterJoin",
@@ -41,6 +43,7 @@ exportMethods(
4143
"numPartitions",
4244
"partitionBy",
4345
"persist",
46+
"pipeRDD",
4447
"reduce",
4548
"reduceByKey",
4649
"reduceByKeyLocally",

pkg/R/RDD.R

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,10 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
110110
computeFunc <- function(split, part) {
111111
rdd@func(split, part)
112112
}
113-
serializedFuncArr <- serialize("computeFunc", connection = NULL,
114-
ascii = TRUE)
113+
serializedFuncArr <- serialize("computeFunc", connection = NULL)
115114

116115
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
117-
connection = NULL,
118-
ascii = TRUE)
116+
connection = NULL)
119117

120118
broadcastArr <- lapply(ls(.broadcastNames),
121119
function(name) { get(name, .broadcastNames) })
@@ -1275,6 +1273,43 @@ setMethod("aggregateRDD",
12751273
Reduce(combOp, partitionList, zeroValue)
12761274
})
12771275

1276+
#' Pipes elements to a forked external process.
1277+
#'
1278+
#' The same as 'pipe()' in Spark.
1279+
#'
1280+
#' @param rdd The RDD whose elements are piped to the forked external process.
1281+
#' @param command The command to fork an external process.
1282+
#' @param env A named list to set environment variables of the external process.
1283+
#' @return A new RDD created by piping all elements to a forked external process.
1284+
#' @rdname pipeRDD
1285+
#' @export
1286+
#' @examples
1287+
#'\dontrun{
1288+
#' sc <- sparkR.init()
1289+
#' rdd <- parallelize(sc, 1:10)
1290+
#' collect(pipeRDD(rdd, "more")
1291+
#' Output: c("1", "2", ..., "10")
1292+
#'}
1293+
setGeneric("pipeRDD", function(rdd, command, env = list()) {
1294+
standardGeneric("pipeRDD")
1295+
})
1296+
1297+
#' @rdname pipeRDD
1298+
#' @aliases pipeRDD,RDD,character-method
1299+
setMethod("pipeRDD",
1300+
signature(rdd = "RDD", command = "character"),
1301+
function(rdd, command, env = list()) {
1302+
func <- function(part) {
1303+
trim.trailing.func <- function(x) {
1304+
sub("[\r\n]*$", "", toString(x))
1305+
}
1306+
input <- unlist(lapply(part, trim.trailing.func))
1307+
res <- system2(command, stdout = TRUE, input = input, env = env)
1308+
lapply(res, trim.trailing.func)
1309+
}
1310+
lapplyPartition(rdd, func)
1311+
})
1312+
12781313
# TODO: Consider caching the name in the RDD's environment
12791314
#' Return an RDD's name.
12801315
#'

pkg/R/context.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ includePackage <- function(sc, pkg) {
179179
#'}
180180
broadcast <- function(sc, object) {
181181
objName <- as.character(substitute(object))
182-
serializedObj <- serialize(object, connection = NULL, ascii = TRUE)
182+
serializedObj <- serialize(object, connection = NULL)
183183

184184
jBroadcast <- callJMethod(sc, "broadcast", serializedObj)
185185
id <- as.character(callJMethod(jBroadcast, "id"))

pkg/R/pairRDD.R

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,12 +212,10 @@ setMethod("partitionBy",
212212
depsBinArr <- getDependencies(partitionFunc)
213213

214214
serializedHashFuncBytes <- serialize(as.character(substitute(partitionFunc)),
215-
connection = NULL,
216-
ascii = TRUE)
215+
connection = NULL)
217216

218217
packageNamesArr <- serialize(.sparkREnv$.packages,
219-
connection = NULL,
220-
ascii = TRUE)
218+
connection = NULL)
221219
broadcastArr <- lapply(ls(.broadcastNames), function(name) {
222220
get(name, .broadcastNames) })
223221
jrdd <- getJRDD(rdd)
@@ -497,6 +495,88 @@ setMethod("combineByKey",
497495
lapplyPartition(shuffled, mergeAfterShuffle)
498496
})
499497

498+
#' Aggregate a pair RDD by each key.
499+
#'
500+
#' Aggregate the values of each key in an RDD, using given combine functions
501+
#' and a neutral "zero value". This function can return a different result type,
502+
#' U, than the type of the values in this RDD, V. Thus, we need one operation
503+
#' for merging a V into a U and one operation for merging two U's, The former
504+
#' operation is used for merging values within a partition, and the latter is
505+
#' used for merging values between partitions. To avoid memory allocation, both
506+
#' of these functions are allowed to modify and return their first argument
507+
#' instead of creating a new U.
508+
#'
509+
#' @param rdd An RDD.
510+
#' @param zeroValue A neutral "zero value".
511+
#' @param seqOp A function to aggregate the values of each key. It may return
512+
#' a different result type from the type of the values.
513+
#' @param combOp A function to aggregate results of seqOp.
514+
#' @return An RDD containing the aggregation result.
515+
#' @rdname aggregateByKey
516+
#' @seealso foldByKey, combineByKey
517+
#' @export
518+
#' @examples
519+
#'\dontrun{
520+
#' sc <- sparkR.init()
521+
#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
522+
#' zeroValue <- list(0, 0)
523+
#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
524+
#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
525+
#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
526+
#' # list(list(1, list(3, 2)), list(2, list(7, 2)))
527+
#'}
528+
setGeneric("aggregateByKey",
529+
function(rdd, zeroValue, seqOp, combOp, numPartitions) {
530+
standardGeneric("aggregateByKey")
531+
})
532+
533+
#' @rdname aggregateByKey
534+
#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
535+
setMethod("aggregateByKey",
536+
signature(rdd = "RDD", zeroValue = "ANY", seqOp = "ANY",
537+
combOp = "ANY", numPartitions = "integer"),
538+
function(rdd, zeroValue, seqOp, combOp, numPartitions) {
539+
createCombiner <- function(v) {
540+
do.call(seqOp, list(zeroValue, v))
541+
}
542+
543+
combineByKey(rdd, createCombiner, seqOp, combOp, numPartitions)
544+
})
545+
546+
#' Fold a pair RDD by each key.
547+
#'
548+
#' Aggregate the values of each key in an RDD, using an associative function "func"
549+
#' and a neutral "zero value" which may be added to the result an arbitrary
550+
#' number of times, and must not change the result (e.g., 0 for addition, or
551+
#' 1 for multiplication.).
552+
#'
553+
#' @param rdd An RDD.
554+
#' @param zeroValue A neutral "zero value".
555+
#' @param func An associative function for folding values of each key.
556+
#' @return An RDD containing the aggregation result.
557+
#' @rdname foldByKey
558+
#' @seealso aggregateByKey, combineByKey
559+
#' @export
560+
#' @examples
561+
#'\dontrun{
562+
#' sc <- sparkR.init()
563+
#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
564+
#' foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
565+
#'}
566+
setGeneric("foldByKey",
567+
function(rdd, zeroValue, func, numPartitions) {
568+
standardGeneric("foldByKey")
569+
})
570+
571+
#' @rdname foldByKey
572+
#' @aliases foldByKey,RDD,ANY,ANY,integer-method
573+
setMethod("foldByKey",
574+
signature(rdd = "RDD", zeroValue = "ANY",
575+
func = "ANY", numPartitions = "integer"),
576+
function(rdd, zeroValue, func, numPartitions) {
577+
aggregateByKey(rdd, zeroValue, func, func, numPartitions)
578+
})
579+
500580
############ Binary Functions #############
501581

502582
#' Join two RDDs

pkg/R/sparkR.R

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ assemblyJarName <- "sparkr-assembly-0.1.jar"
44

55
sparkR.onLoad <- function(libname, pkgname) {
66
assemblyJarPath <- paste(libname, "/SparkR/", assemblyJarName, sep = "")
7-
assemblyJarPath <- gsub(" ", "\\ ", assemblyJarPath, fixed = T)
87
packageStartupMessage("[SparkR] Initializing with classpath ", assemblyJarPath, "\n")
98

109
.sparkREnv$libname <- libname
@@ -90,17 +89,28 @@ sparkR.init <- function(
9089
sparkExecutorEnv = list(),
9190
sparkJars = "",
9291
sparkRLibDir = "",
93-
sparkRBackendPort = as.integer(Sys.getenv("SPARKR_BACKEND_PORT", "12345"))) {
92+
sparkRBackendPort = as.integer(Sys.getenv("SPARKR_BACKEND_PORT", "12345")),
93+
sparkRRetryCount = 6) {
9494

9595
if (exists(".sparkRjsc", envir = .sparkREnv)) {
9696
cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n")
9797
return(get(".sparkRjsc", envir = .sparkREnv))
9898
}
9999

100100
sparkMem <- Sys.getenv("SPARK_MEM", "512m")
101-
jars <- c(as.character(.sparkREnv$assemblyJarPath), as.character(sparkJars))
102-
103-
cp <- paste0(jars, collapse = ":")
101+
jars <- suppressWarnings(
102+
normalizePath(c(as.character(.sparkREnv$assemblyJarPath), as.character(sparkJars))))
103+
104+
# Classpath separator is ";" on Windows
105+
# URI needs four /// as from http://stackoverflow.com/a/18522792
106+
if (.Platform$OS.type == "unix") {
107+
collapseChar <- ":"
108+
uriSep <- "//"
109+
} else {
110+
collapseChar <- ";"
111+
uriSep <- "////"
112+
}
113+
cp <- paste0(jars, collapse = collapseChar)
104114

105115
yarn_conf_dir <- Sys.getenv("YARN_CONF_DIR", "")
106116
if (yarn_conf_dir != "") {
@@ -126,10 +136,30 @@ sparkR.init <- function(
126136
sparkHome = sparkHome,
127137
sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", ""))
128138
}
129-
Sys.sleep(2) # Wait for backend to come up
130139
}
140+
131141
.sparkREnv$sparkRBackendPort <- sparkRBackendPort
132-
connectBackend("localhost", sparkRBackendPort) # Connect to it
142+
cat("Waiting for JVM to come up...\n")
143+
tries <- 0
144+
while (tries < sparkRRetryCount) {
145+
if (!connExists(.sparkREnv)) {
146+
Sys.sleep(2 ^ tries)
147+
tryCatch({
148+
connectBackend("localhost", .sparkREnv$sparkRBackendPort)
149+
}, error = function(err) {
150+
cat("Error in Connection, retrying...\n")
151+
}, warning = function(war) {
152+
cat("No Connection Found, retrying...\n")
153+
})
154+
tries <- tries + 1
155+
} else {
156+
cat("Connection ok.\n")
157+
break
158+
}
159+
}
160+
if (tries == sparkRRetryCount) {
161+
stop(sprintf("Failed to connect JVM after %d tries.\n", sparkRRetryCount))
162+
}
133163

134164
if (nchar(sparkHome) != 0) {
135165
sparkHome <- normalizePath(sparkHome)
@@ -153,7 +183,7 @@ sparkR.init <- function(
153183
}
154184

155185
nonEmptyJars <- Filter(function(x) { x != "" }, jars)
156-
localJarPaths <- sapply(nonEmptyJars, function(j) { paste("file://", j, sep = "") })
186+
localJarPaths <- sapply(nonEmptyJars, function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
157187

158188
assign(
159189
".sparkRjsc",

pkg/R/sparkRClient.R

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ launchBackend <- function(
3535
} else {
3636
java_bin <- java_bin_name
3737
}
38+
# Quote the classpath to make sure it handles spaces on Windows
39+
classPath <- shQuote(classPath)
3840
combinedArgs <- paste(javaOpts, "-cp", classPath, mainClass, args, sep = " ")
3941
cat("Launching java with command ", java_bin, " ", combinedArgs, "\n")
4042
invisible(system2(java_bin, combinedArgs, wait = F))

pkg/inst/tests/test_rdd.R

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,23 @@ test_that("values() on RDDs", {
336336
expect_equal(actual, lapply(intPairs, function(x) { x[[2]] }))
337337
})
338338

339+
test_that("pipeRDD() on RDDs", {
340+
actual <- collect(pipeRDD(rdd, "more"))
341+
expected <- as.list(as.character(1:10))
342+
expect_equal(actual, expected)
343+
344+
trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
345+
actual <- collect(pipeRDD(trailed.rdd, "sort"))
346+
expected <- list("", "1", "2", "3")
347+
expect_equal(actual, expected)
348+
349+
rev.nums <- 9:0
350+
rev.rdd <- parallelize(sc, rev.nums, 2L)
351+
actual <- collect(pipeRDD(rev.rdd, "sort"))
352+
expected <- as.list(as.character(c(5:9, 0:4)))
353+
expect_equal(actual, expected)
354+
})
355+
339356
test_that("join() on pairwise RDDs", {
340357
rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
341358
rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))

0 commit comments

Comments
 (0)