diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index d06b1816dc56..2c30abd3d539 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -574,7 +574,20 @@ private boolean shouldRetryRead(IOException cause) throws IOException { } catch (Exception e) { throw new IOException(e); } - return retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY; + if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) { + if (retryAction.delayMillis > 0) { + try { + LOG.debug("Retry read after {}ms", retryAction.delayMillis); + Thread.sleep(retryAction.delayMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + String msg = "Interrupted: action=" + retryAction.action + ", retry policy=" + retryPolicy; + throw new IOException(msg, e); + } + } + return true; + } + return false; } private void handleReadError(IOException cause) throws IOException { diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index 4db569b7c07a..f755817816b9 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.ozone.test.GenericTestUtils; import org.apache.ratis.thirdparty.io.grpc.Status; import org.apache.ratis.thirdparty.io.grpc.StatusException; import org.junit.jupiter.api.BeforeEach; @@ -42,6 +43,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.stubbing.OngoingStubbing; +import org.slf4j.event.Level; import java.io.EOFException; import java.io.IOException; @@ -58,6 +60,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND; import static org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -257,6 +260,9 @@ public void testSeekAndRead() throws Exception { @Test public void testRefreshPipelineFunction() throws Exception { + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer + .captureLogs(BlockInputStream.LOG); + GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG); BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); AtomicBoolean isRefreshed = new AtomicBoolean(); createChunkList(5); @@ -269,6 +275,7 @@ public void testRefreshPipelineFunction() throws Exception { seekAndVerify(50); byte[] b = new byte[200]; blockInputStreamWithRetry.read(b, 0, 200); + assertThat(logCapturer.getOutput()).contains("Retry read after"); assertTrue(isRefreshed.get()); } }