Skip to content

Commit 0c6e071

Browse files
Merge pull request apache#157 from lythesia/master
[SPARKR-161] Support reduceByKeyLocally()
2 parents 343b6ab + f5038c0 commit 0c6e071

File tree

5 files changed

+175
-54
lines changed

5 files changed

+175
-54
lines changed

pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ exportMethods(
4141
"persist",
4242
"reduce",
4343
"reduceByKey",
44+
"reduceByKeyLocally",
4445
"rightOuterJoin",
4546
"sampleRDD",
4647
"saveAsTextFile",

pkg/R/RDD.R

Lines changed: 92 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,26 +1382,32 @@ setMethod("groupByKey",
13821382
groupVals <- function(part) {
13831383
vals <- new.env()
13841384
keys <- new.env()
1385+
pred <- function(item) exists(item$hash, keys)
1386+
appendList <- function(acc, x) {
1387+
addItemToAccumulator(acc, x)
1388+
acc
1389+
}
1390+
makeList <- function(x) {
1391+
acc <- initAccumulator()
1392+
addItemToAccumulator(acc, x)
1393+
acc
1394+
}
13851395
# Each item in the partition is list of (K, V)
13861396
lapply(part,
13871397
function(item) {
1388-
hashVal <- as.character(hashCode(item[[1]]))
1389-
if (exists(hashVal, vals)) {
1390-
acc <- vals[[hashVal]]
1391-
acc[[length(acc) + 1]] <- item[[2]]
1392-
vals[[hashVal]] <- acc
1393-
} else {
1394-
vals[[hashVal]] <- list(item[[2]])
1395-
keys[[hashVal]] <- item[[1]]
1396-
}
1398+
item$hash <- as.character(hashCode(item[[1]]))
1399+
updateOrCreatePair(item, keys, vals, pred,
1400+
appendList, makeList)
13971401
})
1402+
# extract out data field
1403+
vals <- eapply(vals,
1404+
function(x) {
1405+
length(x$data) <- x$counter
1406+
x$data
1407+
})
13981408
# Every key in the environment contains a list
13991409
# Convert that to list(K, Seq[V])
1400-
grouped <- lapply(ls(vals),
1401-
function(name) {
1402-
list(keys[[name]], vals[[name]])
1403-
})
1404-
grouped
1410+
convertEnvsToList(keys, vals)
14051411
}
14061412
lapplyPartition(shuffled, groupVals)
14071413
})
@@ -1442,28 +1448,78 @@ setMethod("reduceByKey",
14421448
reduceVals <- function(part) {
14431449
vals <- new.env()
14441450
keys <- new.env()
1451+
pred <- function(item) exists(item$hash, keys)
14451452
lapply(part,
14461453
function(item) {
1447-
hashVal <- as.character(hashCode(item[[1]]))
1448-
if (exists(hashVal, vals)) {
1449-
vals[[hashVal]] <- do.call(
1450-
combineFunc, list(vals[[hashVal]], item[[2]]))
1451-
} else {
1452-
vals[[hashVal]] <- item[[2]]
1453-
keys[[hashVal]] <- item[[1]]
1454-
}
1454+
item$hash <- as.character(hashCode(item[[1]]))
1455+
updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
14551456
})
1456-
combined <- lapply(ls(vals),
1457-
function(name) {
1458-
list(keys[[name]], vals[[name]])
1459-
})
1460-
combined
1457+
convertEnvsToList(keys, vals)
14611458
}
14621459
locallyReduced <- lapplyPartition(rdd, reduceVals)
14631460
shuffled <- partitionBy(locallyReduced, numPartitions)
14641461
lapplyPartition(shuffled, reduceVals)
14651462
})
14661463

