Skip to content

Commit 0981dff

Browse files
Merge pull request apache#168 from sun-rui/SPARKR-153_2
[SPARKR-153] phase 2: implement aggregateByKey() and foldByKey().
2 parents fd8f8a9 + 0cda231 commit 0981dff

File tree

5 files changed

+243
-0
lines changed

5 files changed

+243
-0
lines changed

pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
exportClasses("RDD")
33
exportClasses("Broadcast")
44
exportMethods(
5+
"aggregateByKey",
56
"aggregateRDD",
67
"cache",
78
"checkpoint",
@@ -19,6 +20,7 @@ exportMethods(
1920
"flatMap",
2021
"flatMapValues",
2122
"fold",
23+
"foldByKey",
2224
"foreach",
2325
"foreachPartition",
2426
"fullOuterJoin",

pkg/R/pairRDD.R

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,88 @@ setMethod("combineByKey",
495495
lapplyPartition(shuffled, mergeAfterShuffle)
496496
})
497497

498+
#' Aggregate a pair RDD by each key.
499+
#'
500+
#' Aggregate the values of each key in an RDD, using given combine functions
501+
#' and a neutral "zero value". This function can return a different result type,
502+
#' U, than the type of the values in this RDD, V. Thus, we need one operation
503+
#' for merging a V into a U and one operation for merging two U's, The former
504+
#' operation is used for merging values within a partition, and the latter is
505+
#' used for merging values between partitions. To avoid memory allocation, both
506+
#' of these functions are allowed to modify and return their first argument
507+
#' instead of creating a new U.
508+
#'
509+
#' @param rdd An RDD.
510+
#' @param zeroValue A neutral "zero value".
511+
#' @param seqOp A function to aggregate the values of each key. It may return
512+
#' a different result type from the type of the values.
513+
#' @param combOp A function to aggregate results of seqOp.
514+
#' @return An RDD containing the aggregation result.
515+
#' @rdname aggregateByKey
516+
#' @seealso foldByKey, combineByKey
517+
#' @export
518+
#' @examples
519+
#'\dontrun{
520+
#' sc <- sparkR.init()
521+
#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
522+
#' zeroValue <- list(0, 0)
523+
#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
524+
#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
525+
#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
526+
#' # list(list(1, list(3, 2)), list(2, list(7, 2)))
527+
#'}
528+
setGeneric("aggregateByKey",
529+
function(rdd, zeroValue, seqOp, combOp, numPartitions) {
530+
standardGeneric("aggregateByKey")
531+
})
532+
533+
#' @rdname aggregateByKey
534+
#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
535+
setMethod("aggregateByKey",
536+
signature(rdd = "RDD", zeroValue = "ANY", seqOp = "ANY",
537+
combOp = "ANY", numPartitions = "integer"),
538+
function(rdd, zeroValue, seqOp, combOp, numPartitions) {
539+
createCombiner <- function(v) {
540+
do.call(seqOp, list(zeroValue, v))
541+
}
542+
543+
combineByKey(rdd, createCombiner, seqOp, combOp, numPartitions)
544+
})
545+
546+
#' Fold a pair RDD by each key.
547+
#'
548+
#' Aggregate the values of each key in an RDD, using an associative function "func"
549+
#' and a neutral "zero value" which may be added to the result an arbitrary
550+
#' number of times, and must not change the result (e.g., 0 for addition, or
551+
#' 1 for multiplication.).
552+
#'
553+
#' @param rdd An RDD.
554+
#' @param zeroValue A neutral "zero value".
555+
#' @param func An associative function for folding values of each key.
556+
#' @return An RDD containing the aggregation result.
557+
#' @rdname foldByKey
558+
#' @seealso aggregateByKey, combineByKey
559+
#' @export
560+
#' @examples
561+
#'\dontrun{
562+
#' sc <- sparkR.init()
563+
#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
564+
#' foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
565+
#'}
566+
setGeneric("foldByKey",
567+
function(rdd, zeroValue, func, numPartitions) {
568+
standardGeneric("foldByKey")
569+
})
570+
571+
#' @rdname foldByKey
572+
#' @aliases foldByKey,RDD,ANY,ANY,integer-method
573+
setMethod("foldByKey",
574+
signature(rdd = "RDD", zeroValue = "ANY",
575+
func = "ANY", numPartitions = "integer"),
576+
function(rdd, zeroValue, func, numPartitions) {
577+
aggregateByKey(rdd, zeroValue, func, func, numPartitions)
578+
})
579+
498580
############ Binary Functions #############
499581

500582
#' Join two RDDs

pkg/inst/tests/test_shuffle.R

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,77 @@ test_that("combineByKey for doubles", {
7070
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
7171
})
7272

73+
test_that("aggregateByKey", {
74+
# test aggregateByKey for int keys
75+
rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
76+
77+
zeroValue <- list(0, 0)
78+
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
79+
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
80+
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
81+
82+
actual <- collect(aggregatedRDD)
83+
84+
expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
85+
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
86+
87+
# test aggregateByKey for string keys
88+
rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4)))
89+
90+
zeroValue <- list(0, 0)
91+
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
92+
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
93+
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
94+
95+
actual <- collect(aggregatedRDD)
96+
97+
expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
98+
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
99+
})
100+
101+
test_that("foldByKey", {
102+
# test foldByKey for int keys
103+
folded <- foldByKey(intRdd, 0, "+", 2L)
104+
105+
actual <- collect(folded)
106+
107+
expected <- list(list(2L, 101), list(1L, 199))
108+
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
109+
110+
# test foldByKey for double keys
111+
folded <- foldByKey(doubleRdd, 0, "+", 2L)
112+
113+
actual <- collect(folded)
114+
115+
expected <- list(list(1.5, 199), list(2.5, 101))
116+
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
117+
118+
# test foldByKey for string keys
119+
stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200))
120+
121+
stringKeyRDD <- parallelize(sc, stringKeyPairs)
122+
folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
123+
124+
actual <- collect(folded)
125+
126+
expected <- list(list("b", 101), list("a", 199))
127+
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
128+
129+
# test foldByKey for empty pair RDD
130+
rdd <- parallelize(sc, list())
131+
folded <- foldByKey(rdd, 0, "+", 2L)
132+
actual <- collect(folded)
133+
expected <- list()
134+
expect_equal(actual, expected)
135+
136+
# test foldByKey for RDD with only 1 pair
137+
rdd <- parallelize(sc, list(list(1, 1)))
138+
folded <- foldByKey(rdd, 0, "+", 2L)
139+
actual <- collect(folded)
140+
expected <- list(list(1, 1))
141+
expect_equal(actual, expected)
142+
})
143+
73144
test_that("partitionBy() partitions data correctly", {
74145
# Partition by magnitude
75146
partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 }

pkg/man/aggregateByKey.Rd

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{aggregateByKey}
4+
\alias{aggregateByKey}
5+
\alias{aggregateByKey,RDD,ANY,ANY,ANY,integer-method}
6+
\title{Aggregate a pair RDD by each key.}
7+
\usage{
8+
aggregateByKey(rdd, zeroValue, seqOp, combOp, numPartitions)
9+
10+
\S4method{aggregateByKey}{RDD,ANY,ANY,ANY,integer}(rdd, zeroValue, seqOp,
11+
combOp, numPartitions)
12+
}
13+
\arguments{
14+
\item{rdd}{An RDD.}
15+
16+
\item{zeroValue}{A neutral "zero value".}
17+
18+
\item{seqOp}{A function to aggregate the values of each key. It may return
19+
a different result type from the type of the values.}
20+
21+
\item{combOp}{A function to aggregate results of seqOp.}
22+
}
23+
\value{
24+
An RDD containing the aggregation result.
25+
}
26+
\description{
27+
Aggregate the values of each key in an RDD, using given combine functions
28+
and a neutral "zero value". This function can return a different result type,
29+
U, than the type of the values in this RDD, V. Thus, we need one operation
30+
for merging a V into a U and one operation for merging two U's, The former
31+
operation is used for merging values within a partition, and the latter is
32+
used for merging values between partitions. To avoid memory allocation, both
33+
of these functions are allowed to modify and return their first argument
34+
instead of creating a new U.
35+
}
36+
\examples{
37+
\dontrun{
38+
sc <- sparkR.init()
39+
rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
40+
zeroValue <- list(0, 0)
41+
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
42+
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
43+
aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
44+
# list(list(1, list(3, 2)), list(2, list(7, 2)))
45+
}
46+
}
47+
\seealso{
48+
foldByKey, combineByKey
49+
}
50+

pkg/man/foldByKey.Rd

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{foldByKey}
4+
\alias{foldByKey}
5+
\alias{foldByKey,RDD,ANY,ANY,integer-method}
6+
\title{Fold a pair RDD by each key.}
7+
\usage{
8+
foldByKey(rdd, zeroValue, func, numPartitions)
9+
10+
\S4method{foldByKey}{RDD,ANY,ANY,integer}(rdd, zeroValue, func, numPartitions)
11+
}
12+
\arguments{
13+
\item{rdd}{An RDD.}
14+
15+
\item{zeroValue}{A neutral "zero value".}
16+
17+
\item{func}{An associative function for folding values of each key.}
18+
}
19+
\value{
20+
An RDD containing the aggregation result.
21+
}
22+
\description{
23+
Aggregate the values of each key in an RDD, using an associative function "func"
24+
and a neutral "zero value" which may be added to the result an arbitrary
25+
number of times, and must not change the result (e.g., 0 for addition, or
26+
1 for multiplication.).
27+
}
28+
\examples{
29+
\dontrun{
30+
sc <- sparkR.init()
31+
rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
32+
foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
33+
}
34+
}
35+
\seealso{
36+
aggregateByKey, combineByKey
37+
}
38+

0 commit comments

Comments
 (0)