Skip to content

Commit 401d650

Browse files
committed
Revert "[SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths"
This reverts commit 5fd0294.
1 parent 66a7d6b commit 401d650

File tree

8 files changed

+37
-49
lines changed

8 files changed

+37
-49
lines changed

common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818
package org.apache.spark.network.buffer;
1919

2020
import java.io.File;
21+
import java.io.FileInputStream;
2122
import java.io.IOException;
2223
import java.io.InputStream;
2324
import java.io.RandomAccessFile;
2425
import java.nio.ByteBuffer;
2526
import java.nio.channels.FileChannel;
26-
import java.nio.file.Files;
27-
import java.nio.file.StandardOpenOption;
2827

2928
import com.google.common.base.Objects;
3029
import com.google.common.io.ByteStreams;
@@ -94,9 +93,9 @@ public ByteBuffer nioByteBuffer() throws IOException {
9493

9594
@Override
9695
public InputStream createInputStream() throws IOException {
97-
InputStream is = null;
96+
FileInputStream is = null;
9897
try {
99-
is = Files.newInputStream(file.toPath());
98+
is = new FileInputStream(file);
10099
ByteStreams.skipFully(is, offset);
101100
return new LimitedInputStream(is, length);
102101
} catch (IOException e) {
@@ -133,7 +132,7 @@ public Object convertToNetty() throws IOException {
133132
if (conf.lazyFileDescriptor()) {
134133
return new DefaultFileRegion(file, offset, length);
135134
} else {
136-
FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
135+
FileChannel fileChannel = new FileInputStream(file).getChannel();
137136
return new DefaultFileRegion(fileChannel, offset, length);
138137
}
139138
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
package org.apache.spark.network.shuffle;
1919

2020
import java.io.File;
21+
import java.io.FileOutputStream;
2122
import java.io.IOException;
2223
import java.nio.ByteBuffer;
2324
import java.nio.channels.Channels;
2425
import java.nio.channels.WritableByteChannel;
25-
import java.nio.file.Files;
2626
import java.util.Arrays;
2727

2828
import org.slf4j.Logger;
@@ -165,7 +165,7 @@ private class DownloadCallback implements StreamCallback {
165165

166166
DownloadCallback(int chunkIndex) throws IOException {
167167
this.targetFile = tempFileManager.createTempFile();
168-
this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath()));
168+
this.channel = Channels.newChannel(new FileOutputStream(targetFile));
169169
this.chunkIndex = chunkIndex;
170170
}
171171

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919

2020
import java.io.DataInputStream;
2121
import java.io.File;
22+
import java.io.FileInputStream;
2223
import java.io.IOException;
2324
import java.nio.ByteBuffer;
2425
import java.nio.LongBuffer;
25-
import java.nio.file.Files;
2626

2727
/**
2828
* Keeps the index information for a particular map output
@@ -39,7 +39,7 @@ public ShuffleIndexInformation(File indexFile) throws IOException {
3939
offsets = buffer.asLongBuffer();
4040
DataInputStream dis = null;
4141
try {
42-
dis = new DataInputStream(Files.newInputStream(indexFile.toPath()));
42+
dis = new DataInputStream(new FileInputStream(indexFile));
4343
dis.readFully(buffer.array());
4444
} finally {
4545
if (dis != null) {

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
package org.apache.spark.shuffle.sort;
1919

2020
import java.io.File;
21+
import java.io.FileInputStream;
22+
import java.io.FileOutputStream;
2123
import java.io.IOException;
22-
import java.nio.channels.FileChannel;
23-
import static java.nio.file.StandardOpenOption.*;
2424
import javax.annotation.Nullable;
2525

2626
import scala.None$;
@@ -75,6 +75,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
7575
private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);
7676

7777
private final int fileBufferSize;
78+
private final boolean transferToEnabled;
7879
private final int numPartitions;
7980
private final BlockManager blockManager;
8081
private final Partitioner partitioner;
@@ -106,6 +107,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
106107
SparkConf conf) {
107108
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
108109
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
110+
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
109111
this.blockManager = blockManager;
110112
final ShuffleDependency<K, V, V> dep = handle.dependency();
111113
this.mapId = mapId;
@@ -186,21 +188,17 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
186188
return lengths;
187189
}
188190

189-
// This file needs to opened in append mode in order to work around a Linux kernel bug that
190-
// affects transferTo; see SPARK-3948 for more details.
191-
final FileChannel out = FileChannel.open(outputFile.toPath(), WRITE, APPEND, CREATE);
191+
final FileOutputStream out = new FileOutputStream(outputFile, true);
192192
final long writeStartTime = System.nanoTime();
193193
boolean threwException = true;
194194
try {
195195
for (int i = 0; i < numPartitions; i++) {
196196
final File file = partitionWriterSegments[i].file();
197197
if (file.exists()) {
198-
final FileChannel in = FileChannel.open(file.toPath(), READ);
198+
final FileInputStream in = new FileInputStream(file);
199199
boolean copyThrewException = true;
200200
try {
201-
long size = in.size();
202-
Utils.copyFileStreamNIO(in, out, 0, size);
203-
lengths[i] = size;
201+
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
204202
copyThrewException = false;
205203
} finally {
206204
Closeables.close(in, copyThrewException);

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import javax.annotation.Nullable;
2121
import java.io.*;
2222
import java.nio.channels.FileChannel;
23-
import static java.nio.file.StandardOpenOption.*;
2423
import java.util.Iterator;
2524

2625
import scala.Option;
@@ -291,7 +290,7 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti
291290
final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
292291
try {
293292
if (spills.length == 0) {
294-
java.nio.file.Files.newOutputStream(outputFile.toPath()).close(); // Create an empty file
293+
new FileOutputStream(outputFile).close(); // Create an empty file
295294
return new long[partitioner.numPartitions()];
296295
} else if (spills.length == 1) {
297296
// Here, we don't need to perform any metrics updates because the bytes written to this
@@ -368,7 +367,7 @@ private long[] mergeSpillsWithFileStream(
368367
final InputStream[] spillInputStreams = new InputStream[spills.length];
369368

370369
final OutputStream bos = new BufferedOutputStream(
371-
java.nio.file.Files.newOutputStream(outputFile.toPath()),
370+
new FileOutputStream(outputFile),
372371
outputBufferSizeInBytes);
373372
// Use a counting output stream to avoid having to close the underlying file and ask
374373
// the file system for its size after each partition is written.
@@ -443,11 +442,11 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
443442
boolean threwException = true;
444443
try {
445444
for (int i = 0; i < spills.length; i++) {
446-
spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), READ);
445+
spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
447446
}
448447
// This file needs to opened in append mode in order to work around a Linux kernel bug that
449448
// affects transferTo; see SPARK-3948 for more details.
450-
mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), WRITE, CREATE, APPEND);
449+
mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel();
451450

452451
long bytesWrittenToMergedFile = 0;
453452
for (int partition = 0; partition < numPartitions; partition++) {

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.shuffle
1919

2020
import java.io._
21-
import java.nio.file.Files
2221

2322
import com.google.common.io.ByteStreams
2423

@@ -142,8 +141,7 @@ private[spark] class IndexShuffleBlockResolver(
142141
val indexFile = getIndexFile(shuffleId, mapId)
143142
val indexTmp = Utils.tempFileWith(indexFile)
144143
try {
145-
val out = new DataOutputStream(
146-
new BufferedOutputStream(Files.newOutputStream(indexTmp.toPath)))
144+
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
147145
Utils.tryWithSafeFinally {
148146
// We take in lengths of each block, need to convert it to offsets.
149147
var offset = 0L
@@ -198,7 +196,7 @@ private[spark] class IndexShuffleBlockResolver(
198196
// find out the consolidated file, then the offset within that from our index
199197
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
200198

201-
val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
199+
val in = new DataInputStream(new FileInputStream(indexFile))
202200
try {
203201
ByteStreams.skipFully(in, blockId.reduceId * 8)
204202
val offset = in.readLong()

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
package org.apache.spark.util.collection
1919

2020
import java.io._
21-
import java.nio.channels.{Channels, FileChannel}
22-
import java.nio.file.StandardOpenOption
2321
import java.util.Comparator
2422

2523
import scala.collection.BufferedIterator
@@ -461,7 +459,7 @@ class ExternalAppendOnlyMap[K, V, C](
461459
)
462460

463461
private var batchIndex = 0 // Which batch we're in
464-
private var fileChannel: FileChannel = null
462+
private var fileStream: FileInputStream = null
465463

466464
// An intermediate stream that reads from exactly one batch
467465
// This guards against pre-fetching and other arbitrary behavior of higher level streams
@@ -478,23 +476,22 @@ class ExternalAppendOnlyMap[K, V, C](
478476
if (batchIndex < batchOffsets.length - 1) {
479477
if (deserializeStream != null) {
480478
deserializeStream.close()
481-
fileChannel.close()
479+
fileStream.close()
482480
deserializeStream = null
483-
fileChannel = null
481+
fileStream = null
484482
}
485483

486484
val start = batchOffsets(batchIndex)
487-
fileChannel = FileChannel.open(file.toPath, StandardOpenOption.READ)
488-
fileChannel.position(start)
485+
fileStream = new FileInputStream(file)
486+
fileStream.getChannel.position(start)
489487
batchIndex += 1
490488

491489
val end = batchOffsets(batchIndex)
492490

493491
assert(end >= start, "start = " + start + ", end = " + end +
494492
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
495493

496-
val bufferedStream = new BufferedInputStream(
497-
ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
494+
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
498495
val wrappedStream = serializerManager.wrapStream(blockId, bufferedStream)
499496
ser.deserializeStream(wrappedStream)
500497
} else {
@@ -554,9 +551,9 @@ class ExternalAppendOnlyMap[K, V, C](
554551
ds.close()
555552
deserializeStream = null
556553
}
557-
if (fileChannel != null) {
558-
fileChannel.close()
559-
fileChannel = null
554+
if (fileStream != null) {
555+
fileStream.close()
556+
fileStream = null
560557
}
561558
if (file.exists()) {
562559
if (!file.delete()) {

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
package org.apache.spark.util.collection
1919

2020
import java.io._
21-
import java.nio.channels.{Channels, FileChannel}
22-
import java.nio.file.StandardOpenOption
2321
import java.util.Comparator
2422

2523
import scala.collection.mutable
@@ -494,7 +492,7 @@ private[spark] class ExternalSorter[K, V, C](
494492

495493
// Intermediate file and deserializer streams that read from exactly one batch
496494
// This guards against pre-fetching and other arbitrary behavior of higher level streams
497-
var fileChannel: FileChannel = null
495+
var fileStream: FileInputStream = null
498496
var deserializeStream = nextBatchStream() // Also sets fileStream
499497

500498
var nextItem: (K, C) = null
@@ -507,23 +505,22 @@ private[spark] class ExternalSorter[K, V, C](
507505
if (batchId < batchOffsets.length - 1) {
508506
if (deserializeStream != null) {
509507
deserializeStream.close()
510-
fileChannel.close()
508+
fileStream.close()
511509
deserializeStream = null
512-
fileChannel = null
510+
fileStream = null
513511
}
514512

515513
val start = batchOffsets(batchId)
516-
fileChannel = FileChannel.open(spill.file.toPath, StandardOpenOption.READ)
517-
fileChannel.position(start)
514+
fileStream = new FileInputStream(spill.file)
515+
fileStream.getChannel.position(start)
518516
batchId += 1
519517

520518
val end = batchOffsets(batchId)
521519

522520
assert(end >= start, "start = " + start + ", end = " + end +
523521
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
524522

525-
val bufferedStream = new BufferedInputStream(
526-
ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
523+
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
527524

528525
val wrappedStream = serializerManager.wrapStream(spill.blockId, bufferedStream)
529526
serInstance.deserializeStream(wrappedStream)
@@ -613,7 +610,7 @@ private[spark] class ExternalSorter[K, V, C](
613610
batchId = batchOffsets.length // Prevent reading any other batch
614611
val ds = deserializeStream
615612
deserializeStream = null
616-
fileChannel = null
613+
fileStream = null
617614
if (ds != null) {
618615
ds.close()
619616
}

0 commit comments

Comments
 (0)