Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,24 @@
* A {@link ManagedBuffer} backed by a segment in a file.
*/
public final class FileSegmentManagedBuffer extends ManagedBuffer {
private final TransportConf conf;
private final boolean lazyFileDescriptor;
private final int memoryMapBytes;
private final File file;
private final long offset;
private final long length;

public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
this.conf = conf;
this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length);
}

public FileSegmentManagedBuffer(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense then but I don't think you need a new public constructor for this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, do you have a better idea?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just don't add a new constructor. The existing one can set the new fields.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will change a lot of code, right?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, why? I mean keep exactly the same constructors, no more no less. No code would change. You just set your two new fields in the current constructor. It actually means you don't need some of the changes you made here.

Copy link
Copy Markdown
Contributor Author

@witgo witgo Mar 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I probably did not make it clear.
Suppose there are E Executor in the cluster, a shuffle process has M Map task, R reduce task, in the master branch will be created:

  1. Up to M * R FileSegmentManagedBuffer instances
  2. Up to 2 * M * R NoSuchElementException instances

in this PR will be created:

  1. Up to M * R FileSegmentManagedBuffer instances
  2. Up to 2 * NoSuchElementException instances (ExternalShuffleBlockResolver and IndexShuffleBlockResolver are created once executor starts and They call the new constructor to create a FileSegmentManagedBuffer instance)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still doesn't address the point. What you say is true even if you make the change I suggest, which is to remove the superfluous constructor. The performance is exactly the same.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry,I didn't get your idea. Can you write some code?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm simply describing what you proposed above at #17329 (comment)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping @witgo to update or close

boolean lazyFileDescriptor,
int memoryMapBytes,
File file,
long offset,
long length) {
this.lazyFileDescriptor = lazyFileDescriptor;
this.memoryMapBytes = memoryMapBytes;
this.file = file;
this.offset = offset;
this.length = length;
Expand All @@ -60,7 +71,7 @@ public ByteBuffer nioByteBuffer() throws IOException {
try {
channel = new RandomAccessFile(file, "r").getChannel();
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
if (length < conf.memoryMapBytes()) {
if (length < memoryMapBytes) {
ByteBuffer buf = ByteBuffer.allocate((int) length);
channel.position(offset);
while (buf.remaining() != 0) {
Expand Down Expand Up @@ -129,7 +140,7 @@ public ManagedBuffer release() {

@Override
public Object convertToNetty() throws IOException {
if (conf.lazyFileDescriptor()) {
if (lazyFileDescriptor) {
return new DefaultFileRegion(file, offset, length);
} else {
FileChannel fileChannel = new FileInputStream(file).getChannel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public class ExternalShuffleBlockResolver {
// Single-threaded Java executor used to perform expensive recursive directory deletion.
private final Executor directoryCleaner;

private final TransportConf conf;
private final boolean lazyFileDescriptor;
private final int memoryMapBytes;

@VisibleForTesting
final File registeredExecutorFile;
Expand All @@ -102,7 +103,8 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
TransportConf conf,
File registeredExecutorFile,
Executor directoryCleaner) throws IOException {
this.conf = conf;
this.lazyFileDescriptor = conf.lazyFileDescriptor();
this.memoryMapBytes = conf.memoryMapBytes();
this.registeredExecutorFile = registeredExecutorFile;
int indexCacheEntries = conf.getInt("spark.shuffle.service.index.cache.entries", 1024);
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
Expand Down Expand Up @@ -245,7 +247,8 @@ private ManagedBuffer getSortBasedShuffleBlockData(
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
return new FileSegmentManagedBuffer(
conf,
lazyFileDescriptor,
memoryMapBytes,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
shuffleIndexRecord.getOffset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
private val files = new ConcurrentHashMap[String, File]()
private val jars = new ConcurrentHashMap[String, File]()
private val dirs = new ConcurrentHashMap[String, File]()

private val lazyFileDescriptor = rpcEnv.transportConf.lazyFileDescriptor()
private val memoryMapBytes = rpcEnv.transportConf.memoryMapBytes()
override def getChunk(streamId: Long, chunkIndex: Int): ManagedBuffer = {
throw new UnsupportedOperationException()
}
Expand All @@ -59,7 +60,7 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
}

if (file != null && file.isFile()) {
new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length())
new FileSegmentManagedBuffer(lazyFileDescriptor, memoryMapBytes, file, 0, file.length())
} else {
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ private[spark] class IndexShuffleBlockResolver(
private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager)

private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
private val lazyFileDescriptor = transportConf.lazyFileDescriptor()
private val memoryMapBytes = transportConf.memoryMapBytes()

def getDataFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
Expand Down Expand Up @@ -202,7 +204,8 @@ private[spark] class IndexShuffleBlockResolver(
val offset = in.readLong()
val nextOffset = in.readLong()
new FileSegmentManagedBuffer(
transportConf,
lazyFileDescriptor,
memoryMapBytes,
getDataFile(blockId.shuffleId, blockId.mapId),
offset,
nextOffset - offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
when(corruptStream.read(any(), any(), any())).thenThrow(new IOException("corrupt"))
val corruptBuffer = mock(classOf[ManagedBuffer])
when(corruptBuffer.createInputStream()).thenReturn(corruptStream)
val corruptLocalBuffer = new FileSegmentManagedBuffer(null, new File("a"), 0, 100)
val corruptLocalBuffer = new FileSegmentManagedBuffer(true, 1024, new File("a"), 0, 100)

val transfer = mock(classOf[BlockTransferService])
when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
Expand Down