From 67ba98797e3d3bd30c9fe93842eccfd58f6f0205 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Thu, 28 Mar 2019 16:16:14 -0700 Subject: [PATCH 01/13] initial push --- .../shuffle/sort/UnsafeShuffleWriter.java | 226 ++++++++++-------- .../shuffle/sort/SortShuffleManager.scala | 3 +- .../sort/UnsafeShuffleWriterSuite.java | 24 +- ...ypassMergeSortShuffleWriterBenchmark.scala | 4 +- .../sort/UnsafeShuffleWriterBenchmark.scala | 5 +- 5 files changed, 153 insertions(+), 109 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 36081069b0e75..ec736e3daaad5 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -20,6 +20,7 @@ import javax.annotation.Nullable; import java.io.*; import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; import java.util.Iterator; import scala.Option; @@ -31,18 +32,19 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; -import com.google.common.io.Files; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.*; import org.apache.spark.annotation.Private; +import org.apache.spark.api.shuffle.ShuffleMapOutputWriter; +import org.apache.spark.api.shuffle.ShufflePartitionWriter; +import org.apache.spark.api.shuffle.ShuffleWriteSupport; import org.apache.spark.internal.config.package$; import org.apache.spark.io.CompressionCodec; import org.apache.spark.io.CompressionCodec$; import org.apache.spark.io.NioBufferedFileInputStream; import org.apache.commons.io.output.CloseShieldOutputStream; -import org.apache.commons.io.output.CountingOutputStream; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.network.util.LimitedInputStream; import org.apache.spark.scheduler.MapStatus; @@ -74,6 +76,7 @@ public class UnsafeShuffleWriter extends ShuffleWriter { private final SerializerInstance serializer; private final Partitioner partitioner; private final ShuffleWriteMetricsReporter writeMetrics; + private final ShuffleWriteSupport shuffleWriteSupport; private final int shuffleId; private final int mapId; private final TaskContext taskContext; @@ -123,7 +126,8 @@ public UnsafeShuffleWriter( int mapId, TaskContext taskContext, SparkConf sparkConf, - ShuffleWriteMetricsReporter writeMetrics) throws IOException { + ShuffleWriteMetricsReporter writeMetrics, + ShuffleWriteSupport shuffleWriteSupport) throws IOException { final int numPartitions = handle.dependency().partitioner().numPartitions(); if (numPartitions > SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE()) { throw new IllegalArgumentException( @@ -140,6 +144,7 @@ public UnsafeShuffleWriter( this.serializer = dep.serializer().newInstance(); this.partitioner = dep.partitioner(); this.writeMetrics = writeMetrics; + this.shuffleWriteSupport = shuffleWriteSupport; this.taskContext = taskContext; this.sparkConf = sparkConf; this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true); @@ -230,24 +235,27 @@ void closeAndWriteOutput() throws IOException { serOutputStream = null; final SpillInfo[] spills = sorter.closeAndGetSpills(); sorter = null; + final ShuffleMapOutputWriter mapWriter = shuffleWriteSupport + .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions()); final long[] partitionLengths; - final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); - final File tmp = Utils.tempFileWith(output); try { try { - partitionLengths = mergeSpills(spills, tmp); + partitionLengths = mergeSpills(spills, mapWriter); } finally { for (SpillInfo spill : spills) { - if (spill.file.exists() && ! spill.file.delete()) { + if (spill.file.exists() && !spill.file.delete()) { logger.error("Error while deleting spill file {}", spill.file.getPath()); } } } - shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); - } finally { - if (tmp.exists() && !tmp.delete()) { - logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); + mapWriter.commitAllPartitions(); + } catch (Exception e) { + try { + mapWriter.abort(e); + } catch (Exception innerE) { + logger.error("Failed to abort the Map Output Writer", innerE); } + throw e; } mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } @@ -281,7 +289,8 @@ void forceSorterToSpill() throws IOException { * * @return the partition lengths in the merged file. */ - private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException { + private long[] mergeSpills(SpillInfo[] spills, + ShuffleMapOutputWriter mapWriter) throws IOException { final boolean compressionEnabled = (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS()); final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf); final boolean fastMergeEnabled = @@ -289,17 +298,41 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti final boolean fastMergeIsSupported = !compressionEnabled || CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec); final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled(); + final int numPartitions = partitioner.numPartitions(); + long[] partitionLengths = new long[numPartitions]; try { if (spills.length == 0) { - new FileOutputStream(outputFile).close(); // Create an empty file - return new long[partitioner.numPartitions()]; + return partitionLengths; } else if (spills.length == 1) { // Here, we don't need to perform any metrics updates because the bytes written to this // output file would have already been counted as shuffle bytes written. - Files.move(spills[0].file, outputFile); - return spills[0].partitionLengths; + try (FileInputStream in = new FileInputStream(spills[0].file)) { + for (int pId = 0; pId < spills[0].partitionLengths.length; pId++) { + boolean copyThrewExecption = true; + ShufflePartitionWriter writer = null; + try { + writer = mapWriter.getNextPartitionWriter(); + if (transferToEnabled) { + WritableByteChannel channel = writer.toChannel(); + try (FileChannel input = in.getChannel()) { + Utils.copyFileStreamNIO(input, channel, 0, input.size()); + copyThrewExecption = false; + } finally { + Closeables.close(in, copyThrewExecption); + } + } else { + OutputStream stream = writer.toStream(); + Utils.copyStream(in, stream, false, false); + copyThrewExecption = false; + } + } finally { + Closeables.close(writer, copyThrewExecption); + } + partitionLengths[pId] = writer.getNumBytesWritten(); + } + return partitionLengths; + } } else { - final long[] partitionLengths; // There are multiple spills to merge, so none of these spill files' lengths were counted // towards our shuffle write count or shuffle write time. If we use the slow merge path, // then the final output file's size won't necessarily be equal to the sum of the spill @@ -316,14 +349,14 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti // that doesn't need to interpret the spilled bytes. if (transferToEnabled && !encryptionEnabled) { logger.debug("Using transferTo-based fast merge"); - partitionLengths = mergeSpillsWithTransferTo(spills, outputFile); + partitionLengths = mergeSpillsWithTransferTo(spills, mapWriter); } else { logger.debug("Using fileStream-based fast merge"); - partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null); + partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, null); } } else { logger.debug("Using slow merge"); - partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec); + partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, compressionCodec); } // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has // in-memory records, we write out the in-memory records to a file but do not count that @@ -331,13 +364,10 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti // to be counted as shuffle write, but this will lead to double-counting of the final // SpillInfo's bytes. writeMetrics.decBytesWritten(spills[spills.length - 1].file.length()); - writeMetrics.incBytesWritten(outputFile.length()); return partitionLengths; } } catch (IOException e) { - if (outputFile.exists() && !outputFile.delete()) { - logger.error("Unable to delete output file {}", outputFile.getPath()); - } + throw e; } } @@ -345,73 +375,76 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti /** * Merges spill files using Java FileStreams. This code path is typically slower than * the NIO-based merge, {@link UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[], - * File)}, and it's mostly used in cases where the IO compression codec does not support - * concatenation of compressed data, when encryption is enabled, or when users have - * explicitly disabled use of {@code transferTo} in order to work around kernel bugs. + * ShuffleMapOutputWriter)}, and it's mostly used in cases where the IO compression codec + * does not support concatenation of compressed data, when encryption is enabled, or when + * users have explicitly disabled use of {@code transferTo} in order to work around kernel bugs. * This code path might also be faster in cases where individual partition size in a spill * is small and UnsafeShuffleWriter#mergeSpillsWithTransferTo method performs many small * disk ios which is inefficient. In those case, Using large buffers for input and output * files helps reducing the number of disk ios, making the file merging faster. * * @param spills the spills to merge. - * @param outputFile the file to write the merged data to. + * @param mapWriter the map output writer to use for output. * @param compressionCodec the IO compression codec, or null if shuffle compression is disabled. * @return the partition lengths in the merged file. */ private long[] mergeSpillsWithFileStream( SpillInfo[] spills, - File outputFile, + ShuffleMapOutputWriter mapWriter, @Nullable CompressionCodec compressionCodec) throws IOException { assert (spills.length >= 2); final int numPartitions = partitioner.numPartitions(); final long[] partitionLengths = new long[numPartitions]; final InputStream[] spillInputStreams = new InputStream[spills.length]; - final OutputStream bos = new BufferedOutputStream( - new FileOutputStream(outputFile), - outputBufferSizeInBytes); - // Use a counting output stream to avoid having to close the underlying file and ask - // the file system for its size after each partition is written. - final CountingOutputStream mergedFileOutputStream = new CountingOutputStream(bos); - boolean threwException = true; try { for (int i = 0; i < spills.length; i++) { spillInputStreams[i] = new NioBufferedFileInputStream( - spills[i].file, - inputBufferSizeInBytes); + spills[i].file, + inputBufferSizeInBytes); } for (int partition = 0; partition < numPartitions; partition++) { - final long initialFileLength = mergedFileOutputStream.getByteCount(); - // Shield the underlying output stream from close() and flush() calls, so that we can close - // the higher level streams to make sure all data is really flushed and internal state is - // cleaned. - OutputStream partitionOutput = new CloseAndFlushShieldOutputStream( - new TimeTrackingOutputStream(writeMetrics, mergedFileOutputStream)); - partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); - if (compressionCodec != null) { - partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); - } - for (int i = 0; i < spills.length; i++) { - final long partitionLengthInSpill = spills[i].partitionLengths[partition]; - if (partitionLengthInSpill > 0) { - InputStream partitionInputStream = new LimitedInputStream(spillInputStreams[i], - partitionLengthInSpill, false); - try { - partitionInputStream = blockManager.serializerManager().wrapForEncryption( - partitionInputStream); - if (compressionCodec != null) { - partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream); + boolean copyThrewExecption = true; + ShufflePartitionWriter writer = null; + OutputStream partitionOutput = null; + try { + writer = mapWriter.getNextPartitionWriter(); + partitionOutput = writer.toStream(); + partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput); + partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); + if (compressionCodec != null) { + partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); + } + // Shield the underlying output stream from close() and flush() calls, so we can close + // the higher level streams to make sure all data is really flushed and internal state is + // cleaned. + for (int i = 0; i < spills.length; i++) { + final long partitionLengthInSpill = spills[i].partitionLengths[partition]; + + if (partitionLengthInSpill > 0) { + InputStream partitionInputStream = new LimitedInputStream(spillInputStreams[i], + partitionLengthInSpill, false); + try { + partitionInputStream = blockManager.serializerManager().wrapForEncryption( + partitionInputStream); + if (compressionCodec != null) { + partitionInputStream = compressionCodec.compressedInputStream( + partitionInputStream); + } + ByteStreams.copy(partitionInputStream, partitionOutput); + } finally { + partitionInputStream.close(); } - ByteStreams.copy(partitionInputStream, partitionOutput); - } finally { - partitionInputStream.close(); } + copyThrewExecption = false; } + } finally { + Closeables.close(writer, copyThrewExecption); } - partitionOutput.flush(); - partitionOutput.close(); - partitionLengths[partition] = (mergedFileOutputStream.getByteCount() - initialFileLength); + long numBytesWritten = writer.getNumBytesWritten(); + partitionLengths[partition] = numBytesWritten; + writeMetrics.incBytesWritten(numBytesWritten); } threwException = false; } finally { @@ -420,7 +453,6 @@ private long[] mergeSpillsWithFileStream( for (InputStream stream : spillInputStreams) { Closeables.close(stream, threwException); } - Closeables.close(mergedFileOutputStream, threwException); } return partitionLengths; } @@ -430,54 +462,51 @@ private long[] mergeSpillsWithFileStream( * This is only safe when the IO compression codec and serializer support concatenation of * serialized streams. * + * @param spills the spills to merge. + * @param mapWriter the map output writer to use for output. * @return the partition lengths in the merged file. */ - private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) throws IOException { + private long[] mergeSpillsWithTransferTo( + SpillInfo[] spills, + ShuffleMapOutputWriter mapWriter) throws IOException { assert (spills.length >= 2); final int numPartitions = partitioner.numPartitions(); final long[] partitionLengths = new long[numPartitions]; final FileChannel[] spillInputChannels = new FileChannel[spills.length]; final long[] spillInputChannelPositions = new long[spills.length]; - FileChannel mergedFileOutputChannel = null; boolean threwException = true; try { for (int i = 0; i < spills.length; i++) { spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel(); } - // This file needs to opened in append mode in order to work around a Linux kernel bug that - // affects transferTo; see SPARK-3948 for more details. - mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel(); - - long bytesWrittenToMergedFile = 0; for (int partition = 0; partition < numPartitions; partition++) { - for (int i = 0; i < spills.length; i++) { - final long partitionLengthInSpill = spills[i].partitionLengths[partition]; - final FileChannel spillInputChannel = spillInputChannels[i]; - final long writeStartTime = System.nanoTime(); - Utils.copyFileStreamNIO( - spillInputChannel, - mergedFileOutputChannel, - spillInputChannelPositions[i], - partitionLengthInSpill); - spillInputChannelPositions[i] += partitionLengthInSpill; - writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); - bytesWrittenToMergedFile += partitionLengthInSpill; - partitionLengths[partition] += partitionLengthInSpill; + boolean copyThrewExecption = true; + ShufflePartitionWriter writer = null; + long partitionLengthInSpill = 0L; + try { + writer = mapWriter.getNextPartitionWriter(); + WritableByteChannel channel = writer.toChannel(); + for (int i = 0; i < spills.length; i++) { + partitionLengthInSpill += spills[i].partitionLengths[partition]; + final FileChannel spillInputChannel = spillInputChannels[i]; + final long writeStartTime = System.nanoTime(); + Utils.copyFileStreamNIO( + spillInputChannel, + channel, + spillInputChannelPositions[i], + partitionLengthInSpill); + copyThrewExecption = false; + spillInputChannelPositions[i] += partitionLengthInSpill; + writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); + } + } finally { + Closeables.close(writer, copyThrewExecption); } - } - // Check the position after transferTo loop to see if it is in the right position and raise an - // exception if it is incorrect. The position will not be increased to the expected length - // after calling transferTo in kernel version 2.6.32. This issue is described at - // https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948. - if (mergedFileOutputChannel.position() != bytesWrittenToMergedFile) { - throw new IOException( - "Current position " + mergedFileOutputChannel.position() + " does not equal expected " + - "position " + bytesWrittenToMergedFile + " after transferTo. Please check your kernel" + - " version to see if it is 2.6.32, as there is a kernel bug which will lead to " + - "unexpected behavior when using transferTo. You can set spark.file.transferTo=false " + - "to disable this NIO feature." - ); + long numBytes = writer.getNumBytesWritten(); + assert(partitionLengthInSpill == numBytes); + partitionLengths[partition] = writer.getNumBytesWritten(); + writeMetrics.incBytesWritten(numBytes); } threwException = false; } finally { @@ -487,7 +516,6 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th assert(spillInputChannelPositions[i] == spills[i].file.length()); Closeables.close(spillInputChannels[i], threwException); } - Closeables.close(mergedFileOutputChannel, threwException); } return partitionLengths; } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 5da7b5cb35e6d..6925a542a86c1 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -146,7 +146,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager mapId, context, env.conf, - metrics) + metrics, + shuffleExecutorComponents.writes()) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 9bf707f783d44..32f6b1036e3f8 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.*; +import org.apache.spark.*; import scala.Option; import scala.Product2; import scala.Tuple2; @@ -35,10 +36,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.apache.spark.HashPartitioner; -import org.apache.spark.ShuffleDependency; -import org.apache.spark.SparkConf; -import org.apache.spark.TaskContext; +import org.apache.spark.api.shuffle.ShuffleWriteSupport; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.io.CompressionCodec$; @@ -53,6 +51,7 @@ import org.apache.spark.security.CryptoStreamUtils; import org.apache.spark.serializer.*; import org.apache.spark.shuffle.IndexShuffleBlockResolver; +import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport; import org.apache.spark.storage.*; import org.apache.spark.util.Utils; @@ -76,6 +75,7 @@ public class UnsafeShuffleWriterSuite { SparkConf conf; final Serializer serializer = new KryoSerializer(new SparkConf()); TaskMetrics taskMetrics; + ShuffleWriteSupport shuffleWriteSupport; @Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager; @Mock(answer = RETURNS_SMART_NULLS) IndexShuffleBlockResolver shuffleBlockResolver; @@ -83,8 +83,11 @@ public class UnsafeShuffleWriterSuite { @Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext; @Mock(answer = RETURNS_SMART_NULLS) ShuffleDependency shuffleDep; + + @After public void tearDown() { + TaskContext$.MODULE$.unset(); Utils.deleteRecursively(tempDir); final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory(); if (leakedMemory != 0) { @@ -151,6 +154,13 @@ public void setUp() throws IOException { when(taskContext.taskMetrics()).thenReturn(taskMetrics); when(shuffleDep.serializer()).thenReturn(serializer); when(shuffleDep.partitioner()).thenReturn(hashPartitioner); + when(taskContext.taskMemoryManager()).thenReturn(taskMemoryManager); + + Random rand = new Random(); + TaskContext$.MODULE$.setTaskContext(new TaskContextImpl( + 0, 0, 0, rand.nextInt(10000), 0, taskMemoryManager, new Properties(), null, taskMetrics)); + + shuffleWriteSupport = new DefaultShuffleWriteSupport(conf, shuffleBlockResolver); } private UnsafeShuffleWriter createWriter( @@ -164,7 +174,8 @@ private UnsafeShuffleWriter createWriter( 0, // map id taskContext, conf, - taskContext.taskMetrics().shuffleWriteMetrics() + taskContext.taskMetrics().shuffleWriteMetrics(), + shuffleWriteSupport ); } @@ -525,7 +536,8 @@ public void testPeakMemoryUsed() throws Exception { 0, // map id taskContext, conf, - taskContext.taskMetrics().shuffleWriteMetrics()); + taskContext.taskMetrics().shuffleWriteMetrics(), + shuffleWriteSupport); // Peak memory should be monotonically increasing. More specifically, every time // we allocate a new page it should increase by exactly the size of the page. diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala index 7eb867fc29fd2..69fe03e75606f 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala @@ -19,7 +19,7 @@ package org.apache.spark.shuffle.sort import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark -import org.apache.spark.shuffle.sort.io.{DefaultShuffleWriteSupport} +import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport /** * Benchmark to measure performance for aggregate primitives. @@ -46,9 +46,9 @@ object BypassMergeSortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase def getWriter(transferTo: Boolean): BypassMergeSortShuffleWriter[String, String] = { val conf = new SparkConf(loadDefaults = false) - val shuffleWriteSupport = new DefaultShuffleWriteSupport(conf, blockResolver) conf.set("spark.file.transferTo", String.valueOf(transferTo)) conf.set("spark.shuffle.file.buffer", "32k") + val shuffleWriteSupport = new DefaultShuffleWriteSupport(conf, blockResolver) val shuffleWriter = new BypassMergeSortShuffleWriter[String, String]( blockManager, diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala index 15a08111f6d54..20bf3eac95d84 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala @@ -18,6 +18,7 @@ package org.apache.spark.shuffle.sort import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.benchmark.Benchmark +import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport /** * Benchmark to measure performance for aggregate primitives. @@ -42,6 +43,7 @@ object UnsafeShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { def getWriter(transferTo: Boolean): UnsafeShuffleWriter[String, String] = { val conf = new SparkConf(loadDefaults = false) conf.set("spark.file.transferTo", String.valueOf(transferTo)) + val shuffleWriteSupport = new DefaultShuffleWriteSupport(conf, blockResolver) TaskContext.setTaskContext(taskContext) new UnsafeShuffleWriter[String, String]( @@ -52,7 +54,8 @@ object UnsafeShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { 0, taskContext, conf, - taskContext.taskMetrics().shuffleWriteMetrics + taskContext.taskMetrics().shuffleWriteMetrics, + shuffleWriteSupport ) } From 3ee7d732abc935abd1d795f90346808fe1b7bdab Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 1 Apr 2019 17:17:33 -0700 Subject: [PATCH 02/13] closing input channel error --- .../shuffle/sort/UnsafeShuffleWriter.java | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index ec736e3daaad5..826bbe612d4cc 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -84,7 +84,6 @@ public class UnsafeShuffleWriter extends ShuffleWriter { private final boolean transferToEnabled; private final int initialSortBufferSize; private final int inputBufferSizeInBytes; - private final int outputBufferSizeInBytes; @Nullable private MapStatus mapStatus; @Nullable private ShuffleExternalSorter sorter; @@ -152,8 +151,6 @@ public UnsafeShuffleWriter( (int) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()); this.inputBufferSizeInBytes = (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024; - this.outputBufferSizeInBytes = - (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024; open(); } @@ -257,6 +254,9 @@ void closeAndWriteOutput() throws IOException { } throw e; } + for (int pId = 0; pId < partitioner.numPartitions(); pId++) { + logger.error("PartitionLengths: " + partitionLengths[pId]); + } mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } @@ -300,35 +300,36 @@ private long[] mergeSpills(SpillInfo[] spills, final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled(); final int numPartitions = partitioner.numPartitions(); long[] partitionLengths = new long[numPartitions]; + logger.error(mapWriter.toString()); try { if (spills.length == 0) { + logger.error("zero"); return partitionLengths; } else if (spills.length == 1) { + logger.error("one"); // Here, we don't need to perform any metrics updates because the bytes written to this // output file would have already been counted as shuffle bytes written. try (FileInputStream in = new FileInputStream(spills[0].file)) { for (int pId = 0; pId < spills[0].partitionLengths.length; pId++) { + logger.error("parititonID: " + pId); boolean copyThrewExecption = true; ShufflePartitionWriter writer = null; try { writer = mapWriter.getNextPartitionWriter(); - if (transferToEnabled) { - WritableByteChannel channel = writer.toChannel(); - try (FileChannel input = in.getChannel()) { - Utils.copyFileStreamNIO(input, channel, 0, input.size()); - copyThrewExecption = false; - } finally { - Closeables.close(in, copyThrewExecption); - } - } else { - OutputStream stream = writer.toStream(); - Utils.copyStream(in, stream, false, false); - copyThrewExecption = false; + OutputStream partitionOutput = writer.toStream(); + partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); + if (compressionCodec != null) { + partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); } + partitionOutput.write(in.read()); + copyThrewExecption = false; } finally { + logger.error("close please"); Closeables.close(writer, copyThrewExecption); } + logger.error("Made it to to setting partitionLengths"); partitionLengths[pId] = writer.getNumBytesWritten(); + logger.error(Long.toString(writer.getNumBytesWritten())); } return partitionLengths; } @@ -367,7 +368,6 @@ private long[] mergeSpills(SpillInfo[] spills, return partitionLengths; } } catch (IOException e) { - throw e; } } From 905b1889a1419d849d5ffe40520b8c0e6936d70b Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 8 Apr 2019 00:04:34 -0700 Subject: [PATCH 03/13] added certain fixes to make all tests except transferTo pass --- .../shuffle/sort/UnsafeShuffleWriter.java | 84 +++++++------------ .../io/DefaultShuffleMapOutputWriter.java | 12 ++- .../scala/org/apache/spark/util/Utils.scala | 6 ++ .../sort/UnsafeShuffleWriterSuite.java | 7 +- 4 files changed, 49 insertions(+), 60 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 826bbe612d4cc..d2ef176f275f0 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -21,8 +21,10 @@ import java.io.*; import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; +import java.util.Arrays; import java.util.Iterator; +import org.apache.spark.storage.ShuffleBlockId; import scala.Option; import scala.Product2; import scala.collection.JavaConverters; @@ -105,18 +107,6 @@ private static final class MyByteArrayOutputStream extends ByteArrayOutputStream */ private boolean stopping = false; - private class CloseAndFlushShieldOutputStream extends CloseShieldOutputStream { - - CloseAndFlushShieldOutputStream(OutputStream outputStream) { - super(outputStream); - } - - @Override - public void flush() { - // do nothing - } - } - public UnsafeShuffleWriter( BlockManager blockManager, IndexShuffleBlockResolver shuffleBlockResolver, @@ -231,6 +221,7 @@ void closeAndWriteOutput() throws IOException { serBuffer = null; serOutputStream = null; final SpillInfo[] spills = sorter.closeAndGetSpills(); + logger.error("num spills: " + spills.length); sorter = null; final ShuffleMapOutputWriter mapWriter = shuffleWriteSupport .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions()); @@ -254,9 +245,7 @@ void closeAndWriteOutput() throws IOException { } throw e; } - for (int pId = 0; pId < partitioner.numPartitions(); pId++) { - logger.error("PartitionLengths: " + partitionLengths[pId]); - } + logger.error("Partition lengths: " + Arrays.toString(partitionLengths)); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } @@ -303,36 +292,16 @@ private long[] mergeSpills(SpillInfo[] spills, logger.error(mapWriter.toString()); try { if (spills.length == 0) { - logger.error("zero"); - return partitionLengths; - } else if (spills.length == 1) { - logger.error("one"); - // Here, we don't need to perform any metrics updates because the bytes written to this - // output file would have already been counted as shuffle bytes written. - try (FileInputStream in = new FileInputStream(spills[0].file)) { - for (int pId = 0; pId < spills[0].partitionLengths.length; pId++) { - logger.error("parititonID: " + pId); - boolean copyThrewExecption = true; - ShufflePartitionWriter writer = null; - try { - writer = mapWriter.getNextPartitionWriter(); - OutputStream partitionOutput = writer.toStream(); - partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); - if (compressionCodec != null) { - partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); - } - partitionOutput.write(in.read()); - copyThrewExecption = false; - } finally { - logger.error("close please"); - Closeables.close(writer, copyThrewExecption); - } - logger.error("Made it to to setting partitionLengths"); - partitionLengths[pId] = writer.getNumBytesWritten(); - logger.error(Long.toString(writer.getNumBytesWritten())); + ShufflePartitionWriter writer = null; + try { + writer = mapWriter.getNextPartitionWriter(); + writer.toStream(); + } finally { + if (writer != null) { + writer.close(); } - return partitionLengths; } + return partitionLengths; } else { // There are multiple spills to merge, so none of these spill files' lengths were counted // towards our shuffle write count or shuffle write time. If we use the slow merge path, @@ -349,14 +318,14 @@ private long[] mergeSpills(SpillInfo[] spills, // decompression of concatenated compressed streams, so we can perform a fast spill merge // that doesn't need to interpret the spilled bytes. if (transferToEnabled && !encryptionEnabled) { - logger.debug("Using transferTo-based fast merge"); + logger.error("Using transferTo-based fast merge"); partitionLengths = mergeSpillsWithTransferTo(spills, mapWriter); } else { - logger.debug("Using fileStream-based fast merge"); + logger.error("Using fileStream-based fast merge"); partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, null); } } else { - logger.debug("Using slow merge"); + logger.error("Using slow merge"); partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, compressionCodec); } // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has @@ -392,7 +361,6 @@ private long[] mergeSpillsWithFileStream( SpillInfo[] spills, ShuffleMapOutputWriter mapWriter, @Nullable CompressionCodec compressionCodec) throws IOException { - assert (spills.length >= 2); final int numPartitions = partitioner.numPartitions(); final long[] partitionLengths = new long[numPartitions]; final InputStream[] spillInputStreams = new InputStream[spills.length]; @@ -407,11 +375,13 @@ private long[] mergeSpillsWithFileStream( for (int partition = 0; partition < numPartitions; partition++) { boolean copyThrewExecption = true; ShufflePartitionWriter writer = null; - OutputStream partitionOutput = null; + OutputStream partitionOutput; try { writer = mapWriter.getNextPartitionWriter(); partitionOutput = writer.toStream(); - partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput); + if (spills.length >= 2) { + partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput); + } partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); if (compressionCodec != null) { partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); @@ -469,18 +439,19 @@ private long[] mergeSpillsWithFileStream( private long[] mergeSpillsWithTransferTo( SpillInfo[] spills, ShuffleMapOutputWriter mapWriter) throws IOException { - assert (spills.length >= 2); final int numPartitions = partitioner.numPartitions(); final long[] partitionLengths = new long[numPartitions]; final FileChannel[] spillInputChannels = new FileChannel[spills.length]; final long[] spillInputChannelPositions = new long[spills.length]; - + logger.error("Num partitions: " + numPartitions); + logger.error("Num spills: " + spills.length); boolean threwException = true; try { for (int i = 0; i < spills.length; i++) { spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel(); } for (int partition = 0; partition < numPartitions; partition++) { + logger.error("In partition: " + partition); boolean copyThrewExecption = true; ShufflePartitionWriter writer = null; long partitionLengthInSpill = 0L; @@ -491,6 +462,7 @@ private long[] mergeSpillsWithTransferTo( partitionLengthInSpill += spills[i].partitionLengths[partition]; final FileChannel spillInputChannel = spillInputChannels[i]; final long writeStartTime = System.nanoTime(); + logger.error("InputChannelPositions: "+ spillInputChannelPositions[i]); Utils.copyFileStreamNIO( spillInputChannel, channel, @@ -499,20 +471,26 @@ private long[] mergeSpillsWithTransferTo( copyThrewExecption = false; spillInputChannelPositions[i] += partitionLengthInSpill; writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); + logger.error("PartitionLengthsInSpill: " + partitionLengthInSpill); } + } catch (IOException e) { + throw e; } finally { + logger.error("Closing the writer"); Closeables.close(writer, copyThrewExecption); } long numBytes = writer.getNumBytesWritten(); - assert(partitionLengthInSpill == numBytes); - partitionLengths[partition] = writer.getNumBytesWritten(); + logger.error("NumBytes: " + numBytes); + partitionLengths[partition] = numBytes; writeMetrics.incBytesWritten(numBytes); } + logger.error("Finish writing"); threwException = false; } finally { // To avoid masking exceptions that caused us to prematurely enter the finally block, only // throw exceptions during cleanup if threwException == false. for (int i = 0; i < spills.length; i++) { + logger.error("Close input channel: " + i); assert(spillInputChannelPositions[i] == spills[i].file.length()); Closeables.close(spillInputChannels[i], threwException); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java index 0f7e5ed66bb76..c4042868e8e6d 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java @@ -80,7 +80,10 @@ public DefaultShuffleMapOutputWriter( public ShufflePartitionWriter getNextPartitionWriter() throws IOException { if (outputTempFile == null) { outputTempFile = Utils.tempFileWith(outputFile); + log.error("I create an outputTempFile at: " + outputTempFile.getAbsolutePath()); } + log.error("I created a new partition writer at: " + currPartitionId); + log.error("Do I have an outputTempFile: " + outputTempFile.exists()); if (outputFileChannel != null) { currChannelPosition = outputFileChannel.position(); } else { @@ -91,7 +94,9 @@ public ShufflePartitionWriter getNextPartitionWriter() throws IOException { @Override public void commitAllPartitions() throws IOException { + log.error("Do I have an outputTempFile before cleanup: " + outputTempFile.exists()); cleanUp(); + log.error("Do I have an outputTempFile after cleanup: " + outputTempFile.exists()); blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, outputTempFile); } @@ -108,17 +113,19 @@ public void abort(Throwable error) { } private void cleanUp() throws IOException { + log.error("In clenanup"); if (outputBufferedFileStream != null) { outputBufferedFileStream.close(); } - + log.error("In BufferedFiledStream"); if (outputFileChannel != null) { outputFileChannel.close(); } - + log.error("In FileChannel"); if (outputFileStream != null) { outputFileStream.close(); } + log.error("In InFileStream"); } private void initStream() throws IOException { @@ -192,6 +199,7 @@ public long getNumBytesWritten() { @Override public void close() throws IOException { + log.error("Do I have an outputTempFile: " + outputTempFile.exists()); if (stream != null) { stream.close(); } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a9f3c73e90ab8..414f7070de5a8 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -340,6 +340,10 @@ private[spark] object Utils extends Logging { output: WritableByteChannel, startPosition: Long, bytesToCopy: Long): Unit = { + log.error("Entering") + log.error(s"Input open: ${input.isOpen}") + log.error(s"Output open: ${output.isOpen}") + log.error(s"start position: $startPosition with $bytesToCopy bytes to copy") val outputInitialState = output match { case outputFileChannel: FileChannel => Some((outputFileChannel.position(), outputFileChannel)) @@ -350,6 +354,7 @@ private[spark] object Utils extends Logging { while (count < bytesToCopy) { count += input.transferTo(count + startPosition, bytesToCopy - count, output) } + log.error(s"Wrote $count when needed to write $bytesToCopy") assert(count == bytesToCopy, s"request to copy $bytesToCopy bytes, but actually copied $count bytes.") @@ -370,6 +375,7 @@ private[spark] object Utils extends Logging { |You can set spark.file.transferTo = false to disable this NIO feature. """.stripMargin) } + log.error(s"Done with $count") } /** diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 32f6b1036e3f8..627a2278e6415 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -75,7 +75,6 @@ public class UnsafeShuffleWriterSuite { SparkConf conf; final Serializer serializer = new KryoSerializer(new SparkConf()); TaskMetrics taskMetrics; - ShuffleWriteSupport shuffleWriteSupport; @Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager; @Mock(answer = RETURNS_SMART_NULLS) IndexShuffleBlockResolver shuffleBlockResolver; @@ -159,8 +158,6 @@ public void setUp() throws IOException { Random rand = new Random(); TaskContext$.MODULE$.setTaskContext(new TaskContextImpl( 0, 0, 0, rand.nextInt(10000), 0, taskMemoryManager, new Properties(), null, taskMetrics)); - - shuffleWriteSupport = new DefaultShuffleWriteSupport(conf, shuffleBlockResolver); } private UnsafeShuffleWriter createWriter( @@ -175,7 +172,7 @@ private UnsafeShuffleWriter createWriter( taskContext, conf, taskContext.taskMetrics().shuffleWriteMetrics(), - shuffleWriteSupport + new DefaultShuffleWriteSupport(conf, shuffleBlockResolver) ); } @@ -537,7 +534,7 @@ public void testPeakMemoryUsed() throws Exception { taskContext, conf, taskContext.taskMetrics().shuffleWriteMetrics(), - shuffleWriteSupport); + new DefaultShuffleWriteSupport(conf, shuffleBlockResolver)); // Peak memory should be monotonically increasing. More specifically, every time // we allocate a new page it should increase by exactly the size of the page. From 0bdf0986878f0c35ac17511d5b06d6bdb30fdc86 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 8 Apr 2019 16:47:03 -0700 Subject: [PATCH 04/13] small fix to make transferTo properly work --- .../shuffle/sort/UnsafeShuffleWriter.java | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index d2ef176f275f0..ba0e208babad1 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -364,7 +364,9 @@ private long[] mergeSpillsWithFileStream( final int numPartitions = partitioner.numPartitions(); final long[] partitionLengths = new long[numPartitions]; final InputStream[] spillInputStreams = new InputStream[spills.length]; - + logger.error("Num partitions: " + numPartitions); + logger.error("Num spills: " + spills.length); + logger.error("compression: " + compressionCodec); boolean threwException = true; try { for (int i = 0; i < spills.length; i++) { @@ -373,12 +375,18 @@ private long[] mergeSpillsWithFileStream( inputBufferSizeInBytes); } for (int partition = 0; partition < numPartitions; partition++) { + logger.error("In partition: " + partition); boolean copyThrewExecption = true; ShufflePartitionWriter writer = null; OutputStream partitionOutput; try { writer = mapWriter.getNextPartitionWriter(); + // Shield the underlying output stream from close() and flush() calls, so we can close + // the higher level streams to make sure all data is really flushed and internal state is + // cleaned. partitionOutput = writer.toStream(); + // Here, we don't need to perform any metrics updates when spills.length < 2 because the + // bytes written to this output file would have already been counted as shuffle bytes written. if (spills.length >= 2) { partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput); } @@ -386,16 +394,15 @@ private long[] mergeSpillsWithFileStream( if (compressionCodec != null) { partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); } - // Shield the underlying output stream from close() and flush() calls, so we can close - // the higher level streams to make sure all data is really flushed and internal state is - // cleaned. for (int i = 0; i < spills.length; i++) { final long partitionLengthInSpill = spills[i].partitionLengths[partition]; + logger.error("PartitionLengthsInSpill: " + partitionLengthInSpill); if (partitionLengthInSpill > 0) { - InputStream partitionInputStream = new LimitedInputStream(spillInputStreams[i], - partitionLengthInSpill, false); + InputStream partitionInputStream = null; try { + partitionInputStream = new LimitedInputStream(spillInputStreams[i], + partitionLengthInSpill, false); partitionInputStream = blockManager.serializerManager().wrapForEncryption( partitionInputStream); if (compressionCodec != null) { @@ -410,17 +417,21 @@ private long[] mergeSpillsWithFileStream( copyThrewExecption = false; } } finally { + logger.error("Closing the writer"); Closeables.close(writer, copyThrewExecption); } long numBytesWritten = writer.getNumBytesWritten(); + logger.error("NumBytesWritten: " + numBytesWritten); partitionLengths[partition] = numBytesWritten; writeMetrics.incBytesWritten(numBytesWritten); } + logger.error("Finish writing"); threwException = false; } finally { // To avoid masking exceptions that caused us to prematurely enter the finally block, only // throw exceptions during cleanup if threwException == false. for (InputStream stream : spillInputStreams) { + logger.error("Close input channel"); Closeables.close(stream, threwException); } } @@ -454,11 +465,12 @@ private long[] mergeSpillsWithTransferTo( logger.error("In partition: " + partition); boolean copyThrewExecption = true; ShufflePartitionWriter writer = null; - long partitionLengthInSpill = 0L; try { writer = mapWriter.getNextPartitionWriter(); WritableByteChannel channel = writer.toChannel(); for (int i = 0; i < spills.length; i++) { + long partitionLengthInSpill = 0L; + logger.error("In spill: " + i); partitionLengthInSpill += spills[i].partitionLengths[partition]; final FileChannel spillInputChannel = spillInputChannels[i]; final long writeStartTime = System.nanoTime(); From f4f6772934cfc9f6569c71229d63cdb252e7411e Mon Sep 17 00:00:00 2001 From: mccheah Date: Mon, 8 Apr 2019 17:29:41 -0700 Subject: [PATCH 05/13] Always close the serializer stream. (#10) * But don't close the partition stream returned by the partition writer. --- .../shuffle/sort/UnsafeShuffleWriter.java | 64 +++++++++---------- .../io/DefaultShuffleMapOutputWriter.java | 13 +--- 2 files changed, 35 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index ba0e208babad1..d67caf1cbed57 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -24,7 +24,6 @@ import java.util.Arrays; import java.util.Iterator; -import org.apache.spark.storage.ShuffleBlockId; import scala.Option; import scala.Product2; import scala.collection.JavaConverters; @@ -378,43 +377,44 @@ private long[] mergeSpillsWithFileStream( logger.error("In partition: " + partition); boolean copyThrewExecption = true; ShufflePartitionWriter writer = null; - OutputStream partitionOutput; try { writer = mapWriter.getNextPartitionWriter(); - // Shield the underlying output stream from close() and flush() calls, so we can close - // the higher level streams to make sure all data is really flushed and internal state is - // cleaned. - partitionOutput = writer.toStream(); - // Here, we don't need to perform any metrics updates when spills.length < 2 because the - // bytes written to this output file would have already been counted as shuffle bytes written. - if (spills.length >= 2) { - partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput); - } - partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); - if (compressionCodec != null) { - partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); - } - for (int i = 0; i < spills.length; i++) { - final long partitionLengthInSpill = spills[i].partitionLengths[partition]; - logger.error("PartitionLengthsInSpill: " + partitionLengthInSpill); + OutputStream partitionOutput = null; + try { + partitionOutput = new CloseShieldOutputStream(writer.toStream()); + // Here, we don't need to perform any metrics updates when spills.length < 2 because the + // bytes written to this output file would have already been counted as shuffle bytes written. + if (spills.length >= 2) { + partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput); + } + partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); + if (compressionCodec != null) { + partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); + } + for (int i = 0; i < spills.length; i++) { + final long partitionLengthInSpill = spills[i].partitionLengths[partition]; + logger.error("PartitionLengthsInSpill: " + partitionLengthInSpill); - if (partitionLengthInSpill > 0) { - InputStream partitionInputStream = null; - try { - partitionInputStream = new LimitedInputStream(spillInputStreams[i], - partitionLengthInSpill, false); - partitionInputStream = blockManager.serializerManager().wrapForEncryption( - partitionInputStream); - if (compressionCodec != null) { - partitionInputStream = compressionCodec.compressedInputStream( - partitionInputStream); + if (partitionLengthInSpill > 0) { + InputStream partitionInputStream = null; + try { + partitionInputStream = new LimitedInputStream(spillInputStreams[i], + partitionLengthInSpill, false); + partitionInputStream = blockManager.serializerManager().wrapForEncryption( + partitionInputStream); + if (compressionCodec != null) { + partitionInputStream = compressionCodec.compressedInputStream( + partitionInputStream); + } + ByteStreams.copy(partitionInputStream, partitionOutput); + } finally { + partitionInputStream.close(); } - ByteStreams.copy(partitionInputStream, partitionOutput); - } finally { - partitionInputStream.close(); } + copyThrewExecption = false; } - copyThrewExecption = false; + } finally { + Closeables.close(partitionOutput, copyThrewExecption); } } finally { logger.error("Closing the writer"); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java index c4042868e8e6d..c12a19cf30921 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java @@ -198,9 +198,10 @@ public long getNumBytesWritten() { } @Override - public void close() throws IOException { + public void close() { log.error("Do I have an outputTempFile: " + outputTempFile.exists()); if (stream != null) { + // Closing is a no-op. stream.close(); } partitionLengths[partitionId] = getNumBytesWritten(); @@ -230,18 +231,10 @@ public void write(byte[] buf, int pos, int length) throws IOException { } @Override - public void close() throws IOException { - flush(); + public void close() { isClosed = true; } - @Override - public void flush() throws IOException { - if (!isClosed) { - outputBufferedFileStream.flush(); - } - } - private void verifyNotClosed() { if (isClosed) { throw new IllegalStateException("Attempting to write to a closed block output stream."); From a9e0afcdb57484d932fa78b5961a05f195b65446 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 8 Apr 2019 18:18:21 -0700 Subject: [PATCH 06/13] fix issues that are blocking tests --- .../shuffle/sort/UnsafeShuffleWriter.java | 42 +++++-------------- .../io/DefaultShuffleMapOutputWriter.java | 10 ----- .../sort/UnsafeShuffleWriterSuite.java | 6 +-- 3 files changed, 13 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index d67caf1cbed57..18bc9692e2ccd 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -21,7 +21,6 @@ import java.io.*; import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; -import java.util.Arrays; import java.util.Iterator; import scala.Option; @@ -56,7 +55,6 @@ import org.apache.spark.shuffle.IndexShuffleBlockResolver; import org.apache.spark.shuffle.ShuffleWriter; import org.apache.spark.storage.BlockManager; -import org.apache.spark.storage.TimeTrackingOutputStream; import org.apache.spark.unsafe.Platform; import org.apache.spark.util.Utils; @@ -68,7 +66,6 @@ public class UnsafeShuffleWriter extends ShuffleWriter { private static final ClassTag OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object(); @VisibleForTesting - static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1024 * 1024; private final BlockManager blockManager; @@ -220,7 +217,6 @@ void closeAndWriteOutput() throws IOException { serBuffer = null; serOutputStream = null; final SpillInfo[] spills = sorter.closeAndGetSpills(); - logger.error("num spills: " + spills.length); sorter = null; final ShuffleMapOutputWriter mapWriter = shuffleWriteSupport .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions()); @@ -244,7 +240,6 @@ void closeAndWriteOutput() throws IOException { } throw e; } - logger.error("Partition lengths: " + Arrays.toString(partitionLengths)); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } @@ -317,14 +312,14 @@ private long[] mergeSpills(SpillInfo[] spills, // decompression of concatenated compressed streams, so we can perform a fast spill merge // that doesn't need to interpret the spilled bytes. if (transferToEnabled && !encryptionEnabled) { - logger.error("Using transferTo-based fast merge"); + logger.debug("Using transferTo-based fast merge"); partitionLengths = mergeSpillsWithTransferTo(spills, mapWriter); } else { - logger.error("Using fileStream-based fast merge"); + logger.debug("Using fileStream-based fast merge"); partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, null); } } else { - logger.error("Using slow merge"); + logger.debug("Using slow merge"); partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, compressionCodec); } // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has @@ -363,10 +358,9 @@ private long[] mergeSpillsWithFileStream( final int numPartitions = partitioner.numPartitions(); final long[] partitionLengths = new long[numPartitions]; final InputStream[] spillInputStreams = new InputStream[spills.length]; - logger.error("Num partitions: " + numPartitions); - logger.error("Num spills: " + spills.length); - logger.error("compression: " + compressionCodec); + boolean threwException = true; + try { for (int i = 0; i < spills.length; i++) { spillInputStreams[i] = new NioBufferedFileInputStream( @@ -374,26 +368,22 @@ private long[] mergeSpillsWithFileStream( inputBufferSizeInBytes); } for (int partition = 0; partition < numPartitions; partition++) { - logger.error("In partition: " + partition); boolean copyThrewExecption = true; ShufflePartitionWriter writer = null; try { writer = mapWriter.getNextPartitionWriter(); OutputStream partitionOutput = null; try { + // Shield the underlying output stream from close() calls, so that we can close the + // higher level streams to make sure all data is really flushed and internal state + // is cleaned partitionOutput = new CloseShieldOutputStream(writer.toStream()); - // Here, we don't need to perform any metrics updates when spills.length < 2 because the - // bytes written to this output file would have already been counted as shuffle bytes written. - if (spills.length >= 2) { - partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput); - } partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); if (compressionCodec != null) { partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); } for (int i = 0; i < spills.length; i++) { final long partitionLengthInSpill = spills[i].partitionLengths[partition]; - logger.error("PartitionLengthsInSpill: " + partitionLengthInSpill); if (partitionLengthInSpill > 0) { InputStream partitionInputStream = null; @@ -417,21 +407,17 @@ private long[] mergeSpillsWithFileStream( Closeables.close(partitionOutput, copyThrewExecption); } } finally { - logger.error("Closing the writer"); Closeables.close(writer, copyThrewExecption); } long numBytesWritten = writer.getNumBytesWritten(); - logger.error("NumBytesWritten: " + numBytesWritten); partitionLengths[partition] = numBytesWritten; writeMetrics.incBytesWritten(numBytesWritten); } - logger.error("Finish writing"); threwException = false; } finally { // To avoid masking exceptions that caused us to prematurely enter the finally block, only // throw exceptions during cleanup if threwException == false. for (InputStream stream : spillInputStreams) { - logger.error("Close input channel"); Closeables.close(stream, threwException); } } @@ -454,15 +440,14 @@ private long[] mergeSpillsWithTransferTo( final long[] partitionLengths = new long[numPartitions]; final FileChannel[] spillInputChannels = new FileChannel[spills.length]; final long[] spillInputChannelPositions = new long[spills.length]; - logger.error("Num partitions: " + numPartitions); - logger.error("Num spills: " + spills.length); + boolean threwException = true; + try { for (int i = 0; i < spills.length; i++) { spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel(); } for (int partition = 0; partition < numPartitions; partition++) { - logger.error("In partition: " + partition); boolean copyThrewExecption = true; ShufflePartitionWriter writer = null; try { @@ -470,11 +455,9 @@ private long[] mergeSpillsWithTransferTo( WritableByteChannel channel = writer.toChannel(); for (int i = 0; i < spills.length; i++) { long partitionLengthInSpill = 0L; - logger.error("In spill: " + i); partitionLengthInSpill += spills[i].partitionLengths[partition]; final FileChannel spillInputChannel = spillInputChannels[i]; final long writeStartTime = System.nanoTime(); - logger.error("InputChannelPositions: "+ spillInputChannelPositions[i]); Utils.copyFileStreamNIO( spillInputChannel, channel, @@ -483,26 +466,21 @@ private long[] mergeSpillsWithTransferTo( copyThrewExecption = false; spillInputChannelPositions[i] += partitionLengthInSpill; writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); - logger.error("PartitionLengthsInSpill: " + partitionLengthInSpill); } } catch (IOException e) { throw e; } finally { - logger.error("Closing the writer"); Closeables.close(writer, copyThrewExecption); } long numBytes = writer.getNumBytesWritten(); - logger.error("NumBytes: " + numBytes); partitionLengths[partition] = numBytes; writeMetrics.incBytesWritten(numBytes); } - logger.error("Finish writing"); threwException = false; } finally { // To avoid masking exceptions that caused us to prematurely enter the finally block, only // throw exceptions during cleanup if threwException == false. for (int i = 0; i < spills.length; i++) { - logger.error("Close input channel: " + i); assert(spillInputChannelPositions[i] == spills[i].file.length()); Closeables.close(spillInputChannels[i], threwException); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java index c12a19cf30921..0d77223792c1e 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java @@ -80,10 +80,7 @@ public DefaultShuffleMapOutputWriter( public ShufflePartitionWriter getNextPartitionWriter() throws IOException { if (outputTempFile == null) { outputTempFile = Utils.tempFileWith(outputFile); - log.error("I create an outputTempFile at: " + outputTempFile.getAbsolutePath()); } - log.error("I created a new partition writer at: " + currPartitionId); - log.error("Do I have an outputTempFile: " + outputTempFile.exists()); if (outputFileChannel != null) { currChannelPosition = outputFileChannel.position(); } else { @@ -94,9 +91,7 @@ public ShufflePartitionWriter getNextPartitionWriter() throws IOException { @Override public void commitAllPartitions() throws IOException { - log.error("Do I have an outputTempFile before cleanup: " + outputTempFile.exists()); cleanUp(); - log.error("Do I have an outputTempFile after cleanup: " + outputTempFile.exists()); blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, outputTempFile); } @@ -113,19 +108,15 @@ public void abort(Throwable error) { } private void cleanUp() throws IOException { - log.error("In clenanup"); if (outputBufferedFileStream != null) { outputBufferedFileStream.close(); } - log.error("In BufferedFiledStream"); if (outputFileChannel != null) { outputFileChannel.close(); } - log.error("In FileChannel"); if (outputFileStream != null) { outputFileStream.close(); } - log.error("In InFileStream"); } private void initStream() throws IOException { @@ -199,7 +190,6 @@ public long getNumBytesWritten() { @Override public void close() { - log.error("Do I have an outputTempFile: " + outputTempFile.exists()); if (stream != null) { // Closing is a no-op. stream.close(); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 627a2278e6415..69533045d48f6 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -36,7 +36,6 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.apache.spark.api.shuffle.ShuffleWriteSupport; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.io.CompressionCodec$; @@ -64,6 +63,7 @@ public class UnsafeShuffleWriterSuite { + static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; static final int NUM_PARTITITONS = 4; TestMemoryManager memoryManager; TaskMemoryManager taskMemoryManager; @@ -452,10 +452,10 @@ public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOn() thro } private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception { - memoryManager.limit(UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE * 16); + memoryManager.limit(DEFAULT_INITIAL_SORT_BUFFER_SIZE * 16); final UnsafeShuffleWriter writer = createWriter(false); final ArrayList> dataToWrite = new ArrayList<>(); - for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) { + for (int i = 0; i < DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) { dataToWrite.add(new Tuple2<>(i, i)); } writer.write(dataToWrite.iterator()); From 6ebb5d81e1a23f51b7a492ae00e867ec6119103b Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 8 Apr 2019 18:22:22 -0700 Subject: [PATCH 07/13] style --- .../spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 69533045d48f6..8893eab6d7137 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -21,7 +21,6 @@ import java.nio.ByteBuffer; import java.util.*; -import org.apache.spark.*; import scala.Option; import scala.Product2; import scala.Tuple2; @@ -36,6 +35,12 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.apache.spark.HashPartitioner; +import org.apache.spark.ShuffleDependency; +import org.apache.spark.SparkConf; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; +import org.apache.spark.TaskContextImpl; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.io.CompressionCodec$; From c808ba3a5bb3bea20d926895720e0a4bf0d94a40 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 8 Apr 2019 18:25:14 -0700 Subject: [PATCH 08/13] further style --- .../shuffle/sort/io/DefaultShuffleMapOutputWriter.java | 2 ++ core/src/main/scala/org/apache/spark/util/Utils.scala | 6 ------ .../apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 2 -- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java index 0d77223792c1e..174b5106a313d 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java @@ -111,9 +111,11 @@ private void cleanUp() throws IOException { if (outputBufferedFileStream != null) { outputBufferedFileStream.close(); } + if (outputFileChannel != null) { outputFileChannel.close(); } + if (outputFileStream != null) { outputFileStream.close(); } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 414f7070de5a8..a9f3c73e90ab8 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -340,10 +340,6 @@ private[spark] object Utils extends Logging { output: WritableByteChannel, startPosition: Long, bytesToCopy: Long): Unit = { - log.error("Entering") - log.error(s"Input open: ${input.isOpen}") - log.error(s"Output open: ${output.isOpen}") - log.error(s"start position: $startPosition with $bytesToCopy bytes to copy") val outputInitialState = output match { case outputFileChannel: FileChannel => Some((outputFileChannel.position(), outputFileChannel)) @@ -354,7 +350,6 @@ private[spark] object Utils extends Logging { while (count < bytesToCopy) { count += input.transferTo(count + startPosition, bytesToCopy - count, output) } - log.error(s"Wrote $count when needed to write $bytesToCopy") assert(count == bytesToCopy, s"request to copy $bytesToCopy bytes, but actually copied $count bytes.") @@ -375,7 +370,6 @@ private[spark] object Utils extends Logging { |You can set spark.file.transferTo = false to disable this NIO feature. """.stripMargin) } - log.error(s"Done with $count") } /** diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 8893eab6d7137..84afd803d40cc 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -87,8 +87,6 @@ public class UnsafeShuffleWriterSuite { @Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext; @Mock(answer = RETURNS_SMART_NULLS) ShuffleDependency shuffleDep; - - @After public void tearDown() { TaskContext$.MODULE$.unset(); From f74e53efe2970efee08d49641696587e1972e753 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 15 Apr 2019 15:05:14 -0700 Subject: [PATCH 09/13] resolve comments --- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 4 ---- .../sort/io/DefaultShuffleMapOutputWriter.java | 15 +++++++++++++-- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 5 +---- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 18bc9692e2ccd..a03c7a2709823 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -360,7 +360,6 @@ private long[] mergeSpillsWithFileStream( final InputStream[] spillInputStreams = new InputStream[spills.length]; boolean threwException = true; - try { for (int i = 0; i < spills.length; i++) { spillInputStreams[i] = new NioBufferedFileInputStream( @@ -442,7 +441,6 @@ private long[] mergeSpillsWithTransferTo( final long[] spillInputChannelPositions = new long[spills.length]; boolean threwException = true; - try { for (int i = 0; i < spills.length; i++) { spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel(); @@ -467,8 +465,6 @@ private long[] mergeSpillsWithTransferTo( spillInputChannelPositions[i] += partitionLengthInSpill; writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); } - } catch (IOException e) { - throw e; } finally { Closeables.close(writer, copyThrewExecption); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java index 174b5106a313d..7ee44fb0657e6 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java @@ -93,6 +93,19 @@ public ShufflePartitionWriter getNextPartitionWriter() throws IOException { public void commitAllPartitions() throws IOException { cleanUp(); blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, outputTempFile); + if (!outputFile.exists()) { + if (!outputFile.getParentFile().isDirectory() && !outputFile.getParentFile().mkdirs()) { + throw new IOException( + String.format( + "Failed to create shuffle file directory at %s.", + outputFile.getParentFile().getAbsolutePath())); + } + if (!outputFile.isFile() && !outputFile.createNewFile()) { + throw new IOException( + String.format( + "Failed to create empty shuffle file at %s.", outputFile.getAbsolutePath())); + } + } } @Override @@ -111,11 +124,9 @@ private void cleanUp() throws IOException { if (outputBufferedFileStream != null) { outputBufferedFileStream.close(); } - if (outputFileChannel != null) { outputFileChannel.close(); } - if (outputFileStream != null) { outputFileStream.close(); } diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 84afd803d40cc..49e2d831a1435 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -40,7 +40,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; -import org.apache.spark.TaskContextImpl; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.io.CompressionCodec$; @@ -158,9 +157,7 @@ public void setUp() throws IOException { when(shuffleDep.partitioner()).thenReturn(hashPartitioner); when(taskContext.taskMemoryManager()).thenReturn(taskMemoryManager); - Random rand = new Random(); - TaskContext$.MODULE$.setTaskContext(new TaskContextImpl( - 0, 0, 0, rand.nextInt(10000), 0, taskMemoryManager, new Properties(), null, taskMetrics)); + TaskContext$.MODULE$.setTaskContext(taskContext); } private UnsafeShuffleWriter createWriter( From f6dfff3047ea4e65fa6fa337c310f4f676845411 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 15 Apr 2019 15:38:27 -0700 Subject: [PATCH 10/13] remove unecessary file creation --- .../sort/io/DefaultShuffleMapOutputWriter.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java index 7ee44fb0657e6..0d77223792c1e 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java @@ -93,19 +93,6 @@ public ShufflePartitionWriter getNextPartitionWriter() throws IOException { public void commitAllPartitions() throws IOException { cleanUp(); blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, outputTempFile); - if (!outputFile.exists()) { - if (!outputFile.getParentFile().isDirectory() && !outputFile.getParentFile().mkdirs()) { - throw new IOException( - String.format( - "Failed to create shuffle file directory at %s.", - outputFile.getParentFile().getAbsolutePath())); - } - if (!outputFile.isFile() && !outputFile.createNewFile()) { - throw new IOException( - String.format( - "Failed to create empty shuffle file at %s.", outputFile.getAbsolutePath())); - } - } } @Override From c5fffd693462554335c272cc913be5669ebbd910 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 15 Apr 2019 15:46:29 -0700 Subject: [PATCH 11/13] remove .toStream --- .../java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index a03c7a2709823..0f1af55bda29b 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -289,7 +289,6 @@ private long[] mergeSpills(SpillInfo[] spills, ShufflePartitionWriter writer = null; try { writer = mapWriter.getNextPartitionWriter(); - writer.toStream(); } finally { if (writer != null) { writer.close(); From 66f3d98303f6b65503c68b2cabb9783fd1b54e6b Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 15 Apr 2019 16:30:17 -0700 Subject: [PATCH 12/13] hot fix to remove log statement --- .../java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 0f1af55bda29b..6a9fd5a0a5298 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -283,7 +283,6 @@ private long[] mergeSpills(SpillInfo[] spills, final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled(); final int numPartitions = partitioner.numPartitions(); long[] partitionLengths = new long[numPartitions]; - logger.error(mapWriter.toString()); try { if (spills.length == 0) { ShufflePartitionWriter writer = null; From 6267e1b8b083e9185e551cea30385adbde46b9ce Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 15 Apr 2019 17:16:08 -0700 Subject: [PATCH 13/13] fix contract --- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 6a9fd5a0a5298..d7a6d6450ebc0 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -285,12 +285,16 @@ private long[] mergeSpills(SpillInfo[] spills, long[] partitionLengths = new long[numPartitions]; try { if (spills.length == 0) { - ShufflePartitionWriter writer = null; - try { - writer = mapWriter.getNextPartitionWriter(); - } finally { - if (writer != null) { - writer.close(); + // The contract we are working under states that we will open a partition writer for + // each partition, regardless of number of spills + for (int i = 0; i < numPartitions; i++) { + ShufflePartitionWriter writer = null; + try { + writer = mapWriter.getNextPartitionWriter(); + } finally { + if (writer != null) { + writer.close(); + } } } return partitionLengths;