Skip to content

Commit b082a35

Browse files
committed
add reduceByKeyLocally
1 parent 08ff30b commit b082a35

File tree

5 files changed

+164
-54
lines changed

5 files changed

+164
-54
lines changed

pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ exportMethods(
4141
"persist",
4242
"reduce",
4343
"reduceByKey",
44+
"reduceByKeyLocally",
4445
"rightOuterJoin",
4546
"sampleRDD",
4647
"saveAsTextFile",

pkg/R/RDD.R

Lines changed: 80 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,26 +1382,18 @@ setMethod("groupByKey",
13821382
groupVals <- function(part) {
13831383
vals <- new.env()
13841384
keys <- new.env()
1385+
pred <- function(item) exists(item$hash, keys)
13851386
# Each item in the partition is list of (K, V)
13861387
lapply(part,
13871388
function(item) {
1388-
hashVal <- as.character(hashCode(item[[1]]))
1389-
if (exists(hashVal, vals)) {
1390-
acc <- vals[[hashVal]]
1391-
acc[[length(acc) + 1]] <- item[[2]]
1392-
vals[[hashVal]] <- acc
1393-
} else {
1394-
vals[[hashVal]] <- list(item[[2]])
1395-
keys[[hashVal]] <- item[[1]]
1396-
}
1389+
item$hash <- as.character(hashCode(item[[1]]))
1390+
updateOrCreatePair(item, keys, vals, pred,
1391+
function(vs, v) c(vs, list(v)),
1392+
function(x) list(x))
13971393
})
13981394
# Every key in the environment contains a list
13991395
# Convert that to list(K, Seq[V])
1400-
grouped <- lapply(ls(vals),
1401-
function(name) {
1402-
list(keys[[name]], vals[[name]])
1403-
})
1404-
grouped
1396+
convertEnvsToList(keys, vals)
14051397
}
14061398
lapplyPartition(shuffled, groupVals)
14071399
})
@@ -1442,28 +1434,79 @@ setMethod("reduceByKey",
14421434
reduceVals <- function(part) {
14431435
vals <- new.env()
14441436
keys <- new.env()
1437+
pred <- function(item) exists(item$hash, keys)
14451438
lapply(part,
14461439
function(item) {
1447-
hashVal <- as.character(hashCode(item[[1]]))
1448-
if (exists(hashVal, vals)) {
1449-
vals[[hashVal]] <- do.call(
1450-
combineFunc, list(vals[[hashVal]], item[[2]]))
1451-
} else {
1452-
vals[[hashVal]] <- item[[2]]
1453-
keys[[hashVal]] <- item[[1]]
1454-
}
1440+
item$hash <- as.character(hashCode(item[[1]]))
1441+
updateOrCreatePair(item, keys, vals, pred, combineFunc, function(x) x)
14551442
})
1456-
combined <- lapply(ls(vals),
1457-
function(name) {
1458-
list(keys[[name]], vals[[name]])
1459-
})
1460-
combined
1443+
convertEnvsToList(keys, vals)
14611444
}
14621445
locallyReduced <- lapplyPartition(rdd, reduceVals)
14631446
shuffled <- partitionBy(locallyReduced, numPartitions)
14641447
lapplyPartition(shuffled, reduceVals)
14651448
})
14661449

