Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,7 @@ public void fetchBlocks(
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
}
};

if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
// a bug in this code. We should remove the if statement once we're sure of the stability.
new RetryingBlockTransferor(transportConf, blockFetchStarter, blockIds, listener).start();
} else {
blockFetchStarter.createAndStart(blockIds, listener);
}
new RetryingBlockTransferor(transportConf, blockFetchStarter, blockIds, listener).start();
} catch (Exception e) {
logger.error("Exception while beginning fetchBlocks", e);
for (String blockId : blockIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,28 @@ public class RetryingBlockTransferorSuite {
private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));

@Test
public void testFailureWithoutRetry() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
// b0 failed with IOException, no retry
ImmutableMap.<String, Object>builder()
.put("b0", new IOException())
.build(),
// This is not reached -- b0 has failed.
ImmutableMap.<String, Object>builder()
.put("b0", block0)
.build()
);

performInteractions(interactions, listener, 0);

verify(listener).onBlockTransferFailure(eq("b0"), any());
verify(listener, atLeastOnce()).getTransferType();
verifyNoMoreInteractions(listener);
}

@Test
public void testNoFailures() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
Expand All @@ -62,7 +84,7 @@ public void testNoFailures() throws IOException, InterruptedException {
.build()
);

performInteractions(interactions, listener);
performInteractions(interactions, listener, 2);

verify(listener).onBlockTransferSuccess("b0", block0);
verify(listener).onBlockTransferSuccess("b1", block1);
Expand All @@ -81,7 +103,7 @@ public void testUnrecoverableFailure() throws IOException, InterruptedException
.build()
);

performInteractions(interactions, listener);
performInteractions(interactions, listener, 2);

verify(listener).onBlockTransferFailure(eq("b0"), any());
verify(listener).onBlockTransferSuccess("b1", block1);
Expand All @@ -105,7 +127,7 @@ public void testSingleIOExceptionOnFirst() throws IOException, InterruptedExcept
.build()
);

performInteractions(interactions, listener);
performInteractions(interactions, listener, 2);

verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1);
Expand All @@ -128,7 +150,7 @@ public void testSingleIOExceptionOnSecond() throws IOException, InterruptedExcep
.build()
);

performInteractions(interactions, listener);
performInteractions(interactions, listener, 2);

verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1);
Expand Down Expand Up @@ -157,7 +179,7 @@ public void testTwoIOExceptions() throws IOException, InterruptedException {
.build()
);

performInteractions(interactions, listener);
performInteractions(interactions, listener, 2);

verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1);
Expand Down Expand Up @@ -190,7 +212,7 @@ public void testThreeIOExceptions() throws IOException, InterruptedException {
.build()
);

performInteractions(interactions, listener);
performInteractions(interactions, listener, 2);

verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
verify(listener, timeout(5000)).onBlockTransferFailure(eq("b1"), any());
Expand Down Expand Up @@ -221,7 +243,7 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException
.build()
);

performInteractions(interactions, listener);
performInteractions(interactions, listener, 2);

verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
verify(listener, timeout(5000)).onBlockTransferFailure(eq("b1"), any());
Expand All @@ -242,11 +264,12 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException
*/
@SuppressWarnings("unchecked")
private static void performInteractions(List<? extends Map<String, Object>> interactions,
BlockFetchingListener listener)
BlockFetchingListener listener,
int maxRetries)
throws IOException, InterruptedException {

MapConfigProvider provider = new MapConfigProvider(ImmutableMap.of(
"spark.shuffle.io.maxRetries", "2",
"spark.shuffle.io.maxRetries", String.valueOf(maxRetries),
"spark.shuffle.io.retryWait", "0"));
TransportConf conf = new TransportConf("shuffle", provider);
BlockTransferStarter fetchStarter = mock(BlockTransferStarter.class);
Expand All @@ -263,7 +286,7 @@ private static void performInteractions(List<? extends Map<String, Object>> inte
try {
// Verify that the RetryingBlockFetcher requested the expected blocks.
String[] requestedBlockIds = (String[]) invocationOnMock.getArguments()[0];
String[] desiredBlockIds = interaction.keySet().toArray(new String[interaction.size()]);
String[] desiredBlockIds = interaction.keySet().toArray(new String[0]);
assertArrayEquals(desiredBlockIds, requestedBlockIds);

// Now actually invoke the success/failure callbacks on each block.
Expand Down Expand Up @@ -298,7 +321,7 @@ private static void performInteractions(List<? extends Map<String, Object>> inte

assertNotNull(stub);
stub.when(fetchStarter).createAndStart(any(), any());
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
String[] blockIdArray = blockIds.toArray(new String[0]);
new RetryingBlockTransferor(conf, fetchStarter, blockIdArray, listener).start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,7 @@ private[spark] class NettyBlockTransferService(
}
}
}

if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
// a bug in this code. We should remove the if statement once we're sure of the stability.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how do you prove this since maxRetries default to 3 rather than 0.

Copy link
Member Author

@pan3793 pan3793 Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several places overwrite the conf to 0, please search ("spark.shuffle.io.maxRetries", "0") in ExternalShuffleIntegrationSuite and BlockManagerSuite

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When spark.shuffle.io.maxRetries=0, it tests OneForOneBlockFetcher rather than RetryingBlockTransferor, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound like add unit test cases in RetryingBlockTransferorSuite is more reasonable, will update soon

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated UT, please take a look again.

new RetryingBlockTransferor(transportConf, blockFetchStarter, blockIds, listener).start()
} else {
blockFetchStarter.createAndStart(blockIds, listener)
}
new RetryingBlockTransferor(transportConf, blockFetchStarter, blockIds, listener).start()
} catch {
case e: Exception =>
logger.error("Exception while beginning fetchBlocks", e)
Expand Down