@@ -84,6 +84,7 @@ public final class UnsafeShuffleExternalSorter {
8484 private UnsafeShuffleSorter sorter ;
8585 private MemoryBlock currentPage = null ;
8686 private long currentPagePosition = -1 ;
87+ private long freeSpaceInCurrentPage = 0 ;
8788
8889 public UnsafeShuffleExternalSorter (
8990 TaskMemoryManager memoryManager ,
@@ -245,22 +246,40 @@ private long freeMemory() {
245246 allocatedPages .clear ();
246247 currentPage = null ;
247248 currentPagePosition = -1 ;
249+ freeSpaceInCurrentPage = 0 ;
248250 return memoryFreed ;
249251 }
250252
251253 /**
252- * Checks whether there is enough space to insert a new record into the sorter. If there is
253- * insufficient space, either allocate more memory or spill the current sort data (if spilling
254- * is enabled), then insert the record.
254+ * Checks whether there is enough space to insert a new record into the sorter.
255+ *
256+ * @param requiredSpace the required space in the data page, in bytes, including space for storing
257+ * the record size.
258+
259+ * @return true if the record can be inserted without requiring more allocations, false otherwise.
260+ */
261+ private boolean haveSpaceForRecord (int requiredSpace ) {
262+ logger .warn ("Seeing if there's space for the record" );
263+ assert (requiredSpace > 0 );
264+ // The sort array will automatically expand when inserting a new record, so we only need to
265+ // worry about it having free space when spilling is enabled.
266+ final boolean sortBufferHasSpace = !spillingEnabled || sorter .hasSpaceForAnotherRecord ();
267+ final boolean dataPageHasSpace = requiredSpace <= freeSpaceInCurrentPage ;
268+ return (sortBufferHasSpace && dataPageHasSpace );
269+ }
270+
271+ /**
272+ * Allocates more memory in order to insert an additional record. If spilling is enabled, this
273+ * will request additional memory from the {@link ShuffleMemoryManager} and spill if the requested
274+ * memory can not be obtained. If spilling is disabled, then this will allocate memory without
275+ * coordinating with the ShuffleMemoryManager.
276+ *
277+ * @param requiredSpace the required space in the data page, in bytes, including space for storing
278+ * the record size.
255279 */
256- private void ensureSpaceInDataPage (int requiredSpace ) throws IOException {
257- // TODO: we should re-order the `if` cases in this function so that the most common case (there
258- // is enough space) appears first.
259-
260- // TODO: merge these steps to first calculate total memory requirements for this insert,
261- // then try to acquire; no point in acquiring sort buffer only to spill due to no space in the
262- // data page.
263- if (!sorter .hasSpaceForAnotherRecord () && spillingEnabled ) {
280+ private void allocateSpaceForRecord (int requiredSpace ) throws IOException {
281+ if (spillingEnabled && !sorter .hasSpaceForAnotherRecord ()) {
282+ logger .debug ("Attempting to expand sort buffer" );
264283 final long oldSortBufferMemoryUsage = sorter .getMemoryUsage ();
265284 final long memoryToGrowSortBuffer = oldSortBufferMemoryUsage * 2 ;
266285 final long memoryAcquired = shuffleMemoryManager .tryToAcquire (memoryToGrowSortBuffer );
@@ -272,33 +291,33 @@ private void ensureSpaceInDataPage(int requiredSpace) throws IOException {
272291 shuffleMemoryManager .release (oldSortBufferMemoryUsage );
273292 }
274293 }
275-
276- final long spaceInCurrentPage ;
277- if (currentPage != null ) {
278- spaceInCurrentPage = PAGE_SIZE - (currentPagePosition - currentPage .getBaseOffset ());
279- } else {
280- spaceInCurrentPage = 0 ;
281- }
282- if (requiredSpace > PAGE_SIZE ) {
283- // TODO: throw a more specific exception?
284- throw new IOException ("Required space " + requiredSpace + " is greater than page size (" +
285- PAGE_SIZE + ")" );
286- } else if (requiredSpace > spaceInCurrentPage ) {
287- if (spillingEnabled ) {
288- final long memoryAcquired = shuffleMemoryManager .tryToAcquire (PAGE_SIZE );
289- if (memoryAcquired < PAGE_SIZE ) {
290- shuffleMemoryManager .release (memoryAcquired );
291- spill ();
292- final long memoryAcquiredAfterSpill = shuffleMemoryManager .tryToAcquire (PAGE_SIZE );
293- if (memoryAcquiredAfterSpill != PAGE_SIZE ) {
294- shuffleMemoryManager .release (memoryAcquiredAfterSpill );
295- throw new IOException ("Can't allocate memory!" );
294+ if (requiredSpace > freeSpaceInCurrentPage ) {
295+ logger .debug ("Required space {} is less than free space in current page ({}}" , requiredSpace ,
296+ freeSpaceInCurrentPage );
297+ // TODO: we should track metrics on the amount of space wasted when we roll over to a new page
298+ // without using the free space at the end of the current page. We should also do this for
299+ // BytesToBytesMap.
300+ if (requiredSpace > PAGE_SIZE ) {
301+ throw new IOException ("Required space " + requiredSpace + " is greater than page size (" +
302+ PAGE_SIZE + ")" );
303+ } else {
304+ if (spillingEnabled ) {
305+ final long memoryAcquired = shuffleMemoryManager .tryToAcquire (PAGE_SIZE );
306+ if (memoryAcquired < PAGE_SIZE ) {
307+ shuffleMemoryManager .release (memoryAcquired );
308+ spill ();
309+ final long memoryAcquiredAfterSpilling = shuffleMemoryManager .tryToAcquire (PAGE_SIZE );
310+ if (memoryAcquiredAfterSpilling != PAGE_SIZE ) {
311+ shuffleMemoryManager .release (memoryAcquiredAfterSpilling );
312+ throw new IOException ("Can't allocate memory!" );
313+ }
296314 }
297315 }
316+ currentPage = memoryManager .allocatePage (PAGE_SIZE );
317+ currentPagePosition = currentPage .getBaseOffset ();
318+ freeSpaceInCurrentPage = PAGE_SIZE ;
319+ allocatedPages .add (currentPage );
298320 }
299- currentPage = memoryManager .allocatePage (PAGE_SIZE );
300- currentPagePosition = currentPage .getBaseOffset ();
301- allocatedPages .add (currentPage );
302321 }
303322 }
304323
@@ -311,21 +330,25 @@ public void insertRecord(
311330 int lengthInBytes ,
312331 int partitionId ) throws IOException {
313332 // Need 4 bytes to store the record length.
314- ensureSpaceInDataPage (lengthInBytes + 4 );
333+ final int totalSpaceRequired = lengthInBytes + 4 ;
334+ if (!haveSpaceForRecord (totalSpaceRequired )) {
335+ allocateSpaceForRecord (totalSpaceRequired );
336+ }
315337
316338 final long recordAddress =
317339 memoryManager .encodePageNumberAndOffset (currentPage , currentPagePosition );
318340 final Object dataPageBaseObject = currentPage .getBaseObject ();
319341 PlatformDependent .UNSAFE .putInt (dataPageBaseObject , currentPagePosition , lengthInBytes );
320342 currentPagePosition += 4 ;
343+ freeSpaceInCurrentPage -= 4 ;
321344 PlatformDependent .copyMemory (
322345 recordBaseObject ,
323346 recordBaseOffset ,
324347 dataPageBaseObject ,
325348 currentPagePosition ,
326349 lengthInBytes );
327350 currentPagePosition += lengthInBytes ;
328-
351+ freeSpaceInCurrentPage -= lengthInBytes ;
329352 sorter .insertRecord (recordAddress , partitionId );
330353 }
331354
0 commit comments