Skip to content

Commit 28ce67b

Browse files
ash211mateiz
authored andcommitted
SPARK-3211 .take() is OOM-prone with empty partitions
Instead of jumping straight from 1 partition to all partitions, do exponential growth and double the number of partitions to attempt each time instead. Fix proposed by Paul Nepywoda Author: Andrew Ash <[email protected]> Closes #2117 from ash211/SPARK-3211 and squashes the following commits: 8b2299a [Andrew Ash] Quadruple instead of double for a minor speedup e5f7e4d [Andrew Ash] Update comment to better reflect what we're doing 09a27f7 [Andrew Ash] Update PySpark to be less OOM-prone as well 3a156b8 [Andrew Ash] SPARK-3211 .take() is OOM-prone with empty partitions (cherry picked from commit ba5bcad) Signed-off-by: Matei Zaharia <[email protected]>
1 parent 6b128be commit 28ce67b

File tree

2 files changed

+7
-8
lines changed

2 files changed

+7
-8
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,11 +1064,10 @@ abstract class RDD[T: ClassTag](
10641064
// greater than totalParts because we actually cap it at totalParts in runJob.
10651065
var numPartsToTry = 1
10661066
if (partsScanned > 0) {
1067-
// If we didn't find any rows after the first iteration, just try all partitions next.
1068-
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
1069-
// by 50%.
1067+
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
1068+
// interpolate the number of partitions we need to try, but overestimate it by 50%.
10701069
if (buf.size == 0) {
1071-
numPartsToTry = totalParts - 1
1070+
numPartsToTry = partsScanned * 4
10721071
} else {
10731072
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
10741073
}

python/pyspark/rdd.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,11 +1132,11 @@ def take(self, num):
11321132
# we actually cap it at totalParts in runJob.
11331133
numPartsToTry = 1
11341134
if partsScanned > 0:
1135-
# If we didn't find any rows after the first iteration, just
1136-
# try all partitions next. Otherwise, interpolate the number
1137-
# of partitions we need to try, but overestimate it by 50%.
1135+
# If we didn't find any rows after the previous iteration,
1136+
# quadruple and retry. Otherwise, interpolate the number of
1137+
# partitions we need to try, but overestimate it by 50%.
11381138
if len(items) == 0:
1139-
numPartsToTry = totalParts - 1
1139+
numPartsToTry = partsScanned * 4
11401140
else:
11411141
numPartsToTry = int(1.5 * num * partsScanned / len(items))
11421142

0 commit comments

Comments
 (0)