Skip to content

Commit b3b1924

Browse files
committed
Properly implement close() and flush() in DummySerializerInstance.
It turns out that we actually rely on these flushing the underlying stream in order to properly close streams in DiskBlockObjectWriter; it was silly of me to not implement these methods. This should fix a failing LZ4 test in UnsafeShuffleWriterSuite.
1 parent 1ef56c7 commit b3b1924

File tree

2 files changed

+31
-15
lines changed

2 files changed

+31
-15
lines changed

core/src/main/java/org/apache/spark/shuffle/unsafe/DummySerializerInstance.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717

1818
package org.apache.spark.shuffle.unsafe;
1919

20-
import org.apache.spark.serializer.DeserializationStream;
21-
import org.apache.spark.serializer.SerializationStream;
22-
import org.apache.spark.serializer.SerializerInstance;
23-
import scala.reflect.ClassTag;
24-
20+
import java.io.IOException;
2521
import java.io.InputStream;
2622
import java.io.OutputStream;
2723
import java.nio.ByteBuffer;
2824

25+
import scala.reflect.ClassTag;
26+
27+
import org.apache.spark.serializer.DeserializationStream;
28+
import org.apache.spark.serializer.SerializationStream;
29+
import org.apache.spark.serializer.SerializerInstance;
30+
import org.apache.spark.unsafe.PlatformDependent;
31+
2932
/**
3033
* Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
3134
* Our shuffle write path doesn't actually use this serializer (since we end up calling the
@@ -39,18 +42,32 @@ final class DummySerializerInstance extends SerializerInstance {
3942
private DummySerializerInstance() { }
4043

4144
@Override
42-
public SerializationStream serializeStream(OutputStream s) {
45+
public SerializationStream serializeStream(final OutputStream s) {
4346
return new SerializationStream() {
4447
@Override
45-
public void flush() { }
48+
public void flush() {
49+
// Need to implement this because DiskObjectWriter uses it to flush the compression stream
50+
try {
51+
s.flush();
52+
} catch (IOException e) {
53+
PlatformDependent.throwException(e);
54+
}
55+
}
4656

4757
@Override
4858
public <T> SerializationStream writeObject(T t, ClassTag<T> ev1) {
4959
throw new UnsupportedOperationException();
5060
}
5161

5262
@Override
53-
public void close() { }
63+
public void close() {
64+
// Need to implement this because DiskObjectWriter uses it to close the compression stream
65+
try {
66+
s.close();
67+
} catch (IOException e) {
68+
PlatformDependent.throwException(e);
69+
}
70+
}
5471
};
5572
}
5673

core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -311,13 +311,12 @@ private void testMergingSpills(
311311
Assert.assertTrue(mergedOutputFile.exists());
312312
Assert.assertEquals(2, spillFilesCreated.size());
313313

314-
// This assertion only holds for the fast merging path:
315-
// long sumOfPartitionSizes = 0;
316-
// for (long size: partitionSizesInMergedFile) {
317-
// sumOfPartitionSizes += size;
318-
// }
319-
// Assert.assertEquals(sumOfPartitionSizes, mergedOutputFile.length());
320-
Assert.assertTrue(mergedOutputFile.length() > 0);
314+
long sumOfPartitionSizes = 0;
315+
for (long size: partitionSizesInMergedFile) {
316+
sumOfPartitionSizes += size;
317+
}
318+
Assert.assertEquals(sumOfPartitionSizes, mergedOutputFile.length());
319+
321320
Assert.assertEquals(
322321
HashMultiset.create(dataToWrite),
323322
HashMultiset.create(readRecordsFromFile()));

0 commit comments

Comments
 (0)