From 3317f2c4c857eeb93029658213e207bd0d7565fe Mon Sep 17 00:00:00 2001 From: markgui Date: Fri, 15 Apr 2022 14:11:35 -0600 Subject: [PATCH] HDDS-6582. ContainerRecoveryStore for ec containers under recovery. --- .../container/common/interfaces/Handler.java | 13 + .../ec/ContainerRecoveryMetaCache.java | 69 ++++ .../container/ec/ContainerRecoveryStore.java | 64 ++++ .../ec/ContainerRecoveryStoreImpl.java | 327 ++++++++++++++++++ .../ozone/container/ec/package-info.java | 21 ++ .../container/keyvalue/KeyValueContainer.java | 2 +- .../container/keyvalue/KeyValueHandler.java | 10 + .../ozoneimpl/ContainerController.java | 9 + .../ec/TestContainerRecoveryMetaCache.java | 86 +++++ .../ec/TestContainerRecoveryStoreImpl.java | 305 ++++++++++++++++ 10 files changed, 905 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/ContainerRecoveryMetaCache.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/ContainerRecoveryStore.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/ContainerRecoveryStoreImpl.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/package-info.java create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/TestContainerRecoveryMetaCache.java create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/TestContainerRecoveryStoreImpl.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 1dbd588d33cb..04b213ccdad0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.ec.ContainerRecoveryStore; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; @@ -121,6 +122,18 @@ public abstract void exportContainer( TarContainerPacker packer) throws IOException; + /** + * Consolidate a container from a temp store. + * @param container + * @param recoveryStore + * @return + * @throws IOException + */ + public abstract Container consolidateContainer( + Container container, + ContainerRecoveryStore recoveryStore) + throws IOException; + /** * Stop the Handler. */ diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/ContainerRecoveryMetaCache.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/ContainerRecoveryMetaCache.java new file mode 100644 index 000000000000..44cddcd8657f --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/ContainerRecoveryMetaCache.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ec; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A cache for chunk-level & block-level metadata, + * prepared for consolidating a container. + */ +public final class ContainerRecoveryMetaCache { + + private static ContainerRecoveryMetaCache cache; + + private final Map> containerBlockDataMap; + + private ContainerRecoveryMetaCache() { + this.containerBlockDataMap = new ConcurrentHashMap<>(); + } + + public static synchronized ContainerRecoveryMetaCache getInstance() { + if (cache == null) { + cache = new ContainerRecoveryMetaCache(); + } + return cache; + } + + void addChunkToBlock(BlockID blockID, ChunkInfo chunkInfo) { + long containerID = blockID.getContainerID(); + + containerBlockDataMap.putIfAbsent(containerID, new HashMap<>()); + containerBlockDataMap.get(containerID) + .putIfAbsent(blockID, new BlockData(blockID)); + containerBlockDataMap.get(containerID).get(blockID) + .addChunk(chunkInfo.getProtoBufMessage()); + } + + Iterator getBlockIterator(long containerID) { + return containerBlockDataMap.getOrDefault(containerID, + Collections.emptyMap()).values().iterator(); + } + + void dropContainerAll(long containerID) { + containerBlockDataMap.remove(containerID); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/ContainerRecoveryStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/ContainerRecoveryStore.java new file mode 100644 index 000000000000..d57cd0a99e2b --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/ContainerRecoveryStore.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ec; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChunkBuffer; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; + +import java.io.IOException; + +/** + * Interface for a temp store for containers under recovery, + * NOTE: Specially for EC containers for now, but it could also + * be used for common containers in the future. + */ +public interface ContainerRecoveryStore { + + /** + * Write a recovered chunk to some temp location. + * @param container + * @param blockID + * @param chunkInfo + * @param data + * @param checksum + * @param last + * @throws IOException + */ + void writeChunk(KeyValueContainer container, BlockID blockID, + ChunkInfo chunkInfo, ChunkBuffer data, Checksum checksum, + boolean last) throws IOException; + + /** + * Reconstruct a container from chunk files and metadata. + * @param container + * @throws IOException + */ + void consolidateContainer(KeyValueContainer container) + throws IOException; + + /** + * Cleanup in-memory metadata and on-disk files for a container including + * all the replicas. + * Called on the CoordinatorDN. + * @param container + */ + void cleanupContainerAll(KeyValueContainer container); +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/ContainerRecoveryStoreImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/ContainerRecoveryStoreImpl.java new file mode 100644 index 000000000000..04de971d0e1b --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/ContainerRecoveryStoreImpl.java @@ -0,0 +1,327 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ec; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChunkBuffer; +import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; +import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; +import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl; +import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory; +import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; +import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; +import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; +import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.Iterator; + +import static org.apache.hadoop.ozone.OzoneConsts.STORAGE_DIR_CHUNKS; + +/** + * Temp store implementation for ec containers under recovery. + * For each ec container under recovery, we use an "all-or-nothing" policy + * that each single failed operation will clean up all for this container. + */ +public class ContainerRecoveryStoreImpl implements ContainerRecoveryStore { + + private static final Logger LOG = + LoggerFactory.getLogger(ContainerRecoveryStoreImpl.class); + + public static final String RECOVER_DIR = "container-recover"; + public static final String CHUNK_DIR = STORAGE_DIR_CHUNKS; + + private final MutableVolumeSet hddsVolumeSet; + private final VolumeChoosingPolicy volumeChoosingPolicy; + private final ConfigurationSource config; + + private BlockManager blockManager; + private ChunkManager chunkManager; + private DispatcherContext dispatcherContext; + + // TODO(markgui): We may choose to persist meta into db so as to + // control memory usage, but with certain throttling on the number of + // ec containers under recovery concurrently it may not be a big problem. + // For now we could cache meta of containers under recovery + // in memory for simplicity and it gives us "Schema Independent" + // container metadata management. + private final ContainerRecoveryMetaCache metaCache; + + public ContainerRecoveryStoreImpl(MutableVolumeSet hddsVolumeSet, + ConfigurationSource conf) throws IOException { + this.hddsVolumeSet = hddsVolumeSet; + this.config = conf; + this.volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy(); + this.blockManager = new BlockManagerImpl(config); + this.chunkManager = ChunkManagerFactory.createChunkManager(conf, + blockManager, hddsVolumeSet); + this.dispatcherContext = new DispatcherContext.Builder() + .setStage(DispatcherContext.WriteChunkStage.COMBINED).build(); + this.metaCache = ContainerRecoveryMetaCache.getInstance(); + + initialize(); + } + + @Override + public void writeChunk(KeyValueContainer container, BlockID blockID, + ChunkInfo chunkInfo, ChunkBuffer data, Checksum checksum, + boolean last) throws IOException { + KeyValueContainerData containerData = container.getContainerData(); + + Preconditions.checkState(!containerData.getState().equals(State.OPEN)); + Preconditions.checkNotNull(containerData.getSchemaVersion()); + Preconditions.checkState(containerData.getReplicaIndex() != 0); + + try { + // choose a volume for new container + chooseVolumeForContainer(container); + + // write chunk data to disk + writeToChunkFile(container, blockID, chunkInfo, data, last); + + if (checksum != null) { + computeChecksumForChunk(chunkInfo, checksum, data); + } + metaCache.addChunkToBlock(blockID, chunkInfo); + } catch (DiskOutOfSpaceException e) { + LOG.error("No volume with enough space to recover container {}", + containerData.getContainerID()); + cleanupContainerAll(container); + throw e; + } catch (IOException e) { + LOG.error("Write recovered chunk {} for block {} failed", + chunkInfo.getChunkName(), blockID.getContainerBlockID()); + cleanupContainerAll(container); + throw e; + } + } + + @Override + public void consolidateContainer(KeyValueContainer container) + throws IOException { + KeyValueContainerData containerData = container.getContainerData(); + + Preconditions.checkState(!containerData.getState().equals(State.OPEN)); + Preconditions.checkNotNull(containerData.getSchemaVersion()); + Preconditions.checkState(containerData.getReplicaIndex() != 0); + + long containerID = containerData.getContainerID(); + HddsVolume hddsVolume = containerData.getVolume(); + if (hddsVolume == null) { + throw new IOException("Container chunk files not recovered completely."); + } + + try { + initContainerLayout(container); + populateContainerMeta(container); + + // move the block data files under the container directory + File chunksSrc = getRecoverChunksDir(hddsVolume, containerID); + File chunksDst = new File(containerData.getChunksPath()); + Files.move(chunksSrc.toPath(), chunksDst.toPath(), + StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Consolidate container {} failed, cleaning it up", + containerID); + container.delete(); + throw e; + } finally { + cleanupContainerAll(container); + } + } + + @Override + public void cleanupContainerAll(KeyValueContainer container) { + KeyValueContainerData containerData = container.getContainerData(); + long containerID = containerData.getContainerID(); + HddsVolume hddsVolume = containerData.getVolume(); + if (hddsVolume == null) { + return; + } + + metaCache.dropContainerAll(containerID); + try { + FileUtils.deleteDirectory(getRecoverContainerDir( + container.getContainerData().getVolume(), containerID)); + } catch (IOException e) { + LOG.warn("Failed to cleanup for container {} on volume {}", + containerID, hddsVolume.getStorageDir(), e); + } + } + + private void chooseVolumeForContainer(KeyValueContainer container) + throws IOException { + KeyValueContainerData containerData = container.getContainerData(); + if (containerData.getVolume() != null) { + return; + } + + hddsVolumeSet.readLock(); + try { + HddsVolume hddsVolume = volumeChoosingPolicy.chooseVolume( + StorageVolumeUtil.getHddsVolumesList(hddsVolumeSet.getVolumesList()), + containerData.getMaxSize()); + containerData.setVolume(hddsVolume); + } catch (IOException e) { + LOG.error("No volume chosen for new container {}", + containerData.getContainerID(), e); + throw e; + } finally { + hddsVolumeSet.readUnlock(); + } + } + + private void initialize() throws IOException { + for (HddsVolume hddsVolume : StorageVolumeUtil + .getHddsVolumesList(hddsVolumeSet.getVolumesList())) { + File recovDir = getRecoverDir(hddsVolume); + if (recovDir.exists()) { + try { + FileUtils.deleteDirectory(recovDir); + } catch (IOException e) { + LOG.warn("Failed to cleanup ec recover dir on volume {}", + hddsVolume.getStorageDir(), e); + throw e; + } + } + } + } + + private void initContainerLayout(KeyValueContainer container) + throws IOException { + KeyValueContainerData containerData = container.getContainerData(); + HddsVolume hddsVolume = containerData.getVolume(); + String hddsVolumeDir = hddsVolume.getStorageDir().getAbsolutePath(); + String clusterId = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID( + hddsVolume, hddsVolume.getClusterID()); + + container.populatePathFields(clusterId, hddsVolume, hddsVolumeDir); + + // create container meta structure(directories and db) + KeyValueContainerUtil.createContainerMetaData( + containerData.getContainerID(), + new File(containerData.getMetadataPath()), + new File(containerData.getChunksPath()), + containerData.getDbFile(), containerData.getSchemaVersion(), config); + + // create container meta file(.container file) + container.createContainerFile(container.getContainerFile()); + } + + private void populateContainerMeta(KeyValueContainer container) + throws IOException { + populateContainerMetaFromCache(container); + } + + private void populateContainerMetaFromCache(KeyValueContainer container) + throws IOException { + Iterator iter = metaCache.getBlockIterator( + container.getContainerData().getContainerID()); + while (iter.hasNext()) { + blockManager.putBlock(container, iter.next()); + } + } + + private void computeChecksumForChunk(ChunkInfo chunkInfo, + Checksum checksum, ChunkBuffer data) + throws IOException { + try { + chunkInfo.setChecksumData(checksum.computeChecksum(data)); + } catch (OzoneChecksumException e) { + LOG.error("Failed to checksum chunk {}", chunkInfo.getChunkName()); + throw e; + } + } + + private File getRecoverChunksDir(HddsVolume hddsVolume, long containerID) { + // e.g. ../container-recover//chunks + return new File(getRecoverContainerDir(hddsVolume, containerID), + CHUNK_DIR); + } + + private File getRecoverContainerDir(HddsVolume hddsVolume, + long containerID) { + // e.g. ../container-recover/ + return new File(getRecoverDir(hddsVolume), Long.toString(containerID)); + } + + private File getRecoverDir(HddsVolume hddsVolume) { + // e.g. /data1/hdds/CID-/container-recover + return new File(new File(hddsVolume.getStorageDir(), + hddsVolume.getClusterID()), RECOVER_DIR); + } + + private void writeToChunkFile(KeyValueContainer container, + BlockID blockID, ChunkInfo chunkInfo, ChunkBuffer data, boolean last) + throws IOException { + KeyValueContainerData containerData = container.getContainerData(); + File chunksDir = getRecoverChunksDir(containerData.getVolume(), + containerData.getContainerID()); + + ensureDirs(chunksDir); + containerData.setChunksPath(chunksDir.getAbsolutePath()); + + chunkManager.writeChunk(container, blockID, chunkInfo, data, + dispatcherContext); + if (last) { + chunkManager.finishWriteChunks(container, new BlockData(blockID)); + } + } + + private void ensureDirs(File dir) throws IOException { + if (!dir.mkdirs() && !dir.exists()) { + throw new IOException("Unable to create directories along " + + dir.getAbsolutePath()); + } + } + + @VisibleForTesting + void setBlockManager(BlockManager manager) { + this.blockManager = manager; + } + + @VisibleForTesting + void setChunkManager(ChunkManager manager) { + this.chunkManager = manager; + } + + public static String getChunkName(BlockID blockID, int chunkIndex) { + return blockID.getLocalID() + "_chunk_" + chunkIndex; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/package-info.java new file mode 100644 index 000000000000..37ab41d1e1c7 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ec; +/** + Classes to recover ec container data between datanodes. + **/ \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index fdf44edea96a..ea721ddb856f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -266,7 +266,7 @@ private void writeToContainerFile(File containerFile, boolean isCreate) } } - private void createContainerFile(File containerFile) + public void createContainerFile(File containerFile) throws StorageContainerException { writeToContainerFile(containerFile, true); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index d0a865fde385..6ac11de54deb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.ec.ContainerRecoveryStore; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl; @@ -964,6 +965,15 @@ public void exportContainer(final Container container, kvc.exportContainerData(outputStream, packer); } + @Override + public Container consolidateContainer(Container container, + ContainerRecoveryStore recoveryStore) + throws IOException { + recoveryStore.consolidateContainer((KeyValueContainer) container); + sendICR(container); + return container; + } + @Override public void markContainerForClose(Container container) throws IOException { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java index 171303dc0b4d..bc436731458d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java @@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.ec.ContainerRecoveryStore; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,6 +170,14 @@ public void exportContainer(final ContainerType type, containerSet.getContainer(containerId), outputStream, packer); } + public Container consolidateContainer( + final Container container, + final ContainerRecoveryStore recoveryStore) + throws IOException { + return handlers.get(container.getContainerData().getContainerType()) + .consolidateContainer(container, recoveryStore); + } + /** * Deletes a container given its Id. * @param containerId Id of the container to be deleted diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/TestContainerRecoveryMetaCache.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/TestContainerRecoveryMetaCache.java new file mode 100644 index 000000000000..a62877b61db4 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/TestContainerRecoveryMetaCache.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ec; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.junit.Test; + +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.hadoop.ozone.container.ec.ContainerRecoveryStoreImpl.getChunkName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * Test for {@link ContainerRecoveryMetaCache}. + */ +public class TestContainerRecoveryMetaCache { + @Test + public void testBasicOperations() throws InterruptedException { + ContainerRecoveryMetaCache metaCache = + ContainerRecoveryMetaCache.getInstance(); + + // 3 concurrent adds + int numContainers = 3; + List threads = IntStream.range(0, numContainers) + .mapToObj(t -> new Thread(() -> { + // write 5 blocks, 5 chunks each + for (int i = 0; i < 5; i++) { + BlockID block = new BlockID(t, i); + long offset = 0L; + long len = 1024L; + for (int j = 0; j < 5; j++) { + ChunkInfo chunk = new ChunkInfo( + getChunkName(block, j), offset, len); + offset += len; + metaCache.addChunkToBlock(block, chunk); + } + } + })) + .collect(Collectors.toList()); + + threads.forEach(Thread::start); + for (Thread thread : threads) { + thread.join(); + } + + // check if all chunks are added correctly + for (int t = 0; t < numContainers; t++) { + // check through all the chunks in each block + Iterator iter = metaCache.getBlockIterator(t); + int blockCount = 0; + + while (iter.hasNext()) { + BlockData blockData = iter.next(); + assertEquals(5, blockData.getChunks().size()); + blockCount++; + } + assertEquals(5, blockCount); + } + + metaCache.dropContainerAll(0); + // check all stuff of this container is gone + Iterator iter = metaCache.getBlockIterator(0); + assertFalse(iter.hasNext()); + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/TestContainerRecoveryStoreImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/TestContainerRecoveryStoreImpl.java new file mode 100644 index 000000000000..f05e74cda75a --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/TestContainerRecoveryStoreImpl.java @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ec; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.ChunkBuffer; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; +import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.StorageVolume.VolumeType; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; +import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; +import java.util.UUID; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK; +import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_CHUNK; +import static org.apache.hadoop.ozone.container.ec.ContainerRecoveryStoreImpl.CHUNK_DIR; +import static org.apache.hadoop.ozone.container.ec.ContainerRecoveryStoreImpl.RECOVER_DIR; +import static org.apache.hadoop.ozone.container.ec.ContainerRecoveryStoreImpl.getChunkName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +/** + * Test for {@link ContainerRecoveryStoreImpl}. + */ +@RunWith(Parameterized.class) +public class TestContainerRecoveryStoreImpl { + @Rule + public final TemporaryFolder tempDir = new TemporaryFolder(); + + private static final int NUM_VOLUMES = 3; + private static final int NUM_BLOCKS_PER_CONTAINER = 5; + private static final int NUM_CHUNKS_PER_BLOCK = 5; + private static final int CHUNK_SIZE = 1024; + private static final int CONTAINER_MAX_SIZE = 102400; + private static final String CHUNK_FAIL_MSG = "Write chunk failed."; + private static final String BLOCK_MGR_FAIL_MSG = "Put block failed."; + + private static final byte[] CHUNK_DATA = + RandomStringUtils.randomAscii(CHUNK_SIZE).getBytes(UTF_8); + + private final String datanodeId = UUID.randomUUID().toString(); + private final String pipelineId = UUID.randomUUID().toString(); + private final String clusterId = UUID.randomUUID().toString(); + + private OzoneConfiguration conf; + + private MutableVolumeSet hddsVolumeSet; + + private long containerID; + private int replicaIndex; + private ContainerLayoutVersion layout; + private KeyValueContainer container; + + private ContainerRecoveryStoreImpl store; + + public TestContainerRecoveryStoreImpl(ContainerLayoutVersion layout) { + this.layout = layout; + } + + @Parameterized.Parameters + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {FILE_PER_CHUNK}, + {FILE_PER_BLOCK} + }); + } + + @Before + public void setup() throws IOException { + this.conf = new OzoneConfiguration(); + + StringBuilder datanodeDirs = new StringBuilder(); + File[] volumeDirs = new File[NUM_VOLUMES]; + for (int i = 0; i < NUM_VOLUMES; i++) { + volumeDirs[i] = tempDir.newFolder(); + datanodeDirs.append(volumeDirs[i]).append(","); + } + conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, datanodeDirs.toString()); + hddsVolumeSet = new MutableVolumeSet(datanodeId, clusterId, conf, null, + VolumeType.DATA_VOLUME, null); + + store = new ContainerRecoveryStoreImpl(hddsVolumeSet, conf); + + containerID = 1L; + replicaIndex = 1; + KeyValueContainerData containerData = new KeyValueContainerData( + containerID, layout, CONTAINER_MAX_SIZE, pipelineId, datanodeId); + container = new KeyValueContainer(containerData, conf); + // necessary fields from the caller + containerData.setState(ContainerProtos.ContainerDataProto.State.CLOSED); + containerData.setReplicaIndex(replicaIndex); + containerData.setSchemaVersion(OzoneConsts.SCHEMA_V2); + } + + @Test + public void testWriteChunkNormal() throws IOException { + populateContainer(); + + // check directory structure for the container under recovery + int containerCount = 0; + for (HddsVolume hddsVolume : StorageVolumeUtil.getHddsVolumesList( + hddsVolumeSet.getVolumesList())) { + File clusterIDDir = new File(hddsVolume.getStorageDir(), + hddsVolume.getClusterID()); + File recovDir = new File(clusterIDDir, RECOVER_DIR); + + // This is the chosen volume for container + if (recovDir.exists()) { + File containerDir = new File(recovDir, Long.toString(containerID)); + assertTrue(containerDir.exists()); + + File chunksDir = new File(containerDir, CHUNK_DIR); + assertTrue(chunksDir.exists()); + + for (int b = 0; b < NUM_BLOCKS_PER_CONTAINER; b++) { + BlockID blockID = new BlockID(containerID, b); + long offset = 0L; + for (int c = 0; c < NUM_CHUNKS_PER_BLOCK; c++) { + ChunkInfo chunkInfo = new ChunkInfo( + getChunkName(blockID, c), offset, CHUNK_SIZE); + File chunkFile = layout.getChunkFile(chunksDir, blockID, chunkInfo); + + // check local file exist + assertTrue(chunkFile.exists()); + // check local file size + long expectedSize = layout == FILE_PER_CHUNK ? CHUNK_SIZE + : NUM_CHUNKS_PER_BLOCK * CHUNK_SIZE; + assertEquals(expectedSize, Files.size(chunkFile.toPath())); + + offset += CHUNK_SIZE; + } + } + containerCount++; + } + } + assertEquals(1, containerCount); + } + + @Test + public void testWriteChunkFail() throws IOException { + mockBadChunkManager(); + + try { + populateContainer(); + fail("Layout is bad, writeChunk should fail."); + } catch (IOException e) { + assertEquals(CHUNK_FAIL_MSG, e.getMessage()); + } + + for (HddsVolume hddsVolume : StorageVolumeUtil.getHddsVolumesList( + hddsVolumeSet.getVolumesList())) { + File clusterIDDir = new File(hddsVolume.getStorageDir(), + hddsVolume.getClusterID()); + File recovDir = new File(clusterIDDir, RECOVER_DIR); + + // This is the chosen volume for container + if (recovDir.exists()) { + File containerDir = new File(recovDir, Long.toString(containerID)); + // check the container dir is cleaned up + assertFalse(containerDir.exists()); + } + } + } + + @Test + public void testConsolidateContainerFromCache() throws IOException { + populateContainer(); + + store.consolidateContainer(container); + assertNotNull(container); + + // check created on-disk container + DataTransferThrottler throttler = mock(DataTransferThrottler.class); + doNothing().when(throttler).throttle(anyLong(), any()); + + assertTrue(container.scanMetaData()); + assertTrue(container.scanData(throttler, null)); + + // check recover directory is cleaned up + for (HddsVolume hddsVolume : StorageVolumeUtil.getHddsVolumesList( + hddsVolumeSet.getVolumesList())) { + File clusterIDDir = new File(hddsVolume.getStorageDir(), + hddsVolume.getClusterID()); + File recovDir = new File(clusterIDDir, RECOVER_DIR); + + if (recovDir.exists()) { + File containerDir = new File(recovDir, Long.toString(containerID)); + assertFalse(containerDir.exists()); + } + } + } + + @Test + public void testConsolidateContainerFail() throws IOException { + populateContainer(); + + mockBadBlockManager(); + + try { + store.consolidateContainer(container); + fail("BlockManager is bad, consolidateContainer should fail."); + } catch (IOException e) { + assertEquals(BLOCK_MGR_FAIL_MSG, e.getMessage()); + } + + // check container is cleaned up + assertFalse(new File(container.getContainerData().getMetadataPath()) + .exists()); + + // check recover directory is cleaned up + for (HddsVolume hddsVolume : StorageVolumeUtil.getHddsVolumesList( + hddsVolumeSet.getVolumesList())) { + File clusterIDDir = new File(hddsVolume.getStorageDir(), + hddsVolume.getClusterID()); + File recovDir = new File(clusterIDDir, RECOVER_DIR); + + if (recovDir.exists()) { + File containerDir = new File(recovDir, Long.toString(containerID)); + assertFalse(containerDir.exists()); + } + } + } + + private void populateContainer() throws IOException { + for (int b = 0; b < NUM_BLOCKS_PER_CONTAINER; b++) { + BlockID blockID = new BlockID(containerID, b); + long offset = 0L; + + for (int c = 0; c < NUM_CHUNKS_PER_BLOCK; c++) { + ChunkInfo chunkInfo = new ChunkInfo( + getChunkName(blockID, c), offset, CHUNK_SIZE); + ChunkBuffer data = ChunkBuffer.wrap(ByteBuffer.wrap(CHUNK_DATA)); + + store.writeChunk(container, blockID, chunkInfo, data, + null, c == NUM_CHUNKS_PER_BLOCK - 1); + + offset += CHUNK_SIZE; + } + } + } + + private void mockBadChunkManager() throws IOException { + ChunkManager chunkManager = mock(ChunkManager.class); + doThrow(new StorageContainerException(CHUNK_FAIL_MSG, null)) + .when(chunkManager) + .writeChunk(any(), any(), any(), any(ChunkBuffer.class), any()); + store.setChunkManager(chunkManager); + } + + private void mockBadBlockManager() throws IOException { + BlockManager blockManager = mock(BlockManager.class); + doThrow(new IOException(BLOCK_MGR_FAIL_MSG)) + .when(blockManager).putBlock(any(), any()); + store.setBlockManager(blockManager); + } +}