1717
1818package org .apache .spark .util .collection .unsafe .sort ;
1919
20+ import java .io .IOException ;
21+ import java .util .LinkedList ;
22+
2023import com .google .common .annotations .VisibleForTesting ;
24+ import org .slf4j .Logger ;
25+ import org .slf4j .LoggerFactory ;
26+
2127import org .apache .spark .SparkConf ;
2228import org .apache .spark .TaskContext ;
2329import org .apache .spark .executor .ShuffleWriteMetrics ;
2733import org .apache .spark .unsafe .memory .MemoryBlock ;
2834import org .apache .spark .unsafe .memory .TaskMemoryManager ;
2935import org .apache .spark .util .Utils ;
30- import org .slf4j .Logger ;
31- import org .slf4j .LoggerFactory ;
32-
33- import java .io .IOException ;
34- import java .util .Iterator ;
35- import java .util .LinkedList ;
3636
3737/**
3838 * External sorter based on {@link UnsafeInMemorySorter}.
@@ -42,28 +42,36 @@ public final class UnsafeExternalSorter {
4242 private final Logger logger = LoggerFactory .getLogger (UnsafeExternalSorter .class );
4343
4444 private static final int PAGE_SIZE = 1 << 27 ; // 128 megabytes
45+ @ VisibleForTesting
46+ static final int MAX_RECORD_SIZE = PAGE_SIZE - 4 ;
4547
4648 private final PrefixComparator prefixComparator ;
4749 private final RecordComparator recordComparator ;
4850 private final int initialSize ;
49- private int numSpills = 0 ;
50- private UnsafeInMemorySorter sorter ;
51-
5251 private final TaskMemoryManager memoryManager ;
5352 private final ShuffleMemoryManager shuffleMemoryManager ;
5453 private final BlockManager blockManager ;
5554 private final TaskContext taskContext ;
56- private final LinkedList <MemoryBlock > allocatedPages = new LinkedList <MemoryBlock >();
57- private final boolean spillingEnabled ;
58- private final int fileBufferSize ;
5955 private ShuffleWriteMetrics writeMetrics ;
6056
57+ /** The buffer size to use when writing spills using DiskBlockObjectWriter */
58+ private final int fileBufferSizeBytes ;
59+
60+ /**
61+ * Memory pages that hold the records being sorted. The pages in this list are freed when
62+ * spilling, although in principle we could recycle these pages across spills (on the other hand,
63+ * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
64+ * itself).
65+ */
66+ private final LinkedList <MemoryBlock > allocatedPages = new LinkedList <MemoryBlock >();
6167
68+ // These variables are reset after spilling:
69+ private UnsafeInMemorySorter sorter ;
6270 private MemoryBlock currentPage = null ;
6371 private long currentPagePosition = -1 ;
72+ private long freeSpaceInCurrentPage = 0 ;
6473
65- private final LinkedList <UnsafeSorterSpillWriter > spillWriters =
66- new LinkedList <UnsafeSorterSpillWriter >();
74+ private final LinkedList <UnsafeSorterSpillWriter > spillWriters = new LinkedList <>();
6775
6876 public UnsafeExternalSorter (
6977 TaskMemoryManager memoryManager ,
@@ -81,41 +89,44 @@ public UnsafeExternalSorter(
8189 this .recordComparator = recordComparator ;
8290 this .prefixComparator = prefixComparator ;
8391 this .initialSize = initialSize ;
84- this .spillingEnabled = conf .getBoolean ("spark.shuffle.spill" , true );
8592 // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
86- this .fileBufferSize = (int ) conf .getSizeAsKb ("spark.shuffle.file.buffer" , "32k" ) * 1024 ;
87- openSorter ();
93+ this .fileBufferSizeBytes = (int ) conf .getSizeAsKb ("spark.shuffle.file.buffer" , "32k" ) * 1024 ;
94+ initializeForWriting ();
8895 }
8996
9097 // TODO: metrics tracking + integration with shuffle write metrics
98+ // need to connect the write metrics to task metrics so we count the spill IO somewhere.
9199
92- private void openSorter () throws IOException {
100+ /**
101+ * Allocates new sort data structures. Called when creating the sorter and after each spill.
102+ */
103+ private void initializeForWriting () throws IOException {
93104 this .writeMetrics = new ShuffleWriteMetrics ();
94- // TODO: connect write metrics to task metrics?
95105 // TODO: move this sizing calculation logic into a static method of sorter:
96106 final long memoryRequested = initialSize * 8L * 2 ;
97- if (spillingEnabled ) {
98- final long memoryAcquired = shuffleMemoryManager .tryToAcquire (memoryRequested );
99- if (memoryAcquired != memoryRequested ) {
100- shuffleMemoryManager .release (memoryAcquired );
101- throw new IOException ("Could not acquire memory!" );
102- }
107+ final long memoryAcquired = shuffleMemoryManager .tryToAcquire (memoryRequested );
108+ if (memoryAcquired != memoryRequested ) {
109+ shuffleMemoryManager .release (memoryAcquired );
110+ throw new IOException ("Could not acquire " + memoryRequested + " bytes of memory" );
103111 }
104112
105113 this .sorter =
106114 new UnsafeInMemorySorter (memoryManager , recordComparator , prefixComparator , initialSize );
107115 }
108116
117+ /**
118+ * Sort and spill the current records in response to memory pressure.
119+ */
109120 @ VisibleForTesting
110- public void spill () throws IOException {
121+ void spill () throws IOException {
111122 logger .info ("Thread {} spilling sort data of {} to disk ({} {} so far)" ,
112123 Thread .currentThread ().getId (),
113124 Utils .bytesToString (getMemoryUsage ()),
114- numSpills ,
115- numSpills > 1 ? " times" : " time" );
125+ spillWriters . size () ,
126+ spillWriters . size () > 1 ? " times" : " time" );
116127
117128 final UnsafeSorterSpillWriter spillWriter =
118- new UnsafeSorterSpillWriter (blockManager , fileBufferSize , writeMetrics );
129+ new UnsafeSorterSpillWriter (blockManager , fileBufferSizeBytes , writeMetrics );
119130 spillWriters .add (spillWriter );
120131 final UnsafeSorterIterator sortedRecords = sorter .getSortedIterator ();
121132 while (sortedRecords .hasNext ()) {
@@ -134,9 +145,7 @@ public void spill() throws IOException {
134145 shuffleMemoryManager .release (sorterMemoryUsage );
135146 final long spillSize = freeMemory ();
136147 taskContext .taskMetrics ().incMemoryBytesSpilled (spillSize );
137- taskContext .taskMetrics ().incDiskBytesSpilled (spillWriter .numberOfSpilledBytes ());
138- numSpills ++;
139- openSorter ();
148+ initializeForWriting ();
140149 }
141150
142151 private long getMemoryUsage () {
@@ -145,72 +154,98 @@ private long getMemoryUsage() {
145154
146155 public long freeMemory () {
147156 long memoryFreed = 0 ;
148- final Iterator <MemoryBlock > iter = allocatedPages .iterator ();
149- while (iter .hasNext ()) {
150- memoryManager .freePage (iter .next ());
151- shuffleMemoryManager .release (PAGE_SIZE );
152- memoryFreed += PAGE_SIZE ;
153- iter .remove ();
157+ for (MemoryBlock block : allocatedPages ) {
158+ memoryManager .freePage (block );
159+ shuffleMemoryManager .release (block .size ());
160+ memoryFreed += block .size ();
154161 }
162+ allocatedPages .clear ();
155163 currentPage = null ;
156164 currentPagePosition = -1 ;
165+ freeSpaceInCurrentPage = 0 ;
157166 return memoryFreed ;
158167 }
159168
160- private void ensureSpaceInDataPage (int requiredSpace ) throws IOException {
169+ /**
170+ * Checks whether there is enough space to insert a new record into the sorter.
171+ *
172+ * @param requiredSpace the required space in the data page, in bytes, including space for storing
173+ * the record size.
174+
175+ * @return true if the record can be inserted without requiring more allocations, false otherwise.
176+ */
177+ private boolean haveSpaceForRecord (int requiredSpace ) {
178+ assert (requiredSpace > 0 );
179+ return (sorter .hasSpaceForAnotherRecord () && (requiredSpace <= freeSpaceInCurrentPage ));
180+ }
181+
182+ /**
183+ * Allocates more memory in order to insert an additional record. This will request additional
184+ * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
185+ * obtained.
186+ *
187+ * @param requiredSpace the required space in the data page, in bytes, including space for storing
188+ * the record size.
189+ */
190+ private void allocateSpaceForRecord (int requiredSpace ) throws IOException {
161191 // TODO: merge these steps to first calculate total memory requirements for this insert,
162192 // then try to acquire; no point in acquiring sort buffer only to spill due to no space in the
163193 // data page.
164- if (!sorter .hasSpaceForAnotherRecord () && spillingEnabled ) {
165- final long oldSortBufferMemoryUsage = sorter .getMemoryUsage ();
166- final long memoryToGrowSortBuffer = oldSortBufferMemoryUsage * 2 ;
167- final long memoryAcquired = shuffleMemoryManager .tryToAcquire (memoryToGrowSortBuffer );
168- if (memoryAcquired < memoryToGrowSortBuffer ) {
194+ if (!sorter .hasSpaceForAnotherRecord ()) {
195+ logger .debug ("Attempting to expand sort pointer array" );
196+ final long oldPointerArrayMemoryUsage = sorter .getMemoryUsage ();
197+ final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2 ;
198+ final long memoryAcquired = shuffleMemoryManager .tryToAcquire (memoryToGrowPointerArray );
199+ if (memoryAcquired < memoryToGrowPointerArray ) {
169200 shuffleMemoryManager .release (memoryAcquired );
170201 spill ();
171202 } else {
172- sorter .expandSortBuffer ();
173- shuffleMemoryManager .release (oldSortBufferMemoryUsage );
203+ sorter .expandPointerArray ();
204+ shuffleMemoryManager .release (oldPointerArrayMemoryUsage );
174205 }
175206 }
176207
177- final long spaceInCurrentPage ;
178- if (currentPage != null ) {
179- spaceInCurrentPage = PAGE_SIZE - (currentPagePosition - currentPage .getBaseOffset ());
180- } else {
181- spaceInCurrentPage = 0 ;
182- }
183- if (requiredSpace > PAGE_SIZE ) {
184- // TODO: throw a more specific exception?
185- throw new IOException ("Required space " + requiredSpace + " is greater than page size (" +
186- PAGE_SIZE + ")" );
187- } else if (requiredSpace > spaceInCurrentPage ) {
188- if (spillingEnabled ) {
208+ if (requiredSpace > freeSpaceInCurrentPage ) {
209+ logger .trace ("Required space {} is less than free space in current page ({})" , requiredSpace ,
210+ freeSpaceInCurrentPage );
211+ // TODO: we should track metrics on the amount of space wasted when we roll over to a new page
212+ // without using the free space at the end of the current page. We should also do this for
213+ // BytesToBytesMap.
214+ if (requiredSpace > PAGE_SIZE ) {
215+ throw new IOException ("Required space " + requiredSpace + " is greater than page size (" +
216+ PAGE_SIZE + ")" );
217+ } else {
189218 final long memoryAcquired = shuffleMemoryManager .tryToAcquire (PAGE_SIZE );
190219 if (memoryAcquired < PAGE_SIZE ) {
191220 shuffleMemoryManager .release (memoryAcquired );
192221 spill ();
193- final long memoryAcquiredAfterSpill = shuffleMemoryManager .tryToAcquire (PAGE_SIZE );
194- if (memoryAcquiredAfterSpill != PAGE_SIZE ) {
195- shuffleMemoryManager .release (memoryAcquiredAfterSpill );
196- throw new IOException ("Can't allocate memory! " );
222+ final long memoryAcquiredAfterSpilling = shuffleMemoryManager .tryToAcquire (PAGE_SIZE );
223+ if (memoryAcquiredAfterSpilling != PAGE_SIZE ) {
224+ shuffleMemoryManager .release (memoryAcquiredAfterSpilling );
225+ throw new IOException ("Unable to acquire " + PAGE_SIZE + " bytes of memory" );
197226 }
198227 }
228+ currentPage = memoryManager .allocatePage (PAGE_SIZE );
229+ currentPagePosition = currentPage .getBaseOffset ();
230+ freeSpaceInCurrentPage = PAGE_SIZE ;
231+ allocatedPages .add (currentPage );
199232 }
200- currentPage = memoryManager .allocatePage (PAGE_SIZE );
201- currentPagePosition = currentPage .getBaseOffset ();
202- allocatedPages .add (currentPage );
203- logger .info ("Acquired new page! " + allocatedPages .size () * PAGE_SIZE );
204233 }
205234 }
206235
236+ /**
237+ * Write a record to the sorter.
238+ */
207239 public void insertRecord (
208240 Object recordBaseObject ,
209241 long recordBaseOffset ,
210242 int lengthInBytes ,
211243 long prefix ) throws IOException {
212244 // Need 4 bytes to store the record length.
213- ensureSpaceInDataPage (lengthInBytes + 4 );
245+ final int totalSpaceRequired = lengthInBytes + 4 ;
246+ if (!haveSpaceForRecord (totalSpaceRequired )) {
247+ allocateSpaceForRecord (totalSpaceRequired );
248+ }
214249
215250 final long recordAddress =
216251 memoryManager .encodePageNumberAndOffset (currentPage , currentPagePosition );
0 commit comments