1464+
#' Merge values by key locally
1465+
#'
1466+
#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
1467+
#' and merges the values for each key using an associative reduce function, but return the
1468+
#' results immediately to the driver as an R list.
1469+
#'
1470+
#' @param rdd The RDD to reduce by key. Should be an RDD where each element is
1471+
#' list(K, V) or c(K, V).
1472+
#' @param combineFunc The associative reduce function to use.
1473+
#' @return A list of elements of type list(K, V') where V' is the merged value for each key
1474+
#' @rdname reduceByKeyLocally
1475+
#' @seealso reduceByKey
1476+
#' @export
1477+
#' @examples
1478+
#'\dontrun{
1479+
#' sc <- sparkR.init()
1480+
#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
1481+
#' rdd <- parallelize(sc, pairs)
1482+
#' reduced <- reduceByKeyLocally(rdd, "+")
1483+
#' reduced # list(list(1, 6), list(1.1, 3))
1484+
#'}
1485+
setGeneric("reduceByKeyLocally",
1486+
function(rdd, combineFunc) {
1487+
standardGeneric("reduceByKeyLocally")
1488+
})
1489+
1490+
#' @rdname reduceByKeyLocally
1491+
#' @aliases reduceByKeyLocally,RDD,integer-method
1492+
setMethod("reduceByKeyLocally",
1493+
signature(rdd = "RDD", combineFunc = "ANY"),
1494+
function(rdd, combineFunc) {
1495+
reducePart <- function(part) {
1496+
vals <- new.env()
1497+
keys <- new.env()
1498+
pred <- function(item) exists(item$hash, keys)
1499+
lapply(part,
1500+
function(item) {
1501+
item$hash <- as.character(hashCode(item[[1]]))
1502+
updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
1503+
})
1504+
list(list(keys, vals)) # return hash to avoid re-compute in merge
1505+
}
1506+
mergeParts <- function(accum, x) {
1507+
pred <- function(item) {
1508+
exists(item$hash, accum[[1]])
1509+
}
1510+
lapply(ls(x[[1]]),
1511+
function(name) {
1512+
item <- list(x[[1]][[name]], x[[2]][[name]])
1513+
item$hash <- name
1514+
updateOrCreatePair(item, accum[[1]], accum[[2]], pred, combineFunc, identity)
1515+
})
1516+
accum
1517+
}
1518+
reduced <- mapPartitions(rdd, reducePart)
1519+
merged <- reduce(reduced, mergeParts)
1520+
convertEnvsToList(merged[[1]], merged[[2]])
1521+
})
1522+
14671523
#' Combine values by key
14681524
#'
14691525
#' Generic function to combine the elements for each key using a custom set of
@@ -1513,46 +1569,28 @@ setMethod("combineByKey",
15131569
combineLocally <- function(part) {
15141570
combiners <- new.env()
15151571
keys <- new.env()
1572+
pred <- function(item) exists(item$hash, keys)
15161573
lapply(part,
15171574
function(item) {
1518-
k <- as.character(item[[1]])
1519-
if (!exists(k, keys)) {
1520-
combiners[[k]] <- do.call(createCombiner,
1521-
list(item[[2]]))
1522-
keys[[k]] <- item[[1]]
1523-
} else {
1524-
combiners[[k]] <- do.call(mergeValue,
1525-
list(combiners[[k]],
1526-
item[[2]]))
1527-
}
1528-
})
1529-
lapply(ls(keys), function(k) {
1530-
list(keys[[k]], combiners[[k]])
1575+
item$hash <- as.character(item[[1]])
1576+
updateOrCreatePair(item, keys, combiners, pred, mergeValue, createCombiner)
15311577
})
1578+
convertEnvsToList(keys, combiners)
15321579
}
15331580
locallyCombined <- lapplyPartition(rdd, combineLocally)
15341581
shuffled <- partitionBy(locallyCombined, numPartitions)
15351582
mergeAfterShuffle <- function(part) {
15361583
combiners <- new.env()
15371584
keys <- new.env()
1585+
pred <- function(item) exists(item$hash, keys)
15381586
lapply(part,
15391587
function(item) {
1540-
k <- as.character(item[[1]])
1541-
if (!exists(k, combiners)) {
1542-
combiners[[k]] <- item[[2]]
1543-
keys[[k]] <- item[[1]]
1544-
} else {
1545-
combiners[[k]] <- do.call(mergeCombiners,
1546-
list(combiners[[k]],
1547-
item[[2]]))
1548-
}
1549-
})
1550-
lapply(ls(keys), function(k) {
1551-
list(keys[[k]], combiners[[k]])
1588+
item$hash <- as.character(item[[1]])
1589+
updateOrCreatePair(item, keys, combiners, pred, mergeCombiners, identity)
15521590
})
1591+
convertEnvsToList(keys, combiners)
15531592
}
1554-
combined <-lapplyPartition(shuffled, mergeAfterShuffle)
1555-
combined
1593+
lapplyPartition(shuffled, mergeAfterShuffle)
15561594
})
15571595

