Skip to content

Commit d621dbc

Browse files
committed
Merge pull request apache#120 from sun-rui/objectFile
[SPARKR-146] Support read/save object files in SparkR.
2 parents d83c017 + c4a44d7 commit d621dbc

File tree

6 files changed

+183
-2
lines changed

6 files changed

+183
-2
lines changed

pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ exportMethods(
4242
"reduceByKey",
4343
"rightOuterJoin",
4444
"sampleRDD",
45+
"saveAsObjectFile",
4546
"take",
4647
"takeSample",
4748
"unionRDD",
@@ -53,6 +54,7 @@ exportMethods(
5354
# S3 methods exported
5455
export(
5556
"textFile",
57+
"objectFile",
5658
"parallelize",
5759
"hashCode",
5860
"includePackage",

pkg/R/RDD.R

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,6 +1061,36 @@ setMethod("keyBy",
10611061
lapply(rdd, apply.func)
10621062
})
10631063

1064+
#' Save this RDD as a SequenceFile of serialized objects.
1065+
#'
1066+
#' @param rdd The RDD to save
1067+
#' @param path The directory where the file is saved
1068+
#' @rdname saveAsObjectFile
1069+
#' @seealso objectFile
1070+
#' @export
1071+
#' @examples
1072+
#'\dontrun{
1073+
#' sc <- sparkR.init()
1074+
#' rdd <- parallelize(sc, 1:3)
1075+
#' saveAsObjectFile(rdd, "/tmp/sparkR-tmp")
1076+
#'}
1077+
setGeneric("saveAsObjectFile", function(rdd, path) { standardGeneric("saveAsObjectFile") })
1078+
1079+
#' @rdname saveAsObjectFile
1080+
#' @aliases saveAsObjectFile,RDD
1081+
setMethod("saveAsObjectFile",
1082+
signature(rdd = "RDD", path = "character"),
1083+
function(rdd, path) {
1084+
# If the RDD is in string format, need to serialize it before saving it because when
1085+
# objectFile() is invoked to load the saved file, only serialized format is assumed.
1086+
if (!rdd@env$serialized) {
1087+
rdd <- reserialize(rdd)
1088+
}
1089+
.jcall(getJRDD(rdd), "V", "saveAsObjectFile", path)
1090+
# Return nothing
1091+
invisible(NULL)
1092+
})
1093+
10641094
#' Return an RDD with the keys of each tuple.
10651095
#'
10661096
#' @param rdd The RDD from which the keys of each tuple is returned.

pkg/R/context.R

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,46 @@
1818
#' lines <- textFile(sc, "myfile.txt")
1919
#'}
2020

21-
textFile <- function(sc, path, minSplits = NULL) {
21+
getMinSplits <- function(sc, minSplits) {
2222
if (is.null(minSplits)) {
2323
ssc <- .jcall(sc, "Lorg/apache/spark/SparkContext;", "sc")
2424
defaultParallelism <- .jcall(ssc, "I", "defaultParallelism")
2525
minSplits <- min(defaultParallelism, 2)
2626
}
27+
as.integer(minSplits)
28+
}
29+
30+
textFile <- function(sc, path, minSplits = NULL) {
2731
jrdd <- .jcall(sc, "Lorg/apache/spark/api/java/JavaRDD;", "textFile", path,
28-
as.integer(minSplits))
32+
getMinSplits(sc, minSplits))
2933
RDD(jrdd, FALSE)
3034
}
3135

36+
#' Load an RDD saved as a SequenceFile containing serialized objects.
37+
#'
38+
#' The file to be loaded should be one that was previously generated by calling
39+
#' saveAsObjectFile() of the RDD class.
40+
#'
41+
#' @param sc SparkContext to use
42+
#' @param path Path of file to read
43+
#' @param minSplits Minimum number of splits to be created. If NULL, the default
44+
#' value is chosen based on available parallelism.
45+
#' @return RDD containing serialized R objects.
46+
#' @seealso saveAsObjectFile
47+
#' @export
48+
#' @examples
49+
#'\dontrun{
50+
#' sc <- sparkR.init()
51+
#' rdd <- objectFile(sc, "myfile")
52+
#'}
53+
54+
objectFile <- function(sc, path, minSplits = NULL) {
55+
jrdd <- .jcall(sc, "Lorg/apache/spark/api/java/JavaRDD;", "objectFile", path,
56+
getMinSplits(sc, minSplits))
57+
# Assume the RDD contains serialized R objects.
58+
RDD(jrdd, TRUE)
59+
}
60+
3261
#' Create an RDD from a homogeneous list or vector.
3362
#'
3463
#' This function creates an RDD from a local homogeneous list in R. The elements

pkg/inst/tests/test_binaryFile.R

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
context("functions on binary files")
2+
3+
# JavaSparkContext handle
4+
sc <- sparkR.init()
5+
6+
mockFile = c("Spark is pretty.", "Spark is awesome.")
7+
8+
test_that("saveAsObjectFile()/objectFile() following textFile() works", {
9+
fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
10+
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
11+
writeLines(mockFile, fileName1)
12+
13+
rdd <- textFile(sc, fileName1)
14+
saveAsObjectFile(rdd, fileName2)
15+
rdd <- objectFile(sc, fileName2)
16+
expect_equal(collect(rdd), as.list(mockFile))
17+
18+
unlink(fileName1)
19+
unlink(fileName2, recursive = TRUE)
20+
})
21+
22+
test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
23+
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
24+
25+
l <- list(1, 2, 3)
26+
rdd <- parallelize(sc, l)
27+
saveAsObjectFile(rdd, fileName)
28+
rdd <- objectFile(sc, fileName)
29+
expect_equal(collect(rdd), l)
30+
31+
unlink(fileName, recursive = TRUE)
32+
})
33+
34+
test_that("saveAsObjectFile()/objectFile() following RDD transformations works", {
35+
fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
36+
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
37+
writeLines(mockFile, fileName1)
38+
39+
rdd <- textFile(sc, fileName1)
40+
41+
words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
42+
wordCount <- lapply(words, function(word) { list(word, 1L) })
43+
44+
counts <- reduceByKey(wordCount, "+", 2L)
45+
46+
saveAsObjectFile(counts, fileName2)
47+
counts <- objectFile(sc, fileName2)
48+
49+
output <- collect(counts)
50+
expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
51+
list("is", 2))
52+
expect_equal(output, expected)
53+
54+
unlink(fileName1)
55+
unlink(fileName2, recursive = TRUE)
56+
})
57+

