Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.security.token.Token;

import com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.thirdparty.io.grpc.Status;
Comment thread
kerneltime marked this conversation as resolved.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -140,18 +141,22 @@ public synchronized void initialize() throws IOException {
IOException catchEx = null;
do {
try {
// If refresh returns new pipeline, retry with it.
// If we get IOException due to connectivity issue,
// retry according to retry policy.
chunks = getChunkInfos();
break;
// If we get a StorageContainerException or an IOException due to
// datanodes are not reachable, refresh to get the latest pipeline
// info and retry.
// Otherwise, just retry according to the retry policy.
} catch (SCMSecurityException ex) {
throw ex;
} catch (StorageContainerException ex) {
refreshPipeline(ex);
catchEx = ex;
} catch (IOException ex) {
LOG.debug("Retry to get chunk info fail", ex);
if (isConnectivityIssue(ex)) {
refreshPipeline(ex);
}
catchEx = ex;
}
} while (shouldRetryRead(catchEx));
Expand Down Expand Up @@ -187,19 +192,19 @@ public synchronized void initialize() throws IOException {
}
}

/**
* Check if this exception is because datanodes are not reachable.
*/
private boolean isConnectivityIssue(IOException ex) {
return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
}

private void refreshPipeline(IOException cause) throws IOException {
LOG.info("Unable to read information for block {} from pipeline {}: {}",
blockID, pipeline.getId(), cause.getMessage());
if (refreshPipelineFunction != null) {
LOG.debug("Re-fetching pipeline for block {}", blockID);
Pipeline newPipeline = refreshPipelineFunction.apply(blockID);
if (newPipeline == null || newPipeline.sameDatanodes(pipeline)) {
LOG.warn("No new pipeline for block {}", blockID);
throw cause;
} else {
LOG.debug("New pipeline got for block {}", blockID);
this.pipeline = newPipeline;
}
Comment thread
adoroszlai marked this conversation as resolved.
this.pipeline = refreshPipelineFunction.apply(blockID);
} else {
throw cause;
}
Expand Down Expand Up @@ -301,21 +306,27 @@ protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
int numBytesRead;
try {
numBytesRead = strategy.readFromBlock(current, numBytesToRead);
retries = 0; // reset retries after successful read
retries = 0;
// If we get a StorageContainerException or an IOException due to
// datanodes are not reachable, refresh to get the latest pipeline
// info and retry.
// Otherwise, just retry according to the retry policy.
} catch (SCMSecurityException ex) {
Comment thread
kerneltime marked this conversation as resolved.
Outdated
throw ex;
} catch (StorageContainerException e) {
if (shouldRetryRead(e)) {
handleReadError(e);
continue;
} else {
throw e;
}
} catch (SCMSecurityException ex) {
throw ex;
} catch (IOException ex) {
// We got a IOException which might be due
// to DN down or connectivity issue.
if (shouldRetryRead(ex)) {
current.releaseClient();
if (isConnectivityIssue(ex)) {
Comment thread
kerneltime marked this conversation as resolved.
Outdated
handleReadError(ex);
} else {
current.releaseClient();
}
continue;
} else {
throw ex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
Expand Down Expand Up @@ -425,20 +424,12 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
throws IOException {
ReadChunkResponseProto readChunkResponse;

try {
List<CheckedBiFunction> validators =
ContainerProtocolCalls.getValidatorList();
validators.add(validator);
List<CheckedBiFunction> validators =
ContainerProtocolCalls.getValidatorList();
validators.add(validator);

readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, validators, token);

} catch (IOException e) {
if (e instanceof StorageContainerException) {
throw e;
}
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
}
readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, validators, token);

if (readChunkResponse.hasData()) {
return readChunkResponse.getData().asReadOnlyByteBufferList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,20 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.ozone.common.Checksum;

import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.LambdaTestUtils;
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.StatusException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -52,11 +53,12 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Stream;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY;
import static org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
Expand All @@ -68,7 +70,6 @@
/**
* Tests for {@link BlockInputStream}'s functionality.
*/
@RunWith(MockitoJUnitRunner.class)
Comment thread
kerneltime marked this conversation as resolved.
Outdated
public class TestBlockInputStream {

private static final int CHUNK_SIZE = 100;
Expand All @@ -80,11 +81,12 @@ public class TestBlockInputStream {
private List<ChunkInfo> chunks;
private Map<String, byte[]> chunkDataMap;

@Mock
private Function<BlockID, Pipeline> refreshPipeline;

@Before
@BeforeEach
@SuppressWarnings("unchecked")
public void setup() throws Exception {
refreshPipeline = Mockito.mock(Function.class);
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE);
createChunkList(5);
Expand Down Expand Up @@ -280,35 +282,9 @@ public void testRefreshPipelineFunction() throws Exception {
}
}

@Test
public void testGetBlockInfoFailWithIOException() throws Exception {
GenericTestUtils.setLogLevel(BlockInputStream.getLog(), Level.DEBUG);
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(
LoggerFactory.getLogger(BlockInputStream.class));
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
AtomicBoolean isRefreshed = new AtomicBoolean();
createChunkList(5);
BlockInputStream blockInputStreamWithRetry =
new DummyBlockInputStreamWithRetry(blockID, blockSize,
MockPipeline.createSingleNodePipeline(), null,
false, null, chunks, chunkDataMap, isRefreshed,
new IOException("unavailable"));
try {
Assert.assertFalse(isRefreshed.get());
byte[] b = new byte[200];
blockInputStreamWithRetry.read(b, 0, 200);
// As in case of IOException we do not do do refresh.
Assert.assertFalse(isRefreshed.get());
Assert.assertTrue(logCapturer.getOutput().contains(
"Retry to get chunk info fail"));
Comment thread
kerneltime marked this conversation as resolved.
Outdated
} finally {
blockInputStreamWithRetry.close();
}
}

@Test
public void testRefreshOnReadFailure() throws Exception {
@ParameterizedTest
@MethodSource("exceptionsTriggersRefresh")
public void testRefreshOnReadFailure(IOException ex) throws Exception {
// GIVEN
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
Pipeline pipeline = MockPipeline.createSingleNodePipeline();
Expand All @@ -317,7 +293,7 @@ public void testRefreshOnReadFailure() throws Exception {
final int len = 200;
final ChunkInputStream stream = mock(ChunkInputStream.class);
when(stream.read(any(), anyInt(), anyInt()))
.thenThrow(new StorageContainerException("test", CONTAINER_NOT_FOUND))
.thenThrow(ex)
.thenReturn(len);
when(stream.getRemaining())
.thenReturn((long) len);
Expand Down Expand Up @@ -347,55 +323,25 @@ protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
}
}

@Test
public void testRefreshExitsIfPipelineHasSameNodes() throws Exception {
// GIVEN
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
Pipeline pipeline = MockPipeline.createSingleNodePipeline();

final int len = 200;
final ChunkInputStream stream = mock(ChunkInputStream.class);
when(stream.read(any(), anyInt(), anyInt()))
.thenThrow(new StorageContainerException("test", CONTAINER_UNHEALTHY));
when(stream.getRemaining())
.thenReturn((long) len);

when(refreshPipeline.apply(blockID))
.thenAnswer(invocation -> samePipelineWithNewId(pipeline));

BlockInputStream subject = new DummyBlockInputStream(blockID, blockSize,
pipeline, null, false, null, refreshPipeline, chunks, null) {
@Override
protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
return stream;
}
};

try {
subject.initialize();

// WHEN
byte[] b = new byte[len];
LambdaTestUtils.intercept(StorageContainerException.class,
() -> subject.read(b, 0, len));

// THEN
verify(refreshPipeline).apply(blockID);
} finally {
subject.close();
}
private static Stream<Arguments> exceptionsNotTriggerRefresh() {
return Stream.of(
Arguments.of(new SCMSecurityException("Security problem")),
Arguments.of(new OzoneChecksumException("checksum missing")),
Arguments.of(new IOException("Some random exception."))
);
}

@Test
public void testReadNotRetriedOnOtherException() throws Exception {
@ParameterizedTest
@MethodSource("exceptionsNotTriggerRefresh")
public void testReadNotRetriedOnOtherException(IOException ex)
throws Exception {
// GIVEN
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
Pipeline pipeline = MockPipeline.createSingleNodePipeline();

final int len = 200;
final ChunkInputStream stream = mock(ChunkInputStream.class);
when(stream.read(any(), anyInt(), anyInt()))
.thenThrow(new OzoneChecksumException("checksum missing"));
.thenThrow(ex);
when(stream.getRemaining())
.thenReturn((long) len);

Expand All @@ -412,9 +358,8 @@ protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {

// WHEN
byte[] b = new byte[len];
LambdaTestUtils.intercept(OzoneChecksumException.class,
Assertions.assertThrows(ex.getClass(),
() -> subject.read(b, 0, len));

// THEN
verify(refreshPipeline, never()).apply(blockID);
} finally {
Expand All @@ -428,8 +373,10 @@ private Pipeline samePipelineWithNewId(Pipeline pipeline) {
return MockPipeline.createPipeline(reverseOrder);
}

@Test
public void testRefreshOnReadFailureAfterUnbuffer() throws Exception {
@ParameterizedTest
@MethodSource("exceptionsTriggersRefresh")
public void testRefreshOnReadFailureAfterUnbuffer(IOException ex)
throws Exception {
// GIVEN
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
Pipeline pipeline = MockPipeline.createSingleNodePipeline();
Expand All @@ -442,7 +389,7 @@ public void testRefreshOnReadFailureAfterUnbuffer() throws Exception {
final int len = 200;
final ChunkInputStream stream = mock(ChunkInputStream.class);
when(stream.read(any(), anyInt(), anyInt()))
.thenThrow(new StorageContainerException("test", CONTAINER_NOT_FOUND))
.thenThrow(ex)
.thenReturn(len);
when(stream.getRemaining())
.thenReturn((long) len);
Expand Down Expand Up @@ -481,4 +428,12 @@ protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
subject.close();
}
}

private static Stream<Arguments> exceptionsTriggersRefresh() {
return Stream.of(
Arguments.of(new StorageContainerException(CONTAINER_NOT_FOUND)),
Arguments.of(new IOException(new ExecutionException(
new StatusException(Status.UNAVAILABLE))))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public enum OzoneManagerVersion implements ComponentVersion {
"New S3G persistent connection support is present in OM."),
ERASURE_CODED_STORAGE_SUPPORT(2, "OzoneManager version that supports"
+ "ECReplicationConfig"),
OPTIMIZED_GET_KEY_INFO(3, "OzoneManager version that supports optimized"
+ " key lookups using cached container locations."),

FUTURE_VERSION(-1, "Used internally in the client when the server side is "
+ " newer and an unknown server version has arrived to the client.");
Expand Down
Loading