diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index a6af1ba3f784..4626f580f5e4 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -46,6 +46,7 @@ import org.apache.hadoop.security.token.Token; import com.google.common.annotations.VisibleForTesting; +import org.apache.ratis.thirdparty.io.grpc.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,11 +141,12 @@ public synchronized void initialize() throws IOException { IOException catchEx = null; do { try { - // If refresh returns new pipeline, retry with it. - // If we get IOException due to connectivity issue, - // retry according to retry policy. chunks = getChunkInfos(); break; + // If we get a StorageContainerException or an IOException due to + // datanodes are not reachable, refresh to get the latest pipeline + // info and retry. + // Otherwise, just retry according to the retry policy. } catch (SCMSecurityException ex) { throw ex; } catch (StorageContainerException ex) { @@ -152,6 +154,9 @@ public synchronized void initialize() throws IOException { catchEx = ex; } catch (IOException ex) { LOG.debug("Retry to get chunk info fail", ex); + if (isConnectivityIssue(ex)) { + refreshPipeline(ex); + } catchEx = ex; } } while (shouldRetryRead(catchEx)); @@ -187,19 +192,19 @@ public synchronized void initialize() throws IOException { } } + /** + * Check if this exception is because datanodes are not reachable. + */ + private boolean isConnectivityIssue(IOException ex) { + return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode(); + } + private void refreshPipeline(IOException cause) throws IOException { LOG.info("Unable to read information for block {} from pipeline {}: {}", blockID, pipeline.getId(), cause.getMessage()); if (refreshPipelineFunction != null) { LOG.debug("Re-fetching pipeline for block {}", blockID); - Pipeline newPipeline = refreshPipelineFunction.apply(blockID); - if (newPipeline == null || newPipeline.sameDatanodes(pipeline)) { - LOG.warn("No new pipeline for block {}", blockID); - throw cause; - } else { - LOG.debug("New pipeline got for block {}", blockID); - this.pipeline = newPipeline; - } + this.pipeline = refreshPipelineFunction.apply(blockID); } else { throw cause; } @@ -301,7 +306,13 @@ protected synchronized int readWithStrategy(ByteReaderStrategy strategy) int numBytesRead; try { numBytesRead = strategy.readFromBlock(current, numBytesToRead); - retries = 0; // reset retries after successful read + retries = 0; + // If we get a StorageContainerException or an IOException due to + // datanodes are not reachable, refresh to get the latest pipeline + // info and retry. + // Otherwise, just retry according to the retry policy. + } catch (SCMSecurityException ex) { + throw ex; } catch (StorageContainerException e) { if (shouldRetryRead(e)) { handleReadError(e); @@ -309,13 +320,13 @@ protected synchronized int readWithStrategy(ByteReaderStrategy strategy) } else { throw e; } - } catch (SCMSecurityException ex) { - throw ex; } catch (IOException ex) { - // We got a IOException which might be due - // to DN down or connectivity issue. if (shouldRetryRead(ex)) { - current.releaseClient(); + if (isConnectivityIssue(ex)) { + handleReadError(ex); + } else { + current.releaseClient(); + } continue; } else { throw ex; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index b3d3a7e389f7..e30df34b85d9 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; @@ -425,20 +424,12 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) throws IOException { ReadChunkResponseProto readChunkResponse; - try { - List validators = - ContainerProtocolCalls.getValidatorList(); - validators.add(validator); + List validators = + ContainerProtocolCalls.getValidatorList(); + validators.add(validator); - readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, - readChunkInfo, blockID, validators, token); - - } catch (IOException e) { - if (e instanceof StorageContainerException) { - throw e; - } - throw new IOException("Unexpected OzoneException: " + e.toString(), e); - } + readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, + readChunkInfo, blockID, validators, token); if (readChunkResponse.hasData()) { return readChunkResponse.getData().asReadOnlyByteBufferList() diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index 7f0121ea6e9b..0bba24d81921 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -29,19 +29,20 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.OzoneChecksumException; -import org.apache.ozone.test.GenericTestUtils; -import org.apache.ozone.test.LambdaTestUtils; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.StatusException; import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; -import org.slf4j.LoggerFactory; -import org.slf4j.event.Level; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; import java.io.EOFException; import java.io.IOException; @@ -52,11 +53,12 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.Stream; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY; import static org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -68,7 +70,6 @@ /** * Tests for {@link BlockInputStream}'s functionality. */ -@RunWith(MockitoJUnitRunner.class) public class TestBlockInputStream { private static final int CHUNK_SIZE = 100; @@ -80,11 +81,12 @@ public class TestBlockInputStream { private List chunks; private Map chunkDataMap; - @Mock private Function refreshPipeline; - @Before + @BeforeEach + @SuppressWarnings("unchecked") public void setup() throws Exception { + refreshPipeline = Mockito.mock(Function.class); BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE); createChunkList(5); @@ -280,35 +282,9 @@ public void testRefreshPipelineFunction() throws Exception { } } - @Test - public void testGetBlockInfoFailWithIOException() throws Exception { - GenericTestUtils.setLogLevel(BlockInputStream.getLog(), Level.DEBUG); - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs( - LoggerFactory.getLogger(BlockInputStream.class)); - BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); - AtomicBoolean isRefreshed = new AtomicBoolean(); - createChunkList(5); - BlockInputStream blockInputStreamWithRetry = - new DummyBlockInputStreamWithRetry(blockID, blockSize, - MockPipeline.createSingleNodePipeline(), null, - false, null, chunks, chunkDataMap, isRefreshed, - new IOException("unavailable")); - try { - Assert.assertFalse(isRefreshed.get()); - byte[] b = new byte[200]; - blockInputStreamWithRetry.read(b, 0, 200); - // As in case of IOException we do not do do refresh. - Assert.assertFalse(isRefreshed.get()); - Assert.assertTrue(logCapturer.getOutput().contains( - "Retry to get chunk info fail")); - } finally { - blockInputStreamWithRetry.close(); - } - } - - @Test - public void testRefreshOnReadFailure() throws Exception { + @ParameterizedTest + @MethodSource("exceptionsTriggersRefresh") + public void testRefreshOnReadFailure(IOException ex) throws Exception { // GIVEN BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); Pipeline pipeline = MockPipeline.createSingleNodePipeline(); @@ -317,7 +293,7 @@ public void testRefreshOnReadFailure() throws Exception { final int len = 200; final ChunkInputStream stream = mock(ChunkInputStream.class); when(stream.read(any(), anyInt(), anyInt())) - .thenThrow(new StorageContainerException("test", CONTAINER_NOT_FOUND)) + .thenThrow(ex) .thenReturn(len); when(stream.getRemaining()) .thenReturn((long) len); @@ -347,47 +323,17 @@ protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) { } } - @Test - public void testRefreshExitsIfPipelineHasSameNodes() throws Exception { - // GIVEN - BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); - Pipeline pipeline = MockPipeline.createSingleNodePipeline(); - - final int len = 200; - final ChunkInputStream stream = mock(ChunkInputStream.class); - when(stream.read(any(), anyInt(), anyInt())) - .thenThrow(new StorageContainerException("test", CONTAINER_UNHEALTHY)); - when(stream.getRemaining()) - .thenReturn((long) len); - - when(refreshPipeline.apply(blockID)) - .thenAnswer(invocation -> samePipelineWithNewId(pipeline)); - - BlockInputStream subject = new DummyBlockInputStream(blockID, blockSize, - pipeline, null, false, null, refreshPipeline, chunks, null) { - @Override - protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) { - return stream; - } - }; - - try { - subject.initialize(); - - // WHEN - byte[] b = new byte[len]; - LambdaTestUtils.intercept(StorageContainerException.class, - () -> subject.read(b, 0, len)); - - // THEN - verify(refreshPipeline).apply(blockID); - } finally { - subject.close(); - } + private static Stream exceptionsNotTriggerRefresh() { + return Stream.of( + Arguments.of(new SCMSecurityException("Security problem")), + Arguments.of(new OzoneChecksumException("checksum missing")), + Arguments.of(new IOException("Some random exception.")) + ); } - - @Test - public void testReadNotRetriedOnOtherException() throws Exception { + @ParameterizedTest + @MethodSource("exceptionsNotTriggerRefresh") + public void testReadNotRetriedOnOtherException(IOException ex) + throws Exception { // GIVEN BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); Pipeline pipeline = MockPipeline.createSingleNodePipeline(); @@ -395,7 +341,7 @@ public void testReadNotRetriedOnOtherException() throws Exception { final int len = 200; final ChunkInputStream stream = mock(ChunkInputStream.class); when(stream.read(any(), anyInt(), anyInt())) - .thenThrow(new OzoneChecksumException("checksum missing")); + .thenThrow(ex); when(stream.getRemaining()) .thenReturn((long) len); @@ -412,9 +358,8 @@ protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) { // WHEN byte[] b = new byte[len]; - LambdaTestUtils.intercept(OzoneChecksumException.class, + Assertions.assertThrows(ex.getClass(), () -> subject.read(b, 0, len)); - // THEN verify(refreshPipeline, never()).apply(blockID); } finally { @@ -428,8 +373,10 @@ private Pipeline samePipelineWithNewId(Pipeline pipeline) { return MockPipeline.createPipeline(reverseOrder); } - @Test - public void testRefreshOnReadFailureAfterUnbuffer() throws Exception { + @ParameterizedTest + @MethodSource("exceptionsTriggersRefresh") + public void testRefreshOnReadFailureAfterUnbuffer(IOException ex) + throws Exception { // GIVEN BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); Pipeline pipeline = MockPipeline.createSingleNodePipeline(); @@ -442,7 +389,7 @@ public void testRefreshOnReadFailureAfterUnbuffer() throws Exception { final int len = 200; final ChunkInputStream stream = mock(ChunkInputStream.class); when(stream.read(any(), anyInt(), anyInt())) - .thenThrow(new StorageContainerException("test", CONTAINER_NOT_FOUND)) + .thenThrow(ex) .thenReturn(len); when(stream.getRemaining()) .thenReturn((long) len); @@ -481,4 +428,12 @@ protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) { subject.close(); } } + + private static Stream exceptionsTriggersRefresh() { + return Stream.of( + Arguments.of(new StorageContainerException(CONTAINER_NOT_FOUND)), + Arguments.of(new IOException(new ExecutionException( + new StatusException(Status.UNAVAILABLE)))) + ); + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java index c6872f286afe..a91885e25dde 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java @@ -34,6 +34,8 @@ public enum OzoneManagerVersion implements ComponentVersion { "New S3G persistent connection support is present in OM."), ERASURE_CODED_STORAGE_SUPPORT(2, "OzoneManager version that supports" + "ECReplicationConfig"), + OPTIMIZED_GET_KEY_INFO(3, "OzoneManager version that supports optimized" + + " key lookups using cached container locations."), FUTURE_VERSION(-1, "Used internally in the client when the server side is " + " newer and an unknown server version has arrived to the client."); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 973f98ff3b9a..1adb29abfba2 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -1238,19 +1238,12 @@ public OzoneInputStream getKey( verifyVolumeName(volumeName); verifyBucketName(bucketName); Preconditions.checkNotNull(keyName); - OmKeyArgs keyArgs = new OmKeyArgs.Builder() - .setVolumeName(volumeName) - .setBucketName(bucketName) - .setKeyName(keyName) - .setSortDatanodesInPipeline(topologyAwareReadEnabled) - .setLatestVersionLocation(getLatestVersionLocation) - .build(); - OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs); + OmKeyInfo keyInfo = getKeyInfo(volumeName, bucketName, keyName, false); return getInputStreamWithRetryFunction(keyInfo); } @Override - public Map< OmKeyLocationInfo, Map > + public Map > getKeysEveryReplicas(String volumeName, String bucketName, String keyName) throws IOException { @@ -1260,22 +1253,15 @@ public OzoneInputStream getKey( verifyVolumeName(volumeName); verifyBucketName(bucketName); - Preconditions.checkNotNull(keyName); - OmKeyArgs keyArgs = new OmKeyArgs.Builder() - .setVolumeName(volumeName) - .setBucketName(bucketName) - .setKeyName(keyName) - .setSortDatanodesInPipeline(topologyAwareReadEnabled) - .build(); - - OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs); + OmKeyInfo keyInfo = getKeyInfo(volumeName, bucketName, keyName, true); List keyLocationInfos = keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly(); - for (OmKeyLocationInfo keyLocationInfo : keyLocationInfos) { + for (OmKeyLocationInfo locationInfo : keyLocationInfos) { Map blocks = new HashMap<>(); - Pipeline pipelineBefore = keyLocationInfo.getPipeline(); + + Pipeline pipelineBefore = locationInfo.getPipeline(); List datanodes = pipelineBefore.getNodes(); for (DatanodeDetails dn : datanodes) { @@ -1284,22 +1270,47 @@ public OzoneInputStream getKey( Pipeline pipeline = new Pipeline.Builder(pipelineBefore).setNodes(nodes) .setId(PipelineID.randomId()).build(); - keyLocationInfo.setPipeline(pipeline); + OmKeyLocationInfo dnKeyLocation = new OmKeyLocationInfo.Builder() + .setBlockID(locationInfo.getBlockID()) + .setLength(locationInfo.getLength()) + .setOffset(locationInfo.getOffset()) + .setToken(locationInfo.getToken()) + .setPartNumber(locationInfo.getPartNumber()) + .setCreateVersion(locationInfo.getCreateVersion()) + .setPipeline(pipeline) + .build(); - List keyLocationInfoList = new ArrayList<>(); - keyLocationInfoList.add(keyLocationInfo); + List keyLocationInfoList = + Collections.singletonList(dnKeyLocation); OmKeyLocationInfoGroup keyLocationInfoGroup = new OmKeyLocationInfoGroup(0, keyLocationInfoList); - List keyLocationInfoGroups = new ArrayList<>(); - keyLocationInfoGroups.add(keyLocationInfoGroup); + List keyLocationInfoGroups = + Collections.singletonList(keyLocationInfoGroup); keyInfo.setKeyLocationVersions(keyLocationInfoGroups); - OzoneInputStream is = createInputStream(keyInfo, Function.identity()); + OmKeyInfo dnKeyInfo = new OmKeyInfo.Builder() + .setVolumeName(keyInfo.getVolumeName()) + .setBucketName(keyInfo.getBucketName()) + .setKeyName(keyInfo.getKeyName()) + .setOmKeyLocationInfos(keyInfo.getKeyLocationVersions()) + .setDataSize(keyInfo.getDataSize()) + .setCreationTime(keyInfo.getCreationTime()) + .setModificationTime(keyInfo.getModificationTime()) + .setReplicationConfig(keyInfo.getReplicationConfig()) + .setFileEncryptionInfo(keyInfo.getFileEncryptionInfo()) + .setAcls(keyInfo.getAcls()) + .setObjectID(keyInfo.getObjectID()) + .setUpdateID(keyInfo.getUpdateID()) + .setParentObjectID(keyInfo.getParentObjectID()) + .setFileChecksum(keyInfo.getFileChecksum()) + .build(); + dnKeyInfo.setMetadata(keyInfo.getMetadata()); + dnKeyInfo.setKeyLocationVersions(keyLocationInfoGroups); - blocks.put(dn, is); + blocks.put(dn, createInputStream(dnKeyInfo, Function.identity())); } - result.put(keyLocationInfo, blocks); + result.put(locationInfo, blocks); } return result; @@ -1403,17 +1414,8 @@ public boolean recoverTrash(String volumeName, String bucketName, public OzoneKeyDetails getKeyDetails( String volumeName, String bucketName, String keyName) throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - Preconditions.checkNotNull(keyName); - OmKeyArgs keyArgs = new OmKeyArgs.Builder() - .setVolumeName(volumeName) - .setBucketName(bucketName) - .setKeyName(keyName) - .setSortDatanodesInPipeline(topologyAwareReadEnabled) - .setLatestVersionLocation(getLatestVersionLocation) - .build(); - OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs); + OmKeyInfo keyInfo = + getKeyInfo(volumeName, bucketName, keyName, false); List ozoneKeyLocations = new ArrayList<>(); long lastKeyOffset = 0L; @@ -1437,6 +1439,34 @@ public OzoneKeyDetails getKeyDetails( getInputStream); } + private OmKeyInfo getKeyInfo( + String volumeName, String bucketName, String keyName, + boolean forceUpdateContainerCache) throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(keyName); + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setSortDatanodesInPipeline(topologyAwareReadEnabled) + .setLatestVersionLocation(getLatestVersionLocation) + .setForceUpdateContainerCacheFromSCM(forceUpdateContainerCache) + .build(); + return getKeyInfo(keyArgs); + } + + private OmKeyInfo getKeyInfo(OmKeyArgs keyArgs) throws IOException { + final OmKeyInfo keyInfo; + if (omVersion.compareTo(OzoneManagerVersion.OPTIMIZED_GET_KEY_INFO) >= 0) { + keyInfo = ozoneManagerClient.getKeyInfo(keyArgs, false) + .getKeyInfo(); + } else { + keyInfo = ozoneManagerClient.lookupKey(keyArgs); + } + return keyInfo; + } + @Override public void close() throws IOException { if (ecReconstructExecutor != null) { @@ -1694,14 +1724,8 @@ private OzoneInputStream getInputStreamWithRetryFunction( OmKeyInfo keyInfo) throws IOException { return createInputStream(keyInfo, omKeyInfo -> { try { - OmKeyArgs omKeyArgs = new OmKeyArgs.Builder() - .setVolumeName(omKeyInfo.getVolumeName()) - .setBucketName(omKeyInfo.getBucketName()) - .setKeyName(omKeyInfo.getKeyName()) - .setSortDatanodesInPipeline(topologyAwareReadEnabled) - .setLatestVersionLocation(getLatestVersionLocation) - .build(); - return ozoneManagerClient.lookupKey(omKeyArgs); + return getKeyInfo(omKeyInfo.getVolumeName(), omKeyInfo.getBucketName(), + omKeyInfo.getKeyName(), true); } catch (IOException e) { LOG.error("Unable to lookup key {} on retry.", keyInfo.getKeyName(), e); return null; @@ -1997,8 +2021,9 @@ public OzoneKey headObject(String volumeName, String bucketName, .setKeyName(keyName) .setLatestVersionLocation(true) .setHeadOp(true) + .setForceUpdateContainerCacheFromSCM(false) .build(); - OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs); + OmKeyInfo keyInfo = getKeyInfo(keyArgs); return new OzoneKey(keyInfo.getVolumeName(), keyInfo.getBucketName(), keyInfo.getKeyName(), keyInfo.getDataSize(), keyInfo.getCreationTime(), diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java index 90ed563b98ab..3eae9d1d7d0c 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java @@ -39,6 +39,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetKeyInfoRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetKeyInfoResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoBucketRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoBucketResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoVolumeRequest; @@ -129,6 +131,9 @@ public OMResponse submitRequest(OMRequest payload) throws IOException { case AllocateBlock: return response(payload, r -> r.setAllocateBlockResponse( allocateBlock(payload.getAllocateBlockRequest()))); + case GetKeyInfo: + return response(payload, r -> r.setGetKeyInfoResponse( + getKeyInfo(payload.getGetKeyInfoRequest()))); default: throw new IllegalArgumentException( "Mock version of om call " + payload.getCmdType() @@ -168,6 +173,15 @@ private LookupKeyResponse lookupKey(LookupKeyRequest lookupKeyRequest) { .build(); } + private GetKeyInfoResponse getKeyInfo(GetKeyInfoRequest request) { + final KeyArgs keyArgs = request.getKeyArgs(); + return GetKeyInfoResponse.newBuilder() + .setKeyInfo( + keys.get(keyArgs.getVolumeName()).get(keyArgs.getBucketName()) + .get(keyArgs.getKeyName())) + .build(); + } + private CommitKeyResponse commitKey(CommitKeyRequest commitKeyRequest) { final KeyArgs keyArgs = commitKeyRequest.getKeyArgs(); final KeyInfo openKey = diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java index 40f515adc448..2bd30eee721c 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java @@ -270,7 +270,9 @@ default OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID, * @param args the args of the key. * @return OmKeyInfo instance that client uses to talk to container. * @throws IOException + * @deprecated use {@link OzoneManagerProtocol#getKeyInfo} instead. */ + @Deprecated OmKeyInfo lookupKey(OmKeyArgs args) throws IOException; /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java index 442cbfe7604f..f0c684486b17 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java @@ -410,7 +410,9 @@ private void testReadAfterReplication(boolean doUnbuffer) throws Exception { .setKeyName(keyName) .setReplicationConfig(RatisReplicationConfig.getInstance(THREE)) .build(); - OmKeyInfo keyInfo = getCluster().getOzoneManager().lookupKey(keyArgs); + OmKeyInfo keyInfo = getCluster().getOzoneManager() + .getKeyInfo(keyArgs, false) + .getKeyInfo(); OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations(); Assert.assertNotNull(locations); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java new file mode 100644 index 000000000000..50aa6ff7b612 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java @@ -0,0 +1,674 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.ContainerBlockID; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetCommittedBlockLengthResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkResponseProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientGrpc; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientReply; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.hdds.security.exception.SCMSecurityException; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.client.rpc.RpcClient; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; +import org.apache.ozone.test.GenericTestUtils; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.StatusException; +import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; +import org.apache.ratis.util.ExitUtils; +import org.jetbrains.annotations.NotNull; +import org.junit.Rule; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.rules.Timeout; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.security.cert.X509Certificate; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Stream; + +import static com.google.common.collect.Sets.newHashSet; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * This class includes the integration test-cases to verify the integration + * between client and OM to keep container location cache eventually + * consistent. For example, when clients facing particular errors reading data + * from datanodes, they should inform OM to refresh location cache and OM + * should in turn contact SCM to get the updated container location. + * + * This integration verifies clients and OM using mocked Datanode and SCM + * protocols. + */ +public class TestOmContainerLocationCache { + + /** + * Set a timeout for each test. + */ + @Rule + public Timeout timeout = Timeout.seconds(300); + private static ScmBlockLocationProtocol mockScmBlockLocationProtocol; + private static StorageContainerLocationProtocol mockScmContainerClient; + private static OzoneConfiguration conf; + private static OMMetadataManager metadataManager; + private static File dir; + private static final String BUCKET_NAME = "bucket1"; + private static final String VERSIONED_BUCKET_NAME = "versionedBucket1"; + private static final String VOLUME_NAME = "vol1"; + private static OzoneManager om; + private static RpcClient rpcClient; + private static ObjectStore objectStore; + private static XceiverClientGrpc mockDn1Protocol; + private static XceiverClientGrpc mockDn2Protocol; + private static final DatanodeDetails DN1 = + MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID()); + private static final DatanodeDetails DN2 = + MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID()); + private static long testContainerId = 1L; + + + @BeforeAll + public static void setUp() throws Exception { + ExitUtils.disableSystemExit(); + + conf = new OzoneConfiguration(); + conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY, "127.0.0.1:0"); + dir = GenericTestUtils.getRandomizedTestDir(); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString()); + conf.set(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, "true"); + conf.setLong(OZONE_KEY_PREALLOCATION_BLOCKS_MAX, 10); + + mockScmBlockLocationProtocol = mock(ScmBlockLocationProtocol.class); + mockScmContainerClient = + Mockito.mock(StorageContainerLocationProtocol.class); + + OmTestManagers omTestManagers = new OmTestManagers(conf, + mockScmBlockLocationProtocol, mockScmContainerClient); + om = omTestManagers.getOzoneManager(); + metadataManager = omTestManagers.getMetadataManager(); + + rpcClient = new RpcClient(conf, null) { + @NotNull + @Override + protected XceiverClientFactory createXceiverClientFactory( + List x509Certificates) throws IOException { + return mockDataNodeClientFactory(); + } + }; + + objectStore = new ObjectStore(conf, rpcClient); + + createVolume(VOLUME_NAME); + createBucket(VOLUME_NAME, BUCKET_NAME, false); + createBucket(VOLUME_NAME, VERSIONED_BUCKET_NAME, true); + } + + @AfterAll + public static void cleanup() throws Exception { + om.stop(); + FileUtils.deleteDirectory(dir); + } + + private static XceiverClientManager mockDataNodeClientFactory() + throws IOException { + mockDn1Protocol = spy(new XceiverClientGrpc(createPipeline(DN1), conf)); + mockDn2Protocol = spy(new XceiverClientGrpc(createPipeline(DN2), conf)); + XceiverClientManager manager = mock(XceiverClientManager.class); + when(manager.acquireClient(argThat(matchPipeline(DN1)))) + .thenReturn(mockDn1Protocol); + when(manager.acquireClientForReadData(argThat(matchPipeline(DN1)))) + .thenReturn(mockDn1Protocol); + + when(manager.acquireClient(argThat(matchPipeline(DN2)))) + .thenReturn(mockDn2Protocol); + when(manager.acquireClientForReadData(argThat(matchPipeline(DN2)))) + .thenReturn(mockDn2Protocol); + return manager; + } + + private static ArgumentMatcher matchPipeline(DatanodeDetails dn) { + return argument -> argument != null + && argument.getNodes().get(0).getUuid().equals(dn.getUuid()); + } + + private static void createBucket(String volumeName, String bucketName, + boolean isVersionEnabled) + throws IOException { + OmBucketInfo bucketInfo = OmBucketInfo.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setIsVersionEnabled(isVersionEnabled) + .build(); + + OMRequestTestUtils.addBucketToOM(metadataManager, bucketInfo); + } + + private static void createVolume(String volumeName) throws IOException { + OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder() + .setVolume(volumeName) + .setAdminName("bilbo") + .setOwnerName("bilbo") + .build(); + OMRequestTestUtils.addVolumeToOM(metadataManager, volumeArgs); + } + + @BeforeEach + @SuppressFBWarnings("ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD") + public void beforeEach() { + testContainerId++; + Mockito.reset(mockScmBlockLocationProtocol, mockScmContainerClient, + mockDn1Protocol, mockDn2Protocol); + when(mockDn1Protocol.getPipeline()).thenReturn(createPipeline(DN1)); + when(mockDn2Protocol.getPipeline()).thenReturn(createPipeline(DN2)); + } + + /** + * Verify that in a happy case, container location is cached and reused + * in OM. + */ + @Test + public void containerCachedInHappyCase() throws Exception { + byte[] data = "Test content".getBytes(UTF_8); + + mockScmAllocationOnDn1(testContainerId, 1L); + mockWriteChunkResponse(mockDn1Protocol); + mockPutBlockResponse(mockDn1Protocol, testContainerId, 1L, data); + + OzoneBucket bucket = objectStore.getVolume(VOLUME_NAME) + .getBucket(BUCKET_NAME); + + // Create keyName1. + String keyName1 = "key1"; + try (OzoneOutputStream os = bucket.createKey(keyName1, data.length)) { + IOUtils.write(data, os); + } + + mockScmGetContainerPipeline(testContainerId, DN1); + + // Read keyName1. + OzoneKeyDetails key1 = bucket.getKey(keyName1); + verify(mockScmContainerClient, times(1)) + .getContainerWithPipelineBatch(newHashSet(testContainerId)); + + mockGetBlock(mockDn1Protocol, testContainerId, 1L, data, null, null); + mockReadChunk(mockDn1Protocol, testContainerId, 1L, data, null, null); + try (InputStream is = key1.getContent()) { + byte[] read = new byte[(int) key1.getDataSize()]; + IOUtils.read(is, read); + Assertions.assertArrayEquals(data, read); + } + + // Create keyName2 in the same container to reuse the cache + String keyName2 = "key2"; + try (OzoneOutputStream os = bucket.createKey(keyName2, data.length)) { + IOUtils.write(data, os); + } + // Read keyName2. + OzoneKeyDetails key2 = bucket.getKey(keyName2); + try (InputStream is = key2.getContent()) { + byte[] read = new byte[(int) key2.getDataSize()]; + IOUtils.read(is, read); + Assertions.assertArrayEquals(data, read); + } + // Ensure SCM is not called once again. + verify(mockScmContainerClient, times(1)) + .getContainerWithPipelineBatch(newHashSet(testContainerId)); + } + + private static Stream errorsTriggerRefresh() { + return Stream.of( + Arguments.of(null, Result.CLOSED_CONTAINER_IO), + Arguments.of(null, Result.CONTAINER_NOT_FOUND), + Arguments.of(new StatusException(Status.UNAVAILABLE), null), + Arguments.of(new StatusRuntimeException(Status.UNAVAILABLE), null) + ); + } + + private static Stream errorsNotTriggerRefresh() { + return Stream.of( + Arguments.of(new StatusException(Status.UNAUTHENTICATED), null, + SCMSecurityException.class), + Arguments.of(new IOException("Any random IO exception."), null, + IOException.class) + ); + } + + /** + * Verify that in case a client got errors calling datanodes GetBlock, + * the client correctly requests OM to refresh relevant container location + * from SCM. + */ + @ParameterizedTest + @MethodSource("errorsTriggerRefresh") + public void containerRefreshedAfterDatanodeGetBlockError( + Exception dnException, Result dnResponseCode) throws Exception { + byte[] data = "Test content".getBytes(UTF_8); + + mockScmAllocationOnDn1(testContainerId, 1L); + mockWriteChunkResponse(mockDn1Protocol); + mockPutBlockResponse(mockDn1Protocol, testContainerId, 1L, data); + + OzoneBucket bucket = objectStore.getVolume(VOLUME_NAME) + .getBucket(BUCKET_NAME); + + String keyName = "key"; + try (OzoneOutputStream os = bucket.createKey(keyName, data.length)) { + IOUtils.write(data, os); + } + + mockScmGetContainerPipeline(testContainerId, DN1); + + OzoneKeyDetails key1 = bucket.getKey(keyName); + + verify(mockScmContainerClient, times(1)) + .getContainerWithPipelineBatch(newHashSet(testContainerId)); + + try (InputStream is = key1.getContent()) { + // Simulate dn1 got errors, and the container's moved to dn2. + mockGetBlock(mockDn1Protocol, testContainerId, 1L, null, + dnException, dnResponseCode); + mockScmGetContainerPipeline(testContainerId, DN2); + mockGetBlock(mockDn2Protocol, testContainerId, 1L, data, null, null); + mockReadChunk(mockDn2Protocol, testContainerId, 1L, data, null, null); + + byte[] read = new byte[(int) key1.getDataSize()]; + IOUtils.read(is, read); + Assertions.assertArrayEquals(data, read); + } + + // verify SCM is called one more time to refresh. + verify(mockScmContainerClient, times(2)) + .getContainerWithPipelineBatch(newHashSet(testContainerId)); + } + + /** + * Verify that in case a client got errors datanodes ReadChunk,the client + * correctly requests OM to refresh relevant container location from + * SCM. + */ + @ParameterizedTest + @MethodSource("errorsTriggerRefresh") + public void containerRefreshedAfterDatanodeReadChunkError( + Exception dnException, Result dnResponseCode) throws Exception { + byte[] data = "Test content".getBytes(UTF_8); + + mockScmAllocationOnDn1(testContainerId, 1L); + mockWriteChunkResponse(mockDn1Protocol); + mockPutBlockResponse(mockDn1Protocol, testContainerId, 1L, data); + + OzoneBucket bucket = objectStore.getVolume(VOLUME_NAME) + .getBucket(BUCKET_NAME); + + String keyName = "key"; + try (OzoneOutputStream os = bucket.createKey(keyName, data.length)) { + IOUtils.write(data, os); + } + + mockScmGetContainerPipeline(testContainerId, DN1); + + OzoneKeyDetails key1 = bucket.getKey(keyName); + + verify(mockScmContainerClient, times(1)) + .getContainerWithPipelineBatch(newHashSet(testContainerId)); + + try (InputStream is = key1.getContent()) { + // simulate dn1 goes down, the container's to dn2. + mockGetBlock(mockDn1Protocol, testContainerId, 1L, data, null, null); + mockReadChunk(mockDn1Protocol, testContainerId, 1L, null, + dnException, dnResponseCode); + mockScmGetContainerPipeline(testContainerId, DN2); + mockGetBlock(mockDn2Protocol, testContainerId, 1L, data, null, null); + mockReadChunk(mockDn2Protocol, testContainerId, 1L, data, null, null); + + byte[] read = new byte[(int) key1.getDataSize()]; + IOUtils.read(is, read); + Assertions.assertArrayEquals(data, read); + } + + // verify SCM is called one more time to refresh. + verify(mockScmContainerClient, times(2)) + .getContainerWithPipelineBatch(newHashSet(testContainerId)); + } + + /** + * Verify that in case a client got particular errors datanodes GetBlock, + * the client fails correctly fast and don't invoke cache refresh. + */ + @ParameterizedTest + @MethodSource("errorsNotTriggerRefresh") + public void containerNotRefreshedAfterDatanodeGetBlockError( + Exception ex, Result errorCode, Class expectedEx) + throws Exception { + byte[] data = "Test content".getBytes(UTF_8); + + mockScmAllocationOnDn1(testContainerId, 1L); + mockWriteChunkResponse(mockDn1Protocol); + mockPutBlockResponse(mockDn1Protocol, testContainerId, 1L, data); + + OzoneBucket bucket = objectStore.getVolume(VOLUME_NAME) + .getBucket(BUCKET_NAME); + + String keyName = "key"; + try (OzoneOutputStream os = bucket.createKey(keyName, data.length)) { + IOUtils.write(data, os); + } + + mockScmGetContainerPipeline(testContainerId, DN1); + + OzoneKeyDetails key1 = bucket.getKey(keyName); + + verify(mockScmContainerClient, times(1)) + .getContainerWithPipelineBatch(newHashSet(testContainerId)); + + try (InputStream is = key1.getContent()) { + // simulate dn1 got errors, and the container's moved to dn2. + mockGetBlock(mockDn1Protocol, testContainerId, 1L, null, ex, errorCode); + + assertThrows(expectedEx, + () -> IOUtils.read(is, new byte[(int) key1.getDataSize()])); + } + + // verify SCM is called one more time to refresh. + verify(mockScmContainerClient, times(1)) + .getContainerWithPipelineBatch(newHashSet(testContainerId)); + } + + /** + * Verify that in case a client got particular errors datanodes ReadChunk, + * the client fails correctly fast and don't invoke cache refresh. + */ + @ParameterizedTest + @MethodSource("errorsNotTriggerRefresh") + public void containerNotRefreshedAfterDatanodeReadChunkError( + Exception dnException, Result dnResponseCode, + Class expectedEx) throws Exception { + byte[] data = "Test content".getBytes(UTF_8); + + mockScmAllocationOnDn1(testContainerId, 1L); + mockWriteChunkResponse(mockDn1Protocol); + mockPutBlockResponse(mockDn1Protocol, testContainerId, 1L, data); + + OzoneBucket bucket = objectStore.getVolume(VOLUME_NAME) + .getBucket(BUCKET_NAME); + + String keyName = "key"; + try (OzoneOutputStream os = bucket.createKey(keyName, data.length)) { + IOUtils.write(data, os); + } + + mockScmGetContainerPipeline(testContainerId, DN1); + + OzoneKeyDetails key1 = bucket.getKey(keyName); + + verify(mockScmContainerClient, times(1)) + .getContainerWithPipelineBatch(newHashSet(testContainerId)); + + try (InputStream is = key1.getContent()) { + // simulate dn1 got errors, and the container's moved to dn2. + mockGetBlock(mockDn1Protocol, testContainerId, 1L, data, null, null); + mockReadChunk(mockDn1Protocol, testContainerId, 1L, null, + dnException, dnResponseCode); + + assertThrows(expectedEx, + () -> IOUtils.read(is, new byte[(int) key1.getDataSize()])); + } + + // verify SCM is called one more time to refresh. + verify(mockScmContainerClient, times(1)) + .getContainerWithPipelineBatch(newHashSet(testContainerId)); + } + + private void mockPutBlockResponse(XceiverClientSpi mockDnProtocol, + long containerId, long localId, + byte[] data) + throws IOException, ExecutionException, InterruptedException { + GetCommittedBlockLengthResponseProto build = + GetCommittedBlockLengthResponseProto.newBuilder() + .setBlockLength(8) + .setBlockID(createBlockId(containerId, localId)) + .build(); + ContainerCommandResponseProto putResponse = + ContainerCommandResponseProto.newBuilder() + .setPutBlock(PutBlockResponseProto.newBuilder() + .setCommittedBlockLength(build).build()) + .setResult(Result.SUCCESS) + .setCmdType(Type.PutBlock) + .build(); + doAnswer(invocation -> + new XceiverClientReply(completedFuture(putResponse))) + .when(mockDnProtocol) + .sendCommandAsync(argThat(matchCmd(Type.PutBlock))); + } + + @NotNull + private ContainerProtos.DatanodeBlockID createBlockId(long containerId, + long localId) { + return ContainerProtos.DatanodeBlockID.newBuilder() + .setContainerID(containerId) + .setLocalID(localId).build(); + } + + private void mockWriteChunkResponse(XceiverClientSpi mockDnProtocol) + throws IOException, ExecutionException, InterruptedException { + ContainerCommandResponseProto writeResponse = + ContainerCommandResponseProto.newBuilder() + .setWriteChunk(WriteChunkResponseProto.newBuilder().build()) + .setResult(Result.SUCCESS) + .setCmdType(Type.WriteChunk) + .build(); + doAnswer(invocation -> + new XceiverClientReply(completedFuture(writeResponse))) + .when(mockDnProtocol) + .sendCommandAsync(argThat(matchCmd(Type.WriteChunk))); + } + + private ArgumentMatcher matchCmd(Type type) { + return argument -> argument != null && argument.getCmdType() == type; + } + + private void mockScmAllocationOnDn1(long containerID, + long localId) throws IOException { + ContainerBlockID blockId = new ContainerBlockID(containerID, localId); + AllocatedBlock block = new AllocatedBlock.Builder() + .setPipeline(createPipeline(DN1)) + .setContainerBlockID(blockId) + .build(); + when(mockScmBlockLocationProtocol + .allocateBlock(Mockito.anyLong(), Mockito.anyInt(), + any(ReplicationConfig.class), + Mockito.anyString(), + any(ExcludeList.class))) + .thenReturn(Collections.singletonList(block)); + } + + private void mockScmGetContainerPipeline(long containerId, + DatanodeDetails dn) + throws IOException { + Pipeline pipeline = createPipeline(dn); + ContainerInfo containerInfo = new ContainerInfo.Builder() + .setContainerID(containerId) + .setPipelineID(pipeline.getId()).build(); + List containerWithPipelines = + Collections.singletonList( + new ContainerWithPipeline(containerInfo, pipeline)); + + when(mockScmContainerClient.getContainerWithPipelineBatch( + newHashSet(containerId))).thenReturn(containerWithPipelines); + } + + private void mockGetBlock(XceiverClientGrpc mockDnProtocol, + long containerId, long localId, + byte[] data, + Exception exception, + Result errorCode) throws Exception { + + final CompletableFuture response; + if (exception != null) { + response = new CompletableFuture<>(); + response.completeExceptionally(exception); + } else if (errorCode != null) { + ContainerCommandResponseProto getBlockResp = + ContainerCommandResponseProto.newBuilder() + .setResult(errorCode) + .setCmdType(Type.GetBlock) + .build(); + response = completedFuture(getBlockResp); + } else { + ContainerCommandResponseProto getBlockResp = + ContainerCommandResponseProto.newBuilder() + .setGetBlock(GetBlockResponseProto.newBuilder() + .setBlockData(BlockData.newBuilder() + .addChunks(createChunkInfo(data)) + .setBlockID(createBlockId(containerId, localId)) + .build()) + .build() + ) + .setResult(Result.SUCCESS) + .setCmdType(Type.GetBlock) + .build(); + response = completedFuture(getBlockResp); + } + doAnswer(invocation -> new XceiverClientReply(response)) + .when(mockDnProtocol) + .sendCommandAsync(argThat(matchCmd(Type.GetBlock)), any()); + } + + @NotNull + private ChunkInfo createChunkInfo(byte[] data) throws Exception { + Checksum checksum = new Checksum(ChecksumType.CRC32, 4); + return ChunkInfo.newBuilder() + .setOffset(0) + .setLen(data.length) + .setChunkName("chunk1") + .setChecksumData(checksum.computeChecksum(data).getProtoBufMessage()) + .build(); + } + + private void mockReadChunk(XceiverClientGrpc mockDnProtocol, + long containerId, long localId, + byte[] data, + Exception exception, + Result errorCode) throws Exception { + final CompletableFuture response; + if (exception != null) { + response = new CompletableFuture<>(); + response.completeExceptionally(exception); + } else if (errorCode != null) { + ContainerCommandResponseProto readChunkResp = + ContainerCommandResponseProto.newBuilder() + .setResult(errorCode) + .setCmdType(Type.ReadChunk) + .build(); + response = completedFuture(readChunkResp); + } else { + ContainerCommandResponseProto readChunkResp = + ContainerCommandResponseProto.newBuilder() + .setReadChunk(ReadChunkResponseProto.newBuilder() + .setBlockID(createBlockId(containerId, localId)) + .setChunkData(createChunkInfo(data)) + .setData(ByteString.copyFrom(data)) + .build() + ) + .setResult(Result.SUCCESS) + .setCmdType(Type.ReadChunk) + .build(); + response = completedFuture(readChunkResp); + } + + doAnswer(invocation -> new XceiverClientReply(response)) + .when(mockDnProtocol) + .sendCommandAsync(argThat(matchCmd(Type.ReadChunk)), any()); + + } + + private static Pipeline createPipeline(DatanodeDetails dn) { + return Pipeline.newBuilder() + .setState(Pipeline.PipelineState.OPEN) + .setId(PipelineID.randomId()) + .setReplicationConfig( + RatisReplicationConfig.getInstance(ReplicationFactor.THREE)) + .setNodes(Collections.singletonList(dn)) + .build(); + } +} \ No newline at end of file