Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler);
rpcHandler, conf.maxChunksBeingTransferred());
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -53,9 +55,13 @@ private static class StreamState {
// that the caller only requests each chunk one at a time, in order.
int curChunk = 0;

// Used to keep track of the number of chunks being transferred and not finished yet.
AtomicLong chunksBeingTransferred;
Copy link
Contributor

@cloud-fan cloud-fan Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to the usage, seems volatile long chunksBeingTransferred is enough?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? we do chunksBeingTransferred++ and chunksBeingTransferred--? I think it's not safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right

Copy link
Member

@zsxwing zsxwing Jul 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinxing64 chunksBeingTransferred is modified in the same thread. Not a big deal though.


StreamState(String appId, Iterator<ManagedBuffer> buffers) {
this.appId = appId;
this.buffers = Preconditions.checkNotNull(buffers);
this.chunksBeingTransferred = new AtomicLong(0L);
}
}

Expand Down Expand Up @@ -84,6 +90,7 @@ public ManagedBuffer getChunk(long streamId, int chunkIndex) {
"Requested chunk index beyond end %s", chunkIndex));
}
state.curChunk += 1;
state.chunksBeingTransferred.incrementAndGet();
ManagedBuffer nextChunk = state.buffers.next();

if (!state.buffers.hasNext()) {
Expand All @@ -96,18 +103,23 @@ public ManagedBuffer getChunk(long streamId, int chunkIndex) {

@Override
public ManagedBuffer openStream(String streamChunkId) {
String[] array = streamChunkId.split("_");
assert array.length == 2:
"Stream id and chunk index should be specified when open stream for fetching block.";
long streamId = Long.valueOf(array[0]);
int chunkIndex = Integer.valueOf(array[1]);
return getChunk(streamId, chunkIndex);
Pair<Long, Integer> streamChunkIdPair = parseStreamChunkId(streamChunkId);
return getChunk(streamChunkIdPair.getLeft(), streamChunkIdPair.getRight());
}

public static String genStreamChunkId(long streamId, int chunkId) {
return String.format("%d_%d", streamId, chunkId);
}

public static Pair<Long, Integer> parseStreamChunkId(String streamChunkId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please document the meaning of the return value for this public method.

String[] array = streamChunkId.split("_");
assert array.length == 2:
"Stream id and chunk index should be specified.";
long streamId = Long.valueOf(array[0]);
int chunkIndex = Integer.valueOf(array[1]);
return ImmutablePair.of(streamId, chunkIndex);
}

@Override
public void connectionTerminated(Channel channel) {
// Close all streams which have been associated with the channel.
Expand All @@ -122,6 +134,7 @@ public void connectionTerminated(Channel channel) {
}
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra empty line

}

@Override
Expand All @@ -139,6 +152,23 @@ public void checkAuthorization(TransportClient client, long streamId) {
}
}

@Override
public void chunkSent(long streamId) {
StreamState streamState = streams.get(streamId);
if (streamState != null) {
streamState.chunksBeingTransferred.decrementAndGet();
}
}

@Override
public long chunksBeingTransferred() {
long sum = 0L;
for (StreamState streamState: streams.values()) {
sum += streamState.chunksBeingTransferred.get();
}
return sum;
}

/**
* Registers a stream of ManagedBuffers which are served as individual chunks one at a time to
* callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,16 @@ public void connectionTerminated(Channel channel) { }
*/
public void checkAuthorization(TransportClient client, long streamId) { }

/**
* Return the number of chunks being transferred and not finished yet in this StreamManager.
*/
public long chunksBeingTransferred() {
return 0;
}

/**
* Called when a chunk is successfully sent.
*/
public void chunkSent(long streamId) { }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense to have 2 methods:

chunkSent(long streamId);

streamSent(String streamId);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would be much better


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -65,14 +66,19 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
/** Returns each chunk part of a stream. */
private final StreamManager streamManager;

/** The max number of chunks being transferred and not finished yet. */
private final long maxChunksBeingTransferred;

public TransportRequestHandler(
Channel channel,
TransportClient reverseClient,
RpcHandler rpcHandler) {
RpcHandler rpcHandler,
Long maxChunksBeingTransferred) {
this.channel = channel;
this.reverseClient = reverseClient;
this.rpcHandler = rpcHandler;
this.streamManager = rpcHandler.getStreamManager();
this.maxChunksBeingTransferred = maxChunksBeingTransferred;
}

@Override
Expand Down Expand Up @@ -118,6 +124,13 @@ private void processFetchRequest(final ChunkFetchRequest req) {
req.streamChunkId);
}

