diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index ee4edecb3c4..0724d614cce 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -66,6 +66,7 @@ public class XceiverClientManager implements Closeable, XceiverClientFactory { LoggerFactory.getLogger(XceiverClientManager.class); //TODO : change this to SCM configuration class private final ConfigurationSource conf; + private final ScmClientConfig clientConfig; private final Cache clientCache; private List caCerts; @@ -88,6 +89,7 @@ public XceiverClientManager(ConfigurationSource conf, List caCerts) throws IOException { Preconditions.checkNotNull(clientConf); Preconditions.checkNotNull(conf); + this.clientConfig = clientConf; long staleThresholdMs = clientConf.getStaleThreshold(MILLISECONDS); this.conf = conf; this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf); @@ -347,6 +349,37 @@ public void setMaxSize(int maxSize) { this.maxSize = maxSize; } + public void setStaleThreshold(long threshold) { + this.staleThreshold = threshold; + } + + } + + /** + * Builder of ScmClientConfig. + */ + public static class XceiverClientManagerConfigBuilder { + + private int maxCacheSize; + private long staleThresholdMs; + + public XceiverClientManagerConfigBuilder setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + return this; + } + + public XceiverClientManagerConfigBuilder setStaleThresholdMs( + long staleThresholdMs) { + this.staleThresholdMs = staleThresholdMs; + return this; + } + + public ScmClientConfig build() { + ScmClientConfig clientConfig = new ScmClientConfig(); + clientConfig.setMaxSize(this.maxCacheSize); + clientConfig.setStaleThreshold(this.staleThresholdMs); + return clientConfig; + } } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index 3f3fc12f2a8..174e507829b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -80,6 +80,11 @@ public void write(byte[] b, int off, int len) throws IOException { writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len))); } + public CompletableFuture write( + ByteBuffer buff) throws IOException { + return writeChunkToContainer(ChunkBuffer.wrap(buff)); + } + public CompletableFuture executePutBlock(boolean close, boolean force, long blockGroupLength) throws IOException { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index b5365820e3d..16bef697121 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -425,13 +425,15 @@ public static PutSmallFileResponseProto writeSmallFile( * @param client - client * @param containerID - ID of container * @param encodedToken - encodedToken if security is enabled + * @param replicaIndex - index position of the container replica * @throws IOException */ @InterfaceStability.Evolving public static void createRecoveringContainer(XceiverClientSpi client, - long containerID, String encodedToken) throws IOException { + long containerID, String encodedToken, int replicaIndex) + throws IOException { createContainerInternal(client, containerID, encodedToken, - ContainerProtos.ContainerDataProto.State.RECOVERING); + ContainerProtos.ContainerDataProto.State.RECOVERING, replicaIndex); } /** @@ -443,7 +445,7 @@ public static void createRecoveringContainer(XceiverClientSpi client, */ public static void createContainer(XceiverClientSpi client, long containerID, String encodedToken) throws IOException { - createContainerInternal(client, containerID, encodedToken, null); + createContainerInternal(client, containerID, encodedToken, null, 0); } /** * createContainer call that creates a container on the datanode. @@ -451,18 +453,23 @@ public static void createContainer(XceiverClientSpi client, long containerID, * @param containerID - ID of container * @param encodedToken - encodedToken if security is enabled * @param state - state of the container + * @param replicaIndex - index position of the container replica * @throws IOException */ private static void createContainerInternal(XceiverClientSpi client, long containerID, String encodedToken, - ContainerProtos.ContainerDataProto.State state) throws IOException { + ContainerProtos.ContainerDataProto.State state, int replicaIndex) + throws IOException { ContainerProtos.CreateContainerRequestProto.Builder createRequest = ContainerProtos.CreateContainerRequestProto.newBuilder(); - createRequest.setContainerType(ContainerProtos.ContainerType - .KeyValueContainer); + createRequest + .setContainerType(ContainerProtos.ContainerType.KeyValueContainer); if (state != null) { createRequest.setState(state); } + if (replicaIndex > 0) { + createRequest.setReplicaIndex(replicaIndex); + } String id = client.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto.Builder request = diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index ba21e5795e3..cc05511b58a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler; +import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionSupervisor; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; @@ -181,9 +182,12 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, replicationSupervisorMetrics = ReplicationSupervisorMetrics.create(supervisor); + ECReconstructionCoordinator ecReconstructionCoordinator = + new ECReconstructionCoordinator(conf, certClient); ecReconstructionSupervisor = new ECReconstructionSupervisor(container.getContainerSet(), context, - replicationConfig.getReplicationMaxStreams()); + replicationConfig.getReplicationMaxStreams(), + ecReconstructionCoordinator); // When we add new handlers just adding a new handler here should do the @@ -389,6 +393,10 @@ public void close() throws IOException { Thread.currentThread().interrupt(); } + if (ecReconstructionSupervisor != null) { + ecReconstructionSupervisor.close(); + } + if (connectionManager != null) { connectionManager.close(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java index f4ec45f6008..009d47d7afb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java @@ -53,8 +53,9 @@ public void handle(SCMCommand command, OzoneContainer container, ecContainersCommand.getMissingContainerIndexes(), ecContainersCommand.getSources(), ecContainersCommand.getTargetDatanodes()); - this.supervisor.addTask( - new ECReconstructionCoordinatorTask(reconstructionCommandInfo)); + this.supervisor.addTask(new ECReconstructionCoordinatorTask( + this.supervisor.getReconstructionCoordinator(), + reconstructionCommandInfo)); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java new file mode 100644 index 00000000000..adebe117769 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ec.reconstruction; + +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; +import org.apache.hadoop.hdds.utils.HAUtils; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +/** + * This class wraps necessary container-level rpc calls + * during ec offline reconstruction. + * - ListBlock + * - CloseContainer + */ +public class ECContainerOperationClient implements Closeable { + private static final Logger LOG = + LoggerFactory.getLogger(ECContainerOperationClient.class); + private final XceiverClientManager xceiverClientManager; + + public ECContainerOperationClient(XceiverClientManager clientManager) { + this.xceiverClientManager = clientManager; + } + + public ECContainerOperationClient(ConfigurationSource conf, + CertificateClient certificateClient) throws IOException { + this(createClientManager(conf, certificateClient)); + } + + @NotNull + private static XceiverClientManager createClientManager( + ConfigurationSource conf, CertificateClient certificateClient) + throws IOException { + return new XceiverClientManager(conf, + new XceiverClientManager.XceiverClientManagerConfigBuilder() + .setMaxCacheSize(256).setStaleThresholdMs(10 * 1000).build(), + certificateClient != null ? + HAUtils.buildCAX509List(certificateClient, conf) : + null); + } + + public BlockData[] listBlock(long containerId, DatanodeDetails dn, + ECReplicationConfig repConfig, Token token) + throws IOException { + XceiverClientSpi xceiverClient = this.xceiverClientManager + .acquireClient(singleNodePipeline(dn, repConfig)); + try { + List blockDataList = ContainerProtocolCalls + .listBlock(xceiverClient, containerId, null, Integer.MAX_VALUE, token) + .getBlockDataList(); + return blockDataList.stream().map(i -> { + try { + return BlockData.getFromProtoBuf(i); + } catch (IOException e) { + LOG.debug("Failed while converting to protobuf BlockData. Returning" + + " null for listBlock from DN: " + dn, + e); + // TODO: revisit here. + return null; + } + }).collect(Collectors.toList()) + .toArray(new BlockData[blockDataList.size()]); + } finally { + this.xceiverClientManager.releaseClient(xceiverClient, false); + } + } + + public void closeContainer(long containerID, DatanodeDetails dn, + ECReplicationConfig repConfig, String encodedToken) throws IOException { + XceiverClientSpi xceiverClient = this.xceiverClientManager + .acquireClient(singleNodePipeline(dn, repConfig)); + try { + ContainerProtocolCalls + .closeContainer(xceiverClient, containerID, encodedToken); + } finally { + this.xceiverClientManager.releaseClient(xceiverClient, false); + } + } + + public void createRecoveringContainer(long containerID, DatanodeDetails dn, + ECReplicationConfig repConfig, String encodedToken, int replicaIndex) + throws IOException { + XceiverClientSpi xceiverClient = this.xceiverClientManager.acquireClient( + singleNodePipeline(dn, repConfig)); + try { + ContainerProtocolCalls + .createRecoveringContainer(xceiverClient, containerID, encodedToken, + replicaIndex); + } finally { + this.xceiverClientManager.releaseClient(xceiverClient, false); + } + } + + Pipeline singleNodePipeline(DatanodeDetails dn, + ECReplicationConfig repConfig) { + // To get the same client from cache, we try to use the DN UUID as + // pipelineID for uniqueness. Please note, pipeline does not have any + // significance after it's close. So, we are ok to use any ID. + return Pipeline.newBuilder().setId(PipelineID.valueOf(dn.getUuid())) + .setReplicationConfig(repConfig).setNodes(ImmutableList.of(dn)) + .setState(Pipeline.PipelineState.CLOSED).build(); + } + + public XceiverClientManager getXceiverClientManager() { + return xceiverClientManager; + } + + @Override + public void close() throws IOException { + if (xceiverClientManager != null) { + xceiverClientManager.close(); + } + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java new file mode 100644 index 00000000000..780c520d723 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ec.reconstruction; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.ByteStringConversion; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; +import org.apache.hadoop.hdds.scm.storage.BufferPool; +import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; +import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; +import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.io.ElasticByteBufferPool; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory; +import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl; +import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy; +import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * The Coordinator implements the main flow of reconstructing + * missing container replicas. + *

+ * For a container reconstruction task, the main flow is: + * - ListBlock from all healthy replicas + * - calculate effective block group len for all blocks + * - create RECOVERING containers in TargetDNs + * - for each block + * - build a ECReconstructedStripedInputStream to read healthy chunks + * - build a ECBlockOutputStream to write out decoded chunks + * - for each stripe + * - use ECReconstructedStripedInputStream.recoverChunks to decode chunks + * - use ECBlockOutputStream.write to write decoded chunks to TargetDNs + * - PutBlock + * - Close RECOVERING containers in TargetDNs + */ +public class ECReconstructionCoordinator implements Closeable { + + static final Logger LOG = + LoggerFactory.getLogger(ECReconstructionCoordinator.class); + + private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; + + private final ECContainerOperationClient containerOperationClient; + + private final ConfigurationSource config; + + private final ByteBufferPool byteBufferPool; + + private ExecutorService ecReconstructExecutor; + + private BlockInputStreamFactory blockInputStreamFactory; + + public ECReconstructionCoordinator(ECContainerOperationClient containerClient, + ConfigurationSource conf, ByteBufferPool byteBufferPool, + ExecutorService reconstructExecutor, + BlockInputStreamFactory streamFactory) { + this.containerOperationClient = containerClient; + this.config = conf; + this.byteBufferPool = byteBufferPool; + this.blockInputStreamFactory = streamFactory; + this.ecReconstructExecutor = reconstructExecutor; + } + + public ECReconstructionCoordinator(ConfigurationSource conf, + CertificateClient certificateClient) throws IOException { + this(new ECContainerOperationClient(conf, certificateClient), conf, + new ElasticByteBufferPool(), null, null); + this.ecReconstructExecutor = + new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE, + config.getObject(OzoneClientConfig.class) + .getEcReconstructStripeReadPoolLimit(), 60, TimeUnit.SECONDS, + new SynchronousQueue<>(), new ThreadFactoryBuilder() + .setNameFormat("ec-reconstruct-reader-TID-%d").build(), + new ThreadPoolExecutor.CallerRunsPolicy()); + this.blockInputStreamFactory = BlockInputStreamFactoryImpl + .getInstance(byteBufferPool, () -> ecReconstructExecutor); + } + + public void reconstructECContainerGroup(long containerID, + ECReplicationConfig repConfig, + SortedMap sourceNodeMap, + SortedMap targetNodeMap) throws IOException { + + Pipeline pipeline = rebuildInputPipeline(repConfig, sourceNodeMap); + + SortedMap blockDataMap = + getBlockDataMap(containerID, repConfig, sourceNodeMap); + + SortedMap blockLocationInfoMap = + calcBlockLocationInfoMap(containerID, blockDataMap, pipeline); + + // 1. create target recovering containers. + for (Map.Entry indexDnPair : targetNodeMap + .entrySet()) { + this.containerOperationClient + .createRecoveringContainer(containerID, indexDnPair.getValue(), + repConfig, null, indexDnPair.getKey()); + } + + // 2. Reconstruct and transfer to targets + for (BlockLocationInfo blockLocationInfo : blockLocationInfoMap.values()) { + reconstructECBlockGroup(blockLocationInfo, repConfig, targetNodeMap); + } + + // 3. Close containers + for (Map.Entry indexDnPair : targetNodeMap + .entrySet()) { + DatanodeDetails dn = indexDnPair.getValue(); + this.containerOperationClient + .closeContainer(containerID, dn, repConfig, null); + } + + } + + void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, + ECReplicationConfig repConfig, + SortedMap targetMap) + throws IOException { + long safeBlockGroupLength = blockLocationInfo.getLength(); + List missingContainerIndexes = new ArrayList<>(targetMap.keySet()); + + // calculate the real missing block indexes + int dataLocs = ECBlockInputStreamProxy + .expectedDataLocations(repConfig, safeBlockGroupLength); + List toReconstructIndexes = new ArrayList<>(); + for (Integer index : missingContainerIndexes) { + if (index <= dataLocs || index > repConfig.getData()) { + toReconstructIndexes.add(index); + } + // else padded indexes. + } + + // Looks like we don't need to reconstruct any missing blocks in this block + // group. The reason for this should be block group had only padding blocks + // in the missing locations. + if (toReconstructIndexes.size() == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping the reconstruction for the block: " + + blockLocationInfo.getBlockID() + ". In the missing locations: " + + missingContainerIndexes + + ", this block group has only padded blocks."); + } + return; + } + + try (ECBlockReconstructedStripeInputStream sis + = new ECBlockReconstructedStripeInputStream( + repConfig, blockLocationInfo, true, + this.containerOperationClient.getXceiverClientManager(), null, + this.blockInputStreamFactory, byteBufferPool, + this.ecReconstructExecutor)) { + + ECBlockOutputStream[] targetBlockStreams = + new ECBlockOutputStream[toReconstructIndexes.size()]; + ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()]; + OzoneClientConfig configuration = new OzoneClientConfig(); + // TODO: Let's avoid unnecessary bufferPool creation. This pool actually + // not used in EC flows, but there are some dependencies on buffer pool. + BufferPool bufferPool = + new BufferPool(configuration.getStreamBufferSize(), + (int) (configuration.getStreamBufferMaxSize() / configuration + .getStreamBufferSize()), + ByteStringConversion.createByteBufferConversion(false)); + for (int i = 0; i < toReconstructIndexes.size(); i++) { + DatanodeDetails datanodeDetails = + targetMap.get(toReconstructIndexes.get(i)); + targetBlockStreams[i] = + new ECBlockOutputStream(blockLocationInfo.getBlockID(), + this.containerOperationClient.getXceiverClientManager(), + this.containerOperationClient + .singleNodePipeline(datanodeDetails, repConfig), bufferPool, + configuration, blockLocationInfo.getToken()); + bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize()); + // Make sure it's clean. Don't want to reuse the erroneously returned + // buffers from the pool. + bufs[i].clear(); + } + + sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1)) + .collect(Collectors.toSet())); + long length = safeBlockGroupLength; + while (length > 0) { + int readLen = sis.recoverChunks(bufs); + // TODO: can be submitted in parallel + for (int i = 0; i < bufs.length; i++) { + CompletableFuture + future = targetBlockStreams[i].write(bufs[i]); + checkFailures(targetBlockStreams[i], future); + bufs[i].clear(); + } + length -= readLen; + } + + try { + for (ECBlockOutputStream targetStream : targetBlockStreams) { + targetStream + .executePutBlock(true, true, blockLocationInfo.getLength()); + checkFailures(targetStream, + targetStream.getCurrentPutBlkResponseFuture()); + } + } finally { + for (ByteBuffer buf : bufs) { + byteBufferPool.putBuffer(buf); + } + IOUtils.cleanupWithLogger(LOG, targetBlockStreams); + } + } + } + + private void checkFailures(ECBlockOutputStream targetBlockStream, + CompletableFuture + currentPutBlkResponseFuture) + throws IOException { + if (isFailed(targetBlockStream, currentPutBlkResponseFuture)) { + // If one chunk response failed, we should retry. + // Even after retries if it failed, we should declare the + // reconstruction as failed. + // For now, let's throw the exception. + throw new IOException( + "Chunk write failed at the new target node: " + targetBlockStream + .getDatanodeDetails() + ". Aborting the reconstruction process."); + } + } + + private boolean isFailed(ECBlockOutputStream outputStream, + CompletableFuture chunkWriteResponseFuture) { + if (chunkWriteResponseFuture == null) { + return true; + } + + ContainerProtos.ContainerCommandResponseProto + containerCommandResponseProto = null; + try { + containerCommandResponseProto = chunkWriteResponseFuture.get(); + } catch (InterruptedException e) { + outputStream.setIoException(e); + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + outputStream.setIoException(e); + } + + if (outputStream.getIoException() != null) { + return true; + } + + if (containerCommandResponseProto == null) { + return true; + } + + return false; + } + + SortedMap calcBlockLocationInfoMap(long containerID, + SortedMap blockDataMap, Pipeline pipeline) { + + SortedMap blockInfoMap = new TreeMap<>(); + + for (Map.Entry entry : blockDataMap.entrySet()) { + Long localID = entry.getKey(); + BlockData[] blockGroup = entry.getValue(); + + long blockGroupLen = calcEffectiveBlockGroupLen(blockGroup, + pipeline.getReplicationConfig().getRequiredNodes()); + if (blockGroupLen > 0) { + BlockLocationInfo blockLocationInfo = new BlockLocationInfo.Builder() + .setBlockID(new BlockID(containerID, localID)) + .setLength(blockGroupLen).setPipeline(pipeline).build(); + blockInfoMap.put(localID, blockLocationInfo); + } + } + return blockInfoMap; + } + + @Override + public void close() throws IOException { + if (containerOperationClient != null) { + containerOperationClient.close(); + } + } + + private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig, + SortedMap sourceNodeMap) { + + List nodes = new ArrayList<>(sourceNodeMap.values()); + Map dnVsIndex = new HashMap<>(); + + Iterator> iterator = + sourceNodeMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry next = iterator.next(); + Integer key = next.getKey(); + DatanodeDetails value = next.getValue(); + dnVsIndex.put(value, key); + } + + return Pipeline.newBuilder().setId(PipelineID.randomId()) + .setReplicationConfig(repConfig).setNodes(nodes) + .setReplicaIndexes(dnVsIndex).setState(Pipeline.PipelineState.CLOSED) + .build(); + } + + private SortedMap getBlockDataMap(long containerID, + ECReplicationConfig repConfig, + Map sourceNodeMap) throws IOException { + + SortedMap resultMap = new TreeMap<>(); + + Iterator> iterator = + sourceNodeMap.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry next = iterator.next(); + Integer index = next.getKey(); + DatanodeDetails dn = next.getValue(); + + BlockData[] blockDataArr = + containerOperationClient.listBlock(containerID, dn, repConfig, null); + + for (BlockData blockData : blockDataArr) { + BlockID blockID = blockData.getBlockID(); + BlockData[] blkDataArr = resultMap.getOrDefault(blockData.getLocalID(), + new BlockData[repConfig.getRequiredNodes()]); + blkDataArr[index - 1] = blockData; + resultMap.put(blockID.getLocalID(), blkDataArr); + } + } + return resultMap; + } + + /** + * Get the effective length of each block group. + * We can not be absolutely accurate when there is a failed stripe + * in this block since the failed cells could be missing, and + * we can not tell from the healthy cells whether the last stripe + * is failed or not. But in such case we at most recover one extra + * stripe for this block which does not confuse the client data view. + * + * @param blockGroup + * @param replicaCount + * @return + */ + private long calcEffectiveBlockGroupLen(BlockData[] blockGroup, + int replicaCount) { + Preconditions.checkState(blockGroup.length == replicaCount); + + long blockGroupLen = Long.MAX_VALUE; + + for (int i = 0; i < replicaCount; i++) { + if (blockGroup[i] == null) { + continue; + } + + String putBlockLenStr = blockGroup[i].getMetadata() + .get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK); + long putBlockLen = (putBlockLenStr == null) ? + Long.MAX_VALUE : + Long.parseLong(putBlockLenStr); + // Use the min to be conservative + blockGroupLen = Math.min(putBlockLen, blockGroupLen); + } + return blockGroupLen == Long.MAX_VALUE ? 0 : blockGroupLen; + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java index 24168e5f69f..af4a87e2723 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java @@ -17,14 +17,30 @@ */ package org.apache.hadoop.ozone.container.ec.reconstruction; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + /** * This is the actual EC reconstruction coordination task. */ public class ECReconstructionCoordinatorTask implements Runnable { + static final Logger LOG = + LoggerFactory.getLogger(ECReconstructionCoordinatorTask.class); + private ECReconstructionCoordinator reconstructionCoordinator; private ECReconstructionCommandInfo reconstructionCommandInfo; public ECReconstructionCoordinatorTask( + ECReconstructionCoordinator coordinator, ECReconstructionCommandInfo reconstructionCommandInfo) { + this.reconstructionCoordinator = coordinator; this.reconstructionCommandInfo = reconstructionCommandInfo; } @@ -42,6 +58,29 @@ public void run() { // 4. Write the recovered chunks to given targets/write locally to // respective container. HDDS-6582 // 5. Close/finalize the recovered containers. + + SortedMap sourceNodeMap = + reconstructionCommandInfo.getSources().stream().collect(Collectors + .toMap(DatanodeDetailsAndReplicaIndex::getReplicaIndex, + DatanodeDetailsAndReplicaIndex::getDnDetails, (v1, v2) -> v1, + TreeMap::new)); + SortedMap targetNodeMap = IntStream + .range(0, reconstructionCommandInfo.getTargetDatanodes().size()).boxed() + .collect(Collectors.toMap(i -> (int) reconstructionCommandInfo + .getMissingContainerIndexes()[i], + i -> reconstructionCommandInfo.getTargetDatanodes().get(i), + (v1, v2) -> v1, TreeMap::new)); + + try { + reconstructionCoordinator.reconstructECContainerGroup( + reconstructionCommandInfo.getContainerID(), + reconstructionCommandInfo.getEcReplicationConfig(), sourceNodeMap, + targetNodeMap); + } catch (IOException e) { + LOG.warn( + "Failed to complete the reconstruction task for the container: " + + reconstructionCommandInfo.getContainerID(), e); + } } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java index e2c930a8c2a..2af04f65007 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java @@ -21,6 +21,8 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import java.io.Closeable; +import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -30,28 +32,33 @@ * This class is to handle all the EC reconstruction tasks to be scheduled as * they arrive. */ -public class ECReconstructionSupervisor { +public class ECReconstructionSupervisor implements Closeable { private final ContainerSet containerSet; private final StateContext context; private final ExecutorService executor; + private final ECReconstructionCoordinator reconstructionCoordinator; public ECReconstructionSupervisor(ContainerSet containerSet, - StateContext context, ExecutorService executor) { + StateContext context, ExecutorService executor, + ECReconstructionCoordinator coordinator) { this.containerSet = containerSet; this.context = context; this.executor = executor; + this.reconstructionCoordinator = coordinator; } public ECReconstructionSupervisor(ContainerSet containerSet, - StateContext context, int poolSize) { + StateContext context, int poolSize, + ECReconstructionCoordinator coordinator) { // TODO: ReplicationSupervisor and this class can be refactored to have a // common interface. this(containerSet, context, new ThreadPoolExecutor(poolSize, poolSize, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("ECContainerReconstructionThread-%d").build())); + .setNameFormat("ECContainerReconstructionThread-%d").build()), + coordinator); } public void stop() { @@ -69,4 +76,16 @@ public void stop() { public void addTask(ECReconstructionCoordinatorTask task) { executor.execute(task); } + + @Override + public void close() throws IOException { + if (reconstructionCoordinator != null) { + reconstructionCoordinator.close(); + } + stop(); + } + + public ECReconstructionCoordinator getReconstructionCoordinator() { + return reconstructionCoordinator; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 7fcbdb3e7f4..95907116506 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -283,6 +283,7 @@ ContainerCommandResponseProto handleCreateContainer( } newContainerData.setReplicaIndex(request.getCreateContainer() .getReplicaIndex()); + // TODO: Add support to add metadataList to ContainerData. Add metadata // to container during creation. KeyValueContainer newContainer = new KeyValueContainer( diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java index e86be82a091..2403364f4cf 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java @@ -28,7 +28,7 @@ public class TestECReconstructionSupervisor { private final ECReconstructionSupervisor supervisor = - new ECReconstructionSupervisor(null, null, 5); + new ECReconstructionSupervisor(null, null, 5, null); @Test public void testAddTaskShouldExecuteTheGivenTask() @@ -42,7 +42,7 @@ static class FakeTask extends ECReconstructionCoordinatorTask { private boolean isExecuted = false; FakeTask(ECReconstructionCommandInfo reconstructionCommandInfo) { - super(reconstructionCommandInfo); + super(null, reconstructionCommandInfo); } @Override diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index 76cfb7b289a..3047d18d7e3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; @@ -47,10 +48,16 @@ import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.InsufficientLocationsException; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.ec.reconstruction.ECContainerOperationClient; +import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator; import org.junit.Assert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -58,6 +65,8 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.io.OutputStream; @@ -65,10 +74,16 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.UUID; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; import static java.nio.charset.StandardCharsets.UTF_8; @@ -79,7 +94,7 @@ public class TestContainerCommandsEC { private static MiniOzoneCluster cluster; private static StorageContainerManager scm; - private static OzoneClient client; + private static OzoneClient rpcClient; private static ObjectStore store; private static StorageContainerLocationProtocolClientSideTranslatorPB storageContainerLocationClient; @@ -90,17 +105,16 @@ public class TestContainerCommandsEC { private static final EcCodec EC_CODEC = EcCodec.RS; private static final int EC_CHUNK_SIZE = 1024; private static final int STRIPE_DATA_SIZE = EC_DATA * EC_CHUNK_SIZE; - private static final int NUM_DN = EC_DATA + EC_PARITY; + private static final int NUM_DN = EC_DATA + EC_PARITY + 3; + private static byte[][] inputChunks = new byte[EC_DATA][EC_CHUNK_SIZE]; // Each key size will be in range [min, max), min inclusive, max exclusive - private static final int[][] KEY_SIZE_RANGES = new int[][]{ - {1, EC_CHUNK_SIZE}, - {EC_CHUNK_SIZE, EC_CHUNK_SIZE + 1}, - {EC_CHUNK_SIZE + 1, STRIPE_DATA_SIZE}, - {STRIPE_DATA_SIZE, STRIPE_DATA_SIZE + 1}, - {STRIPE_DATA_SIZE + 1, STRIPE_DATA_SIZE + EC_CHUNK_SIZE}, - {STRIPE_DATA_SIZE + EC_CHUNK_SIZE, STRIPE_DATA_SIZE * 2}, - }; + private static final int[][] KEY_SIZE_RANGES = + new int[][] {{1, EC_CHUNK_SIZE}, {EC_CHUNK_SIZE, EC_CHUNK_SIZE + 1}, + {EC_CHUNK_SIZE + 1, STRIPE_DATA_SIZE}, + {STRIPE_DATA_SIZE, STRIPE_DATA_SIZE + 1}, + {STRIPE_DATA_SIZE + 1, STRIPE_DATA_SIZE + EC_CHUNK_SIZE}, + {STRIPE_DATA_SIZE + EC_CHUNK_SIZE, STRIPE_DATA_SIZE * 2}}; private static byte[][] values; private static long containerID; private static Pipeline pipeline; @@ -117,6 +131,7 @@ public static void init() throws Exception { OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE); startCluster(config); prepareData(KEY_SIZE_RANGES); + rpcClient = OzoneClientFactory.getRpcClient(config); } @AfterAll @@ -130,13 +145,11 @@ private Pipeline createSingleNodePipeline(Pipeline ecPipeline, Map indicesForSinglePipeline = new HashMap<>(); indicesForSinglePipeline.put(node, replicaIndex); - return Pipeline.newBuilder() - .setId(ecPipeline.getId()) + return Pipeline.newBuilder().setId(ecPipeline.getId()) .setReplicationConfig(ecPipeline.getReplicationConfig()) .setState(ecPipeline.getPipelineState()) .setNodes(ImmutableList.of(node)) - .setReplicaIndexes(indicesForSinglePipeline) - .build(); + .setReplicaIndexes(indicesForSinglePipeline).build(); } @BeforeEach @@ -175,22 +188,27 @@ private Function chunksInReplicaFunc(int i) { public void testListBlock() throws Exception { for (int i = 0; i < datanodeDetails.size(); i++) { final int minKeySize = i < EC_DATA ? i * EC_CHUNK_SIZE : 0; - final int numExpectedBlocks = (int) Arrays.stream(values) - .mapToInt(v -> v.length).filter(s -> s > minKeySize).count(); + final int numExpectedBlocks = + (int) Arrays.stream(values).mapToInt(v -> v.length) + .filter(s -> s > minKeySize).count(); Function expectedChunksFunc = chunksInReplicaFunc(i); - final int numExpectedChunks = Arrays.stream(values) - .mapToInt(v -> v.length).map(expectedChunksFunc::apply).sum(); + final int numExpectedChunks = + Arrays.stream(values).mapToInt(v -> v.length) + .map(expectedChunksFunc::apply).sum(); if (numExpectedBlocks == 0) { final int j = i; Throwable t = Assertions.assertThrows(StorageContainerException.class, - () -> ContainerProtocolCalls.listBlock(clients.get(j), - containerID, null, numExpectedBlocks + 1, null)); - Assertions.assertEquals( - "ContainerID " + containerID + " does not exist", t.getMessage()); + () -> ContainerProtocolCalls + .listBlock(clients.get(j), containerID, null, + numExpectedBlocks + 1, null)); + Assertions + .assertEquals("ContainerID " + containerID + " does not exist", + t.getMessage()); continue; } - ListBlockResponseProto response = ContainerProtocolCalls.listBlock( - clients.get(i), containerID, null, numExpectedBlocks + 1, null); + ListBlockResponseProto response = ContainerProtocolCalls + .listBlock(clients.get(i), containerID, null, numExpectedBlocks + 1, + null); Assertions.assertEquals(numExpectedBlocks, response.getBlockDataCount(), "blocks count doesn't match on DN " + i); Assertions.assertEquals(numExpectedChunks, @@ -228,7 +246,7 @@ public void testCreateRecoveryContainer() throws Exception { //Create the recovering container in DN. ContainerProtocolCalls.createRecoveringContainer(dnClient, - container.containerID().getProtobuf().getId(), null); + container.containerID().getProtobuf().getId(), null, 4); BlockID blockID = ContainerTestHelper .getTestBlockID(container.containerID().getProtobuf().getId()); @@ -278,6 +296,207 @@ public void testCreateRecoveryContainer() throws Exception { } } + private static byte[] getBytesWith(int singleDigitNumber, int total) { + StringBuilder builder = new StringBuilder(singleDigitNumber); + for (int i = 1; i <= total; i++) { + builder.append(singleDigitNumber); + } + return builder.toString().getBytes(UTF_8); + } + + @ParameterizedTest + @MethodSource("recoverableMissingIndexes") + void testECReconstructionCoordinatorWith(List missingIndexes) + throws Exception { + testECReconstructionCoordinator(missingIndexes); + } + + static Stream> recoverableMissingIndexes() { + return Stream + .concat(IntStream.rangeClosed(1, 5).mapToObj(ImmutableList::of), Stream + .of(ImmutableList.of(2, 3), ImmutableList.of(2, 4), + ImmutableList.of(3, 5), ImmutableList.of(4, 5))); + } + + /** + * Tests the reconstruction of data when more than parity blocks missed. + * Test should throw InsufficientLocationsException. + */ + @Test + public void testECReconstructionCoordinatorWithMissingIndexes135() { + InsufficientLocationsException exception = + Assert.assertThrows(InsufficientLocationsException.class, () -> { + testECReconstructionCoordinator(ImmutableList.of(1, 3, 5)); + }); + + String expectedMessage = + "There are insufficient datanodes to read the EC block"; + String actualMessage = exception.getMessage(); + + Assert.assertEquals(expectedMessage, actualMessage); + } + + private void testECReconstructionCoordinator(List missingIndexes) + throws Exception { + ObjectStore objectStore = rpcClient.getObjectStore(); + String keyString = UUID.randomUUID().toString(); + String volumeName = UUID.randomUUID().toString(); + String bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + OzoneVolume volume = objectStore.getVolume(volumeName); + OzoneBucket bucket = volume.getBucket(bucketName); + for (int i = 0; i < EC_DATA; i++) { + inputChunks[i] = getBytesWith(i + 1, EC_CHUNK_SIZE); + } + XceiverClientManager xceiverClientManager = + new XceiverClientManager(config); + try (OzoneOutputStream out = bucket.createKey(keyString, 4096, + new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, 1024), + new HashMap<>())) { + Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream); + for (int i = 0; i < inputChunks.length; i++) { + out.write(inputChunks[i]); + } + } + + ECReconstructionCoordinator coordinator = + new ECReconstructionCoordinator(config, null); + + OzoneKeyDetails key = bucket.getKey(keyString); + long conID = key.getOzoneKeyLocations().get(0).getContainerID(); + + //Close the container first. + scm.getContainerManager().getContainerStateManager().updateContainerState( + HddsProtos.ContainerID.newBuilder().setId(conID).build(), + HddsProtos.LifeCycleEvent.FINALIZE); + scm.getContainerManager().getContainerStateManager().updateContainerState( + HddsProtos.ContainerID.newBuilder().setId(conID).build(), + HddsProtos.LifeCycleEvent.CLOSE); + + Pipeline containerPipeline = scm.getPipelineManager().getPipeline( + scm.getContainerManager().getContainer(ContainerID.valueOf(conID)) + .getPipelineID()); + + SortedMap sourceNodeMap = new TreeMap<>(); + + List nodeSet = containerPipeline.getNodes(); + List containerToDeletePipeline = new ArrayList<>(); + for (DatanodeDetails srcDn : nodeSet) { + int replIndex = containerPipeline.getReplicaIndex(srcDn); + if (missingIndexes.contains(replIndex)) { + containerToDeletePipeline + .add(createSingleNodePipeline(containerPipeline, srcDn, replIndex)); + continue; + } + sourceNodeMap.put(replIndex, srcDn); + } + + //Find nodes outside of pipeline + List clusterDnsList = + cluster.getHddsDatanodes().stream().map(k -> k.getDatanodeDetails()) + .collect(Collectors.toList()); + List targetNodes = new ArrayList<>(); + for (DatanodeDetails clusterDN : clusterDnsList) { + if (!nodeSet.contains(clusterDN)) { + targetNodes.add(clusterDN); + if (targetNodes.size() == missingIndexes.size()) { + break; + } + } + } + + Assert.assertEquals(missingIndexes.size(), targetNodes.size()); + + List + blockDataArrList = new ArrayList<>(); + for (int j = 0; j < containerToDeletePipeline.size(); j++) { + org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData = + new ECContainerOperationClient(new OzoneConfiguration(), null) + .listBlock(conID, containerToDeletePipeline.get(j).getFirstNode(), + (ECReplicationConfig) containerToDeletePipeline.get(j) + .getReplicationConfig(), null); + blockDataArrList.add(blockData); + // Delete the first index container + ContainerProtocolCalls.deleteContainer( + xceiverClientManager.acquireClient(containerToDeletePipeline.get(j)), + conID, true, null); + } + + //Give the new target to reconstruct the container + SortedMap targetNodeMap = new TreeMap<>(); + for (int k = 0; k < missingIndexes.size(); k++) { + targetNodeMap.put(missingIndexes.get(k), targetNodes.get(k)); + } + + coordinator.reconstructECContainerGroup(conID, + (ECReplicationConfig) containerPipeline.getReplicationConfig(), + sourceNodeMap, targetNodeMap); + + // Assert the original container metadata with the new recovered container. + Iterator> iterator = + targetNodeMap.entrySet().iterator(); + int i = 0; + while (iterator.hasNext()) { + Map.Entry next = iterator.next(); + DatanodeDetails targetDN = next.getValue(); + Map indexes = new HashMap<>(); + indexes.put(targetNodeMap.entrySet().iterator().next().getValue(), + targetNodeMap.entrySet().iterator().next().getKey()); + Pipeline newTargetPipeline = + Pipeline.newBuilder().setId(PipelineID.randomId()) + .setReplicationConfig(containerPipeline.getReplicationConfig()) + .setReplicaIndexes(indexes) + .setState(Pipeline.PipelineState.CLOSED) + .setNodes(ImmutableList.of(targetDN)).build(); + + org.apache.hadoop.ozone.container.common.helpers.BlockData[] + reconstructedBlockData = + new ECContainerOperationClient(new OzoneConfiguration(), null) + .listBlock(conID, newTargetPipeline.getFirstNode(), + (ECReplicationConfig) newTargetPipeline + .getReplicationConfig(), null); + Assert.assertEquals(blockDataArrList.get(i).length, + reconstructedBlockData.length); + checkBlockData(blockDataArrList.get(i), reconstructedBlockData); + ContainerProtos.ReadContainerResponseProto readContainerResponseProto = + ContainerProtocolCalls.readContainer( + xceiverClientManager.acquireClient(newTargetPipeline), conID, + null); + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, + readContainerResponseProto.getContainerData().getState()); + i++; + } + + } + + private void checkBlockData( + org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData, + org.apache.hadoop.ozone.container.common.helpers.BlockData[] + reconstructedBlockData) { + + for (int i = 0; i < blockData.length; i++) { + List oldBlockDataChunks = + blockData[i].getChunks(); + List newBlockDataChunks = + reconstructedBlockData[i].getChunks(); + for (int j = 0; j < oldBlockDataChunks.size(); j++) { + ContainerProtos.ChunkInfo chunkInfo = oldBlockDataChunks.get(j); + if (chunkInfo.getLen() == 0) { + // let's ignore the empty chunks + continue; + } + Assert.assertEquals(chunkInfo, newBlockDataChunks.get(j)); + } + } + + /* Assert.assertEquals( + Arrays.stream(blockData).map(b -> b.getProtoBufMessage()) + .collect(Collectors.toList()), + Arrays.stream(reconstructedBlockData).map(b -> b.getProtoBufMessage()) + .collect(Collectors.toList()));*/ + } + public static void startCluster(OzoneConfiguration conf) throws Exception { // Set minimum pipeline to 1 to ensure all data is written to @@ -287,15 +506,12 @@ public static void startCluster(OzoneConfiguration conf) throws Exception { writableECContainerProviderConfig.setMinimumPipelines(1); conf.setFromObject(writableECContainerProviderConfig); - cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(NUM_DN) - .setScmId(SCM_ID) - .setClusterId(CLUSTER_ID) - .build(); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(NUM_DN) + .setScmId(SCM_ID).setClusterId(CLUSTER_ID).build(); cluster.waitForClusterToBeReady(); scm = cluster.getStorageContainerManager(); - client = OzoneClientFactory.getRpcClient(conf); - store = client.getObjectStore(); + rpcClient = OzoneClientFactory.getRpcClient(conf); + store = rpcClient.getObjectStore(); storageContainerLocationClient = cluster.getStorageContainerLocationClient(); } @@ -314,8 +530,8 @@ public static void prepareData(int[][] ranges) throws Exception { int keySize = RandomUtils.nextInt(ranges[i][0], ranges[i][1]); values[i] = RandomUtils.nextBytes(keySize); final String keyName = UUID.randomUUID().toString(); - try (OutputStream out = bucket.createKey( - keyName, values[i].length, repConfig, new HashMap<>())) { + try (OutputStream out = bucket + .createKey(keyName, values[i].length, repConfig, new HashMap<>())) { out.write(values[i]); } } @@ -330,8 +546,8 @@ public static void prepareData(int[][] ranges) throws Exception { } public static void stopCluster() throws IOException { - if (client != null) { - client.close(); + if (rpcClient != null) { + rpcClient.close(); } if (storageContainerLocationClient != null) {