From b9dad5ac976261359623fafbbfa9389310272238 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 19 Jul 2017 11:30:15 -0700 Subject: [PATCH 1/3] Use NIO's file API to replace FileInputStream/FileOutputStream Change-Id: I0f11b9e0cbe62ca3d0bac7bfe0e2df838da80b48 --- .../buffer/FileSegmentManagedBuffer.java | 11 ++++++---- .../shuffle/OneForOneBlockFetcher.java | 4 ++-- .../shuffle/ShuffleIndexInformation.java | 4 ++-- .../sort/BypassMergeSortShuffleWriter.java | 14 +++++++----- .../shuffle/sort/UnsafeShuffleWriter.java | 11 ++++++---- .../shuffle/IndexShuffleBlockResolver.scala | 6 +++-- .../collection/ExternalAppendOnlyMap.scala | 22 +++++++++++-------- .../util/collection/ExternalSorter.scala | 18 +++++++++------ 8 files changed, 55 insertions(+), 35 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index c20fab83c346..9dc935a43f4b 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -18,14 +18,16 @@ package org.apache.spark.network.buffer; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; import com.google.common.base.Objects; +import com.google.common.collect.ImmutableSet; import com.google.common.io.ByteStreams; import io.netty.channel.DefaultFileRegion; @@ -93,9 +95,9 @@ public ByteBuffer nioByteBuffer() throws IOException { @Override public InputStream createInputStream() throws IOException { - FileInputStream is = null; + InputStream is = null; try { - is = new FileInputStream(file); + is = Files.newInputStream(file.toPath()); ByteStreams.skipFully(is, offset); return new LimitedInputStream(is, length); } catch (IOException e) { @@ -132,7 +134,8 @@ public Object convertToNetty() throws IOException { if (conf.lazyFileDescriptor()) { return new DefaultFileRegion(file, offset, length); } else { - FileChannel fileChannel = new FileInputStream(file).getChannel(); + FileChannel fileChannel = FileChannel.open(file.toPath(), + ImmutableSet.of(StandardOpenOption.READ)); return new DefaultFileRegion(fileChannel, offset, length); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java index 2f160d12af22..66b67e282c80 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java @@ -18,11 +18,11 @@ package org.apache.spark.network.shuffle; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.nio.file.Files; import java.util.Arrays; import org.slf4j.Logger; @@ -165,7 +165,7 @@ private class DownloadCallback implements StreamCallback { DownloadCallback(int chunkIndex) throws IOException { this.targetFile = tempShuffleFileManager.createTempShuffleFile(); - this.channel = Channels.newChannel(new FileOutputStream(targetFile)); + this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath())); this.chunkIndex = chunkIndex; } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java index ec57f0259d55..39ca9ba57485 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java @@ -19,10 +19,10 @@ import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.LongBuffer; +import java.nio.file.Files; /** * Keeps the index information for a particular map output @@ -38,7 +38,7 @@ public ShuffleIndexInformation(File indexFile) throws IOException { offsets = buffer.asLongBuffer(); DataInputStream dis = null; try { - dis = new DataInputStream(new FileInputStream(indexFile)); + dis = new DataInputStream(Files.newInputStream(indexFile.toPath())); dis.readFully(buffer.array()); } finally { if (dis != null) { diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 323a5d3c5283..a84bfe19eb0b 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -18,9 +18,9 @@ package org.apache.spark.shuffle.sort; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; +import java.nio.channels.FileChannel; +import static java.nio.file.StandardOpenOption.*; import javax.annotation.Nullable; import scala.None$; @@ -30,6 +30,7 @@ import scala.collection.Iterator; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import com.google.common.io.Closeables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -188,17 +189,20 @@ private long[] writePartitionedFile(File outputFile) throws IOException { return lengths; } - final FileOutputStream out = new FileOutputStream(outputFile, true); + final FileChannel out = FileChannel.open(outputFile.toPath(), + ImmutableSet.of(WRITE, APPEND, CREATE)); final long writeStartTime = System.nanoTime(); boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { final File file = partitionWriterSegments[i].file(); if (file.exists()) { - final FileInputStream in = new FileInputStream(file); + final FileChannel in = FileChannel.open(file.toPath(), ImmutableSet.of(READ)); boolean copyThrewException = true; try { - lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); + final long size = in.size(); + Utils.copyFileStreamNIO(in, out, 0, size); + lengths[i] = size; copyThrewException = false; } finally { Closeables.close(in, copyThrewException); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 1b578491b81d..5acffde874b4 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -20,6 +20,7 @@ import javax.annotation.Nullable; import java.io.*; import java.nio.channels.FileChannel; +import static java.nio.file.StandardOpenOption.*; import java.util.Iterator; import scala.Option; @@ -29,6 +30,7 @@ import scala.reflect.ClassTag$; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; import com.google.common.io.Files; @@ -290,7 +292,7 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled(); try { if (spills.length == 0) { - new FileOutputStream(outputFile).close(); // Create an empty file + java.nio.file.Files.newOutputStream(outputFile.toPath()).close(); // Create an empty file return new long[partitioner.numPartitions()]; } else if (spills.length == 1) { // Here, we don't need to perform any metrics updates because the bytes written to this @@ -367,7 +369,7 @@ private long[] mergeSpillsWithFileStream( final InputStream[] spillInputStreams = new InputStream[spills.length]; final OutputStream bos = new BufferedOutputStream( - new FileOutputStream(outputFile), + java.nio.file.Files.newOutputStream(outputFile.toPath()), outputBufferSizeInBytes); // Use a counting output stream to avoid having to close the underlying file and ask // the file system for its size after each partition is written. @@ -442,11 +444,12 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th boolean threwException = true; try { for (int i = 0; i < spills.length; i++) { - spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel(); + spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), ImmutableSet.of(READ)); } // This file needs to opened in append mode in order to work around a Linux kernel bug that // affects transferTo; see SPARK-3948 for more details. - mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel(); + mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), + ImmutableSet.of(WRITE, CREATE, APPEND)); long bytesWrittenToMergedFile = 0; for (int partition = 0; partition < numPartitions; partition++) { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 15540485170d..94a3a78e9416 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -18,6 +18,7 @@ package org.apache.spark.shuffle import java.io._ +import java.nio.file.Files import com.google.common.io.ByteStreams @@ -141,7 +142,8 @@ private[spark] class IndexShuffleBlockResolver( val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) try { - val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) + val out = new DataOutputStream( + new BufferedOutputStream(Files.newOutputStream(indexTmp.toPath))) Utils.tryWithSafeFinally { // We take in lengths of each block, need to convert it to offsets. var offset = 0L @@ -196,7 +198,7 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) - val in = new DataInputStream(new FileInputStream(indexFile)) + val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 8aafda5e45d5..c5178571efbd 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -18,12 +18,15 @@ package org.apache.spark.util.collection import java.io._ +import java.nio.channels.{Channels, FileChannel} +import java.nio.file.StandardOpenOption import java.util.Comparator import scala.collection.BufferedIterator import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import com.google.common.collect.ImmutableSet import com.google.common.io.ByteStreams import org.apache.spark.{SparkEnv, TaskContext} @@ -460,7 +463,7 @@ class ExternalAppendOnlyMap[K, V, C]( ) private var batchIndex = 0 // Which batch we're in - private var fileStream: FileInputStream = null + private var fileChannel: FileChannel = null // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams @@ -477,14 +480,14 @@ class ExternalAppendOnlyMap[K, V, C]( if (batchIndex < batchOffsets.length - 1) { if (deserializeStream != null) { deserializeStream.close() - fileStream.close() + fileChannel.close() deserializeStream = null - fileStream = null + fileChannel = null } val start = batchOffsets(batchIndex) - fileStream = new FileInputStream(file) - fileStream.getChannel.position(start) + fileChannel = FileChannel.open(file.toPath, ImmutableSet.of(StandardOpenOption.READ)) + fileChannel.position(start) batchIndex += 1 val end = batchOffsets(batchIndex) @@ -492,7 +495,8 @@ class ExternalAppendOnlyMap[K, V, C]( assert(end >= start, "start = " + start + ", end = " + end + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) - val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) + val bufferedStream = new BufferedInputStream( + ByteStreams.limit(Channels.newInputStream(fileChannel), end - start)) val wrappedStream = serializerManager.wrapStream(blockId, bufferedStream) ser.deserializeStream(wrappedStream) } else { @@ -552,9 +556,9 @@ class ExternalAppendOnlyMap[K, V, C]( ds.close() deserializeStream = null } - if (fileStream != null) { - fileStream.close() - fileStream = null + if (fileChannel != null) { + fileChannel.close() + fileChannel = null } if (file.exists()) { if (!file.delete()) { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 176f84fa2a0d..ee9e53f017bd 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -18,11 +18,14 @@ package org.apache.spark.util.collection import java.io._ +import java.nio.channels.{Channels, FileChannel} +import java.nio.file.StandardOpenOption import java.util.Comparator import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import com.google.common.collect.ImmutableSet import com.google.common.io.ByteStreams import org.apache.spark._ @@ -492,7 +495,7 @@ private[spark] class ExternalSorter[K, V, C]( // Intermediate file and deserializer streams that read from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams - var fileStream: FileInputStream = null + var fileChannel: FileChannel = null var deserializeStream = nextBatchStream() // Also sets fileStream var nextItem: (K, C) = null @@ -505,14 +508,14 @@ private[spark] class ExternalSorter[K, V, C]( if (batchId < batchOffsets.length - 1) { if (deserializeStream != null) { deserializeStream.close() - fileStream.close() + fileChannel.close() deserializeStream = null - fileStream = null + fileChannel = null } val start = batchOffsets(batchId) - fileStream = new FileInputStream(spill.file) - fileStream.getChannel.position(start) + fileChannel = FileChannel.open(spill.file.toPath, ImmutableSet.of(StandardOpenOption.READ)) + fileChannel.position(start) batchId += 1 val end = batchOffsets(batchId) @@ -520,7 +523,8 @@ private[spark] class ExternalSorter[K, V, C]( assert(end >= start, "start = " + start + ", end = " + end + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) - val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) + val bufferedStream = new BufferedInputStream( + ByteStreams.limit(Channels.newInputStream(fileChannel), end - start)) val wrappedStream = serializerManager.wrapStream(spill.blockId, bufferedStream) serInstance.deserializeStream(wrappedStream) @@ -610,7 +614,7 @@ private[spark] class ExternalSorter[K, V, C]( batchId = batchOffsets.length // Prevent reading any other batch val ds = deserializeStream deserializeStream = null - fileStream = null + fileChannel = null if (ds != null) { ds.close() } From f2d534a1693c31138b464ed1094dc05888cdc3d0 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 19 Jul 2017 13:10:39 -0700 Subject: [PATCH 2/3] Style changes Change-Id: I883b36420471a20615dbb1e7e10421884fa0b690 --- .../spark/shuffle/sort/BypassMergeSortShuffleWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index a84bfe19eb0b..8914b223bafc 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -190,7 +190,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException { } final FileChannel out = FileChannel.open(outputFile.toPath(), - ImmutableSet.of(WRITE, APPEND, CREATE)); + ImmutableSet.of(WRITE, APPEND, CREATE)); final long writeStartTime = System.nanoTime(); boolean threwException = true; try { @@ -200,7 +200,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException { final FileChannel in = FileChannel.open(file.toPath(), ImmutableSet.of(READ)); boolean copyThrewException = true; try { - final long size = in.size(); + long size = in.size(); Utils.copyFileStreamNIO(in, out, 0, size); lengths[i] = size; copyThrewException = false; From 6d7224c0b01beacde28d806e595328b970f354a8 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 20 Jul 2017 10:35:21 -0700 Subject: [PATCH 3/3] Simplify the usage of FileChannel Change-Id: Ibb0036d0ac88c01310cba817da0bb40535c12351 --- .../spark/network/buffer/FileSegmentManagedBuffer.java | 4 +--- .../shuffle/sort/BypassMergeSortShuffleWriter.java | 10 ++++------ .../apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 6 ++---- .../spark/util/collection/ExternalAppendOnlyMap.scala | 3 +-- .../apache/spark/util/collection/ExternalSorter.scala | 3 +-- 5 files changed, 9 insertions(+), 17 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index 9dc935a43f4b..ea9b3ce4e352 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -27,7 +27,6 @@ import java.nio.file.StandardOpenOption; import com.google.common.base.Objects; -import com.google.common.collect.ImmutableSet; import com.google.common.io.ByteStreams; import io.netty.channel.DefaultFileRegion; @@ -134,8 +133,7 @@ public Object convertToNetty() throws IOException { if (conf.lazyFileDescriptor()) { return new DefaultFileRegion(file, offset, length); } else { - FileChannel fileChannel = FileChannel.open(file.toPath(), - ImmutableSet.of(StandardOpenOption.READ)); + FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ); return new DefaultFileRegion(fileChannel, offset, length); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 8914b223bafc..a9b5236ab817 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -30,7 +30,6 @@ import scala.collection.Iterator; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; import com.google.common.io.Closeables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +75,6 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class); private final int fileBufferSize; - private final boolean transferToEnabled; private final int numPartitions; private final BlockManager blockManager; private final Partitioner partitioner; @@ -108,7 +106,6 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { SparkConf conf) { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; - this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true); this.blockManager = blockManager; final ShuffleDependency dep = handle.dependency(); this.mapId = mapId; @@ -189,15 +186,16 @@ private long[] writePartitionedFile(File outputFile) throws IOException { return lengths; } - final FileChannel out = FileChannel.open(outputFile.toPath(), - ImmutableSet.of(WRITE, APPEND, CREATE)); + // This file needs to opened in append mode in order to work around a Linux kernel bug that + // affects transferTo; see SPARK-3948 for more details. + final FileChannel out = FileChannel.open(outputFile.toPath(), WRITE, APPEND, CREATE); final long writeStartTime = System.nanoTime(); boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { final File file = partitionWriterSegments[i].file(); if (file.exists()) { - final FileChannel in = FileChannel.open(file.toPath(), ImmutableSet.of(READ)); + final FileChannel in = FileChannel.open(file.toPath(), READ); boolean copyThrewException = true; try { long size = in.size(); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 5acffde874b4..c0ebe3cc9b79 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -30,7 +30,6 @@ import scala.reflect.ClassTag$; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; import com.google.common.io.Files; @@ -444,12 +443,11 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th boolean threwException = true; try { for (int i = 0; i < spills.length; i++) { - spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), ImmutableSet.of(READ)); + spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), READ); } // This file needs to opened in append mode in order to work around a Linux kernel bug that // affects transferTo; see SPARK-3948 for more details. - mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), - ImmutableSet.of(WRITE, CREATE, APPEND)); + mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), WRITE, CREATE, APPEND); long bytesWrittenToMergedFile = 0; for (int partition = 0; partition < numPartitions; partition++) { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index c5178571efbd..a08563562b87 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -26,7 +26,6 @@ import scala.collection.BufferedIterator import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import com.google.common.collect.ImmutableSet import com.google.common.io.ByteStreams import org.apache.spark.{SparkEnv, TaskContext} @@ -486,7 +485,7 @@ class ExternalAppendOnlyMap[K, V, C]( } val start = batchOffsets(batchIndex) - fileChannel = FileChannel.open(file.toPath, ImmutableSet.of(StandardOpenOption.READ)) + fileChannel = FileChannel.open(file.toPath, StandardOpenOption.READ) fileChannel.position(start) batchIndex += 1 diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index ee9e53f017bd..3593cfd50778 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -25,7 +25,6 @@ import java.util.Comparator import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import com.google.common.collect.ImmutableSet import com.google.common.io.ByteStreams import org.apache.spark._ @@ -514,7 +513,7 @@ private[spark] class ExternalSorter[K, V, C]( } val start = batchOffsets(batchId) - fileChannel = FileChannel.open(spill.file.toPath, ImmutableSet.of(StandardOpenOption.READ)) + fileChannel = FileChannel.open(spill.file.toPath, StandardOpenOption.READ) fileChannel.position(start) batchId += 1