1717
1818package org .apache .spark .shuffle .unsafe ;
1919
20- import com .google .common .annotations .VisibleForTesting ;
20+ import java .io .File ;
21+ import java .io .IOException ;
22+ import java .util .Iterator ;
23+ import java .util .LinkedList ;
24+
25+ import org .apache .spark .storage .*;
26+ import scala .Tuple2 ;
27+
28+ import org .slf4j .Logger ;
29+ import org .slf4j .LoggerFactory ;
30+
2131import org .apache .spark .SparkConf ;
2232import org .apache .spark .TaskContext ;
2333import org .apache .spark .executor .ShuffleWriteMetrics ;
2434import org .apache .spark .serializer .SerializerInstance ;
2535import org .apache .spark .shuffle .ShuffleMemoryManager ;
26- import org .apache .spark .storage .BlockId ;
27- import org .apache .spark .storage .BlockManager ;
28- import org .apache .spark .storage .BlockObjectWriter ;
29- import org .apache .spark .storage .TempLocalBlockId ;
3036import org .apache .spark .unsafe .PlatformDependent ;
3137import org .apache .spark .unsafe .memory .MemoryBlock ;
3238import org .apache .spark .unsafe .memory .TaskMemoryManager ;
33- import org .slf4j .Logger ;
34- import org .slf4j .LoggerFactory ;
35- import scala .Tuple2 ;
36-
37- import java .io .File ;
38- import java .io .IOException ;
39- import java .util .Iterator ;
40- import java .util .LinkedList ;
4139
4240/**
43- * External sorter based on {@link UnsafeShuffleSorter}.
41+ * An external sorter that is specialized for sort-based shuffle.
42+ * <p>
43+ * Incoming records are appended to data pages. When all records have been inserted (or when the
44+ * current thread's shuffle memory limit is reached), the in-memory records are sorted according to
45+ * their partition ids (using a {@link UnsafeShuffleSorter}). The sorted records are then written
46+ * to a single output file (or multiple files, if we've spilled). The format of the output files is
47+ * the same as the format of the final output file written by
48+ * {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output partition's records are
49+ * written as a single serialized, compressed stream that can be read with a new decompression and
50+ * deserialization stream.
51+ * <p>
52+ * Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its
53+ * spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a
54+ * specialized merge procedure that avoids extra serialization/deserialization.
4455 */
4556public final class UnsafeShuffleSpillWriter {
4657
@@ -51,23 +62,31 @@ public final class UnsafeShuffleSpillWriter {
5162
5263 private final int initialSize ;
5364 private final int numPartitions ;
54- private UnsafeShuffleSorter sorter ;
55-
5665 private final TaskMemoryManager memoryManager ;
5766 private final ShuffleMemoryManager shuffleMemoryManager ;
5867 private final BlockManager blockManager ;
5968 private final TaskContext taskContext ;
60- private final LinkedList <MemoryBlock > allocatedPages = new LinkedList <MemoryBlock >();
6169 private final boolean spillingEnabled ;
62- private final int fileBufferSize ;
6370 private ShuffleWriteMetrics writeMetrics ;
6471
72+ /** The buffer size to use when writing spills using DiskBlockObjectWriter */
73+ private final int fileBufferSize ;
6574
66- private MemoryBlock currentPage = null ;
67- private long currentPagePosition = -1 ;
75+ /**
76+ * Memory pages that hold the records being sorted. The pages in this list are freed when
77+ * spilling, although in principle we could recycle these pages across spills (on the other hand,
78+ * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
79+ * itself).
80+ */
81+ private final LinkedList <MemoryBlock > allocatedPages = new LinkedList <MemoryBlock >();
6882
6983 private final LinkedList <SpillInfo > spills = new LinkedList <SpillInfo >();
7084
85+ // All three of these variables are reset after spilling:
86+ private UnsafeShuffleSorter sorter ;
87+ private MemoryBlock currentPage = null ;
88+ private long currentPagePosition = -1 ;
89+
7190 public UnsafeShuffleSpillWriter (
7291 TaskMemoryManager memoryManager ,
7392 ShuffleMemoryManager shuffleMemoryManager ,
@@ -90,6 +109,10 @@ public UnsafeShuffleSpillWriter(
90109
91110 // TODO: metrics tracking + integration with shuffle write metrics
92111
112+ /**
113+ * Allocates a new sorter. Called when opening the spill writer for the first time and after
114+ * each spill.
115+ */
93116 private void openSorter () throws IOException {
94117 this .writeMetrics = new ShuffleWriteMetrics ();
95118 // TODO: connect write metrics to task metrics?
@@ -106,22 +129,41 @@ private void openSorter() throws IOException {
106129 this .sorter = new UnsafeShuffleSorter (initialSize );
107130 }
108131
132+ /**
133+ * Sorts the in-memory records, writes the sorted records to a spill file, and frees the in-memory
134+ * data structures associated with this sort. New data structures are not automatically allocated.
135+ */
109136 private SpillInfo writeSpillFile () throws IOException {
110- final UnsafeShuffleSorter .UnsafeShuffleSorterIterator sortedRecords = sorter .getSortedIterator ();
137+ // This call performs the actual sort.
138+ final UnsafeShuffleSorter .UnsafeShuffleSorterIterator sortedRecords =
139+ sorter .getSortedIterator ();
111140
112- int currentPartition = -1 ;
141+ // Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this
142+ // after SPARK-5581 is fixed.
113143 BlockObjectWriter writer = null ;
144+
145+ // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
146+ // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
147+ // records in a byte array. This array only needs to be big enough to hold a single record.
114148 final byte [] arr = new byte [SER_BUFFER_SIZE ];
115149
116- final Tuple2 <TempLocalBlockId , File > spilledFileInfo =
117- blockManager .diskBlockManager ().createTempLocalBlock ();
150+ // Because this output will be read during shuffle, its compression codec must be controlled by
151+ // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
152+ // createTempShuffleBlock here; see SPARK-3426 for more details.
153+ final Tuple2 <TempShuffleBlockId , File > spilledFileInfo =
154+ blockManager .diskBlockManager ().createTempShuffleBlock ();
118155 final File file = spilledFileInfo ._2 ();
119156 final BlockId blockId = spilledFileInfo ._1 ();
120157 final SpillInfo spillInfo = new SpillInfo (numPartitions , file , blockId );
121158
159+ // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
160+ // Our write path doesn't actually use this serializer (since we end up calling the `write()`
161+ // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
162+ // around this, we pass a dummy no-op serializer.
122163 final SerializerInstance ser = new DummySerializerInstance ();
123164 writer = blockManager .getDiskWriter (blockId , file , ser , fileBufferSize , writeMetrics );
124165
166+ int currentPartition = -1 ;
125167 while (sortedRecords .hasNext ()) {
126168 sortedRecords .loadNext ();
127169 final int partition = sortedRecords .packedRecordPointer .getPartitionId ();
@@ -153,7 +195,9 @@ private SpillInfo writeSpillFile() throws IOException {
153195
154196 if (writer != null ) {
155197 writer .commitAndClose ();
156- // TODO: comment and explain why our handling of empty spills, etc.
198+ // If `writeSpillFile()` was called from `closeAndGetSpills()` and no records were inserted,
199+ // then the spill file might be empty. Note that it might be better to avoid calling
200+ // writeSpillFile() in that case.
157201 if (currentPartition != -1 ) {
158202 spillInfo .partitionLengths [currentPartition ] = writer .fileSegment ().length ();
159203 spills .add (spillInfo );
@@ -162,24 +206,30 @@ private SpillInfo writeSpillFile() throws IOException {
162206 return spillInfo ;
163207 }
164208
165- @ VisibleForTesting
166- public void spill () throws IOException {
167- final SpillInfo spillInfo = writeSpillFile ();
209+ /**
210+ * Sort and spill the current records in response to memory pressure.
211+ */
212+ private void spill () throws IOException {
213+ final long threadId = Thread .currentThread ().getId ();
214+ logger .info ("Thread " + threadId + " spilling sort data of " +
215+ org .apache .spark .util .Utils .bytesToString (getMemoryUsage ()) + " to disk (" +
216+ (spills .size () + (spills .size () > 1 ? " times" : " time" )) + " so far)" );
168217
218+ final SpillInfo spillInfo = writeSpillFile ();
169219 final long sorterMemoryUsage = sorter .getMemoryUsage ();
170220 sorter = null ;
171221 shuffleMemoryManager .release (sorterMemoryUsage );
172222 final long spillSize = freeMemory ();
173223 taskContext .taskMetrics ().incMemoryBytesSpilled (spillSize );
174224 taskContext .taskMetrics ().incDiskBytesSpilled (spillInfo .file .length ());
175- final long threadId = Thread .currentThread ().getId ();
176- // TODO: messy; log _before_ spill
177- logger .info ("Thread " + threadId + " spilling in-memory map of " +
178- org .apache .spark .util .Utils .bytesToString (spillSize ) + " to disk (" +
179- (spills .size () + ((spills .size () > 1 ) ? " times" : " time" )) + " so far)" );
225+
180226 openSorter ();
181227 }
182228
229+ private long getMemoryUsage () {
230+ return sorter .getMemoryUsage () + (allocatedPages .size () * PAGE_SIZE );
231+ }
232+
183233 private long freeMemory () {
184234 long memoryFreed = 0 ;
185235 final Iterator <MemoryBlock > iter = allocatedPages .iterator ();
@@ -194,7 +244,15 @@ private long freeMemory() {
194244 return memoryFreed ;
195245 }
196246
197- private void ensureSpaceInDataPage (int requiredSpace ) throws Exception {
247+ /**
248+ * Checks whether there is enough space to insert a new record into the sorter. If there is
249+ * insufficient space, either allocate more memory or spill the current sort data (if spilling
250+ * is enabled), then insert the record.
251+ */
252+ private void ensureSpaceInDataPage (int requiredSpace ) throws IOException {
253+ // TODO: we should re-order the `if` cases in this function so that the most common case (there
254+ // is enough space) appears first.
255+
198256 // TODO: merge these steps to first calculate total memory requirements for this insert,
199257 // then try to acquire; no point in acquiring sort buffer only to spill due to no space in the
200258 // data page.
@@ -219,7 +277,7 @@ private void ensureSpaceInDataPage(int requiredSpace) throws Exception {
219277 }
220278 if (requiredSpace > PAGE_SIZE ) {
221279 // TODO: throw a more specific exception?
222- throw new Exception ("Required space " + requiredSpace + " is greater than page size (" +
280+ throw new IOException ("Required space " + requiredSpace + " is greater than page size (" +
223281 PAGE_SIZE + ")" );
224282 } else if (requiredSpace > spaceInCurrentPage ) {
225283 if (spillingEnabled ) {
@@ -230,7 +288,7 @@ private void ensureSpaceInDataPage(int requiredSpace) throws Exception {
230288 final long memoryAcquiredAfterSpill = shuffleMemoryManager .tryToAcquire (PAGE_SIZE );
231289 if (memoryAcquiredAfterSpill != PAGE_SIZE ) {
232290 shuffleMemoryManager .release (memoryAcquiredAfterSpill );
233- throw new Exception ("Can't allocate memory!" );
291+ throw new IOException ("Can't allocate memory!" );
234292 }
235293 }
236294 }
@@ -241,11 +299,14 @@ private void ensureSpaceInDataPage(int requiredSpace) throws Exception {
241299 }
242300 }
243301
302+ /**
303+ * Write a record to the shuffle sorter.
304+ */
244305 public void insertRecord (
245306 Object recordBaseObject ,
246307 long recordBaseOffset ,
247308 int lengthInBytes ,
248- int prefix ) throws Exception {
309+ int partitionId ) throws IOException {
249310 // Need 4 bytes to store the record length.
250311 ensureSpaceInDataPage (lengthInBytes + 4 );
251312
@@ -262,12 +323,20 @@ public void insertRecord(
262323 lengthInBytes );
263324 currentPagePosition += lengthInBytes ;
264325
265- sorter .insertRecord (recordAddress , prefix );
326+ sorter .insertRecord (recordAddress , partitionId );
266327 }
267328
329+ /**
330+ * Close the sorter, causing any buffered data to be sorted and written out to disk.
331+ *
332+ * @return metadata for the spill files written by this sorter. If no records were ever inserted
333+ * into this sorter, then this will return an empty array.
334+ * @throws IOException
335+ */
268336 public SpillInfo [] closeAndGetSpills () throws IOException {
269337 if (sorter != null ) {
270338 writeSpillFile ();
339+ freeMemory ();
271340 }
272341 return spills .toArray (new SpillInfo [0 ]);
273342 }
0 commit comments