-
Notifications
You must be signed in to change notification settings - Fork 589
HDDS-12233. Atomically import a container #7934
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
78da63b
9ccda66
920e5ad
c9f1741
b3abc37
a151bd2
e68c85e
6d9b112
c9907f8
8d09dcb
3e1cb0e
8e3f941
06e90e2
436d151
db1ac27
35361d1
1a69500
f7108cb
1b60599
a3f661b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,19 +42,25 @@ | |
| import org.apache.commons.io.FileUtils; | ||
| import org.apache.commons.io.IOUtils; | ||
| import org.apache.hadoop.hdds.HddsUtils; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; | ||
| import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; | ||
| import org.apache.hadoop.ozone.OzoneConsts; | ||
| import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; | ||
| import org.apache.hadoop.ozone.container.common.interfaces.Container; | ||
| import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker; | ||
| import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; | ||
| import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl; | ||
| import org.apache.hadoop.ozone.container.replication.CopyContainerCompression; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * Compress/uncompress KeyValueContainer data to a tar archive. | ||
| */ | ||
| public class TarContainerPacker | ||
| implements ContainerPacker<KeyValueContainerData> { | ||
| private static final Logger LOG = LoggerFactory.getLogger(TarContainerPacker.class); | ||
|
|
||
| static final String CHUNKS_DIR_NAME = OzoneConsts.STORAGE_DIR_CHUNKS; | ||
|
|
||
|
|
@@ -95,6 +101,13 @@ public byte[] unpackContainerData(Container<KeyValueContainerData> container, | |
| Files.createDirectories(destContainerDir); | ||
| } | ||
| if (FileUtils.isEmptyDirectory(destContainerDir.toFile())) { | ||
| // Before the atomic move, the destination dir is empty and doesn't have a metadata directory. | ||
| // Writing the .container file will fail as the metadata dir doesn't exist. | ||
| // So we instead save the container file to the containerUntarDir. | ||
| Path containerMetadataPath = Paths.get(container.getContainerData().getMetadataPath()); | ||
sumitagrawl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Path tempContainerMetadataPath = Paths.get(containerUntarDir.toString(), | ||
| containerMetadataPath.getName(containerMetadataPath.getNameCount() - 1).toString()); | ||
| persistCustomContainerState(container, descriptorFileContent, State.RECOVERING, tempContainerMetadataPath); | ||
| Files.move(containerUntarDir, destContainerDir, | ||
| StandardCopyOption.ATOMIC_MOVE, | ||
| StandardCopyOption.REPLACE_EXISTING); | ||
|
|
@@ -108,6 +121,20 @@ public byte[] unpackContainerData(Container<KeyValueContainerData> container, | |
| return descriptorFileContent; | ||
| } | ||
|
|
||
| private void persistCustomContainerState(Container<KeyValueContainerData> container, byte[] descriptorContent, | ||
|
||
| ContainerProtos.ContainerDataProto.State state, Path containerMetadataPath) throws IOException { | ||
| if (descriptorContent == null) { | ||
| LOG.warn("Skipping persisting of custom state. Container descriptor is null for container {}", | ||
| container.getContainerData().getContainerID()); | ||
| return; | ||
| } | ||
|
|
||
| KeyValueContainerData originalContainerData = | ||
| (KeyValueContainerData) ContainerDataYaml.readContainer(descriptorContent); | ||
|
||
| container.getContainerData().setState(state); | ||
| container.update(originalContainerData.getMetadata(), true, containerMetadataPath.toString()); | ||
| } | ||
|
|
||
| private void extractEntry(ArchiveEntry entry, InputStream input, long size, | ||
| Path ancestor, Path path) throws IOException { | ||
| HddsUtils.validatePath(path, ancestor); | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -31,17 +31,23 @@ | |||||||||||||||||||||
| import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.MetadataKeyFilters; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.BatchOperation; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.Codec; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.RDBStore; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.RocksDatabase; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.Table; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator; | ||||||||||||||||||||||
| import org.apache.hadoop.ozone.container.common.helpers.BlockData; | ||||||||||||||||||||||
| import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; | ||||||||||||||||||||||
| import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; | ||||||||||||||||||||||
| import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; | ||||||||||||||||||||||
| import org.rocksdb.LiveFileMetaData; | ||||||||||||||||||||||
| import org.rocksdb.RocksDBException; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /** | ||||||||||||||||||||||
| * Constructs a datanode store in accordance with schema version 3, which uses | ||||||||||||||||||||||
|
|
@@ -132,17 +138,61 @@ public void dumpKVContainerData(long containerID, File dumpDir) | |||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| public void loadKVContainerData(File dumpDir) | ||||||||||||||||||||||
| throws IOException { | ||||||||||||||||||||||
| getMetadataTable().loadFromFile( | ||||||||||||||||||||||
| getTableDumpFile(getMetadataTable(), dumpDir)); | ||||||||||||||||||||||
| getBlockDataTable().loadFromFile( | ||||||||||||||||||||||
| getTableDumpFile(getBlockDataTable(), dumpDir)); | ||||||||||||||||||||||
| if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) { | ||||||||||||||||||||||
| getLastChunkInfoTable().loadFromFile( | ||||||||||||||||||||||
| getTableDumpFile(getLastChunkInfoTable(), dumpDir)); | ||||||||||||||||||||||
| throws IOException, RocksDBException { | ||||||||||||||||||||||
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| try (BatchOperation batch = getBatchHandler().initBatchOperation()) { | ||||||||||||||||||||||
| processTable(batch, getTableDumpFile(getMetadataTable(), dumpDir), | ||||||||||||||||||||||
| getDbDef().getMetadataColumnFamily().getKeyCodec(), | ||||||||||||||||||||||
| getDbDef().getMetadataColumnFamily().getValueCodec(), | ||||||||||||||||||||||
| getMetadataTable()); | ||||||||||||||||||||||
| processTable(batch, getTableDumpFile(getBlockDataTable(), dumpDir), | ||||||||||||||||||||||
| getDbDef().getBlockDataColumnFamily().getKeyCodec(), | ||||||||||||||||||||||
| getDbDef().getBlockDataColumnFamily().getValueCodec(), | ||||||||||||||||||||||
| getBlockDataTable()); | ||||||||||||||||||||||
| if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) { | ||||||||||||||||||||||
| processTable(batch, getTableDumpFile(getLastChunkInfoTable(), dumpDir), | ||||||||||||||||||||||
| getDbDef().getLastChunkInfoColumnFamily().getKeyCodec(), | ||||||||||||||||||||||
| getDbDef().getLastChunkInfoColumnFamily().getValueCodec(), | ||||||||||||||||||||||
| getLastChunkInfoTable()); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| processTable(batch, getTableDumpFile(getDeleteTransactionTable(), dumpDir), | ||||||||||||||||||||||
| ((DatanodeSchemaThreeDBDefinition)getDbDef()).getDeleteTransactionsColumnFamily().getKeyCodec(), | ||||||||||||||||||||||
| ((DatanodeSchemaThreeDBDefinition)getDbDef()).getDeleteTransactionsColumnFamily().getValueCodec(), | ||||||||||||||||||||||
| getDeleteTransactionTable()); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| getStore().commitBatchOperation(batch); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private <K, V> void processTable(BatchOperation batch, File tableDumpFile, | ||||||||||||||||||||||
| Codec<K> keyCodec, Codec<V> valueCodec, Table<K, V> table) throws IOException, RocksDBException { | ||||||||||||||||||||||
| if (isFileEmpty(tableDumpFile)) { | ||||||||||||||||||||||
| LOG.debug("SST File {} is empty. Skipping processing.", tableDumpFile.getAbsolutePath()); | ||||||||||||||||||||||
| return; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| try (ManagedOptions managedOptions = new ManagedOptions(); | ||||||||||||||||||||||
| ManagedSstFileReader sstFileReader = new ManagedSstFileReader(managedOptions)) { | ||||||||||||||||||||||
| sstFileReader.open(tableDumpFile.getAbsolutePath()); | ||||||||||||||||||||||
| try (ManagedReadOptions managedReadOptions = new ManagedReadOptions(); | ||||||||||||||||||||||
| ManagedSstFileReaderIterator iterator = | ||||||||||||||||||||||
| ManagedSstFileReaderIterator.managed(sstFileReader.newIterator(managedReadOptions))) { | ||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure there are no tombstones in this sst file?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you confirm this? How we are generating this sst file.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We dump the files by reading all the keys via an iterator so there should be no tombstones.
|
||||||||||||||||||||||
| for (iterator.get().seekToFirst(); iterator.get().isValid(); iterator.get().next()) { | ||||||||||||||||||||||
| byte[] key = iterator.get().key(); | ||||||||||||||||||||||
| byte[] value = iterator.get().value(); | ||||||||||||||||||||||
| K decodedKey = keyCodec.fromPersistedFormat(key); | ||||||||||||||||||||||
| V decodedValue = valueCodec.fromPersistedFormat(value); | ||||||||||||||||||||||
| table.putWithBatch(batch, decodedKey, decodedValue); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| boolean isFileEmpty(File file) { | ||||||||||||||||||||||
| if (!file.exists()) { | ||||||||||||||||||||||
| return true; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| getDeleteTransactionTable().loadFromFile( | ||||||||||||||||||||||
| getTableDumpFile(getDeleteTransactionTable(), dumpDir)); | ||||||||||||||||||||||
| return file.length() == 0; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| public static File getTableDumpFile(Table<String, ?> table, | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,12 +26,14 @@ | |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY; | ||
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; | ||
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; | ||
| import static org.apache.hadoop.ozone.container.TestHelper.isContainerClosed; | ||
| import static org.apache.hadoop.ozone.container.TestHelper.waitForContainerClose; | ||
| import static org.apache.hadoop.ozone.container.TestHelper.waitForReplicaCount; | ||
| import static org.apache.ozone.test.GenericTestUtils.setLogLevel; | ||
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
| import static org.mockito.ArgumentMatchers.anyList; | ||
| import static org.mockito.Mockito.any; | ||
|
|
||
|
|
@@ -282,6 +284,43 @@ private static void deleteContainer(MiniOzoneCluster cluster, DatanodeDetails dn | |
| } | ||
|
|
||
|
|
||
| @Test | ||
| public void testImportedContainerIsClosed() throws Exception { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have miniOzoneCluster test case for the container cleanup on DN restart? |
||
| OzoneConfiguration conf = createConfiguration(false); | ||
| // create a 4 node cluster | ||
| try (MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).build()) { | ||
| cluster.waitForClusterToBeReady(); | ||
|
|
||
| try (OzoneClient client = OzoneClientFactory.getRpcClient(conf)) { | ||
| List<DatanodeDetails> allNodes = | ||
| cluster.getHddsDatanodes().stream() | ||
| .map(HddsDatanodeService::getDatanodeDetails) | ||
| .collect(Collectors.toList()); | ||
| // shutdown 4th node (node 3 is down now) | ||
| cluster.shutdownHddsDatanode(allNodes.get(allNodes.size() - 1)); | ||
|
|
||
| createTestData(client); | ||
| final OmKeyLocationInfo keyLocation = lookupKeyFirstLocation(cluster); | ||
| long containerID = keyLocation.getContainerID(); | ||
| waitForContainerClose(cluster, containerID); | ||
|
|
||
| // shutdown nodes 0 and 1. only node 2 is up now | ||
| for (int i = 0; i < 2; i++) { | ||
| cluster.shutdownHddsDatanode(allNodes.get(i)); | ||
| } | ||
| waitForReplicaCount(containerID, 1, cluster); | ||
|
|
||
| // bring back up the 4th node | ||
| cluster.restartHddsDatanode(allNodes.get(allNodes.size() - 1), false); | ||
|
|
||
| // the container should have been imported on the 4th node | ||
| waitForReplicaCount(containerID, 2, cluster); | ||
| assertTrue(isContainerClosed(cluster, containerID, allNodes.get(allNodes.size() - 1))); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| @Test | ||
| @Flaky("HDDS-11087") | ||
| public void testECContainerReplication() throws Exception { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.