Skip to content

Commit bd6705b

Browse files
Merge pull request apache#154 from sun-rui/SPARKR-150
[SPARKR-150] phase 1: implement sortBy() and sortByKey().
2 parents 0c6e071 + c7964c9 commit bd6705b

File tree

6 files changed

+234
-3
lines changed

6 files changed

+234
-3
lines changed

pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ exportMethods(
4646
"sampleRDD",
4747
"saveAsTextFile",
4848
"saveAsObjectFile",
49+
"sortBy",
50+
"sortByKey",
4951
"take",
5052
"takeSample",
5153
"unionRDD",

pkg/R/RDD.R

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,6 +1264,36 @@ setMethod("flatMapValues",
12641264
flatMap(X, flatMapFunc)
12651265
})
12661266

1267+
#' Sort an RDD by the given key function.
1268+
#'
1269+
#' @param rdd An RDD to be sorted.
1270+
#' @param func A function used to compute the sort key for each element.
1271+
#' @param ascending A flag to indicate whether the sorting is ascending or descending.
1272+
#' @param numPartitions Number of partitions to create.
1273+
#' @return An RDD where all elements are sorted.
1274+
#' @rdname sortBy
1275+
#' @export
1276+
#' @examples
1277+
#'\dontrun{
1278+
#' sc <- sparkR.init()
1279+
#' rdd <- parallelize(sc, list(3, 2, 1))
1280+
#' collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
1281+
#'}
1282+
setGeneric("sortBy", function(rdd,
1283+
func,
1284+
ascending = TRUE,
1285+
numPartitions = 1L) {
1286+
standardGeneric("sortBy")
1287+
})
1288+
1289+
#' @rdname sortBy
1290+
#' @aliases sortBy,RDD,RDD-method
1291+
setMethod("sortBy",
1292+
signature(rdd = "RDD", func = "function"),
1293+
function(rdd, func, ascending = TRUE, numPartitions = SparkR::numPartitions(rdd)) {
1294+
values(sortByKey(keyBy(rdd, func), ascending, numPartitions))
1295+
})
1296+
12671297
############ Shuffle Functions ############
12681298

12691299
#' Partition an RDD by key
@@ -1858,6 +1888,76 @@ setMethod("cogroup",
18581888
group.func)
18591889
})
18601890

1891+
#' Sort a (k, v) pair RDD by k.
1892+
#'
1893+
#' @param rdd A (k, v) pair RDD to be sorted.
1894+
#' @param ascending A flag to indicate whether the sorting is ascending or descending.
1895+
#' @param numPartitions Number of partitions to create.
1896+
#' @return An RDD where all (k, v) pair elements are sorted.
1897+
#' @rdname sortByKey
1898+
#' @export
1899+
#' @examples
1900+
#'\dontrun{
1901+
#' sc <- sparkR.init()
1902+
#' rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
1903+
#' collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
1904+
#'}
1905+
setGeneric("sortByKey", function(rdd,
1906+
ascending = TRUE,
1907+
numPartitions = 1L) {
1908+
standardGeneric("sortByKey")
1909+
})
1910+
1911+
#' @rdname sortByKey
1912+
#' @aliases sortByKey,RDD,RDD-method
1913+
setMethod("sortByKey",
1914+
signature(rdd = "RDD"),
1915+
function(rdd, ascending = TRUE, numPartitions = SparkR::numPartitions(rdd)) {
1916+
rangeBounds <- list()
1917+
1918+
if (numPartitions > 1) {
1919+
rddSize <- count(rdd)
1920+
# constant from Spark's RangePartitioner
1921+
maxSampleSize <- numPartitions * 20
1922+
fraction <- min(maxSampleSize / max(rddSize, 1), 1.0)
1923+
1924+
samples <- collect(keys(sampleRDD(rdd, FALSE, fraction, 1L)))
1925+
1926+
# Note: the built-in R sort() function only works on atomic vectors
1927+
samples <- sort(unlist(samples, recursive = FALSE), decreasing = !ascending)
1928+
1929+
if (length(samples) > 0) {
1930+
rangeBounds <- lapply(seq_len(numPartitions - 1),
1931+
function(i) {
1932+
j <- ceiling(length(samples) * i / numPartitions)
1933+
samples[j]
1934+
})
1935+
}
1936+
}
1937+
1938+
rangePartitionFunc <- function(key) {
1939+
partition <- 0
1940+
1941+
# TODO: Use binary search instead of linear search, similar with Spark
1942+
while (partition < length(rangeBounds) && key > rangeBounds[[partition + 1]]) {
1943+
partition <- partition + 1
1944+
}
1945+
1946+
if (ascending) {
1947+
partition
1948+
} else {
1949+
numPartitions - partition - 1
1950+
}
1951+
}
1952+
1953+
partitionFunc <- function(part) {
1954+
sortKeyValueList(part, decreasing = !ascending)
1955+
}
1956+
1957+
newRDD <- partitionBy(rdd, numPartitions, rangePartitionFunc)
1958+
lapplyPartition(newRDD, partitionFunc)
1959+
})
1960+
18611961
# TODO: Consider caching the name in the RDD's environment
18621962
#' Return an RDD's name.
18631963
#'

