Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,21 @@ public long[] getChecksums() {
* Sorts the in-memory records and writes the sorted records to an on-disk file.
* This method does not free the sort data structures.
*
* @param isLastFile if true, this indicates that we're writing the final output file and that the
* bytes written should be counted towards shuffle spill metrics rather than
* shuffle write metrics.
* @param isFinalFile if true, this indicates that we're writing the final output file and that
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename the flag to be more accurate. It doesn't mean the last spill file, but should be the only spill file so that it will be used as the final shuffle output file.

* the bytes written should be counted towards shuffle write metrics rather
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment was wrong before. If this flag is true, we are writing the final shuffle output file and will increase the shuffle write metrics rather than the spill metrics.

* than shuffle spill metrics.
*/
private void writeSortedFile(boolean isLastFile) {
private void writeSortedFile(boolean isFinalFile) {
// Only emit the log if this is an actual spilling.
if (!isFinalFile) {
logger.info(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the logging here so that it applies to the last spilling as well.

"Task {} on Thread {} spilling sort data of {} to disk ({} {} so far)",
taskContext.taskAttemptId(),
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
spills.size(),
spills.size() != 1 ? " times" : " time");
}

// This call performs the actual sort.
final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
Expand All @@ -167,13 +177,14 @@ private void writeSortedFile(boolean isLastFile) {

final ShuffleWriteMetricsReporter writeMetricsToUse;

if (isLastFile) {
if (isFinalFile) {
// We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
writeMetricsToUse = writeMetrics;
} else {
// We're spilling, so bytes written should be counted towards spill rather than write.
// Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
// them towards shuffle bytes written.
// The actual shuffle bytes written will be counted when we merge the spill files.
writeMetricsToUse = new ShuffleWriteMetrics();
}

Expand Down Expand Up @@ -246,7 +257,7 @@ private void writeSortedFile(boolean isLastFile) {
spills.add(spillInfo);
}

if (!isLastFile) { // i.e. this is a spill file
if (!isFinalFile) { // i.e. this is a spill file
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
// relies on its `recordWritten()` method being called in order to trigger periodic updates to
Expand Down Expand Up @@ -281,12 +292,6 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
return 0L;
}

logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
spills.size(),
spills.size() > 1 ? " times" : " time");

writeSortedFile(false);
final long spillSize = freeMemory();
inMemSorter.reset();
Expand Down Expand Up @@ -440,8 +445,9 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
*/
public SpillInfo[] closeAndGetSpills() throws IOException {
if (inMemSorter != null) {
// Do not count the final file towards the spill count.
writeSortedFile(true);
// Here we are spilling the remaining data in the buffer. If there is no spill before, this
// final spill file will be the final shuffle output file.
writeSortedFile(/* isFinalFile = */spills.isEmpty());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual change of this PR. We should only set this flag if we have not spilled before.

freeMemory();
inMemSorter.free();
inMemSorter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,6 @@ private long[] mergeSpillsUsingStandardWriter(SpillInfo[] spills) throws IOExcep
logger.debug("Using slow merge");
mergeSpillsWithFileStream(spills, mapWriter, compressionCodec);
}
// When closing an UnsafeShuffleExternalSorter that has already spilled once but also has
// in-memory records, we write out the in-memory records to a file but do not count that
// final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
// to be counted as shuffle write, but this will lead to double-counting of the final
// SpillInfo's bytes.
writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hack is not needed anymore.

partitionLengths = mapWriter.commitAllPartitions(sorter.getChecksums()).getPartitionLengths();
} catch (Exception e) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class UnsafeShuffleWriterSuite implements ShuffleChecksumTestHelper {
File tempDir;
long[] partitionSizesInMergedFile;
final LinkedList<File> spillFilesCreated = new LinkedList<>();
long totalSpilledDiskBytes = 0;
SparkConf conf;
final Serializer serializer =
new KryoSerializer(new SparkConf().set("spark.kryo.unsafe", "false"));
Expand Down Expand Up @@ -96,6 +97,7 @@ public void setUp() throws Exception {
mergedOutputFile = File.createTempFile("mergedoutput", "", tempDir);
partitionSizesInMergedFile = null;
spillFilesCreated.clear();
totalSpilledDiskBytes = 0;
conf = new SparkConf()
.set(package$.MODULE$.BUFFER_PAGESIZE().key(), "1m")
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)
Expand Down Expand Up @@ -160,7 +162,11 @@ public void setUp() throws Exception {

when(diskBlockManager.createTempShuffleBlock()).thenAnswer(invocationOnMock -> {
TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID());
File file = File.createTempFile("spillFile", ".spill", tempDir);
File file = spy(File.createTempFile("spillFile", ".spill", tempDir));
when(file.delete()).thenAnswer(inv -> {
totalSpilledDiskBytes += file.length();
return inv.callRealMethod();
});
spillFilesCreated.add(file);
return Tuple2$.MODULE$.apply(blockId, file);
});
Expand Down Expand Up @@ -284,6 +290,9 @@ public void writeWithoutSpilling() throws Exception {
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
// Even if there is no spill, the sorter still writes its data to a spill file at the end,
// which will become the final shuffle file.
assertEquals(1, spillFilesCreated.size());

long sumOfPartitionSizes = 0;
for (long size: partitionSizesInMergedFile) {
Expand Down Expand Up @@ -425,9 +434,8 @@ private void testMergingSpills(
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertTrue(taskMetrics.diskBytesSpilled() > 0L);
assertTrue(taskMetrics.diskBytesSpilled() < mergedOutputFile.length());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous test was simply too relaxed.

assertTrue(taskMetrics.memoryBytesSpilled() > 0L);
assertEquals(totalSpilledDiskBytes, taskMetrics.diskBytesSpilled());
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}

Expand Down Expand Up @@ -517,9 +525,8 @@ public void writeEnoughDataToTriggerSpill() throws Exception {
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertTrue(taskMetrics.diskBytesSpilled() > 0L);
assertTrue(taskMetrics.diskBytesSpilled() < mergedOutputFile.length());
assertTrue(taskMetrics.memoryBytesSpilled()> 0L);
assertEquals(totalSpilledDiskBytes, taskMetrics.diskBytesSpilled());
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}

Expand Down Expand Up @@ -550,9 +557,8 @@ private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exc
assertSpillFilesWereCleanedUp();
ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertTrue(taskMetrics.diskBytesSpilled() > 0L);
assertTrue(taskMetrics.diskBytesSpilled() < mergedOutputFile.length());
assertTrue(taskMetrics.memoryBytesSpilled()> 0L);
assertEquals(totalSpilledDiskBytes, taskMetrics.diskBytesSpilled());
assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
}

Expand Down