Skip to content

Commit ef29a9a

Browse files
rxinaarondav
authored andcommitted
[SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Netty's DefaultFileRegion requires a FileDescriptor in its constructor, which means we need to have a opened file handle. In super large workloads, this could lead to too many open files due to the way these file descriptors are cleaned. This pull request creates a new LazyFileRegion that initializes the FileDescriptor when we are sending data for the first time. Author: Reynold Xin <[email protected]> Author: Reynold Xin <[email protected]> Closes #3172 from rxin/lazyFD and squashes the following commits: 0bdcdc6 [Reynold Xin] Added reference to Netty's DefaultFileRegion d4564ae [Reynold Xin] Added SparkConf to the ctor argument of IndexShuffleBlockManager. 6ed369e [Reynold Xin] Code review feedback. 04cddc8 [Reynold Xin] [SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
1 parent 65083e9 commit ef29a9a

File tree

16 files changed

+191
-40
lines changed

16 files changed

+191
-40
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
4040
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
4141

4242
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
43-
private val blockHandler = new ExternalShuffleBlockHandler()
43+
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
4444
private val transportContext: TransportContext = {
4545
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
4646
new TransportContext(transportConf, handler)

core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import scala.collection.JavaConversions._
2727
import org.apache.spark.{Logging, SparkConf, SparkEnv}
2828
import org.apache.spark.executor.ShuffleWriteMetrics
2929
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
30+
import org.apache.spark.network.netty.SparkTransportConf
3031
import org.apache.spark.serializer.Serializer
3132
import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
3233
import org.apache.spark.storage._
@@ -68,6 +69,8 @@ private[spark]
6869
class FileShuffleBlockManager(conf: SparkConf)
6970
extends ShuffleBlockManager with Logging {
7071

72+
private val transportConf = SparkTransportConf.fromSparkConf(conf)
73+
7174
private lazy val blockManager = SparkEnv.get.blockManager
7275

7376
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
@@ -182,13 +185,14 @@ class FileShuffleBlockManager(conf: SparkConf)
182185
val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId)
183186
if (segmentOpt.isDefined) {
184187
val segment = segmentOpt.get
185-
return new FileSegmentManagedBuffer(segment.file, segment.offset, segment.length)
188+
return new FileSegmentManagedBuffer(
189+
transportConf, segment.file, segment.offset, segment.length)
186190
}
187191
}
188192
throw new IllegalStateException("Failed to find shuffle block: " + blockId)
189193
} else {
190194
val file = blockManager.diskBlockManager.getFile(blockId)
191-
new FileSegmentManagedBuffer(file, 0, file.length)
195+
new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
192196
}
193197
}
194198

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import java.nio.ByteBuffer
2222

2323
import com.google.common.io.ByteStreams
2424

25-
import org.apache.spark.SparkEnv
25+
import org.apache.spark.{SparkConf, SparkEnv}
2626
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
27+
import org.apache.spark.network.netty.SparkTransportConf
2728
import org.apache.spark.storage._
2829