pkg/man/objectFile.Rd

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\name{objectFile}
3+
\alias{objectFile}
4+
\title{Load an RDD saved as a SequenceFile containing serialized objects.}
5+
\usage{
6+
objectFile(sc, path, minSplits = NULL)
7+
}
8+
\arguments{
9+
\item{sc}{SparkContext to use}
10+
11+
\item{path}{Path of file to read}
12+
13+
\item{minSplits}{Minimum number of splits to be created. If NULL, the default
14+
value is chosen based on available parallelism.}
15+
}
16+
\value{
17+
RDD containing serialized R objects.
18+
}
19+
\description{
20+
The file to be loaded should be one that was previously generated by calling
21+
saveAsObjectFile() of the RDD class.
22+
}
23+
\examples{
24+
\dontrun{
25+
sc <- sparkR.init()
26+
rdd <- objectFile(sc, "myfile")
27+
}
28+
}
29+
\seealso{
30+
saveAsObjectFile
31+
}
32+

pkg/man/saveAsObjectFile.Rd

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{saveAsObjectFile}
4+
\alias{saveAsObjectFile}
5+
\alias{saveAsObjectFile,RDD}
6+
\alias{saveAsObjectFile,RDD,character-method}
7+
\title{Save this RDD as a SequenceFile of serialized objects.}
8+
\usage{
9+
saveAsObjectFile(rdd, path)
10+
11+
\S4method{saveAsObjectFile}{RDD,character}(rdd, path)
12+
}
13+
\arguments{
14+
\item{rdd}{The RDD to save}
15+
16+
\item{path}{The directory where the file is saved}
17+
}
18+
\description{
19+
Save this RDD as a SequenceFile of serialized objects.
20+
}
21+
\examples{
22+
\dontrun{
23+
sc <- sparkR.init()
24+
rdd <- parallelize(sc, 1:3)
25+
saveAsObjectFile(rdd, "/tmp/sparkR-tmp")
26+
}
27+
}
28+
\seealso{
29+
objectFile
30+
}
31+

0 commit comments

Comments
 (0)