diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java index 2ae69dc3c96f..ee5c6c667f22 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java @@ -18,10 +18,12 @@ package org.apache.hadoop.ozone.om; +import com.google.common.collect.ImmutableMap; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.client.ContainerBlockID; +import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -89,8 +91,11 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -145,10 +150,17 @@ public class TestOmContainerLocationCache { private static ObjectStore objectStore; private static XceiverClientGrpc mockDn1Protocol; private static XceiverClientGrpc mockDn2Protocol; + private static XceiverClientGrpc mockDnEcProtocol; private static final DatanodeDetails DN1 = MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID()); private static final DatanodeDetails DN2 = MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID()); + private static final DatanodeDetails DN3 = + MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID()); + private static final DatanodeDetails DN4 = + MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID()); + private static final DatanodeDetails DN5 = + MockDatanodeDetails.createDatanodeDetails(UUID.randomUUID()); private static final AtomicLong CONTAINER_ID = new AtomicLong(1); @@ -200,6 +212,8 @@ private static XceiverClientManager mockDataNodeClientFactory() throws IOException { mockDn1Protocol = spy(new XceiverClientGrpc(createPipeline(DN1), conf)); mockDn2Protocol = spy(new XceiverClientGrpc(createPipeline(DN2), conf)); + mockDnEcProtocol = spy(new XceiverClientGrpc(createEcPipeline( + ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, DN4, 4, DN5, 5)), conf)); XceiverClientManager manager = mock(XceiverClientManager.class); when(manager.acquireClient(argThat(matchEmptyPipeline()))) .thenCallRealMethod(); @@ -217,6 +231,11 @@ private static XceiverClientManager mockDataNodeClientFactory() .thenReturn(mockDn2Protocol); when(manager.acquireClientForReadData(argThat(matchPipeline(DN2)))) .thenReturn(mockDn2Protocol); + + when(manager.acquireClient(argThat(matchEcPipeline()))) + .thenReturn(mockDnEcProtocol); + when(manager.acquireClientForReadData(argThat(matchEcPipeline()))) + .thenReturn(mockDnEcProtocol); return manager; } @@ -231,6 +250,11 @@ private static ArgumentMatcher matchPipeline(DatanodeDetails dn) { && argument.getNodes().get(0).getUuid().equals(dn.getUuid()); } + private static ArgumentMatcher matchEcPipeline() { + return argument -> argument != null && !argument.getNodes().isEmpty() + && argument.getReplicationConfig() instanceof ECReplicationConfig; + } + private static void createBucket(String volumeName, String bucketName, boolean isVersionEnabled) throws IOException { @@ -256,12 +280,14 @@ private static void createVolume(String volumeName) throws IOException { public void beforeEach() throws IOException { CONTAINER_ID.getAndIncrement(); reset(mockScmBlockLocationProtocol, mockScmContainerClient, - mockDn1Protocol, mockDn2Protocol); + mockDn1Protocol, mockDn2Protocol, mockDnEcProtocol); InnerNode.Factory factory = InnerNodeImpl.FACTORY; when(mockScmBlockLocationProtocol.getNetworkTopology()).thenReturn( factory.newInnerNode("", "", null, NetConstants.ROOT_LEVEL, 1)); when(mockDn1Protocol.getPipeline()).thenReturn(createPipeline(DN1)); when(mockDn2Protocol.getPipeline()).thenReturn(createPipeline(DN2)); + when(mockDnEcProtocol.getPipeline()).thenReturn(createEcPipeline( + ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, DN4, 4, DN5, 5))); } /** @@ -575,6 +601,48 @@ public void containerRefreshedOnEmptyPipelines() throws Exception { .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get())); } + @Test + public void containerRefreshedOnInsufficientEcPipelines() throws Exception { + int chunkSize = 1024 * 1024; + int dataBlocks = 3; + int parityBlocks = 2; + int inputSize = chunkSize * dataBlocks; + byte[][] inputChunks = new byte[dataBlocks][chunkSize]; + + mockScmAllocationEcPipeline(CONTAINER_ID.get(), 1L); + mockWriteChunkResponse(mockDnEcProtocol); + mockPutBlockResponse(mockDnEcProtocol, CONTAINER_ID.get(), 1L, null); + + OzoneBucket bucket = objectStore.getVolume(VOLUME_NAME).getBucket(BUCKET_NAME); + + String keyName = "ecKey"; + try (OzoneOutputStream os = bucket.createKey(keyName, inputSize, + new ECReplicationConfig(dataBlocks, parityBlocks, ECReplicationConfig.EcCodec.RS, + chunkSize), new HashMap<>())) { + for (int i = 0; i < dataBlocks; i++) { + os.write(inputChunks[i]); + } + } + + // case1: pipeline replicaIndexes missing some data indexes, should not cache + mockScmGetContainerEcPipeline(CONTAINER_ID.get(), ImmutableMap.of(DN1, 1, DN2, 2, DN4, 4)); + bucket.getKey(keyName); + verify(mockScmContainerClient, times(1)) + .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get())); + bucket.getKey(keyName); + verify(mockScmContainerClient, times(2)) + .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get())); + + // case2: pipeline replicaIndexes contain all data indexes, should cache + mockScmGetContainerEcPipeline(CONTAINER_ID.get(), ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, DN4, 4)); + bucket.getKey(keyName); + verify(mockScmContainerClient, times(3)) + .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get())); + bucket.getKey(keyName); + verify(mockScmContainerClient, times(3)) + .getContainerWithPipelineBatch(newHashSet(CONTAINER_ID.get())); + } + private void mockPutBlockResponse(XceiverClientSpi mockDnProtocol, long containerId, long localId, byte[] data) @@ -639,6 +707,22 @@ private void mockScmAllocationOnDn1(long containerID, .thenReturn(Collections.singletonList(block)); } + private void mockScmAllocationEcPipeline(long containerID, long localId) + throws IOException { + ContainerBlockID blockId = new ContainerBlockID(containerID, localId); + AllocatedBlock block = new AllocatedBlock.Builder() + .setPipeline(createEcPipeline(ImmutableMap.of(DN1, 1, DN2, 2, DN3, 3, DN4, 4, DN5, 5))) + .setContainerBlockID(blockId) + .build(); + when(mockScmBlockLocationProtocol + .allocateBlock(anyLong(), anyInt(), + any(ECReplicationConfig.class), + anyString(), + any(ExcludeList.class), + anyString())) + .thenReturn(Collections.singletonList(block)); + } + private void mockScmGetContainerPipeline(long containerId, DatanodeDetails dn) throws IOException { @@ -668,6 +752,20 @@ private void mockScmGetContainerPipelineEmpty(long containerId) newHashSet(containerId))).thenReturn(containerWithPipelines); } + private void mockScmGetContainerEcPipeline(long containerId, Map indexes) + throws IOException { + Pipeline pipeline = createEcPipeline(indexes); + ContainerInfo containerInfo = new ContainerInfo.Builder() + .setContainerID(containerId) + .setPipelineID(pipeline.getId()).build(); + List containerWithPipelines = + Collections.singletonList( + new ContainerWithPipeline(containerInfo, pipeline)); + + when(mockScmContainerClient.getContainerWithPipelineBatch( + newHashSet(containerId))).thenReturn(containerWithPipelines); + } + private void mockGetBlock(XceiverClientGrpc mockDnProtocol, long containerId, long localId, byte[] data, @@ -766,4 +864,14 @@ private static Pipeline createPipeline(List nodes) { .setNodes(nodes) .build(); } + + private static Pipeline createEcPipeline(Map indexes) { + return Pipeline.newBuilder() + .setState(Pipeline.PipelineState.OPEN) + .setId(PipelineID.randomId()) + .setReplicationConfig(new ECReplicationConfig(3, 2)) + .setReplicaIndexes(indexes) + .setNodes(new ArrayList<>(indexes.keySet())) + .build(); + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java index 77ee0d5851f2..e64a32a45e56 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java @@ -21,6 +21,8 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader.InvalidCacheLoadException; import com.google.common.cache.LoadingCache; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -113,12 +115,29 @@ public Map getContainerLocations(Iterable containerIds, } try { Map result = containerLocationCache.getAll(containerIds); - // Don't keep empty pipelines in the cache. - List emptyPipelines = result.entrySet().stream() - .filter(e -> e.getValue().isEmpty()) + // Don't keep empty pipelines or insufficient EC pipelines in the cache. + List uncachePipelines = result.entrySet().stream() + .filter(e -> { + Pipeline pipeline = e.getValue(); + // filter empty pipelines + if (pipeline.isEmpty()) { + return true; + } + // filter insufficient EC pipelines which missing any data index + ReplicationConfig repConfig = pipeline.getReplicationConfig(); + if (repConfig instanceof ECReplicationConfig) { + int d = ((ECReplicationConfig) repConfig).getData(); + for (int i = 1; i <= d; i++) { + if (!pipeline.getReplicaIndexes().containsValue(i)) { + return true; + } + } + } + return false; + }) .map(Map.Entry::getKey) .collect(Collectors.toList()); - containerLocationCache.invalidateAll(emptyPipelines); + containerLocationCache.invalidateAll(uncachePipelines); return result; } catch (ExecutionException e) { return handleCacheExecutionException(e);