diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 9eddfc924404e..97f34bf460495 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -548,6 +548,10 @@ private CommittedPartition[] mergeSpillsWithPluggableWriter( ShufflePartitionWriter writer = mapOutputWriter.newPartitionWriter(partition); try { try (OutputStream partitionOutput = writer.openPartitionStream()) { + OutputStream partitionOutputStream = partitionOutput; + if (compressionCodec != null) { + partitionOutputStream = compressionCodec.compressedOutputStream(partitionOutput); + } for (int i = 0; i < spills.length; i++) { final long partitionLengthInSpill = spills[i].partitionLengths[partition]; if (partitionLengthInSpill > 0) { @@ -560,7 +564,7 @@ private CommittedPartition[] mergeSpillsWithPluggableWriter( partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream); } - Utils.copyStream(partitionInputStream, partitionOutput, false, false); + Utils.copyStream(partitionInputStream, partitionOutputStream, false, false); } finally { partitionInputStream.close(); } @@ -621,7 +625,11 @@ private CommittedPartition[] writeSingleSpillFileUsingPluggableWriter( partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream); } try (OutputStream partitionOutput = writer.openPartitionStream()) { - Utils.copyStream(partitionInputStream, partitionOutput, false, false); + OutputStream partitionOutputStream = partitionOutput; + if (compressionCodec != null) { + partitionOutputStream = compressionCodec.compressedOutputStream(partitionOutput); + } + Utils.copyStream(partitionInputStream, partitionOutputStream, false, false); } } catch (Exception e) { try { @@ -637,6 +645,7 @@ private CommittedPartition[] writeSingleSpillFileUsingPluggableWriter( writeMetrics.incBytesWritten(committedPartitions[partition].length()); } threwException = false; + mapOutputWriter.commitAllPartitions(); } catch (Exception e) { try { mapOutputWriter.abort(e);