Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,20 @@ private boolean shouldRetryRead(IOException cause) throws IOException {
} catch (Exception e) {
throw new IOException(e);
}
return retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought: It might be nice if we can have a general RetryAction handler so that we don't need to duplicate the sleeping logic (similar codes is in AbstractDataStreamOutput#shouldRetry, GrpcOmTranport#shouldRetry, and KeyOutputStream#handleRetry)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes sounds good may be we can refactor in other Jira.

if (retryAction.delayMillis > 0) {
try {
LOG.debug("Retry read after {}ms", retryAction.delayMillis);
Thread.sleep(retryAction.delayMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
String msg = "Interrupted: action=" + retryAction.action + ", retry policy=" + retryPolicy;
throw new IOException(msg, e);
}
}
return true;
}
return false;
}

private void handleReadError(IOException cause) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.ozone.common.Checksum;

import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.StatusException;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -42,6 +43,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.stubbing.OngoingStubbing;
import org.slf4j.event.Level;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -58,6 +60,7 @@

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
import static org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -257,6 +260,9 @@ public void testSeekAndRead() throws Exception {

@Test
public void testRefreshPipelineFunction() throws Exception {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(BlockInputStream.LOG);
GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG);
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
AtomicBoolean isRefreshed = new AtomicBoolean();
createChunkList(5);
Expand All @@ -269,6 +275,7 @@ public void testRefreshPipelineFunction() throws Exception {
seekAndVerify(50);
byte[] b = new byte[200];
blockInputStreamWithRetry.read(b, 0, 200);
assertThat(logCapturer.getOutput()).contains("Retry read after");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verifying debug log message is going to be brittle. But writing a test to verify time is always tricky. What I usually do is to introduce a timer class that wraps around Thread.sleep(), and we can verify the time does advance (artifically). That's going to introduce big code churn with little benefit so I'm okay with what we have now.

assertTrue(isRefreshed.get());
}
}
Expand Down