Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
import org.apache.spark.network.util.TransportConf;


/**
* RPC Handler for a server which can serve shuffle blocks from outside of an Executor process.
*
Expand Down Expand Up @@ -91,26 +90,8 @@ protected void handleMessage(
try {
OpenBlocks msg = (OpenBlocks) msgObj;
checkAuth(client, msg.appId);

Iterator<ManagedBuffer> iter = new Iterator<ManagedBuffer>() {
private int index = 0;

@Override
public boolean hasNext() {
return index < msg.blockIds.length;
}

@Override
public ManagedBuffer next() {
final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId,
msg.blockIds[index]);
index++;
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
return block;
}
};

long streamId = streamManager.registerStream(client.getClientId(), iter);
long streamId = streamManager.registerStream(client.getClientId(),
new ManagedBufferIterator(msg.appId, msg.execId, msg.blockIds));
if (logger.isTraceEnabled()) {
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
streamId,
Expand Down Expand Up @@ -209,4 +190,52 @@ public Map<String, Metric> getMetrics() {
}
}

private class ManagedBufferIterator implements Iterator<ManagedBuffer> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why break this out -- it's not necessary for the change right? just for clarity?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the iterator is becoming a little bit complicated. So I break this out and give a constructor.


private int index = 0;
private String appId;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final, same for all fields except index.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will refine.

private String execId;
private String shuffleId;
// An array containing mapId and reduceId pairs.
private int[][] mapIdAndReduceIds;

@vanzin vanzin Jun 8, 2017

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I mean a single array. e.g.

int[] mapIdAndReduceIds;

mapIdAndReduceIds = new int[blockIds.length * 2];
mapIdAndReduceIds[0] = mapId1;
mapIdAndReduceIds[1] = reduceId1;
mapIdAndReduceIds[2] = mapId2;
mapIdAndReduceIds[3] = reduceId2;
etc etc etc

Reason being that if you really have millions of these, each "child" array in your two-dimensional array wastes 16 (or 20?) bytes (16 bytes of object overhead + 4 bytes for the array length). Looking in jvisualvm, an empty array actually consumes 24 bytes, so it seems the JVM is aligning things and wasting an extra 4 bytes per array...


ManagedBufferIterator(String appId, String execId, String[] blockIds) {
this.appId = appId;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if you see a lot of these in your heap dump too? You could potentially intern appId and execId for some extra memory savings, if you see a lot of those.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin
There's one appId and execId per stream. I don't see a lot in my heap dump. Do you have any thoughts for interning this? :)

this.execId = execId;
String[] blockId0Parts = blockIds[0].split("_");
if (blockId0Parts.length < 4) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use require(blockId0Parts.length < 4, "Unexpected block id format: " + blockIds[0]) instead?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to throw the IllegalArgumentException.
Pardon, I'm not sure how to use require in java.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, didn't notice they are java code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we be more strict and use blockId0Parts.length != 4?

throw new IllegalArgumentException("Unexpected block id format: " + blockIds[0]);
} else if (!blockId0Parts[0].equals("shuffle")) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need the 'else' here

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some kinds of BlockId, I guess it's better to have a check here and we can parse the blockId correctly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Sean means that since you're throwing in the previous block, else is redundant.

throw new IllegalArgumentException("Expected shuffle block id, got: " + blockIds[0]);
}
this.shuffleId = blockId0Parts[1];
mapIdAndReduceIds = new int[blockIds.length][2];
if (blockIds.length > 0) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is superfluous

for (int i = 0; i< blockIds.length; i++) {
String[] blockIdParts = blockIds[i].split("_");
if (!blockIdParts[1].equals(shuffleId)) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockIds[i]);
}
mapIdAndReduceIds[i][0] = Integer.parseInt(blockIdParts[2]);
mapIdAndReduceIds[i][1] = Integer.parseInt(blockIdParts[3]);
}
}
}

@Override
public boolean hasNext() {
return index < mapIdAndReduceIds.length;
}

@Override
public ManagedBuffer next() {
String blockId = "shuffle_" + shuffleId + "_" + mapIdAndReduceIds[index][0] + "_" + mapIdAndReduceIds[index][1];
final ManagedBuffer block = blockManager.getBlockData(appId, execId, blockId);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it too big a change to make getBlockData expect the broken down shuffle/map/reduce id, since all it does is parse the string again into integers?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this. I added a new getBlockData in ExternalShuffleBlockResolver`. But not sure if it is appropriate.

index++;
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
return block;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void onBlockFetchFailure(String blockId, Throwable t) {
}
};

String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" };
String[] blockIds = { "shuffle_0_1_2", "shuffle_0_3_4" };

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this change?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change ,we cannot shuffle blocks with multiple shuffleIds

OneForOneBlockFetcher fetcher =
new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener, conf, null);
fetcher.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ public void testOpenShuffleBlocks() {

ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" })
when(blockResolver.getBlockData("app0", "exec1", "shuffle_0_0_0")).thenReturn(block0Marker);
when(blockResolver.getBlockData("app0", "exec1", "shuffle_0_0_1")).thenReturn(block1Marker);
ByteBuffer openBlocks = new OpenBlocks("app0", "exec1",
new String[] { "shuffle_0_0_0", "shuffle_0_0_1" })
.toByteBuffer();
handler.receive(client, openBlocks, callback);

Expand All @@ -105,8 +106,8 @@ public void testOpenShuffleBlocks() {
assertEquals(block0Marker, buffers.next());
assertEquals(block1Marker, buffers.next());
assertFalse(buffers.hasNext());
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "shuffle_0_0_0");
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "shuffle_0_0_1");

// Verify open block request latency metrics
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ public void testFetchNonexistent() throws Exception {
@Test
public void testFetchWrongExecutor() throws Exception {
registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
FetchResult execFetch = fetchBlocks("exec-0",
new String[] { "shuffle_0_0_0" /* right */, "shuffle_1_0_0" /* wrong */ });
assertEquals(Sets.newHashSet("shuffle_0_0_0"), execFetch.successBlocks);
assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
FetchResult execFetch0 = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" /* right */});
FetchResult execFetch1 = fetchBlocks("exec-0", new String[] { "shuffle_1_0_0" /* wrong */ });
assertEquals(Sets.newHashSet("shuffle_0_0_0"), execFetch0.successBlocks);
assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch1.failedBlocks);
}

@Test
Expand Down