long chunksBeingTransferred = streamManager.chunksBeingTransferred();
if (chunksBeingTransferred > maxChunksBeingTransferred) {
logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
chunksBeingTransferred, maxChunksBeingTransferred);
channel.close();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing return.


ManagedBuffer buf;
try {
streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
Expand All @@ -130,11 +143,25 @@ private void processFetchRequest(final ChunkFetchRequest req) {
return;
}

respond(new ChunkFetchSuccess(req.streamChunkId, buf));
respond(new ChunkFetchSuccess(req.streamChunkId, buf)).addListener(future -> {
streamManager.chunkSent(req.streamChunkId.streamId);
});
}

private void processStreamRequest(final StreamRequest req) {
if (logger.isTraceEnabled()) {
logger.trace("Received req from {} to fetch stream {}", getRemoteAddress(channel),
req.streamId);
}

long chunksBeingTransferred = streamManager.chunksBeingTransferred();
if (chunksBeingTransferred > maxChunksBeingTransferred) {
logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
chunksBeingTransferred, maxChunksBeingTransferred);
channel.close();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing return

Copy link
Member

@zsxwing zsxwing Jul 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also please decrease chunksBeingTransferred for when sending ChunkFetchFailure and StreamFailure if chunksBeingTransferred is increased.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the error handling simple, you can increase chunksBeingTransferred just before writing the chunk to the channel, and decrease it in the future returned by write.

ManagedBuffer buf;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra empty line

try {
buf = streamManager.openStream(req.streamId);
} catch (Exception e) {
Expand All @@ -145,7 +172,12 @@ private void processStreamRequest(final StreamRequest req) {
}

if (buf != null) {
respond(new StreamResponse(req.streamId, buf.size(), buf));
respond(new StreamResponse(req.streamId, buf.size(), buf)).addListener(future -> {
if (streamManager instanceof OneForOneStreamManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case the streamManager may not be OneForOneStreamManager?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When executor fetch file from driver, it will send StreamRequest. NettyStreamManager on driver will serve the stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NettyStreamManager.chunkSent is no-op, seems we don't need this if?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The streamId sent to NettyStreamManager is not a "{streamId}_{chunkIndex}", so fail to parse?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see

streamManager.chunkSent(OneForOneStreamManager.parseStreamChunkId(req.streamId)
.getLeft());
}
});
} else {
respond(new StreamFailure(req.streamId, String.format(
"Stream '%s' was not found.", req.streamId)));
Expand Down Expand Up @@ -187,9 +219,9 @@ private void processOneWayMessage(OneWayMessage req) {
* Responds to a single message with some Encodable object. If a failure occurs while sending,
* it will be logged and the channel closed.
*/
private void respond(Encodable result) {
private ChannelFuture respond(Encodable result) {
SocketAddress remoteAddress = channel.remoteAddress();
channel.writeAndFlush(result).addListener(future -> {
return channel.writeAndFlush(result).addListener(future -> {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,11 @@ public Properties cryptoConf() {
return CryptoUtils.toCryptoConf("spark.network.crypto.config.", conf.getAll());
}

/**
* The max number of chunks being transferred at the same time. This config helps avoid OOM on
* shuffle server.
*/
public long maxChunksBeingTransferred() {
return conf.getLong("spark.network.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a reasonable default value?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on memory config of NodeManager. In our cluster, we will set it to be 3G(memory overhead for Netty)/1k(size of ChannelOutboundBuffer$Entry)=3000000.
I think it's ok to leave it MAX_VALUE(by default it's disabled).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel there are too many "by default disabled" optimizations, which can't benefit most of the spark users. I'm ok to leave it disabled for now, but we should definitely revisit it and think about reasonable default values. also cc @tgravescs @zsxwing

BTW, please document this config in configuration.md

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default value totally depends on the JVM heap size. Seems hard to pick up a reasonable value. If it's too small, then if a user uses a large heap size, when they upgrade, their shuffle service may start to fail. If it's too large, it's just the same as MAX_VALUE.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.network.shuffle -> spark.shuffle

}
}
8 changes: 8 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1808,6 +1808,14 @@ Apart from these, the following properties are also available, and may be useful
Disable unencrypted connections for services that support SASL authentication.
</td>
</tr>
<tr>
<td><code>spark.network.shuffle.maxChunksBeingTransferred</code></td>
<td>Long.MAX_VALUE</td>
<td>
The max number of chunks being transferred at the same time. This config helps avoid OOM on
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The max number of chunks allowed to being transferred at the same time on shuffle service.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also move this to Shuffle Behavior section.

shuffle server.
</td>
</tr>
<tr>
<td><code>spark.core.connection.ack.wait.timeout</code></td>
<td><code>spark.network.timeout</code></td>
Expand Down