2930
/**
@@ -38,10 +39,12 @@ import org.apache.spark.storage._
3839
// Note: Changes to the format in this file should be kept in sync with
3940
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
4041
private[spark]
41-
class IndexShuffleBlockManager extends ShuffleBlockManager {
42+
class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
4243

4344
private lazy val blockManager = SparkEnv.get.blockManager
4445

46+
private val transportConf = SparkTransportConf.fromSparkConf(conf)
47+
4548
/**
4649
* Mapping to a single shuffleBlockId with reduce ID 0.
4750
* */
@@ -109,6 +112,7 @@ class IndexShuffleBlockManager extends ShuffleBlockManager {
109112
val offset = in.readLong()
110113
val nextOffset = in.readLong()
111114
new FileSegmentManagedBuffer(
115+
transportConf,
112116
getDataFile(blockId.shuffleId, blockId.mapId),
113117
offset,
114118
nextOffset - offset)

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader
2525

2626
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {
2727

28-
private val indexShuffleBlockManager = new IndexShuffleBlockManager()
28+
private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf)
2929
private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
3030

3131
/**

core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
3939

4040
override def beforeAll() {
4141
val transportConf = SparkTransportConf.fromSparkConf(conf)
42-
rpcHandler = new ExternalShuffleBlockHandler()
42+
rpcHandler = new ExternalShuffleBlockHandler(transportConf)
4343
val transportContext = new TransportContext(transportConf, rpcHandler)
4444
server = transportContext.createServer()
4545

network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,19 @@
3131

3232
import org.apache.spark.network.util.JavaUtils;
3333
import org.apache.spark.network.util.LimitedInputStream;
34+
import org.apache.spark.network.util.TransportConf;
3435

3536
/**
3637
* A {@link ManagedBuffer} backed by a segment in a file.
3738
*/
3839
public final class FileSegmentManagedBuffer extends ManagedBuffer {
39-
40-
/**
41-
* Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
42-
* Avoid unless there's a good reason not to.
43-
*/
44-
// TODO: Make this configurable
45-
private static final long MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;
46-
40+
private final TransportConf conf;
4741
private final File file;
4842
private final long offset;
4943
private final long length;
5044

51-
public FileSegmentManagedBuffer(File file, long offset, long length) {
45+
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
46+
this.conf = conf;
5247
this.file = file;
5348
this.offset = offset;
5449
this.length = length;
@@ -65,7 +60,7 @@ public ByteBuffer nioByteBuffer() throws IOException {
6560
try {
6661
channel = new RandomAccessFile(file, "r").getChannel();
6762
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
68-
if (length < MIN_MEMORY_MAP_BYTES) {
63+
if (length < conf.memoryMapBytes()) {
6964
ByteBuffer buf = ByteBuffer.allocate((int) length);
7065
channel.position(offset);
7166
while (buf.remaining() != 0) {
@@ -134,8 +129,12 @@ public ManagedBuffer release() {
134129

135130
@Override
136131
public Object convertToNetty() throws IOException {
137-
FileChannel fileChannel = new FileInputStream(file).getChannel();
138-
return new DefaultFileRegion(fileChannel, offset, length);
132+
if (conf.lazyFileDescriptor()) {
133+
return new LazyFileRegion(file, offset, length);
134+
} else {
135+
FileChannel fileChannel = new FileInputStream(file).getChannel();
136+
return new DefaultFileRegion(fileChannel, offset, length);
137+
}
139138
}
140139

141140
public File getFile() { return file; }
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.buffer;
19+
20+
import java.io.FileInputStream;
21+
import java.io.File;
22+
import java.io.IOException;
23+
import java.nio.channels.FileChannel;
24+
import java.nio.channels.WritableByteChannel;
25+
26+
import com.google.common.base.Objects;
27+
import io.netty.channel.FileRegion;
28+
import io.netty.util.AbstractReferenceCounted;
29+
30+
import org.apache.spark.network.util.JavaUtils;
31+
32+
/**
33+
* A FileRegion implementation that only creates the file descriptor when the region is being
34+
* transferred. This cannot be used with Epoll because there is no native support for it.
35+
*
36+
* This is mostly copied from DefaultFileRegion implementation in Netty. In the future, we
37+
* should push this into Netty so the native Epoll transport can support this feature.
38+
*/
39+
public final class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {
40+
41+
private final File file;
42+
private final long position;
43+
private final long count;
44+
45+
private FileChannel channel;
46+
47+
private long numBytesTransferred = 0L;
48+
49+
/**
50+
* @param file file to transfer.
51+
* @param position start position for the transfer.
52+
* @param count number of bytes to transfer starting from position.
53+
*/
54+
public LazyFileRegion(File file, long position, long count) {
55+
this.file = file;
56+
this.position = position;
57+
this.count = count;
58+
}
59+
60+
@Override
61+
protected void deallocate() {
62+
JavaUtils.closeQuietly(channel);
63+
}
64+
65+
@Override
66+
public long position() {
67+
return position;
68+
}
69+
70+
@Override
71+
public long transfered() {
72+
return numBytesTransferred;
73+
}
74+
75+
@Override
76+
public long count() {
77+
return count;
78+
}
79+
80+
@Override
81+
public long transferTo(WritableByteChannel target, long position) throws IOException {
82+
if (channel == null) {
83+
channel = new FileInputStream(file).getChannel();
84+
}
85+
86+
long count = this.count - position;
87+
if (count < 0 || position < 0) {
88+
throw new IllegalArgumentException(
89+
"position out of range: " + position + " (expected: 0 - " + (count - 1) + ')');
90+
}
91+
92+
if (count == 0) {
93+
return 0L;
94+
}
95+
96+
long written = channel.transferTo(this.position + position, count, target);
97+
if (written > 0) {
98+
numBytesTransferred += written;
99+
}
100+
return written;
101+
}
102+
103+
@Override
104+
public String toString() {
105+
return Objects.toStringHelper(this)
106+
.add("file", file)
107+
.add("position", position)
108+
.add("count", count)
109+
.toString();
110+
}
111+
}

network/common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,21 @@ public int connectionTimeoutMs() {
7575
* Only relevant if maxIORetries > 0.
7676
*/
7777
public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); }
78+
79+
/**
80+
* Minimum size of a block that we should start using memory map rather than reading in through
81+
* normal IO operations. This prevents Spark from memory mapping very small blocks. In general,
82+
* memory mapping has high overhead for blocks close to or below the page size of the OS.
83+
*/
84+
public int memoryMapBytes() {
85+
return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 1024);
86+
}
87+
88+
/**
89+
* Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are
90+
* created only when data is going to be transferred. This can reduce the number of open files.
91+
*/
92+
public boolean lazyFileDescriptor() {
93+
return conf.getBoolean("spark.shuffle.io.lazyFD", true);
94+
}
7895
}

network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class ChunkFetchIntegrationSuite {
6363
static ManagedBuffer bufferChunk;
6464
static ManagedBuffer fileChunk;
6565

66+
private TransportConf transportConf;
67+
6668
@BeforeClass
6769
public static void setUp() throws Exception {
6870
int bufSize = 100000;
@@ -80,17 +82,18 @@ public static void setUp() throws Exception {
8082
new Random().nextBytes(fileContent);
8183
fp.write(fileContent);
8284
fp.close();
83-
fileChunk = new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25);
8485

85-
TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
86+
final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
87+
fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
88+
8689
streamManager = new StreamManager() {
8790
@Override
8891
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
8992
assertEquals(STREAM_ID, streamId);
9093
if (chunkIndex == BUFFER_CHUNK_INDEX) {
9194
return new NioManagedBuffer(buf);
9295
} else if (chunkIndex == FILE_CHUNK_INDEX) {
93-
return new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25);
96+
return new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
9497
} else {
9598
throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex);
9699
}

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.google.common.annotations.VisibleForTesting;
2323
import com.google.common.collect.Lists;
24+
import org.apache.spark.network.util.TransportConf;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
2627

@@ -48,8 +49,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
4849
private final ExternalShuffleBlockManager blockManager;
4950
private final OneForOneStreamManager streamManager;
5051

51-
public ExternalShuffleBlockHandler() {
52-
this(new OneForOneStreamManager(), new ExternalShuffleBlockManager());
52+
public ExternalShuffleBlockHandler(TransportConf conf) {
53+
this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
5354
}
5455

5556
/** Enables mocking out the StreamManager and BlockManager. */

0 commit comments

Comments
 (0)