1919
2020import com .google .common .annotations .VisibleForTesting ;
2121import org .apache .spark .SparkConf ;
22+ import org .apache .spark .TaskContext ;
2223import org .apache .spark .executor .ShuffleWriteMetrics ;
2324import org .apache .spark .shuffle .ShuffleMemoryManager ;
2425import org .apache .spark .storage .BlockManager ;
2526import org .apache .spark .unsafe .PlatformDependent ;
2627import org .apache .spark .unsafe .memory .MemoryBlock ;
2728import org .apache .spark .unsafe .memory .TaskMemoryManager ;
29+ import org .slf4j .Logger ;
30+ import org .slf4j .LoggerFactory ;
2831
2932import java .io .IOException ;
3033import java .util .Iterator ;
3740 */
3841public final class UnsafeExternalSorter {
3942
43+ private final Logger logger = LoggerFactory .getLogger (UnsafeExternalSorter .class );
44+
4045 private static final int PAGE_SIZE = 1024 * 1024 ; // TODO: tune this
4146
4247 private final PrefixComparator prefixComparator ;
4348 private final RecordComparator recordComparator ;
4449 private final int initialSize ;
50+ private int numSpills = 0 ;
4551 private UnsafeSorter sorter ;
4652
4753 private final TaskMemoryManager memoryManager ;
4854 private final ShuffleMemoryManager shuffleMemoryManager ;
4955 private final BlockManager blockManager ;
56+ private final TaskContext taskContext ;
5057 private final LinkedList <MemoryBlock > allocatedPages = new LinkedList <MemoryBlock >();
5158 private final boolean spillingEnabled ;
5259 private final int fileBufferSize ;
@@ -63,13 +70,15 @@ public UnsafeExternalSorter(
6370 TaskMemoryManager memoryManager ,
6471 ShuffleMemoryManager shuffleMemoryManager ,
6572 BlockManager blockManager ,
73+ TaskContext taskContext ,
6674 RecordComparator recordComparator ,
6775 PrefixComparator prefixComparator ,
6876 int initialSize ,
69- SparkConf conf ) {
77+ SparkConf conf ) throws IOException {
7078 this .memoryManager = memoryManager ;
7179 this .shuffleMemoryManager = shuffleMemoryManager ;
7280 this .blockManager = blockManager ;
81+ this .taskContext = taskContext ;
7382 this .recordComparator = recordComparator ;
7483 this .prefixComparator = prefixComparator ;
7584 this .initialSize = initialSize ;
@@ -81,9 +90,19 @@ public UnsafeExternalSorter(
8190
8291 // TODO: metrics tracking + integration with shuffle write metrics
8392
84- private void openSorter () {
93+ private void openSorter () throws IOException {
8594 this .writeMetrics = new ShuffleWriteMetrics ();
8695 // TODO: connect write metrics to task metrics?
96+ // TODO: move this sizing calculation logic into a static method of sorter:
97+ final long memoryRequested = initialSize * 8L * 2 ;
98+ if (spillingEnabled ) {
99+ final long memoryAcquired = shuffleMemoryManager .tryToAcquire (memoryRequested );
100+ if (memoryAcquired != memoryRequested ) {
101+ shuffleMemoryManager .release (memoryAcquired );
102+ throw new IOException ("Could not acquire memory!" );
103+ }
104+ }
105+
87106 this .sorter = new UnsafeSorter (memoryManager , recordComparator , prefixComparator , initialSize );
88107 }
89108
@@ -101,23 +120,52 @@ public void spill() throws IOException {
101120 spillWriter .write (baseObject , baseOffset , recordLength , recordPointer .keyPrefix );
102121 }
103122 spillWriter .close ();
123+ final long sorterMemoryUsage = sorter .getMemoryUsage ();
104124 sorter = null ;
105- freeMemory ();
125+ shuffleMemoryManager .release (sorterMemoryUsage );
126+ final long spillSize = freeMemory ();
127+ taskContext .taskMetrics ().incMemoryBytesSpilled (spillSize );
128+ taskContext .taskMetrics ().incDiskBytesSpilled (spillWriter .numberOfSpilledBytes ());
129+ numSpills ++;
130+ final long threadId = Thread .currentThread ().getId ();
131+ // TODO: messy; log _before_ spill
132+ logger .info ("Thread " + threadId + " spilling in-memory map of " +
133+ org .apache .spark .util .Utils .bytesToString (spillSize ) + " to disk (" +
134+ (numSpills + ((numSpills > 1 ) ? " times" : " time" )) + " so far)" );
106135 openSorter ();
107136 }
108137
109- private void freeMemory () {
138+ private long freeMemory () {
139+ long memoryFreed = 0 ;
110140 final Iterator <MemoryBlock > iter = allocatedPages .iterator ();
111141 while (iter .hasNext ()) {
112142 memoryManager .freePage (iter .next ());
113143 shuffleMemoryManager .release (PAGE_SIZE );
144+ memoryFreed += PAGE_SIZE ;
114145 iter .remove ();
115146 }
116147 currentPage = null ;
117148 currentPagePosition = -1 ;
149+ return memoryFreed ;
118150 }
119151
120152 private void ensureSpaceInDataPage (int requiredSpace ) throws Exception {
153+ // TODO: merge these steps to first calculate total memory requirements for this insert,
154+ // then try to acquire; no point in acquiring sort buffer only to spill due to no space in the
155+ // data page.
156+ if (!sorter .hasSpaceForAnotherRecord () && spillingEnabled ) {
157+ final long oldSortBufferMemoryUsage = sorter .getMemoryUsage ();
158+ final long memoryToGrowSortBuffer = oldSortBufferMemoryUsage * 2 ;
159+ final long memoryAcquired = shuffleMemoryManager .tryToAcquire (memoryToGrowSortBuffer );
160+ if (memoryAcquired < memoryToGrowSortBuffer ) {
161+ shuffleMemoryManager .release (memoryAcquired );
162+ spill ();
163+ } else {
164+ sorter .expandSortBuffer ();
165+ shuffleMemoryManager .release (oldSortBufferMemoryUsage );
166+ }
167+ }
168+
121169 final long spaceInCurrentPage ;
122170 if (currentPage != null ) {
123171 spaceInCurrentPage = PAGE_SIZE - (currentPagePosition - currentPage .getBaseOffset ());
@@ -129,12 +177,22 @@ private void ensureSpaceInDataPage(int requiredSpace) throws Exception {
129177 throw new Exception ("Required space " + requiredSpace + " is greater than page size (" +
130178 PAGE_SIZE + ")" );
131179 } else if (requiredSpace > spaceInCurrentPage ) {
132- if (spillingEnabled && shuffleMemoryManager .tryToAcquire (PAGE_SIZE ) < PAGE_SIZE ) {
133- spill ();
180+ if (spillingEnabled ) {
181+ final long memoryAcquired = shuffleMemoryManager .tryToAcquire (PAGE_SIZE );
182+ if (memoryAcquired < PAGE_SIZE ) {
183+ shuffleMemoryManager .release (memoryAcquired );
184+ spill ();
185+ final long memoryAcquiredAfterSpill = shuffleMemoryManager .tryToAcquire (PAGE_SIZE );
186+ if (memoryAcquiredAfterSpill != PAGE_SIZE ) {
187+ shuffleMemoryManager .release (memoryAcquiredAfterSpill );
188+ throw new Exception ("Can't allocate memory!" );
189+ }
190+ }
134191 }
135192 currentPage = memoryManager .allocatePage (PAGE_SIZE );
136193 currentPagePosition = currentPage .getBaseOffset ();
137194 allocatedPages .add (currentPage );
195+ logger .info ("Acquired new page! " + allocatedPages .size () * PAGE_SIZE );
138196 }
139197 }
140198
@@ -162,9 +220,9 @@ public void insertRecord(
162220 sorter .insertRecord (recordAddress , prefix );
163221 }
164222
165- public Iterator < UnsafeExternalSortSpillMerger . RecordAddressAndKeyPrefix > getSortedIterator () throws IOException {
166- final UnsafeExternalSortSpillMerger spillMerger =
167- new UnsafeExternalSortSpillMerger (recordComparator , prefixComparator );
223+ public ExternalSorterIterator getSortedIterator () throws IOException {
224+ final UnsafeSorterSpillMerger spillMerger =
225+ new UnsafeSorterSpillMerger (recordComparator , prefixComparator );
168226 for (UnsafeSorterSpillWriter spillWriter : spillWriters ) {
169227 spillMerger .addSpill (spillWriter .getReader (blockManager ));
170228 }
0 commit comments