Skip to content

Commit 413d060

Browse files
Sital Kediadavies
authored andcommitted
[SPARK-14363] Fix executor OOM due to memory leak in the Sorter
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization. This is a regression partially introduced in PR apache#9241 Tested by running a job and observed around 30% speedup after this change. Author: Sital Kedia <[email protected]> Closes apache#12285 from sitalkedia/executor_oom. (cherry picked from commit d187e7d) Signed-off-by: Davies Liu <[email protected]> Conflicts: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
1 parent 582ed8a commit 413d060

File tree

4 files changed

+23
-4
lines changed

4 files changed

+23
-4
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,6 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
215215
}
216216
}
217217

218-
inMemSorter.reset();
219-
220218
if (!isLastFile) { // i.e. this is a spill file
221219
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
222220
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
@@ -255,6 +253,10 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
255253

256254
writeSortedFile(false);
257255
final long spillSize = freeMemory();
256+
inMemSorter.reset();
257+
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
258+
// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
259+
// we might not be able to get memory for the pointer array.
258260
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
259261
return spillSize;
260262
}

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) {
4949
*/
5050
private int pos = 0;
5151

52+
private int initialSize;
53+
5254
public ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize) {
5355
this.consumer = consumer;
5456
assert (initialSize > 0);
57+
this.initialSize = initialSize;
5558
this.array = consumer.allocateArray(initialSize);
5659
this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE);
5760
}
@@ -68,6 +71,10 @@ public int numRecords() {
6871
}
6972

7073
public void reset() {
74+
if (consumer != null) {
75+
consumer.freeArray(array);
76+
this.array = consumer.allocateArray(initialSize);
77+
}
7178
pos = 0;
7279
}
7380

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,14 +192,17 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
192192
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
193193
}
194194
spillWriter.close();
195-
196-
inMemSorter.reset();
197195
}
198196

199197
final long spillSize = freeMemory();
200198
// Note that this is more-or-less going to be a multiple of the page size, so wasted space in
201199
// pages will currently be counted as memory spilled even though that space isn't actually
202200
// written to disk. This also counts the space needed to store the sorter's pointer array.
201+
inMemSorter.reset();
202+
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
203+
// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
204+
// we might not be able to get memory for the pointer array.
205+
203206
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
204207

205208
return spillSize;

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
8080
*/
8181
private int pos = 0;
8282

83+
private long initialSize;
84+
8385
public UnsafeInMemorySorter(
8486
final MemoryConsumer consumer,
8587
final TaskMemoryManager memoryManager,
@@ -98,6 +100,7 @@ public UnsafeInMemorySorter(
98100
LongArray array) {
99101
this.consumer = consumer;
100102
this.memoryManager = memoryManager;
103+
this.initialSize = array.size();
101104
this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
102105
this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
103106
this.array = array;
@@ -114,6 +117,10 @@ public void free() {
114117
}
115118

116119
public void reset() {
120+
if (consumer != null) {
121+
consumer.freeArray(array);
122+
this.array = consumer.allocateArray(initialSize);
123+
}
117124
pos = 0;
118125
}
119126

0 commit comments

Comments
 (0)