3636import org .apache .spark .unsafe .PlatformDependent ;
3737import org .apache .spark .unsafe .memory .MemoryBlock ;
3838import org .apache .spark .unsafe .memory .TaskMemoryManager ;
39+ import org .apache .spark .util .Utils ;
3940
4041/**
4142 * An external sorter that is specialized for sort-based shuffle.
@@ -85,7 +86,7 @@ final class UnsafeShuffleExternalSorter {
8586
8687 private final LinkedList <SpillInfo > spills = new LinkedList <SpillInfo >();
8788
88- // All three of these variables are reset after spilling:
89+ // These variables are reset after spilling:
8990 private UnsafeShuffleInMemorySorter sorter ;
9091 private MemoryBlock currentPage = null ;
9192 private long currentPagePosition = -1 ;
@@ -110,46 +111,46 @@ public UnsafeShuffleExternalSorter(
110111 // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
111112 this .fileBufferSize = (int ) conf .getSizeAsKb ("spark.shuffle.file.buffer" , "32k" ) * 1024 ;
112113 this .writeMetrics = writeMetrics ;
113- openSorter ();
114+ initializeForWriting ();
114115 }
115116
116117 /**
117- * Allocates a new sorter. Called when opening the spill writer for the first time and after
118- * each spill.
118+ * Allocates new sort data structures. Called when creating the sorter and after each spill.
119119 */
120- private void openSorter () throws IOException {
120+ private void initializeForWriting () throws IOException {
121121 // TODO: move this sizing calculation logic into a static method of sorter:
122122 final long memoryRequested = initialSize * 8L ;
123123 if (spillingEnabled ) {
124124 final long memoryAcquired = shuffleMemoryManager .tryToAcquire (memoryRequested );
125125 if (memoryAcquired != memoryRequested ) {
126126 shuffleMemoryManager .release (memoryAcquired );
127- throw new IOException ("Could not acquire memory! " );
127+ throw new IOException ("Could not acquire " + memoryRequested + " bytes of memory" );
128128 }
129129 }
130130
131131 this .sorter = new UnsafeShuffleInMemorySorter (initialSize );
132132 }
133133
134134 /**
135- * Sorts the in-memory records and writes the sorted records to a spill file.
135+ * Sorts the in-memory records and writes the sorted records to an on-disk file.
136136 * This method does not free the sort data structures.
137137 *
138- * @param isSpill if true, this indicates that we're writing a spill and that bytes written should
139- * be counted towards shuffle spill metrics rather than shuffle write metrics.
138+ * @param isLastFile if true, this indicates that we're writing the final output file and that the
139+ * bytes written should be counted towards shuffle spill metrics rather than
140+ * shuffle write metrics.
140141 */
141- private void writeSpillFile (boolean isSpill ) throws IOException {
142+ private void writeSortedFile (boolean isLastFile ) throws IOException {
142143
143144 final ShuffleWriteMetrics writeMetricsToUse ;
144145
145- if (isSpill ) {
146+ if (isLastFile ) {
147+ // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
148+ writeMetricsToUse = writeMetrics ;
149+ } else {
146150 // We're spilling, so bytes written should be counted towards spill rather than write.
147151 // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
148152 // them towards shuffle bytes written.
149153 writeMetricsToUse = new ShuffleWriteMetrics ();
150- } else {
151- // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
152- writeMetricsToUse = writeMetrics ;
153154 }
154155
155156 // This call performs the actual sort.
@@ -221,16 +222,16 @@ private void writeSpillFile(boolean isSpill) throws IOException {
221222
222223 if (writer != null ) {
223224 writer .commitAndClose ();
224- // If `writeSpillFile ()` was called from `closeAndGetSpills()` and no records were inserted,
225- // then the spill file might be empty. Note that it might be better to avoid calling
226- // writeSpillFile () in that case.
225+ // If `writeSortedFile ()` was called from `closeAndGetSpills()` and no records were inserted,
226+ // then the file might be empty. Note that it might be better to avoid calling
227+ // writeSortedFile () in that case.
227228 if (currentPartition != -1 ) {
228229 spillInfo .partitionLengths [currentPartition ] = writer .fileSegment ().length ();
229230 spills .add (spillInfo );
230231 }
231232 }
232233
233- if (isSpill ) {
234+ if (! isLastFile ) {
234235 writeMetrics .incShuffleRecordsWritten (writeMetricsToUse .shuffleRecordsWritten ());
235236 // Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
236237 // This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
@@ -244,19 +245,20 @@ private void writeSpillFile(boolean isSpill) throws IOException {
244245 */
245246 @ VisibleForTesting
246247 void spill () throws IOException {
247- final long threadId = Thread .currentThread ().getId ();
248- logger .info ("Thread " + threadId + " spilling sort data of " +
249- org .apache .spark .util .Utils .bytesToString (getMemoryUsage ()) + " to disk (" +
250- (spills .size () + (spills .size () > 1 ? " times" : " time" )) + " so far)" );
248+ logger .info ("Thread {} spilling sort data of {} to disk ({} {} so far)" ,
249+ Thread .currentThread ().getId (),
250+ Utils .bytesToString (getMemoryUsage ()),
251+ spills .size (),
252+ spills .size () > 1 ? " times" : " time" );
251253
252- writeSpillFile ( true );
254+ writeSortedFile ( false );
253255 final long sorterMemoryUsage = sorter .getMemoryUsage ();
254256 sorter = null ;
255257 shuffleMemoryManager .release (sorterMemoryUsage );
256258 final long spillSize = freeMemory ();
257259 taskContext .taskMetrics ().incMemoryBytesSpilled (spillSize );
258260
259- openSorter ();
261+ initializeForWriting ();
260262 }
261263
262264 private long getMemoryUsage () {
@@ -405,7 +407,7 @@ public SpillInfo[] closeAndGetSpills() throws IOException {
405407 try {
406408 if (sorter != null ) {
407409 // Do not count the final file towards the spill count.
408- writeSpillFile ( false );
410+ writeSortedFile ( true );
409411 freeMemory ();
410412 }
411413 return spills .toArray (new SpillInfo [spills .size ()]);
0 commit comments