pkg/R/utils.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,9 @@ initAccumulator <- function() {
197197

198198
# Utility function to sort a list of key value pairs
199199
# Used in unit tests
200-
sortKeyValueList <- function(kv_list) {
200+
sortKeyValueList <- function(kv_list, decreasing = FALSE) {
201201
keys <- sapply(kv_list, function(x) x[[1]])
202-
kv_list[order(keys)]
202+
kv_list[order(keys, decreasing = decreasing)]
203203
}
204204

205205
# Utility function to generate compact R lists from grouped rdd

pkg/inst/tests/test_rdd.R

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,17 @@ test_that("keyBy on RDDs", {
267267
expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
268268
})
269269

270+
test_that("sortBy() on RDDs", {
271+
sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE)
272+
actual <- collect(sortedRdd)
273+
expect_equal(actual, as.list(sort(nums, decreasing = TRUE)))
274+
275+
rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
276+
sortedRdd2 <- sortBy(rdd2, function(x) { x * x })
277+
actual <- collect(sortedRdd2)
278+
expect_equal(actual, as.list(nums))
279+
})
280+
270281
test_that("keys() on RDDs", {
271282
keys <- keys(intRdd)
272283
actual <- collect(keys)
@@ -387,6 +398,55 @@ test_that("fullOuterJoin() on pairwise RDDs", {
387398
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
388399
})
389400

401+
test_that("sortByKey() on pairwise RDDs", {
402+
numPairsRdd <- map(rdd, function(x) { list (x, x) })
403+
sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE)
404+
actual <- collect(sortedRdd)
405+
numPairs <- lapply(nums, function(x) { list (x, x) })
406+
expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE))
407+
408+
rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
409+
numPairsRdd2 <- map(rdd2, function(x) { list (x, x) })
410+
sortedRdd2 <- sortByKey(numPairsRdd2)
411+
actual <- collect(sortedRdd2)
412+
expect_equal(actual, numPairs)
413+
414+
# sort by string keys
415+
l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5))
416+
rdd3 <- parallelize(sc, l, 2L)
417+
sortedRdd3 <- sortByKey(rdd3)
418+
actual <- collect(sortedRdd3)
419+
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
420+
421+
# test on the boundary cases
422+
423+
# boundary case 1: the RDD to be sorted has only 1 partition
424+
rdd4 <- parallelize(sc, l, 1L)
425+
sortedRdd4 <- sortByKey(rdd4)
426+
actual <- collect(sortedRdd4)
427+
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
428+
429+
# boundary case 2: the sorted RDD has only 1 partition
430+
rdd5 <- parallelize(sc, l, 2L)
431+
sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L)
432+
actual <- collect(sortedRdd5)
433+
expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
434+
435+
# boundary case 3: the RDD to be sorted has only 1 element
436+
l2 <- list(list("a", 1))
437+
rdd6 <- parallelize(sc, l2, 2L)
438+
sortedRdd6 <- sortByKey(rdd6)
439+
actual <- collect(sortedRdd6)
440+
expect_equal(actual, l2)
441+
442+
# boundary case 4: the RDD to be sorted has 0 element
443+
l3 <- list()
444+
rdd7 <- parallelize(sc, l3, 2L)
445+
sortedRdd7 <- sortByKey(rdd7)
446+
actual <- collect(sortedRdd7)
447+
expect_equal(actual, l3)
448+
})
449+
390450
test_that("collectAsMap() on a pairwise RDD", {
391451
rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
392452
vals <- collectAsMap(rdd)
@@ -404,4 +464,3 @@ test_that("collectAsMap() on a pairwise RDD", {
404464
vals <- collectAsMap(rdd)
405465
expect_equal(vals, list(`1` = "a", `2` = "b"))
406466
})
407-

pkg/man/sortBy.Rd

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{sortBy}
4+
\alias{sortBy}
5+
\alias{sortBy,RDD,RDD-method}
6+
\alias{sortBy,RDD,function,missingOrLogical,missingOrInteger-method}
7+
\title{Sort an RDD by the given key function.}
8+
\usage{
9+
sortBy(rdd, func, ascending, numPartitions)
10+
11+
\S4method{sortBy}{RDD,`function`,missingOrLogical,missingOrInteger}(rdd, func,
12+
ascending, numPartitions)
13+
}
14+
\arguments{
15+
\item{rdd}{An RDD to be sorted.}
16+
17+
\item{func}{A function used to compute the sort key for each element.}
18+
19+
\item{ascending}{A flag to indicate whether the sorting is ascending or descending.}
20+
21+
\item{numPartitions}{Number of partitions to create.}
22+
}
23+
\value{
24+
An RDD where all elements are sorted.
25+
}
26+
\description{
27+
Sort an RDD by the given key function.
28+
}
29+
\examples{
30+
\dontrun{
31+
sc <- sparkR.init()
32+
rdd <- parallelize(sc, list(3, 2, 1))
33+
collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
34+
}
35+
}
36+

pkg/man/sortByKey.Rd

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{sortByKey}
4+
\alias{sortByKey}
5+
\alias{sortByKey,RDD,RDD-method}
6+
\alias{sortByKey,RDD,missingOrLogical,missingOrInteger-method}
7+
\title{Sort a (k, v) pair RDD by k.}
8+
\usage{
9+
sortByKey(rdd, ascending, numPartitions)
10+
11+
\S4method{sortByKey}{RDD,missingOrLogical,missingOrInteger}(rdd, ascending,
12+
numPartitions)
13+
}
14+
\arguments{
15+
\item{rdd}{A (k, v) pair RDD to be sorted.}
16+
17+
\item{ascending}{A flag to indicate whether the sorting is ascending or descending.}
18+
19+
\item{numPartitions}{Number of partitions to create.}
20+
}
21+
\value{
22+
An RDD where all (k, v) pair elements are sorted.
23+
}
24+
\description{
25+
Sort a (k, v) pair RDD by k.
26+
}
27+
\examples{
28+
\dontrun{
29+
sc <- sparkR.init()
30+
rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
31+
collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
32+
}
33+
}
34+

0 commit comments

Comments
 (0)