Skip to content

Commit ebb77b2

Browse files
Sun Ruishivaram
authored andcommitted
[SPARK-7033] [SPARKR] Clean usage of split. Use partition instead where applicable.
Author: Sun Rui <[email protected]> Closes #5628 from sun-rui/SPARK-7033 and squashes the following commits: 046bc9e [Sun Rui] Clean split usage in tests. d531c86 [Sun Rui] [SPARK-7033][SPARKR] Clean usage of split. Use partition instead where applicable.
1 parent 6e57d57 commit ebb77b2

File tree

5 files changed

+39
-39
lines changed

5 files changed

+39
-39
lines changed

R/pkg/R/RDD.R

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
9191
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
9292
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
9393
} else {
94-
pipelinedFunc <- function(split, iterator) {
95-
func(split, prev@func(split, iterator))
94+
pipelinedFunc <- function(partIndex, part) {
95+
func(partIndex, prev@func(partIndex, part))
9696
}
9797
.Object@func <- cleanClosure(pipelinedFunc)
9898
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
@@ -306,7 +306,7 @@ setMethod("numPartitions",
306306
signature(x = "RDD"),
307307
function(x) {
308308
jrdd <- getJRDD(x)
309-
partitions <- callJMethod(jrdd, "splits")
309+
partitions <- callJMethod(jrdd, "partitions")
310310
callJMethod(partitions, "size")
311311
})
312312

@@ -452,8 +452,8 @@ setMethod("countByValue",
452452
setMethod("lapply",
453453
signature(X = "RDD", FUN = "function"),
454454
function(X, FUN) {
455-
func <- function(split, iterator) {
456-
lapply(iterator, FUN)
455+
func <- function(partIndex, part) {
456+
lapply(part, FUN)
457457
}
458458
lapplyPartitionsWithIndex(X, func)
459459
})
@@ -538,8 +538,8 @@ setMethod("mapPartitions",
538538
#'\dontrun{
539539
#' sc <- sparkR.init()
540540
#' rdd <- parallelize(sc, 1:10, 5L)
541-
#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) {
542-
#' split * Reduce("+", part) })
541+
#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
542+
#' partIndex * Reduce("+", part) })
543543
#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
544544
#'}
545545
#' @rdname lapplyPartitionsWithIndex
@@ -813,7 +813,7 @@ setMethod("distinct",
813813
#' @examples
814814
#'\dontrun{
815815
#' sc <- sparkR.init()
816-
#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
816+
#' rdd <- parallelize(sc, 1:10)
817817
#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
818818
#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
819819
#'}
@@ -825,14 +825,14 @@ setMethod("sampleRDD",
825825
function(x, withReplacement, fraction, seed) {
826826

827827
# The sampler: takes a partition and returns its sampled version.
828-
samplingFunc <- function(split, part) {
828+
samplingFunc <- function(partIndex, part) {
829829
set.seed(seed)
830830
res <- vector("list", length(part))
831831
len <- 0
832832

833833
# Discards some random values to ensure each partition has a
834834
# different random seed.
835-
runif(split)
835+
runif(partIndex)
836836

837837
for (elem in part) {
838838
if (withReplacement) {
@@ -989,8 +989,8 @@ setMethod("coalesce",
989989
function(x, numPartitions, shuffle = FALSE) {
990990
numPartitions <- numToInt(numPartitions)
991991
if (shuffle || numPartitions > SparkR::numPartitions(x)) {
992-
func <- function(s, part) {
993-
set.seed(s) # split as seed
992+
func <- function(partIndex, part) {
993+
set.seed(partIndex) # partIndex as seed
994994
start <- as.integer(sample(numPartitions, 1) - 1)
995995
lapply(seq_along(part),
996996
function(i) {
@@ -1035,7 +1035,7 @@ setMethod("saveAsObjectFile",
10351035
#' Save this RDD as a text file, using string representations of elements.
10361036
#'
10371037
#' @param x The RDD to save
1038-
#' @param path The directory where the splits of the text file are saved
1038+
#' @param path The directory where the partitions of the text file are saved
10391039
#' @examples
10401040
#'\dontrun{
10411041
#' sc <- sparkR.init()
@@ -1335,10 +1335,10 @@ setMethod("zipWithUniqueId",
13351335
function(x) {
13361336
n <- numPartitions(x)
13371337

1338-
partitionFunc <- function(split, part) {
1338+
partitionFunc <- function(partIndex, part) {
13391339
mapply(
13401340
function(item, index) {
1341-
list(item, (index - 1) * n + split)
1341+
list(item, (index - 1) * n + partIndex)
13421342
},
13431343
part,
13441344
seq_along(part),
@@ -1382,11 +1382,11 @@ setMethod("zipWithIndex",
13821382
startIndices <- Reduce("+", nums, accumulate = TRUE)
13831383
}
13841384

1385-
partitionFunc <- function(split, part) {
1386-
if (split == 0) {
1385+
partitionFunc <- function(partIndex, part) {
1386+
if (partIndex == 0) {
13871387
startIndex <- 0
13881388
} else {
1389-
startIndex <- startIndices[[split]]
1389+
startIndex <- startIndices[[partIndex]]
13901390
}
13911391

13921392
mapply(

R/pkg/R/context.R

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
# context.R: SparkContext driven functions
1919

20-
getMinSplits <- function(sc, minSplits) {
21-
if (is.null(minSplits)) {
20+
getMinPartitions <- function(sc, minPartitions) {
21+
if (is.null(minPartitions)) {
2222
defaultParallelism <- callJMethod(sc, "defaultParallelism")
23-
minSplits <- min(defaultParallelism, 2)
23+
minPartitions <- min(defaultParallelism, 2)
2424
}
25-
as.integer(minSplits)
25+
as.integer(minPartitions)
2626
}
2727

2828
#' Create an RDD from a text file.
@@ -33,7 +33,7 @@ getMinSplits <- function(sc, minSplits) {
3333
#'
3434
#' @param sc SparkContext to use
3535
#' @param path Path of file to read. A vector of multiple paths is allowed.
36-
#' @param minSplits Minimum number of splits to be created. If NULL, the default
36+
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
3737
#' value is chosen based on available parallelism.
3838
#' @return RDD where each item is of type \code{character}
3939
#' @export
@@ -42,13 +42,13 @@ getMinSplits <- function(sc, minSplits) {
4242
#' sc <- sparkR.init()
4343
#' lines <- textFile(sc, "myfile.txt")
4444
#'}
45-
textFile <- function(sc, path, minSplits = NULL) {
45+
textFile <- function(sc, path, minPartitions = NULL) {
4646
# Allow the user to have a more flexible definiton of the text file path
4747
path <- suppressWarnings(normalizePath(path))
4848
#' Convert a string vector of paths to a string containing comma separated paths
4949
path <- paste(path, collapse = ",")
5050

51-
jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
51+
jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
5252
# jrdd is of type JavaRDD[String]
5353
RDD(jrdd, "string")
5454
}
@@ -60,7 +60,7 @@ textFile <- function(sc, path, minSplits = NULL) {
6060
#'
6161
#' @param sc SparkContext to use
6262
#' @param path Path of file to read. A vector of multiple paths is allowed.
63-
#' @param minSplits Minimum number of splits to be created. If NULL, the default
63+
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
6464
#' value is chosen based on available parallelism.
6565
#' @return RDD containing serialized R objects.
6666
#' @seealso saveAsObjectFile
@@ -70,13 +70,13 @@ textFile <- function(sc, path, minSplits = NULL) {
7070
#' sc <- sparkR.init()
7171
#' rdd <- objectFile(sc, "myfile")
7272
#'}
73-
objectFile <- function(sc, path, minSplits = NULL) {
73+
objectFile <- function(sc, path, minPartitions = NULL) {
7474
# Allow the user to have a more flexible definiton of the text file path
7575
path <- suppressWarnings(normalizePath(path))
7676
#' Convert a string vector of paths to a string containing comma separated paths
7777
path <- paste(path, collapse = ",")
7878

79-
jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
79+
jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
8080
# Assume the RDD contains serialized R objects.
8181
RDD(jrdd, "byte")
8282
}

R/pkg/R/pairRDD.R

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,8 @@ setMethod("partitionBy",
206206
get(name, .broadcastNames) })
207207
jrdd <- getJRDD(x)
208208

209-
# We create a PairwiseRRDD that extends RDD[(Array[Byte],
210-
# Array[Byte])], where the key is the hashed split, the value is
209+
# We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])],
210+
# where the key is the target partition number, the value is
211211
# the content (key-val pairs).
212212
pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
213213
callJMethod(jrdd, "rdd"),
@@ -866,8 +866,8 @@ setMethod("sampleByKey",
866866
}
867867

868868
# The sampler: takes a partition and returns its sampled version.
869-
samplingFunc <- function(split, part) {
870-
set.seed(bitwXor(seed, split))
869+
samplingFunc <- function(partIndex, part) {
870+
set.seed(bitwXor(seed, partIndex))
871871
res <- vector("list", length(part))
872872
len <- 0
873873

R/pkg/R/utils.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ appendPartitionLengths <- function(x, other) {
501501
# A result RDD.
502502
mergePartitions <- function(rdd, zip) {
503503
serializerMode <- getSerializedMode(rdd)
504-
partitionFunc <- function(split, part) {
504+
partitionFunc <- function(partIndex, part) {
505505
len <- length(part)
506506
if (len > 0) {
507507
if (serializerMode == "byte") {

R/pkg/inst/tests/test_rdd.R

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
105105
rdd2 <- rdd
106106
for (i in 1:12)
107107
rdd2 <- lapplyPartitionsWithIndex(
108-
rdd2, function(split, part) {
109-
part <- as.list(unlist(part) * split + i)
108+
rdd2, function(partIndex, part) {
109+
part <- as.list(unlist(part) * partIndex + i)
110110
})
111111
rdd2 <- lapply(rdd2, function(x) x + x)
112112
actual <- collect(rdd2)
@@ -121,8 +121,8 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
121121
# PipelinedRDD
122122
rdd2 <- lapplyPartitionsWithIndex(
123123
rdd2,
124-
function(split, part) {
125-
part <- as.list(unlist(part) * split)
124+
function(partIndex, part) {
125+
part <- as.list(unlist(part) * partIndex)
126126
})
127127

128128
cache(rdd2)
@@ -174,13 +174,13 @@ test_that("lapply with dependency", {
174174
})
175175

176176
test_that("lapplyPartitionsWithIndex on RDDs", {
177-
func <- function(splitIndex, part) { list(splitIndex, Reduce("+", part)) }
177+
func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) }
178178
actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
179179
expect_equal(actual, list(list(0, 15), list(1, 40)))
180180

181181
pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
182182
partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
183-
mkTup <- function(splitIndex, part) { list(splitIndex, part) }
183+
mkTup <- function(partIndex, part) { list(partIndex, part) }
184184
actual <- collect(lapplyPartitionsWithIndex(
185185
partitionBy(pairsRDD, 2L, partitionByParity),
186186
mkTup),

0 commit comments

Comments
 (0)