Skip to content

Commit f10c607

Browse files
committed
Merge pull request apache#118 from sun-rui/saveAsTextFile
[SPARKR-144] Implement saveAsTextFile() in the RDD class.
2 parents d621dbc + 6c9bfc0 commit f10c607

File tree

6 files changed

+305
-188
lines changed

6 files changed

+305
-188
lines changed

pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ exportMethods(
4242
"reduceByKey",
4343
"rightOuterJoin",
4444
"sampleRDD",
45+
"saveAsTextFile",
4546
"saveAsObjectFile",
4647
"take",
4748
"takeSample",

pkg/R/RDD.R

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ PipelinedRDD <- function(prev, func) {
8585

8686

8787
# The jrdd accessor function.
88-
setGeneric("getJRDD", function(rdd) { standardGeneric("getJRDD") })
88+
setGeneric("getJRDD", function(rdd, ...) { standardGeneric("getJRDD") })
8989
setMethod("getJRDD", signature(rdd = "RDD"), function(rdd) rdd@jrdd )
9090
setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
91-
function(rdd) {
91+
function(rdd, dataSerialization = TRUE) {
9292
if (!is.null(rdd@env$jrdd_val)) {
9393
return(rdd@env$jrdd_val)
9494
}
@@ -116,17 +116,29 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
116116

117117
prev_jrdd <- rdd@prev_jrdd
118118

119-
rddRef <- new(J("edu.berkeley.cs.amplab.sparkr.RRDD"),
120-
prev_jrdd$rdd(),
121-
serializedFuncArr,
122-
rdd@env$serialized,
123-
depsBinArr,
124-
packageNamesArr,
125-
as.character(.sparkREnv[["libname"]]),
126-
broadcastArr,
127-
prev_jrdd$classTag())
128-
# The RDD is serialized after we create a RRDD
129-
rdd@env$serialized <- TRUE
119+
if (dataSerialization) {
120+
rddRef <- new(J("edu.berkeley.cs.amplab.sparkr.RRDD"),
121+
prev_jrdd$rdd(),
122+
serializedFuncArr,
123+
rdd@env$serialized,
124+
depsBinArr,
125+
packageNamesArr,
126+
as.character(.sparkREnv[["libname"]]),
127+
broadcastArr,
128+
prev_jrdd$classTag())
129+
} else {
130+
rddRef <- new(J("edu.berkeley.cs.amplab.sparkr.StringRRDD"),
131+
prev_jrdd$rdd(),
132+
serializedFuncArr,
133+
rdd@env$serialized,
134+
depsBinArr,
135+
packageNamesArr,
136+
as.character(.sparkREnv[["libname"]]),
137+
broadcastArr,
138+
prev_jrdd$classTag())
139+
}
140+
# Save the serialization flag after we create a RRDD
141+
rdd@env$serialized <- dataSerialization
130142
rdd@env$jrdd_val <- rddRef$asJavaRDD()
131143
rdd@env$jrdd_val
132144
})
@@ -1091,6 +1103,34 @@ setMethod("saveAsObjectFile",
10911103
invisible(NULL)
10921104
})
10931105

1106+
#' Save this RDD as a text file, using string representations of elements.
1107+
#'
1108+
#' @param rdd The RDD to save
1109+
#' @param path The directory where the splits of the text file are saved
1110+
#' @rdname saveAsTextFile
1111+
#' @export
1112+
#' @examples
1113+
#'\dontrun{
1114+
#' sc <- sparkR.init()
1115+
#' rdd <- parallelize(sc, 1:3)
1116+
#' saveAsTextFile(rdd, "/tmp/sparkR-tmp")
1117+
#'}
1118+
setGeneric("saveAsTextFile", function(rdd, path) { standardGeneric("saveAsTextFile") })
1119+
1120+
#' @rdname saveAsTextFile
1121+
#' @aliases saveAsTextFile,RDD
1122+
setMethod("saveAsTextFile",
1123+
signature(rdd = "RDD", path = "character"),
1124+
function(rdd, path) {
1125+
func <- function(x) {
1126+
toString(x)
1127+
}
1128+
stringRdd <- lapply(rdd, func)
1129+
.jcall(getJRDD(stringRdd, dataSerialization = FALSE), "V", "saveAsTextFile", path)
1130+
# Return nothing
1131+
invisible(NULL)
1132+
})
1133+
10941134
#' Return an RDD with the keys of each tuple.
10951135
#'
10961136
#' @param rdd The RDD from which the keys of each tuple is returned.

pkg/inst/tests/test_textFile.R

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,52 @@ test_that("several transformations on RDD created by textFile()", {
5858

5959
unlink(fileName)
6060
})
61+
62+
test_that("textFile() followed by a saveAsTextFile() returns the same content", {
63+
fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
64+
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
65+
writeLines(mockFile, fileName1)
66+
67+
rdd <- textFile(sc, fileName1)
68+
saveAsTextFile(rdd, fileName2)
69+
rdd <- textFile(sc, fileName2)
70+
expect_equal(collect(rdd), as.list(mockFile))
71+
72+
unlink(fileName1)
73+
unlink(fileName2)
74+
})
75+
76+
test_that("saveAsTextFile() on a parallelized list works as expected", {
77+
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
78+
l <- list(1, 2, 3)
79+
rdd <- parallelize(sc, l)
80+
saveAsTextFile(rdd, fileName)
81+
rdd <- textFile(sc, fileName)
82+
expect_equal(collect(rdd), lapply(l, function(x) {toString(x)}))
83+
84+
unlink(fileName)
85+
})
86+
87+
test_that("textFile() and saveAsTextFile() word count works as expected", {
88+
fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
89+
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
90+
writeLines(mockFile, fileName1)
91+
92+
rdd <- textFile(sc, fileName1)
93+
94+
words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
95+
wordCount <- lapply(words, function(word) { list(word, 1L) })
96+
97+
counts <- reduceByKey(wordCount, "+", 2L)
98+
99+
saveAsTextFile(counts, fileName2)
100+
rdd <- textFile(sc, fileName2)
101+
102+
output <- collect(rdd)
103+
expected <- list(list("awesome.", 1), list("Spark", 2),
104+
list("pretty.", 1), list("is", 2))
105+
expect_equal(output, lapply(expected, function(x) {toString(x)}))
106+
107+
unlink(fileName1)
108+
unlink(fileName2)
109+
})

pkg/inst/worker/serialize.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ writeInt <- function(con, value) {
4646
writeBin(as.integer(value), con, endian="big")
4747
}
4848

49+
writeStrings <- function(con, stringList) {
50+
writeLines(unlist(stringList), con)
51+
}
52+
4953
writeRaw <- function(con, batch, serialized = FALSE) {
5054
if (serialized) {
5155
outputSer <- batch

pkg/inst/worker/worker.R

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,11 @@ splitIndex <- readInt(inputCon)
3333
execLen <- readInt(inputCon)
3434
execFunctionName <- unserialize(readRawLen(inputCon, execLen))
3535

36-
# read the isSerialized bit flag
37-
isSerialized <- readInt(inputCon)
36+
# read the isInputSerialized bit flag
37+
isInputSerialized <- readInt(inputCon)
38+
39+
# read the isOutputSerialized bit flag
40+
isOutputSerialized <- readInt(inputCon)
3841

3942
# Redirect stdout to stderr to prevent print statements from
4043
# interfering with outputStream
@@ -82,16 +85,20 @@ isEmpty <- readInt(inputCon)
8285
if (isEmpty != 0) {
8386

8487
if (numPartitions == -1) {
85-
if (isSerialized) {
88+
if (isInputSerialized) {
8689
# Now read as many characters as described in funcLen
8790
data <- readDeserialize(inputCon)
8891
} else {
8992
data <- readLines(inputCon)
9093
}
9194
output <- do.call(execFunctionName, list(splitIndex, data))
92-
writeRaw(outputCon, output)
95+
if (isOutputSerialized) {
96+
writeRaw(outputCon, output)
97+
} else {
98+
writeStrings(outputCon, output)
99+
}
93100
} else {
94-
if (isSerialized) {
101+
if (isInputSerialized) {
95102
# Now read as many characters as described in funcLen
96103
data <- readDeserialize(inputCon)
97104
} else {
@@ -127,7 +134,9 @@ if (isEmpty != 0) {
127134
}
128135

129136
# End of output
130-
writeInt(outputCon, 0L)
137+
if (isOutputSerialized) {
138+
writeInt(outputCon, 0L)
139+
}
131140

132141
close(outputCon)
133142

0 commit comments

Comments
 (0)