1450+
#' Merge values by key locally
1451+
#'
1452+
#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
1453+
#' and merges the values for each key using an associative reduce function, but return the
1454+
#' results immediately to master as R list.
1455+
#'
1456+
#' @param rdd The RDD to reduce by key. Should be an RDD where each element is
1457+
#' list(K, V) or c(K, V).
1458+
#' @param combineFunc The associative reduce function to use.
1459+
#' @return An list where each element is list(K, V') where V' is the merged
1460+
#' value
1461+
#' @rdname reduceByKeyLocally
1462+
#' @seealso reduceByKey
1463+
#' @export
1464+
#' @examples
1465+
#'\dontrun{
1466+
#' sc <- sparkR.init()
1467+
#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
1468+
#' rdd <- parallelize(sc, pairs)
1469+
#' reduced <- reduceByKeyLocally(rdd, "+")
1470+
#' reduced[[1]] # Should be a list(1, 6)
1471+
#'}
1472+
setGeneric("reduceByKeyLocally",
1473+
function(rdd, combineFunc) {
1474+
standardGeneric("reduceByKeyLocally")
1475+
})
1476+
1477+
#' @rdname reduceByKeyLocally
1478+
#' @aliases reduceByKeyLocally,RDD,integer-method
1479+
setMethod("reduceByKeyLocally",
1480+
signature(rdd = "RDD", combineFunc = "ANY"),
1481+
function(rdd, combineFunc) {
1482+
reducePart <- function(part) {
1483+
vals <- new.env()
1484+
keys <- new.env()
1485+
pred <- function(item) exists(item$hash, keys)
1486+
lapply(part,
1487+
function(item) {
1488+
item$hash <- as.character(hashCode(item[[1]]))
1489+
updateOrCreatePair(item, keys, vals, pred, combineFunc, function(x) x)
1490+
})
1491+
list(list(keys, vals)) # return hash to avoid re-compute in merge
1492+
}
1493+
mergeParts <- function(accum, x) {
1494+
pred <- function(item) {
1495+
exists(item$hash, accum[[1]])
1496+
}
1497+
lapply(ls(x[[1]]),
1498+
function(name) {
1499+
item <- list(x[[1]][[name]], x[[2]][[name]])
1500+
item$hash <- name
1501+
updateOrCreatePair(item, accum[[1]], accum[[2]], pred, combineFunc, function(x) x)
1502+
})
1503+
accum
1504+
}
1505+
reduced <- mapPartitions(rdd, reducePart)
1506+
merged <- reduce(reduced, mergeParts)
1507+
convertEnvsToList(merged[[1]], merged[[2]])
1508+
})
1509+
14671510
#' Combine values by key
14681511
#'
14691512
#' Generic function to combine the elements for each key using a custom set of
@@ -1513,46 +1556,29 @@ setMethod("combineByKey",
15131556
combineLocally <- function(part) {
15141557
combiners <- new.env()
15151558
keys <- new.env()
1559+
pred <- function(item) exists(item$hash, keys)
15161560
lapply(part,
15171561
function(item) {
1518-
k <- as.character(item[[1]])
1519-
if (!exists(k, keys)) {
1520-
combiners[[k]] <- do.call(createCombiner,
1521-
list(item[[2]]))
1522-
keys[[k]] <- item[[1]]
1523-
} else {
1524-
combiners[[k]] <- do.call(mergeValue,
1525-
list(combiners[[k]],
1526-
item[[2]]))
1527-
}
1528-
})
1529-
lapply(ls(keys), function(k) {
1530-
list(keys[[k]], combiners[[k]])
1562+
item$hash <- as.character(item[[1]])
1563+
updateOrCreatePair(item, keys, combiners, pred, mergeValue, createCombiner)
15311564
})
1565+
convertEnvsToList(keys, combiners)
15321566
}
15331567
locallyCombined <- lapplyPartition(rdd, combineLocally)
15341568
shuffled <- partitionBy(locallyCombined, numPartitions)
15351569
mergeAfterShuffle <- function(part) {
15361570
combiners <- new.env()
15371571
keys <- new.env()
1572+
pred <- function(item) exists(item$hash, keys)
15381573
lapply(part,
15391574
function(item) {
1540-
k <- as.character(item[[1]])
1541-
if (!exists(k, combiners)) {
1542-
combiners[[k]] <- item[[2]]
1543-
keys[[k]] <- item[[1]]
1544-
} else {
1545-
combiners[[k]] <- do.call(mergeCombiners,
1546-
list(combiners[[k]],
1547-
item[[2]]))
1548-
}
1549-
})
1550-
lapply(ls(keys), function(k) {
1551-
list(keys[[k]], combiners[[k]])
1575+
item$hash <- as.character(item[[1]])
1576+
updateOrCreatePair(item, keys, combiners, pred, mergeCombiners,
1577+
function(x) x)
15521578
})
1579+
convertEnvsToList(keys, combiners)
15531580
}
1554-
combined <-lapplyPartition(shuffled, mergeAfterShuffle)
1555-
combined
1581+
lapplyPartition(shuffled, mergeAfterShuffle)
15561582
})
15571583

15581584
############ Binary Functions #############

