Skip to content

Commit d187e7d

Browse files
Sital Kediadavies
authored andcommitted
[SPARK-14363] Fix executor OOM due to memory leak in the Sorter
## What changes were proposed in this pull request? Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization. This is a regression partially introduced in PR apache#9241 ## How was this patch tested? Tested by running a job and observed around 30% speedup after this change. Author: Sital Kedia <[email protected]> Closes apache#12285 from sitalkedia/executor_oom.
1 parent c439d88 commit d187e7d

File tree

4 files changed

+23
-4
lines changed

4 files changed

+23
-4
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,6 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
215215
}
216216
}
217217

218-
inMemSorter.reset();
219-
220218
if (!isLastFile) { // i.e. this is a spill file
221219
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
222220
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
@@ -255,6 +253,10 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
255253

256254
writeSortedFile(false);
257255
final long spillSize = freeMemory();
256+
inMemSorter.reset();
257+
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
258+
// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
259+
// we might not be able to get memory for the pointer array.
258260
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
259261
return spillSize;
260262
}

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,12 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) {
5151
*/
5252
private int pos = 0;
5353

54+
private int initialSize;
55+
5456
ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize) {
5557
this.consumer = consumer;
5658
assert (initialSize > 0);
59+
this.initialSize = initialSize;
5760
this.array = consumer.allocateArray(initialSize);
5861
this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE);
5962
}
@@ -70,6 +73,10 @@ public int numRecords() {
7073
}
7174

7275
public void reset() {
76+
if (consumer != null) {
77+
consumer.freeArray(array);
78+
this.array = consumer.allocateArray(initialSize);
79+
}
7380
pos = 0;
7481
}
7582

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,14 +200,17 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
200200
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
201201
}
202202
spillWriter.close();
203-
204-
inMemSorter.reset();
205203
}
206204

207205
final long spillSize = freeMemory();
208206
// Note that this is more-or-less going to be a multiple of the page size, so wasted space in
209207
// pages will currently be counted as memory spilled even though that space isn't actually
210208
// written to disk. This also counts the space needed to store the sorter's pointer array.
209+
inMemSorter.reset();
210+
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
211+
// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
212+
// we might not be able to get memory for the pointer array.
213+
211214
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
212215

213216
return spillSize;

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
8484
*/
8585
private int pos = 0;
8686

87+
private long initialSize;
88+
8789
public UnsafeInMemorySorter(
8890
final MemoryConsumer consumer,
8991
final TaskMemoryManager memoryManager,
@@ -102,6 +104,7 @@ public UnsafeInMemorySorter(
102104
LongArray array) {
103105
this.consumer = consumer;
104106
this.memoryManager = memoryManager;
107+
this.initialSize = array.size();
105108
if (recordComparator != null) {
106109
this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
107110
this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
@@ -123,6 +126,10 @@ public void free() {
123126
}
124127

125128
public void reset() {
129+
if (consumer != null) {
130+
consumer.freeArray(array);
131+
this.array = consumer.allocateArray(initialSize);
132+
}
126133
pos = 0;
127134
}
128135

0 commit comments

Comments
 (0)