15581596
############ Binary Functions #############

pkg/R/utils.R

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,3 +259,32 @@ joinTaggedList <- function(tagged_list, cnull) {
259259
lists <- genCompactLists(tagged_list, cnull)
260260
mergeCompactLists(lists[[1]], lists[[2]])
261261
}
262+
263+
# Utility function to reduce a key-value list with predicate
264+
# Used in *ByKey functions
265+
# param
266+
# pair key-value pair
267+
# keys/vals env of key/value with hashes
268+
# updateOrCreatePred predicate function
269+
# updateFn update or merge function for existing pair, similar with `mergeVal` @combineByKey
270+
# createFn create function for new pair, similar with `createCombiner` @combinebykey
271+
updateOrCreatePair <- function(pair, keys, vals, updateOrCreatePred, updateFn, createFn) {
272+
# assume hashVal bind to `$hash`, key/val with index 1/2
273+
hashVal <- pair$hash
274+
key <- pair[[1]]
275+
val <- pair[[2]]
276+
if (updateOrCreatePred(pair)) {
277+
assign(hashVal, do.call(updateFn, list(get(hashVal, envir = vals), val)), envir = vals)
278+
} else {
279+
assign(hashVal, do.call(createFn, list(val)), envir = vals)
280+
assign(hashVal, key, envir=keys)
281+
}
282+
}
283+
284+
# Utility function to convert key&values envs into key-val list
285+
convertEnvsToList <- function(keys, vals) {
286+
lapply(ls(keys),
287+
function(name) {
288+
list(keys[[name]], vals[[name]])
289+
})
290+
}

pkg/inst/tests/test_rdd.R

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,19 @@ test_that("flatMapValues() on pairwise RDDs", {
229229
list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
230230
})
231231

232+
test_that("reduceByKeyLocally() on PairwiseRDDs", {
233+
pairs <- parallelize(sc, list(list(1, 2), list(1.1, 3), list(1, 4)), 2L)
234+
actual <- reduceByKeyLocally(pairs, "+")
235+
expect_equal(sortKeyValueList(actual),
236+
sortKeyValueList(list(list(1, 6), list(1.1, 3))))
237+
238+
pairs <- parallelize(sc, list(list("abc", 1.2), list(1.1, 0), list("abc", 1.3),
239+
list("bb", 5)), 4L)
240+
actual <- reduceByKeyLocally(pairs, "+")
241+
expect_equal(sortKeyValueList(actual),
242+
sortKeyValueList(list(list("abc", 2.5), list(1.1, 0), list("bb", 5))))
243+
})
244+
232245
test_that("distinct() on RDDs", {
233246
nums.rep2 <- rep(1:10, 2)
234247
rdd.rep2 <- parallelize(sc, nums.rep2, 2L)

pkg/man/reduceByKeyLocally.Rd

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
% Generated by roxygen2 (4.1.0): do not edit by hand
2+
% Please edit documentation in R/RDD.R
3+
\docType{methods}
4+
\name{reduceByKeyLocally}
5+
\alias{reduceByKeyLocally}
6+
\alias{reduceByKeyLocally,RDD,integer-method}
7+
\alias{reduceByKeyLocally,RDD-method}
8+
\title{Merge values by key locally}
9+
\usage{
10+
reduceByKeyLocally(rdd, combineFunc)
11+
12+
\S4method{reduceByKeyLocally}{RDD}(rdd, combineFunc)
13+
}
14+
\arguments{
15+
\item{rdd}{The RDD to reduce by key. Should be an RDD where each element is
16+
list(K, V) or c(K, V).}
17+
18+
\item{combineFunc}{The associative reduce function to use.}
19+
}
20+
\value{
21+
A list of elements of type list(K, V') where V' is the merged value for each key
22+
}
23+
\description{
24+
This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
25+
and merges the values for each key using an associative reduce function, but return the
26+
results immediately to the driver as an R list.
27+
}
28+
\examples{
29+
\dontrun{
30+
sc <- sparkR.init()
31+
pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
32+
rdd <- parallelize(sc, pairs)
33+
reduced <- reduceByKeyLocally(rdd, "+")
34+
reduced # list(list(1, 6), list(1.1, 3))
35+
}
36+
}
37+
\seealso{
38+
reduceByKey
39+
}
40+

0 commit comments

Comments
 (0)