Skip to content

Commit e943e74

Browse files
author
Davies Liu
committed
fix tests
1 parent 4ee1f42 commit e943e74

File tree

3 files changed

+23
-20
lines changed

3 files changed

+23
-20
lines changed

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.slf4j.LoggerFactory;
2929

3030
import org.apache.spark.SparkEnv;
31-
import org.apache.spark.TaskContext;
3231
import org.apache.spark.executor.ShuffleWriteMetrics;
3332
import org.apache.spark.memory.MemoryConsumer;
3433
import org.apache.spark.memory.TaskMemoryManager;
@@ -40,7 +39,6 @@
4039
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
4140
import org.apache.spark.unsafe.memory.MemoryBlock;
4241
import org.apache.spark.unsafe.memory.MemoryLocation;
43-
import org.apache.spark.util.TaskCompletionListener;
4442
import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader;
4543
import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter;
4644

@@ -175,6 +173,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
175173

176174
private final BlockManager blockManager;
177175
private volatile MapIterator destructiveIterator = null;
176+
private LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>();
178177

179178
public BytesToBytesMap(
180179
TaskMemoryManager taskMemoryManager,
@@ -243,7 +242,6 @@ public final class MapIterator implements Iterator<Location> {
243242
// If this iterator destructive or not. When it is true, it frees each page as it moves onto
244243
// next one.
245244
private boolean destructive = false;
246-
private LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>();
247245
private UnsafeSorterSpillReader reader = null;
248246

249247
private MapIterator(int numRecords, Location loc, boolean destructive) {
@@ -293,6 +291,17 @@ private void advanceToNextPage() {
293291

294292
@Override
295293
public boolean hasNext() {
294+
if (numRecords == 0) {
295+
if (reader != null) {
296+
// remove the spill file from disk
297+
File file = spillWriters.removeFirst().getFile();
298+
if (file != null && file.exists()) {
299+
if (!file.delete()) {
300+
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
301+
}
302+
}
303+
}
304+
}
296305
return numRecords > 0;
297306
}
298307

@@ -355,21 +364,6 @@ public long spill(long numBytes) throws IOException {
355364
}
356365
writer.close();
357366
spillWriters.add(writer);
358-
if (TaskContext.get() != null) {
359-
TaskContext.get().addTaskCompletionListener(
360-
new TaskCompletionListener() {
361-
@Override
362-
public void onTaskCompletion(TaskContext context) {
363-
File file = writer.getFile();
364-
if (file != null && file.exists()) {
365-
if (!file.delete()) {
366-
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
367-
}
368-
}
369-
}
370-
}
371-
);
372-
}
373367

374368
dataPages.removeLast();
375369
released += block.size();
@@ -774,6 +768,15 @@ public void free() {
774768
freePage(dataPage);
775769
}
776770
assert(dataPages.isEmpty());
771+
772+
while (!spillWriters.isEmpty()) {
773+
File file = spillWriters.removeFirst().getFile();
774+
if (file != null && file.exists()) {
775+
if (!file.delete()) {
776+
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
777+
}
778+
}
779+
}
777780
}
778781

779782
public TaskMemoryManager getTaskMemoryManager() {

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ private UnsafeExternalSorter(
131131
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
132132
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
133133
// does not fully consume the sorter's output (e.g. sort followed by limit).
134-
TaskContext.get().addTaskCompletionListener(
134+
taskContext.addTaskCompletionListener(
135135
new TaskCompletionListener() {
136136
@Override
137137
public void onTaskCompletion(TaskContext context) {

core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ public void spillInIterator() throws IOException {
556556
for (i = 0; i < 100; i++) {
557557
iter2.next();
558558
}
559-
Assert.assertTrue(iter2.spill(1024) > 1024);
559+
Assert.assertTrue(iter2.spill(1024) >= 1024);
560560
for (i = 100; i < 1024; i++) {
561561
iter2.next();
562562
}

0 commit comments

Comments
 (0)