Skip to content

Commit 0cda231

Browse files
author
Sun Rui
committed
[SPARKR-153] phase 2: implement aggregateByKey() and foldByKey().
1 parent 2271030 commit 0cda231

File tree

5 files changed

+243
-0
lines changed

5 files changed

+243
-0
lines changed

pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
exportClasses("RDD")
33
exportClasses("Broadcast")
44
exportMethods(
5+
"aggregateByKey",
56
"aggregateRDD",
67
"cache",
78
"checkpoint",
@@ -19,6 +20,7 @@ exportMethods(
1920
"flatMap",
2021
"flatMapValues",
2122
"fold",
23+
"foldByKey",
2224
"foreach",
2325
"foreachPartition",
2426
"fullOuterJoin",

pkg/R/pairRDD.R

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,88 @@ setMethod("combineByKey",
497497
lapplyPartition(shuffled, mergeAfterShuffle)
498498
})
499499

500+
#' Aggregate a pair RDD by each key.
501+
#'
502+
#' Aggregate the values of each key in an RDD, using given combine functions
503+
#' and a neutral "zero value". This function can return a different result type,
504+
#' U, than the type of the values in this RDD, V. Thus, we need one operation
505+
#' for merging a V into a U and one operation for merging two U's, The former
506+
#' operation is used for merging values within a partition, and the latter is
507+
#' used for merging values between partitions. To avoid memory allocation, both
508+
#' of these functions are allowed to modify and return their first argument
509+
#' instead of creating a new U.
510+
#'
511+
#' @param rdd An RDD.
512+
#' @param zeroValue A neutral "zero value".
513+
#' @param seqOp A function to aggregate the values of each key. It may return
514+
#' a different result type from the type of the values.
515+
#' @param combOp A function to aggregate results of seqOp.
516+
#' @return An RDD containing the aggregation result.
517+
#' @rdname aggregateByKey
518+
#' @seealso foldByKey, combineByKey
519+
#' @export
520+
#' @examples
521+
#'\dontrun{
522+
#' sc <- sparkR.init()
523+
#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
524+
#' zeroValue <- list(0, 0)
525+
#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
526+
#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
527+
#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
528+
#' # list(list(1, list(3, 2)), list(2, list(7, 2)))
529+
#'}
530+
setGeneric("aggregateByKey",
531+
function(rdd, zeroValue, seqOp, combOp, numPartitions) {
532+
standardGeneric("aggregateByKey")
533+
})
534+
535+
#' @rdname aggregateByKey
536+
#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
537+
setMethod("aggregateByKey",
538+
signature(rdd = "RDD", zeroValue = "ANY", seqOp = "ANY",
539+
combOp = "ANY", numPartitions = "integer"),
540+
function(rdd, zeroValue, seqOp, combOp, numPartitions) {
541+
createCombiner <- function(v) {
542+
do.call(seqOp, list(zeroValue, v))
543+
}
544+
545+
combineByKey(rdd, createCombiner, seqOp, combOp, numPartitions)
546+
})
547+
548+
#' Fold a pair RDD by each key.
549+
#'
550+
#' Aggregate the values of each key in an RDD, using an associative function "func"
551+
#' and a neutral "zero value" which may be added to the result an arbitrary
552+
#' number of times, and must not change the result (e.g., 0 for addition, or
553+
#' 1 for multiplication.).
554+
#'
555+
#' @param rdd An RDD.
556+
#' @param zeroValue A neutral "zero value".
557+
#' @param func An associative function for folding values of each key.
558+
#' @return An RDD containing the aggregation result.
559+
#' @rdname foldByKey
560+
#' @seealso aggregateByKey, combineByKey
561+
#' @export
562+
#' @examples
563+
#'\dontrun{
564+
#' sc <- sparkR.init()
565+
#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
566+
#' foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
567+
#'}
568+
setGeneric("foldByKey",
569+
function(rdd, zeroValue, func, numPartitions) {
570+
standardGeneric("foldByKey")
571+
})
572+
573+
#' @rdname foldByKey
574+
#' @aliases foldByKey,RDD,ANY,ANY,integer-method
575+
setMethod("foldByKey",
576+
signature(rdd = "RDD", zeroValue = "ANY",
577+
func = "ANY", numPartitions = "integer"),
578+
function(rdd, zeroValue, func, numPartitions) {
579+
aggregateByKey(rdd, zeroValue, func, func, numPartitions)
580+
})
581+
500582
############ Binary Functions #############
501583

502584
#' Join two RDDs

pkg/inst/tests/test_shuffle.R

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,77 @@ test_that("combineByKey for doubles", {
7070
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
7171
})
7272

73+
test_that("aggregateByKey", {
74+
# test aggregateByKey for int keys
75+
rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
76+
77+
zeroValue <- list(0, 0)
78+
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
79+
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
80+
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
81+
82+
actual <- collect(aggregatedRDD)
83+
84+
expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
85+
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
86+
87+
# test aggregateByKey for string keys
88+
rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4)))
89+
90+
zeroValue <- list(0, 0)
91+
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
92+
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
93+
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
94+
95+
actual <- collect(aggregatedRDD)
96+
97+
expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
98+
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
99+
})
100+
101+
test_that("foldByKey", {
102+
# test foldByKey for int keys
103+
folded <- foldByKey(intRdd, 0, "+", 2L)
104+
105+
actual <- collect(folded)
106+
107+
expected <- list(list(2L, 101), list(1L, 199))
108+
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
109+
110+
# test foldByKey for double keys
111+
folded <- foldByKey(doubleRdd, 0, "+", 2L)
112+
113+
actual <- collect(folded)
114+
115+
expected <- list(list(1.5, 199), list(2.5, 101))
116+
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
117+
118+
# test foldByKey for string keys
119+
stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200))
120+
121+
stringKeyRDD <- parallelize(sc, stringKeyPairs)
122+
folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
123+
124+
actual <- collect(folded)
125+
126+
expected <- list(list("b", 101), list("a", 199))
127+
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
128+
129+
# test foldByKey for empty pair RDD
130+
rdd <- parallelize(sc, list())
131+
folded <- foldByKey(rdd, 0, "+", 2L)
132+
actual <- collect(folded)
133+
expected <- list()
134+
expect_equal(actual, expected)
135+
136+
# test foldByKey for RDD with only 1 pair
137+
rdd <- parallelize(sc, list(list(1, 1)))
138+
folded <- foldByKey(rdd, 0, "+", 2L)
139+
actual <- collect(folded)
140+
expected <- list(list(1, 1))
141+
expect_equal(actual, expected)
142+
})
143+
73144
test_that("partitionBy() partitions data correctly", {
74145
# Partition by magnitude
75146
partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 }

pkg/man/aggregateByKey.Rd

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{aggregateByKey}
4+
\alias{aggregateByKey}
5+
\alias{aggregateByKey,RDD,ANY,ANY,ANY,integer-method}
6+
\title{Aggregate a pair RDD by each key.}
7+
\usage{
8+
aggregateByKey(rdd, zeroValue, seqOp, combOp, numPartitions)
9+
10+
\S4method{aggregateByKey}{RDD,ANY,ANY,ANY,integer}(rdd, zeroValue, seqOp,
11+
combOp, numPartitions)
12+
}
13+
\arguments{
14+
\item{rdd}{An RDD.}
15+
16+
\item{zeroValue}{A neutral "zero value".}
17+
18+
\item{seqOp}{A function to aggregate the values of each key. It may return
19+
a different result type from the type of the values.}
20+
21+
\item{combOp}{A function to aggregate results of seqOp.}
22+
}
23+
\value{
24+
An RDD containing the aggregation result.
25+
}
26+
\description{
27+
Aggregate the values of each key in an RDD, using given combine functions
28+
and a neutral "zero value". This function can return a different result type,
29+
U, than the type of the values in this RDD, V. Thus, we need one operation
30+
for merging a V into a U and one operation for merging two U's, The former
31+
operation is used for merging values within a partition, and the latter is
32+
used for merging values between partitions. To avoid memory allocation, both
33+
of these functions are allowed to modify and return their first argument
34+
instead of creating a new U.
35+
}
36+
\examples{
37+
\dontrun{
38+
sc <- sparkR.init()
39+
rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
40+
zeroValue <- list(0, 0)
41+
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
42+
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
43+
aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
44+
# list(list(1, list(3, 2)), list(2, list(7, 2)))
45+
}
46+
}
47+
\seealso{
48+
foldByKey, combineByKey
49+
}
50+

pkg/man/foldByKey.Rd

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{foldByKey}
4+
\alias{foldByKey}
5+
\alias{foldByKey,RDD,ANY,ANY,integer-method}
6+
\title{Fold a pair RDD by each key.}
7+
\usage{
8+
foldByKey(rdd, zeroValue, func, numPartitions)
9+
10+
\S4method{foldByKey}{RDD,ANY,ANY,integer}(rdd, zeroValue, func, numPartitions)
11+
}
12+
\arguments{
13+
\item{rdd}{An RDD.}
14+
15+
\item{zeroValue}{A neutral "zero value".}
16+
17+
\item{func}{An associative function for folding values of each key.}
18+
}
19+
\value{
20+
An RDD containing the aggregation result.
21+
}
22+
\description{
23+
Aggregate the values of each key in an RDD, using an associative function "func"
24+
and a neutral "zero value" which may be added to the result an arbitrary
25+
number of times, and must not change the result (e.g., 0 for addition, or
26+
1 for multiplication.).
27+
}
28+
\examples{
29+
\dontrun{
30+
sc <- sparkR.init()
31+
rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
32+
foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
33+
}
34+
}
35+
\seealso{
36+
aggregateByKey, combineByKey
37+
}
38+

0 commit comments

Comments
 (0)