Skip to content

Commit b317aa7

Browse files
concretevitaminDavies Liu
authored andcommitted
Merge pull request apache#243 from hqzizania/master
[SPARKR-199] Change takeOrdered, top to fetch one partition at a time
1 parent 136a07e commit b317aa7

File tree

1 file changed

+35
-12
lines changed

1 file changed

+35
-12
lines changed

R/pkg/R/RDD.R

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -738,6 +738,7 @@ setMethod("take",
738738
index <- -1
739739
jrdd <- getJRDD(x)
740740
numPartitions <- numPartitions(x)
741+
serializedModeRDD <- getSerializedMode(x)
741742

742743
# TODO(shivaram): Collect more than one partition based on size
743744
# estimates similar to the scala version of `take`.
@@ -756,13 +757,14 @@ setMethod("take",
756757
elems <- convertJListToRList(partition,
757758
flatten = TRUE,
758759
logicalUpperBound = size,
759-
serializedMode = getSerializedMode(x))
760-
# TODO: Check if this append is O(n^2)?
760+
serializedMode = serializedModeRDD)
761+
761762
resList <- append(resList, elems)
762763
}
763764
resList
764765
})
765766

767+
766768
#' First
767769
#'
768770
#' Return the first element of an RDD
@@ -1100,21 +1102,42 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
11001102
if (num < length(part)) {
11011103
# R limitation: order works only on primitive types!
11021104
ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending)
1103-
list(part[ord[1:num]])
1105+
part[ord[1:num]]
11041106
} else {
1105-
list(part)
1107+
part
11061108
}
11071109
}
11081110

1109-
reduceFunc <- function(elems, part) {
1110-
newElems <- append(elems, part)
1111-
# R limitation: order works only on primitive types!
1112-
ord <- order(unlist(newElems, recursive = FALSE), decreasing = !ascending)
1113-
newElems[ord[1:num]]
1114-
}
1115-
11161111
newRdd <- mapPartitions(x, partitionFunc)
1117-
reduce(newRdd, reduceFunc)
1112+
1113+
resList <- list()
1114+
index <- -1
1115+
jrdd <- getJRDD(newRdd)
1116+
numPartitions <- numPartitions(newRdd)
1117+
serializedModeRDD <- getSerializedMode(newRdd)
1118+
1119+
while (TRUE) {
1120+
index <- index + 1
1121+
1122+
if (index >= numPartitions) {
1123+
ord <- order(unlist(resList, recursive = FALSE), decreasing = !ascending)
1124+
resList <- resList[ord[1:num]]
1125+
break
1126+
}
1127+
1128+
# a JList of byte arrays
1129+
partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
1130+
partition <- partitionArr[[1]]
1131+
1132+
# elems is capped to have at most `num` elements
1133+
elems <- convertJListToRList(partition,
1134+
flatten = TRUE,
1135+
logicalUpperBound = num,
1136+
serializedMode = serializedModeRDD)
1137+
1138+
resList <- append(resList, elems)
1139+
}
1140+
resList
11181141
}
11191142

11201143
#' Returns the first N elements from an RDD in ascending order.

0 commit comments

Comments
 (0)