Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ void create(VolumeSet volumeSet, VolumeChoosingPolicy volumeChoosingPolicy,
void update(Map<String, String> metaData, boolean forceUpdate)
throws StorageContainerException;

void update(Map<String, String> metaData, boolean forceUpdate, String containerMetadataPath)
throws StorageContainerException;

void updateDataScanTimestamp(Instant timestamp)
throws StorageContainerException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;

/**
* Service to pack/unpack ContainerData container data to/from a single byte
Expand Down Expand Up @@ -54,4 +56,17 @@ void pack(Container<CONTAINERDATA> container, OutputStream destination)
*/
byte[] unpackContainerDescriptor(InputStream inputStream)
throws IOException;

/**
* Persists the custom state for a container. This method allows saving the container file to a custom location.
*/
default void persistCustomContainerState(Container<? extends ContainerData> container, byte[] descriptorContent,
ContainerProtos.ContainerDataProto.State state, Path containerMetadataPath) throws IOException {
if (descriptorContent == null) {
return;
}
ContainerData originalContainerData = ContainerDataYaml.readContainer(descriptorContent);
container.getContainerData().setState(state);
container.update(originalContainerData.getMetadata(), true, containerMetadataPath.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -551,13 +551,17 @@ public ContainerType getContainerType() {
}

@Override
public void update(
Map<String, String> metadata, boolean forceUpdate)
public void update(Map<String, String> metadata, boolean forceUpdate)
throws StorageContainerException {
update(metadata, forceUpdate, containerData.getMetadataPath());
}

@Override
public void update(Map<String, String> metadata, boolean forceUpdate, String containerMetadataPath)
throws StorageContainerException {
// TODO: Now, when writing the updated data to .container file, we are
// holding lock and writing data to disk. We can have async implementation
// to flush the update container data to disk.
// holding lock and writing data to disk. We can have async implementation
// to flush the update container data to disk.
long containerId = containerData.getContainerID();
if (!containerData.isValid()) {
LOG.debug("Invalid container data. ContainerID: {}", containerId);
Expand All @@ -577,7 +581,7 @@ public void update(
containerData.addMetadata(entry.getKey(), entry.getValue());
}

File containerFile = getContainerFile();
File containerFile = getContainerFile(containerMetadataPath, containerData.getContainerID());
// update the new container data to .container File
updateContainerFile(containerFile);
} catch (StorageContainerException ex) {
Expand Down Expand Up @@ -665,21 +669,22 @@ public void importContainerData(InputStream input,

public void importContainerData(KeyValueContainerData originalContainerData)
throws IOException {
containerData.setState(originalContainerData.getState());
containerData
.setContainerDBType(originalContainerData.getContainerDBType());
containerData.setSchemaVersion(originalContainerData.getSchemaVersion());

//rewriting the yaml file with new checksum calculation.
update(originalContainerData.getMetadata(), true);

if (containerData.hasSchema(OzoneConsts.SCHEMA_V3)) {
// load metadata from received dump files before we try to parse kv
BlockUtils.loadKVContainerDataFromFiles(containerData, config);
}

//fill in memory stat counter (keycount, byte usage)
KeyValueContainerUtil.parseKVContainerData(containerData, config);

// rewriting the yaml file with new checksum calculation
// restore imported container's state to the original state and flush the yaml file
containerData.setState(originalContainerData.getState());
update(originalContainerData.getMetadata(), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
Expand Down Expand Up @@ -92,6 +93,13 @@ public byte[] unpackContainerData(Container<KeyValueContainerData> container,
Files.createDirectories(destContainerDir);
}
if (FileUtils.isEmptyDirectory(destContainerDir.toFile())) {
// Before the atomic move, the destination dir is empty and doesn't have a metadata directory.
// Writing the .container file will fail as the metadata dir doesn't exist.
// So we instead save the container file to the containerUntarDir.
Path containerMetadataPath = Paths.get(container.getContainerData().getMetadataPath());
Path tempContainerMetadataPath = Paths.get(containerUntarDir.toString(),
containerMetadataPath.getName(containerMetadataPath.getNameCount() - 1).toString());
persistCustomContainerState(container, descriptorFileContent, State.RECOVERING, tempContainerMetadataPath);
Files.move(containerUntarDir, destContainerDir,
StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,23 @@
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.RocksDatabase;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.RocksDBException;

/**
* Constructs a datanode store in accordance with schema version 3, which uses
Expand Down Expand Up @@ -133,16 +139,62 @@ public void dumpKVContainerData(long containerID, File dumpDir)

public void loadKVContainerData(File dumpDir)
throws IOException {
getMetadataTable().loadFromFile(
getTableDumpFile(getMetadataTable(), dumpDir));
getBlockDataTable().loadFromFile(
getTableDumpFile(getBlockDataTable(), dumpDir));
if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) {
getLastChunkInfoTable().loadFromFile(
getTableDumpFile(getLastChunkInfoTable(), dumpDir));

try (BatchOperation batch = getBatchHandler().initBatchOperation()) {
processTable(batch, getTableDumpFile(getMetadataTable(), dumpDir),
getDbDef().getMetadataColumnFamily().getKeyCodec(),
getDbDef().getMetadataColumnFamily().getValueCodec(),
getMetadataTable());
processTable(batch, getTableDumpFile(getBlockDataTable(), dumpDir),
getDbDef().getBlockDataColumnFamily().getKeyCodec(),
getDbDef().getBlockDataColumnFamily().getValueCodec(),
getBlockDataTable());
if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) {
processTable(batch, getTableDumpFile(getLastChunkInfoTable(), dumpDir),
getDbDef().getLastChunkInfoColumnFamily().getKeyCodec(),
getDbDef().getLastChunkInfoColumnFamily().getValueCodec(),
getLastChunkInfoTable());
}
processTable(batch, getTableDumpFile(getDeleteTransactionTable(), dumpDir),
((DatanodeSchemaThreeDBDefinition)getDbDef()).getDeleteTransactionsColumnFamily().getKeyCodec(),
((DatanodeSchemaThreeDBDefinition)getDbDef()).getDeleteTransactionsColumnFamily().getValueCodec(),
getDeleteTransactionTable());

getStore().commitBatchOperation(batch);
} catch (RocksDBException e) {
throw new IOException("Failed to load container data from dump file.", e);
}
}

private <K, V> void processTable(BatchOperation batch, File tableDumpFile,
Codec<K> keyCodec, Codec<V> valueCodec, Table<K, V> table) throws IOException, RocksDBException {
if (isFileEmpty(tableDumpFile)) {
LOG.debug("SST File {} is empty. Skipping processing.", tableDumpFile.getAbsolutePath());
return;
}

try (ManagedOptions managedOptions = new ManagedOptions();
ManagedSstFileReader sstFileReader = new ManagedSstFileReader(managedOptions)) {
sstFileReader.open(tableDumpFile.getAbsolutePath());
try (ManagedReadOptions managedReadOptions = new ManagedReadOptions();
ManagedSstFileReaderIterator iterator =
ManagedSstFileReaderIterator.managed(sstFileReader.newIterator(managedReadOptions))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure there are no tombstones in this sst file?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you confirm this? How we are generating this sst file.

Copy link
Contributor Author

@ptlrs ptlrs Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dump the files by reading all the keys via an iterator so there should be no tombstones.

  1. public void dumpToFileWithPrefix(File externalFile, byte[] prefix)
    throws IOException {
    try (TableIterator<byte[], KeyValue<byte[], byte[]>> iter
    = iterator(prefix);
    DumpFileWriter fileWriter = new RDBSstFileWriter()) {
    fileWriter.open(externalFile);
    while (iter.hasNext()) {
    final KeyValue<byte[], byte[]> entry = iter.next();
    fileWriter.put(entry.getKey(), entry.getValue());

for (iterator.get().seekToFirst(); iterator.get().isValid(); iterator.get().next()) {
byte[] key = iterator.get().key();
byte[] value = iterator.get().value();
K decodedKey = keyCodec.fromPersistedFormat(key);
V decodedValue = valueCodec.fromPersistedFormat(value);
table.putWithBatch(batch, decodedKey, decodedValue);
}
}
}
}

boolean isFileEmpty(File file) {
if (!file.exists()) {
return true;
}
getDeleteTransactionTable().loadFromFile(
getTableDumpFile(getDeleteTransactionTable(), dumpDir));
return file.length() == 0;
}

public static File getTableDumpFile(Table<String, ?> table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,15 @@ public void verifyAndFixupContainerData(ContainerData containerData)
config);
if (kvContainer.getContainerState() == RECOVERING) {
if (shouldDelete) {
kvContainer.markContainerUnhealthy();
LOG.info("Stale recovering container {} marked UNHEALTHY",
kvContainerData.getContainerID());
containerSet.addContainer(kvContainer);
// delete Ratis replicated RECOVERING containers
if (kvContainer.getContainerData().getReplicaIndex() == 0) {
cleanupContainer(hddsVolume, kvContainer);
} else {
kvContainer.markContainerUnhealthy();
LOG.info("Stale recovering container {} marked UNHEALTHY",
kvContainerData.getContainerID());
containerSet.addContainer(kvContainer);
}
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,7 @@ void testAutoCompactionSmallSstFile(
TarContainerPacker packer = new TarContainerPacker(NO_COMPRESSION);
container.importContainerData(fis, packer);
containerList.add(container);
assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, container.getContainerData().getState());
}
}

Expand All @@ -891,12 +892,16 @@ void testAutoCompactionSmallSstFile(
CONF).getStore();
List<LiveFileMetaData> fileMetaDataList1 =
((RDBStore)(dnStore.getStore())).getDb().getLiveFilesMetaData();
// When using Table.loadFromFile() in loadKVContainerData(),
// there were as many SST files generated as the number of imported containers
// After moving away from using Table.loadFromFile(), no SST files are generated unless the db is force flushed
assertEquals(0, fileMetaDataList1.size());
hddsVolume.compactDb();
// Sleep a while to wait for compaction to complete
Thread.sleep(7000);
List<LiveFileMetaData> fileMetaDataList2 =
((RDBStore)(dnStore.getStore())).getDb().getLiveFilesMetaData();
assertThat(fileMetaDataList2.size()).isLessThan(fileMetaDataList1.size());
assertThat(fileMetaDataList2).hasSizeLessThanOrEqualTo(fileMetaDataList1.size());
} finally {
// clean up
for (KeyValueContainer c : containerList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker.CONTAINER_FILE_NAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -72,7 +71,22 @@ public class TestTarContainerPacker {

private static final String TEST_CHUNK_FILE_CONTENT = "This is a chunk";

private static final String TEST_DESCRIPTOR_FILE_CONTENT = "descriptor";
private static final String TEST_DESCRIPTOR_FILE_CONTENT = "!<KeyValueContainerData>\n" +
"checksum: 2215d39f2ae1de89fec837d18dc6387d8cba22fb5943cf4616f80c4b34e2edfe\n" +
"chunksPath: target/test-dir/MiniOzoneClusterImpl-23c1bb30-d86a-4f79-88dc-574d8259a5b3/ozone-meta/datanode-4" +
"/data-0/hdds/23c1bb30-d86a-4f79-88dc-574d8259a5b3/current/containerDir0/1/chunks\n" +
"containerDBType: RocksDB\n" +
"containerID: 1\n" +
"containerType: KeyValueContainer\n" +
"layOutVersion: 2\n" +
"maxSize: 5368709120\n" +
"metadata: {}\n" +
"metadataPath: target/test-dir/MiniOzoneClusterImpl-23c1bb30-d86a-4f79-88dc-574d8259a5b3/ozone-meta/datanode-4" +
"/data-0/hdds/23c1bb30-d86a-4f79-88dc-574d8259a5b3/current/containerDir0/1/metadata\n" +
"originNodeId: 25a48afa-f8d8-44ff-b268-642167e5354b\n" +
"originPipelineId: d7faca81-407f-4a50-a399-bd478c9795e5\n" +
"schemaVersion: '3'\n" +
"state: CLOSED";

private TarContainerPacker packer;

Expand Down Expand Up @@ -142,9 +156,9 @@ private KeyValueContainerData createContainer(Path dir, boolean createDir)
long id = CONTAINER_ID.getAndIncrement();

Path containerDir = dir.resolve(String.valueOf(id));
Path dbDir = containerDir.resolve("db");
Path dataDir = containerDir.resolve("chunks");
Path metaDir = containerDir.resolve("metadata");
Path dbDir = metaDir.resolve("db");
if (createDir) {
Files.createDirectories(metaDir);
Files.createDirectories(dbDir);
Expand Down Expand Up @@ -245,9 +259,10 @@ public void pack(ContainerTestVersionInfo versionInfo,
assertExampleChunkFileIsGood(
Paths.get(destinationContainerData.getChunksPath()),
TEST_CHUNK_FILE_NAME);
assertFalse(destinationContainer.getContainerFile().exists(),
"Descriptor file should not have been extracted by the "
+ "unpackContainerData Call");

String containerFileData = new String(Files.readAllBytes(destinationContainer.getContainerFile().toPath()), UTF_8);
assertTrue(containerFileData.contains("RECOVERING"),
"The state of the container is not 'RECOVERING' in the container file");
assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, descriptor);
inputForUnpackData.assertClosedExactlyOnce();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,16 @@ public void testContainerReader(ContainerTestVersionInfo versionInfo)
thread.start();
thread.join();

//recovering container should be marked unhealthy, so the count should be 3
assertEquals(UNHEALTHY, containerSet.getContainer(
recoveringContainerData.getContainerID()).getContainerState());
assertEquals(3, containerSet.containerCount());
// Ratis replicated recovering containers are deleted upon datanode startup
if (recoveringKeyValueContainer.getContainerData().getReplicaIndex() == 0) {
assertNull(containerSet.getContainer(recoveringContainerData.getContainerID()));
assertEquals(2, containerSet.containerCount());
} else {
//recovering container should be marked unhealthy, so the count should be 3
assertEquals(UNHEALTHY, containerSet.getContainer(
recoveringContainerData.getContainerID()).getContainerState());
assertEquals(3, containerSet.containerCount());
}

for (int i = 0; i < 2; i++) {
Container keyValueContainer = containerSet.getContainer(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.ozone.container.TestHelper.isContainerClosed;
import static org.apache.hadoop.ozone.container.TestHelper.waitForContainerClose;
import static org.apache.hadoop.ozone.container.TestHelper.waitForReplicaCount;
import static org.apache.ozone.test.GenericTestUtils.setLogLevel;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.any;

Expand Down Expand Up @@ -282,6 +284,43 @@ private static void deleteContainer(MiniOzoneCluster cluster, DatanodeDetails dn
}


@Test
public void testImportedContainerIsClosed() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have miniOzoneCluster test case for the container cleanup on DN restart?

OzoneConfiguration conf = createConfiguration(false);
// create a 4 node cluster
try (MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).build()) {
cluster.waitForClusterToBeReady();

try (OzoneClient client = OzoneClientFactory.getRpcClient(conf)) {
List<DatanodeDetails> allNodes =
cluster.getHddsDatanodes().stream()
.map(HddsDatanodeService::getDatanodeDetails)
.collect(Collectors.toList());
// shutdown 4th node (node 3 is down now)
cluster.shutdownHddsDatanode(allNodes.get(allNodes.size() - 1));

createTestData(client);
final OmKeyLocationInfo keyLocation = lookupKeyFirstLocation(cluster);
long containerID = keyLocation.getContainerID();
waitForContainerClose(cluster, containerID);

// shutdown nodes 0 and 1. only node 2 is up now
for (int i = 0; i < 2; i++) {
cluster.shutdownHddsDatanode(allNodes.get(i));
}
waitForReplicaCount(containerID, 1, cluster);

// bring back up the 4th node
cluster.restartHddsDatanode(allNodes.get(allNodes.size() - 1), false);

// the container should have been imported on the 4th node
waitForReplicaCount(containerID, 2, cluster);
assertTrue(isContainerClosed(cluster, containerID, allNodes.get(allNodes.size() - 1)));
}
}
}


@Test
@Flaky("HDDS-11087")
public void testECContainerReplication() throws Exception {
Expand Down