-
Notifications
You must be signed in to change notification settings - Fork 588
HDDS-12233. Atomically import a container #7934
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
78da63b
9ccda66
920e5ad
c9f1741
b3abc37
a151bd2
e68c85e
6d9b112
c9907f8
8d09dcb
3e1cb0e
8e3f941
06e90e2
436d151
db1ac27
35361d1
1a69500
f7108cb
1b60599
a3f661b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,19 +42,25 @@ | |
| import org.apache.commons.io.FileUtils; | ||
| import org.apache.commons.io.IOUtils; | ||
| import org.apache.hadoop.hdds.HddsUtils; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; | ||
| import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; | ||
| import org.apache.hadoop.ozone.OzoneConsts; | ||
| import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; | ||
| import org.apache.hadoop.ozone.container.common.interfaces.Container; | ||
| import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker; | ||
| import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; | ||
| import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl; | ||
| import org.apache.hadoop.ozone.container.replication.CopyContainerCompression; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * Compress/uncompress KeyValueContainer data to a tar archive. | ||
| */ | ||
| public class TarContainerPacker | ||
| implements ContainerPacker<KeyValueContainerData> { | ||
| private static final Logger LOG = LoggerFactory.getLogger(TarContainerPacker.class); | ||
|
|
||
| static final String CHUNKS_DIR_NAME = OzoneConsts.STORAGE_DIR_CHUNKS; | ||
|
|
||
|
|
@@ -95,9 +101,19 @@ public byte[] unpackContainerData(Container<KeyValueContainerData> container, | |
| Files.createDirectories(destContainerDir); | ||
| } | ||
| if (FileUtils.isEmptyDirectory(destContainerDir.toFile())) { | ||
| // Before the atomic move, the destination dir is empty and doesn't have a metadata directory. | ||
| // Writing the .container file will fail as the metadata dir doesn't exist | ||
| Path containerMetadataPath = Paths.get(container.getContainerData().getMetadataPath()); | ||
sumitagrawl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Path tempContainerMetadataPath = Paths.get(containerUntarDir.toString(), | ||
| containerMetadataPath.getName(containerMetadataPath.getNameCount() - 1).toString()); | ||
| container.getContainerData().setMetadataPath(tempContainerMetadataPath.toString()); | ||
| persistCustomContainerState(container, descriptorFileContent, State.RECOVERING); | ||
| container.getContainerData().setMetadataPath(containerMetadataPath.toString()); | ||
| Files.move(containerUntarDir, destContainerDir, | ||
| StandardCopyOption.ATOMIC_MOVE, | ||
| StandardCopyOption.REPLACE_EXISTING); | ||
| // Persist again to update the metadata path to point the destination dir | ||
| persistCustomContainerState(container, descriptorFileContent, State.RECOVERING); | ||
ptlrs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } else { | ||
| String errorMessage = "Container " + containerId + | ||
| " unpack failed because ContainerFile " + | ||
|
|
@@ -108,6 +124,20 @@ public byte[] unpackContainerData(Container<KeyValueContainerData> container, | |
| return descriptorFileContent; | ||
| } | ||
|
|
||
| private void persistCustomContainerState(Container<KeyValueContainerData> container, byte[] descriptorContent, | ||
|
||
| ContainerProtos.ContainerDataProto.State state) throws IOException { | ||
| if (descriptorContent == null) { | ||
| LOG.warn("Skipping persisting of custom state. Container descriptor is null for container {}", | ||
| container.getContainerData().getContainerID()); | ||
| return; | ||
| } | ||
|
|
||
| KeyValueContainerData originalContainerData = | ||
| (KeyValueContainerData) ContainerDataYaml.readContainer(descriptorContent); | ||
|
||
| container.getContainerData().setState(state); | ||
| container.update(originalContainerData.getMetadata(), true); | ||
| } | ||
|
|
||
| private void extractEntry(ArchiveEntry entry, InputStream input, long size, | ||
| Path ancestor, Path path) throws IOException { | ||
| HddsUtils.validatePath(path, ancestor); | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -31,17 +31,25 @@ | |||||||||||||||||||||
| import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.MetadataKeyFilters; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.BatchOperation; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.Codec; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.LongCodec; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.Proto2Codec; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.RDBStore; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.RocksDatabase; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.Table; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader; | ||||||||||||||||||||||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator; | ||||||||||||||||||||||
| import org.apache.hadoop.ozone.container.common.helpers.BlockData; | ||||||||||||||||||||||
| import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; | ||||||||||||||||||||||
| import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; | ||||||||||||||||||||||
| import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; | ||||||||||||||||||||||
| import org.rocksdb.LiveFileMetaData; | ||||||||||||||||||||||
| import org.rocksdb.RocksDBException; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /** | ||||||||||||||||||||||
| * Constructs a datanode store in accordance with schema version 3, which uses | ||||||||||||||||||||||
|
|
@@ -132,17 +140,52 @@ public void dumpKVContainerData(long containerID, File dumpDir) | |||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| public void loadKVContainerData(File dumpDir) | ||||||||||||||||||||||
| throws IOException { | ||||||||||||||||||||||
| getMetadataTable().loadFromFile( | ||||||||||||||||||||||
| getTableDumpFile(getMetadataTable(), dumpDir)); | ||||||||||||||||||||||
| getBlockDataTable().loadFromFile( | ||||||||||||||||||||||
| getTableDumpFile(getBlockDataTable(), dumpDir)); | ||||||||||||||||||||||
| if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) { | ||||||||||||||||||||||
| getLastChunkInfoTable().loadFromFile( | ||||||||||||||||||||||
| getTableDumpFile(getLastChunkInfoTable(), dumpDir)); | ||||||||||||||||||||||
| throws IOException, RocksDBException { | ||||||||||||||||||||||
|
||||||||||||||||||||||
| try (BatchOperation batch = getBatchHandler().initBatchOperation()) { | ||||||||||||||||||||||
| processTable(batch, getTableDumpFile(getMetadataTable(), dumpDir), | ||||||||||||||||||||||
| FixedLengthStringCodec.get(), LongCodec.get(), getMetadataTable()); | ||||||||||||||||||||||
ptlrs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||||
| processTable(batch, getTableDumpFile(getBlockDataTable(), dumpDir), | ||||||||||||||||||||||
| FixedLengthStringCodec.get(), BlockData.getCodec(), getBlockDataTable()); | ||||||||||||||||||||||
| if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) { | ||||||||||||||||||||||
| processTable(batch, getTableDumpFile(getLastChunkInfoTable(), dumpDir), | ||||||||||||||||||||||
| FixedLengthStringCodec.get(), BlockData.getCodec(), getLastChunkInfoTable()); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| processTable(batch, getTableDumpFile(getDeleteTransactionTable(), dumpDir), FixedLengthStringCodec.get(), | ||||||||||||||||||||||
| Proto2Codec.get(DeletedBlocksTransaction.getDefaultInstance()), getDeleteTransactionTable()); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| getStore().commitBatchOperation(batch); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private <K, V> void processTable(BatchOperation batch, File tableDumpFile, | ||||||||||||||||||||||
| Codec<K> keyCodec, Codec<V> valueCodec, Table<K, V> table) throws IOException, RocksDBException { | ||||||||||||||||||||||
| if (isFileEmpty(tableDumpFile)) { | ||||||||||||||||||||||
| LOG.warn("SST File {} is empty. Skipping processing.", tableDumpFile.getAbsolutePath()); | ||||||||||||||||||||||
ptlrs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||||
| return; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| try (ManagedOptions managedOptions = new ManagedOptions(); | ||||||||||||||||||||||
| ManagedSstFileReader sstFileReader = new ManagedSstFileReader(managedOptions)) { | ||||||||||||||||||||||
| sstFileReader.open(tableDumpFile.getAbsolutePath()); | ||||||||||||||||||||||
| try (ManagedReadOptions managedReadOptions = new ManagedReadOptions(); | ||||||||||||||||||||||
| ManagedSstFileReaderIterator iterator = | ||||||||||||||||||||||
| ManagedSstFileReaderIterator.managed(sstFileReader.newIterator(managedReadOptions))) { | ||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure there are no tombstones in this sst file?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you confirm this? How we are generating this sst file.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We dump the files by reading all the keys via an iterator so there should be no tombstones.
|
||||||||||||||||||||||
| for (iterator.get().seekToFirst(); iterator.get().isValid(); iterator.get().next()) { | ||||||||||||||||||||||
| byte[] key = iterator.get().key(); | ||||||||||||||||||||||
| byte[] value = iterator.get().value(); | ||||||||||||||||||||||
| K decodedKey = keyCodec.fromPersistedFormat(key); | ||||||||||||||||||||||
| V decodedValue = valueCodec.fromPersistedFormat(value); | ||||||||||||||||||||||
| table.putWithBatch(batch, decodedKey, decodedValue); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| boolean isFileEmpty(File file) { | ||||||||||||||||||||||
| if (!file.exists()) { | ||||||||||||||||||||||
| return true; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| getDeleteTransactionTable().loadFromFile( | ||||||||||||||||||||||
| getTableDumpFile(getDeleteTransactionTable(), dumpDir)); | ||||||||||||||||||||||
| return file.length() == 0; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| public static File getTableDumpFile(Table<String, ?> table, | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -72,7 +72,20 @@ public class TestTarContainerPacker { | |
|
|
||
| private static final String TEST_CHUNK_FILE_CONTENT = "This is a chunk"; | ||
|
|
||
| private static final String TEST_DESCRIPTOR_FILE_CONTENT = "descriptor"; | ||
| private static final String TEST_DESCRIPTOR_FILE_CONTENT = "!<KeyValueContainerData>\n" + | ||
| "checksum: 2215d39f2ae1de89fec837d18dc6387d8cba22fb5943cf4616f80c4b34e2edfe\n" + | ||
| "chunksPath: target/test-dir/MiniOzoneClusterImpl-23c1bb30-d86a-4f79-88dc-574d8259a5b3/ozone-meta/datanode-4/data-0/hdds/23c1bb30-d86a-4f79-88dc-574d8259a5b3/current/containerDir0/1/chunks\n" + | ||
| "containerDBType: RocksDB\n" + | ||
| "containerID: 1\n" + | ||
| "containerType: KeyValueContainer\n" + | ||
| "layOutVersion: 2\n" + | ||
| "maxSize: 5368709120\n" + | ||
| "metadata: {}\n" + | ||
| "metadataPath: target/test-dir/MiniOzoneClusterImpl-23c1bb30-d86a-4f79-88dc-574d8259a5b3/ozone-meta/datanode-4/data-0/hdds/23c1bb30-d86a-4f79-88dc-574d8259a5b3/current/containerDir0/1/metadata\n" + | ||
| "originNodeId: 25a48afa-f8d8-44ff-b268-642167e5354b\n" + | ||
| "originPipelineId: d7faca81-407f-4a50-a399-bd478c9795e5\n" + | ||
| "schemaVersion: '3'\n" + | ||
| "state: CLOSED"; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The dummy content for the descriptor file resulted in test failure as we now parse the descriptor file content and save to disk when performing the |
||
|
|
||
| private TarContainerPacker packer; | ||
|
|
||
|
|
@@ -142,9 +155,9 @@ private KeyValueContainerData createContainer(Path dir, boolean createDir) | |
| long id = CONTAINER_ID.getAndIncrement(); | ||
|
|
||
| Path containerDir = dir.resolve(String.valueOf(id)); | ||
| Path dbDir = containerDir.resolve("db"); | ||
| Path dataDir = containerDir.resolve("chunks"); | ||
| Path metaDir = containerDir.resolve("metadata"); | ||
| Path dbDir = metaDir.resolve("db"); | ||
ptlrs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (createDir) { | ||
| Files.createDirectories(metaDir); | ||
| Files.createDirectories(dbDir); | ||
|
|
@@ -245,9 +258,10 @@ public void pack(ContainerTestVersionInfo versionInfo, | |
| assertExampleChunkFileIsGood( | ||
| Paths.get(destinationContainerData.getChunksPath()), | ||
| TEST_CHUNK_FILE_NAME); | ||
| assertFalse(destinationContainer.getContainerFile().exists(), | ||
| "Descriptor file should not have been extracted by the " | ||
| + "unpackContainerData Call"); | ||
|
|
||
| String containerFileData = new String(Files.readAllBytes(destinationContainer.getContainerFile().toPath()), UTF_8); | ||
| assertTrue(containerFileData.contains("RECOVERING"), | ||
| "The state of the container is not 'RECOVERING' in the container file"); | ||
| assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, descriptor); | ||
| inputForUnpackData.assertClosedExactlyOnce(); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,12 +26,14 @@ | |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY; | ||
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; | ||
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; | ||
| import static org.apache.hadoop.ozone.container.TestHelper.isContainerClosed; | ||
| import static org.apache.hadoop.ozone.container.TestHelper.waitForContainerClose; | ||
| import static org.apache.hadoop.ozone.container.TestHelper.waitForReplicaCount; | ||
| import static org.apache.ozone.test.GenericTestUtils.setLogLevel; | ||
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
| import static org.mockito.ArgumentMatchers.anyList; | ||
| import static org.mockito.Mockito.any; | ||
|
|
||
|
|
@@ -284,6 +286,43 @@ private static void deleteContainer(MiniOzoneCluster cluster, DatanodeDetails dn | |
| } | ||
|
|
||
|
|
||
| @Test | ||
| public void testImportedContainerIsClosed() throws Exception { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have miniOzoneCluster test case for the container cleanup on DN restart? |
||
| OzoneConfiguration conf = createConfiguration(false); | ||
| // create a 4 node cluster | ||
| try (MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).build()) { | ||
| cluster.waitForClusterToBeReady(); | ||
|
|
||
| try (OzoneClient client = OzoneClientFactory.getRpcClient(conf)) { | ||
| List<DatanodeDetails> allNodes = | ||
| cluster.getHddsDatanodes().stream() | ||
| .map(HddsDatanodeService::getDatanodeDetails) | ||
| .collect(Collectors.toList()); | ||
| // shutdown 4th node (node 3 is down now) | ||
| cluster.shutdownHddsDatanode(allNodes.get(allNodes.size() - 1)); | ||
|
|
||
| createTestData(client); | ||
| final OmKeyLocationInfo keyLocation = lookupKeyFirstLocation(cluster); | ||
| long containerID = keyLocation.getContainerID(); | ||
| waitForContainerClose(cluster, containerID); | ||
|
|
||
| // shutdown nodes 0 and 1. only node 2 is up now | ||
| for (int i = 0; i < 2; i++) { | ||
| cluster.shutdownHddsDatanode(allNodes.get(i)); | ||
| } | ||
| waitForReplicaCount(containerID, 1, cluster); | ||
|
|
||
| // bring back up the 4th node | ||
| cluster.restartHddsDatanode(allNodes.get(allNodes.size() - 1), false); | ||
|
|
||
| // the container should have been imported on the 4th node | ||
| waitForReplicaCount(containerID,2, cluster); | ||
| assertTrue(isContainerClosed(cluster, containerID, allNodes.get(allNodes.size() - 1))); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| @Test | ||
| @Flaky("HDDS-11087") | ||
| public void testECContainerReplication() throws Exception { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.