diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index d2df77658ccdd..b2a4e22724922 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -132,14 +132,7 @@ public void fetchBlocks( logger.info("This clientFactory was closed. Skipping further block fetch retries."); } }; - - if (maxRetries > 0) { - // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's - // a bug in this code. We should remove the if statement once we're sure of the stability. - new RetryingBlockTransferor(transportConf, blockFetchStarter, blockIds, listener).start(); - } else { - blockFetchStarter.createAndStart(blockIds, listener); - } + new RetryingBlockTransferor(transportConf, blockFetchStarter, blockIds, listener).start(); } catch (Exception e) { logger.error("Exception while beginning fetchBlocks", e); for (String blockId : blockIds) { diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java index 1b44b061f3d81..27b8f1e482c97 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java @@ -50,6 +50,28 @@ public class RetryingBlockTransferorSuite { private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7])); private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19])); + @Test + public void testFailureWithoutRetry() throws IOException, InterruptedException { + BlockFetchingListener listener = mock(BlockFetchingListener.class); + + List> interactions = Arrays.asList( + // b0 failed with IOException, no retry + ImmutableMap.builder() + .put("b0", new IOException()) + .build(), + // This is not reached -- b0 has failed. + ImmutableMap.builder() + .put("b0", block0) + .build() + ); + + performInteractions(interactions, listener, 0); + + verify(listener).onBlockTransferFailure(eq("b0"), any()); + verify(listener, atLeastOnce()).getTransferType(); + verifyNoMoreInteractions(listener); + } + @Test public void testNoFailures() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); @@ -62,7 +84,7 @@ public void testNoFailures() throws IOException, InterruptedException { .build() ); - performInteractions(interactions, listener); + performInteractions(interactions, listener, 2); verify(listener).onBlockTransferSuccess("b0", block0); verify(listener).onBlockTransferSuccess("b1", block1); @@ -81,7 +103,7 @@ public void testUnrecoverableFailure() throws IOException, InterruptedException .build() ); - performInteractions(interactions, listener); + performInteractions(interactions, listener, 2); verify(listener).onBlockTransferFailure(eq("b0"), any()); verify(listener).onBlockTransferSuccess("b1", block1); @@ -105,7 +127,7 @@ public void testSingleIOExceptionOnFirst() throws IOException, InterruptedExcept .build() ); - performInteractions(interactions, listener); + performInteractions(interactions, listener, 2); verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0); verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1); @@ -128,7 +150,7 @@ public void testSingleIOExceptionOnSecond() throws IOException, InterruptedExcep .build() ); - performInteractions(interactions, listener); + performInteractions(interactions, listener, 2); verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0); verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1); @@ -157,7 +179,7 @@ public void testTwoIOExceptions() throws IOException, InterruptedException { .build() ); - performInteractions(interactions, listener); + performInteractions(interactions, listener, 2); verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0); verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1); @@ -190,7 +212,7 @@ public void testThreeIOExceptions() throws IOException, InterruptedException { .build() ); - performInteractions(interactions, listener); + performInteractions(interactions, listener, 2); verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0); verify(listener, timeout(5000)).onBlockTransferFailure(eq("b1"), any()); @@ -221,7 +243,7 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException .build() ); - performInteractions(interactions, listener); + performInteractions(interactions, listener, 2); verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0); verify(listener, timeout(5000)).onBlockTransferFailure(eq("b1"), any()); @@ -242,11 +264,12 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException */ @SuppressWarnings("unchecked") private static void performInteractions(List> interactions, - BlockFetchingListener listener) + BlockFetchingListener listener, + int maxRetries) throws IOException, InterruptedException { MapConfigProvider provider = new MapConfigProvider(ImmutableMap.of( - "spark.shuffle.io.maxRetries", "2", + "spark.shuffle.io.maxRetries", String.valueOf(maxRetries), "spark.shuffle.io.retryWait", "0")); TransportConf conf = new TransportConf("shuffle", provider); BlockTransferStarter fetchStarter = mock(BlockTransferStarter.class); @@ -263,7 +286,7 @@ private static void performInteractions(List> inte try { // Verify that the RetryingBlockFetcher requested the expected blocks. String[] requestedBlockIds = (String[]) invocationOnMock.getArguments()[0]; - String[] desiredBlockIds = interaction.keySet().toArray(new String[interaction.size()]); + String[] desiredBlockIds = interaction.keySet().toArray(new String[0]); assertArrayEquals(desiredBlockIds, requestedBlockIds); // Now actually invoke the success/failure callbacks on each block. @@ -298,7 +321,7 @@ private static void performInteractions(List> inte assertNotNull(stub); stub.when(fetchStarter).createAndStart(any(), any()); - String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]); + String[] blockIdArray = blockIds.toArray(new String[0]); new RetryingBlockTransferor(conf, fetchStarter, blockIdArray, listener).start(); } } diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 6da0cb439db1a..0f55da1c8ab1d 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -139,14 +139,7 @@ private[spark] class NettyBlockTransferService( } } } - - if (maxRetries > 0) { - // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's - // a bug in this code. We should remove the if statement once we're sure of the stability. - new RetryingBlockTransferor(transportConf, blockFetchStarter, blockIds, listener).start() - } else { - blockFetchStarter.createAndStart(blockIds, listener) - } + new RetryingBlockTransferor(transportConf, blockFetchStarter, blockIds, listener).start() } catch { case e: Exception => logger.error("Exception while beginning fetchBlocks", e)