Skip to content

Commit 08ff30b

Browse files
committed
Merge pull request apache#153 from hqzizania/master
[SPARKR-160] Support collectAsMap()
2 parents 554bda0 + 9767e8e commit 08ff30b

File tree

4 files changed

+60
-1
lines changed

4 files changed

+60
-1
lines changed

pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ exportMethods(
66
"checkpoint",
77
"cogroup",
88
"collect",
9+
"collectAsMap",
910
"collectPartition",
1011
"combineByKey",
1112
"count",

pkg/R/RDD.R

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ setMethod("collect",
358358
convertJListToRList(collected, flatten)
359359
})
360360

361+
361362
#' @rdname collect-methods
362363
#' @export
363364
#' @description
@@ -382,6 +383,29 @@ setMethod("collectPartition",
382383
convertJListToRList(jList, flatten = TRUE)
383384
})
384385

386+
#' @rdname collect-methods
387+
#' @export
388+
#' @description
389+
#' \code{collectAsMap} returns a named list as a map that contains all of the elements
390+
#' in a key-value pair RDD.
391+
#' @examples
392+
#'\dontrun{
393+
#' sc <- sparkR.init()
394+
#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
395+
#' collectAsMap(rdd) # list(`1` = 2, `3` = 4)
396+
#'}
397+
setGeneric("collectAsMap", function(rdd) { standardGeneric("collectAsMap") })
398+
399+
#' @rdname collect-methods
400+
#' @aliases collectAsMap,RDD-method
401+
setMethod("collectAsMap",
402+
signature(rdd = "RDD"),
403+
function(rdd) {
404+
pairList <- collect(rdd)
405+
map <- new.env()
406+
lapply(pairList, function(x) { assign(as.character(x[[1]]), x[[2]], envir = map) })
407+
as.list(map)
408+
})
385409

386410
#' Look up elements of a key in an RDD
387411
#'

pkg/inst/tests/test_rdd.R

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,3 +373,22 @@ test_that("fullOuterJoin() on pairwise RDDs", {
373373
expect_equal(sortKeyValueList(actual),
374374
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
375375
})
376+
377+
test_that("collectAsMap() on a pairwise RDD", {
378+
rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
379+
vals <- collectAsMap(rdd)
380+
expect_equal(vals, list(`1` = 2, `3` = 4))
381+
382+
rdd <- parallelize(sc, list(list("a", 1), list("b", 2)))
383+
vals <- collectAsMap(rdd)
384+
expect_equal(vals, list(a = 1, b = 2))
385+
386+
rdd <- parallelize(sc, list(list(1.1, 2.2), list(1.2, 2.4)))
387+
vals <- collectAsMap(rdd)
388+
expect_equal(vals, list(`1.1` = 2.2, `1.2` = 2.4))
389+
390+
rdd <- parallelize(sc, list(list(1, "a"), list(2, "b")))
391+
vals <- collectAsMap(rdd)
392+
expect_equal(vals, list(`1` = "a", `2` = "b"))
393+
})
394+

pkg/man/collect-methods.Rd

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
% Generated by roxygen2 (4.0.2): do not edit by hand
1+
% Generated by roxygen2 (4.1.0): do not edit by hand
2+
% Please edit documentation in R/RDD.R
23
\docType{methods}
34
\name{collect}
45
\alias{collect}
56
\alias{collect,RDD-method}
7+
\alias{collectAsMap}
8+
\alias{collectAsMap,RDD-method}
69
\alias{collectPartition}
710
\alias{collectPartition,RDD,integer-method}
811
\alias{collectPartition,integer,RDD-method}
@@ -15,6 +18,10 @@ collect(rdd, ...)
1518
collectPartition(rdd, partitionId)
1619

1720
\S4method{collectPartition}{RDD,integer}(rdd, partitionId)
21+
22+
collectAsMap(rdd)
23+
24+
\S4method{collectAsMap}{RDD}(rdd)
1825
}
1926
\arguments{
2027
\item{rdd}{The RDD to collect}
@@ -33,6 +40,9 @@ a list containing elements in the RDD
3340

3441
\code{collectPartition} returns a list that contains all of the elements
3542
in the specified partition of the RDD.
43+
44+
\code{collectAsMap} returns a named list as a map that contains all of the elements
45+
in a key-value pair RDD.
3646
}
3747
\examples{
3848
\dontrun{
@@ -41,5 +51,10 @@ rdd <- parallelize(sc, 1:10, 2L)
4151
collect(rdd) # list from 1 to 10
4252
collectPartition(rdd, 0L) # list from 1 to 5
4353
}
54+
\dontrun{
55+
sc <- sparkR.init()
56+
rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
57+
collectAsMap(rdd) # list(`1` = 2, `3` = 4)
58+
}
4459
}
4560

0 commit comments

Comments
 (0)