Skip to content

Commit cfe0ec4

Browse files
committed
Address a number of minor review comments:
1 parent 8a6fe52 commit cfe0ec4

File tree

6 files changed

+37
-39
lines changed

6 files changed

+37
-39
lines changed

core/src/main/java/org/apache/spark/shuffle/unsafe/DummySerializerInstance.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,44 +26,51 @@
2626
import java.io.OutputStream;
2727
import java.nio.ByteBuffer;
2828

29-
class DummySerializerInstance extends SerializerInstance {
29+
/**
30+
* Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
31+
* Our shuffle write path doesn't actually use this serializer (since we end up calling the
32+
* `write() OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
33+
* around this, we pass a dummy no-op serializer.
34+
*/
35+
final class DummySerializerInstance extends SerializerInstance {
36+
37+
public static final DummySerializerInstance INSTANCE = new DummySerializerInstance();
38+
39+
private DummySerializerInstance() { }
40+
3041
@Override
3142
public SerializationStream serializeStream(OutputStream s) {
3243
return new SerializationStream() {
3344
@Override
34-
public void flush() {
35-
36-
}
45+
public void flush() { }
3746

3847
@Override
3948
public <T> SerializationStream writeObject(T t, ClassTag<T> ev1) {
40-
return null;
49+
throw new UnsupportedOperationException();
4150
}
4251

4352
@Override
44-
public void close() {
45-
46-
}
53+
public void close() { }
4754
};
4855
}
4956

5057
@Override
5158
public <T> ByteBuffer serialize(T t, ClassTag<T> ev1) {
52-
return null;
59+
throw new UnsupportedOperationException();
5360
}
5461

5562
@Override
5663
public DeserializationStream deserializeStream(InputStream s) {
57-
return null;
64+
throw new UnsupportedOperationException();
5865
}
5966

6067
@Override
6168
public <T> T deserialize(ByteBuffer bytes, ClassLoader loader, ClassTag<T> ev1) {
62-
return null;
69+
throw new UnsupportedOperationException();
6370
}
6471

6572
@Override
6673
public <T> T deserialize(ByteBuffer bytes, ClassTag<T> ev1) {
67-
return null;
74+
throw new UnsupportedOperationException();
6875
}
6976
}

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private SpillInfo writeSpillFile() throws IOException {
155155
// Our write path doesn't actually use this serializer (since we end up calling the `write()`
156156
// OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
157157
// around this, we pass a dummy no-op serializer.
158-
final SerializerInstance ser = new DummySerializerInstance();
158+
final SerializerInstance ser = DummySerializerInstance.INSTANCE;
159159
// TODO: audit the metrics-related code and ensure proper metrics integration:
160160
// It's not clear how we should handle shuffle write metrics for spill files; currently, Spark
161161
// doesn't report IO time spent writing spill files (see SPARK-7413). This method,
@@ -238,13 +238,12 @@ private long getMemoryUsage() {
238238

239239
private long freeMemory() {
240240
long memoryFreed = 0;
241-
final Iterator<MemoryBlock> iter = allocatedPages.iterator();
242-
while (iter.hasNext()) {
243-
memoryManager.freePage(iter.next());
244-
shuffleMemoryManager.release(PAGE_SIZE);
245-
memoryFreed += PAGE_SIZE;
246-
iter.remove();
241+
for (MemoryBlock block : allocatedPages) {
242+
memoryManager.freePage(block);
243+
shuffleMemoryManager.release(block.size());
244+
memoryFreed += block.size();
247245
}
246+
allocatedPages.clear();
248247
currentPage = null;
249248
currentPagePosition = -1;
250249
return memoryFreed;

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ public void copyRange(long[] src, int srcPos, long[] dst, int dstPos, int length
6161

6262
@Override
6363
public long[] allocate(int length) {
64-
assert (length < Integer.MAX_VALUE) : "Length " + length + " is too large";
6564
return new long[length];
6665
}
6766

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@
2424
public final class UnsafeShuffleSorter {
2525

2626
private final Sorter<PackedRecordPointer, long[]> sorter;
27-
private final Comparator<PackedRecordPointer> sortComparator;
27+
private static final class SortComparator implements Comparator<PackedRecordPointer> {
28+
@Override
29+
public int compare(PackedRecordPointer left, PackedRecordPointer right) {
30+
return left.getPartitionId() - right.getPartitionId();
31+
}
32+
}
33+
private static final SortComparator SORT_COMPARATOR = new SortComparator();
2834

2935
private long[] sortBuffer;
3036

@@ -36,14 +42,7 @@ public final class UnsafeShuffleSorter {
3642
public UnsafeShuffleSorter(int initialSize) {
3743
assert (initialSize > 0);
3844
this.sortBuffer = new long[initialSize];
39-
this.sorter =
40-
new Sorter<PackedRecordPointer, long[]>(UnsafeShuffleSortDataFormat.INSTANCE);
41-
this.sortComparator = new Comparator<PackedRecordPointer>() {
42-
@Override
43-
public int compare(PackedRecordPointer left, PackedRecordPointer right) {
44-
return left.getPartitionId() - right.getPartitionId();
45-
}
46-
};
45+
this.sorter = new Sorter<PackedRecordPointer, long[]>(UnsafeShuffleSortDataFormat.INSTANCE);
4746
}
4847

4948
public void expandSortBuffer() {
@@ -81,11 +80,10 @@ public static abstract class UnsafeShuffleSorterIterator {
8180
}
8281

8382
/**
84-
* Return an iterator over record pointers in sorted order. For efficiency, all calls to
85-
* {@code next()} will return the same mutable object.
83+
* Return an iterator over record pointers in sorted order.
8684
*/
8785
public UnsafeShuffleSorterIterator getSortedIterator() {
88-
sorter.sort(sortBuffer, 0, sortBufferInsertPosition, sortComparator);
86+
sorter.sort(sortBuffer, 0, sortBufferInsertPosition, SORT_COMPARATOR);
8987
return new UnsafeShuffleSorterIterator() {
9088

9189
private int position = 0;

core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -218,13 +218,7 @@ private[spark] class DiskBlockObjectWriter(
218218
recordWritten()
219219
}
220220

221-
override def write(b: Int): Unit = {
222-
if (!initialized) {
223-
open()
224-
}
225-
226-
bs.write(b)
227-
}
221+
override def write(b: Int): Unit = throw new UnsupportedOperationException()
228222

229223
override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
230224
if (!initialized) {

core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Th
118118
}
119119

120120
@Test
121+
@SuppressWarnings("unchecked")
121122
public void basicShuffleWriting() throws Exception {
122123

123124
final ShuffleDependency<Object, Object, Object> dep = mock(ShuffleDependency.class);

0 commit comments

Comments
 (0)