pkg/R/utils.R

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,3 +259,32 @@ joinTaggedList <- function(tagged_list, cnull) {
259259
lists <- genCompactLists(tagged_list, cnull)
260260
mergeCompactLists(lists[[1]], lists[[2]])
261261
}
262+
263+
# Utility function to reduce a key-value list with predicate
264+
# Used in *ByKey functions
265+
# param
266+
# item key-val pair
267+
# keys/vals env of key/value with hashes
268+
# pred predicate function
269+
# update_fn update or merge function for existing pair, similar with `mergeVal` @combineByKey
270+
# create_fn create function for new pair, similar with `createCombiner` @combinebykey
271+
updateOrCreatePair <- function(item, keys, vals, pred, update_fn, create_fn) {
272+
# assum hashval bind to `$hash`, key/val with index 1/2
273+
hashVal <- item$hash
274+
key <- item[[1]]
275+
val <- item[[2]]
276+
if (pred(item)) {
277+
assign(hashVal, do.call(update_fn, list(get(hashVal, envir=vals), val)), envir=vals)
278+
} else {
279+
assign(hashVal, do.call(create_fn, list(val)), envir=vals)
280+
assign(hashVal, key, envir=keys)
281+
}
282+
}
283+
284+
# Utility function to convert key&values envs into key-val list
285+
convertEnvsToList <- function(keys, vals) {
286+
lapply(ls(keys),
287+
function(name) {
288+
list(keys[[name]], vals[[name]])
289+
})
290+
}

pkg/inst/tests/test_rdd.R

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,19 @@ test_that("flatMapValues() on pairwise RDDs", {
229229
list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
230230
})
231231

232+
test_that("reduceByKeyLocally() on PairwiseRDDs", {
233+
pairs <- parallelize(sc, list(list(1, 2), list(1.1, 3), list(1, 4)), 2L)
234+
actual <- reduceByKeyLocally(pairs, "+")
235+
expect_equal(sortKeyValueList(actual),
236+
sortKeyValueList(list(list(1, 6), list(1.1, 3))))
237+
238+
pairs <- parallelize(sc, list(list("abc", 1.2), list(1.1, 0), list("abc", 1.3),
239+
list("bb", 5)), 4L)
240+
actual <- reduceByKeyLocally(pairs, "+")
241+
expect_equal(sortKeyValueList(actual),
242+
sortKeyValueList(list(list("abc", 2.5), list(1.1, 0), list("bb", 5))))
243+
})
244+
232245
test_that("distinct() on RDDs", {
233246
nums.rep2 <- rep(1:10, 2)
234247
rdd.rep2 <- parallelize(sc, nums.rep2, 2L)

pkg/man/reduceByKeyLocally.Rd

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
% Generated by roxygen2 (4.1.0): do not edit by hand
2+
% Please edit documentation in R/RDD.R
3+
\docType{methods}
4+
\name{reduceByKeyLocally}
5+
\alias{reduceByKeyLocally}
6+
\alias{reduceByKeyLocally,RDD,integer-method}
7+
\alias{reduceByKeyLocally,RDD-method}
8+
\title{Merge values by key locally}
9+
\usage{
10+
reduceByKeyLocally(rdd, combineFunc)
11+
12+
\S4method{reduceByKeyLocally}{RDD}(rdd, combineFunc)
13+
}
14+
\arguments{
15+
\item{rdd}{The RDD to reduce by key. Should be an RDD where each element is
16+
list(K, V) or c(K, V).}
17+
18+
\item{combineFunc}{The associative reduce function to use.}
19+
}
20+
\value{
21+
An list where each element is list(K, V') where V' is the merged
22+
value
23+
}
24+
\description{
25+
This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
26+
and merges the values for each key using an associative reduce function, but return the
27+
results immediately to master as R list.
28+
}
29+
\examples{
30+
\dontrun{
31+
sc <- sparkR.init()
32+
pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
33+
rdd <- parallelize(sc, pairs)
34+
reduced <- reduceByKeyLocally(rdd, "+")
35+
reduced[[1]] # Should be a list(1, 6)
36+
}
37+
}
38+
\seealso{
39+
reduceByKey
40+
}
41+

0 commit comments

Comments
 (0)