Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,23 @@ public ReconContainerMetadataManagerImpl(ReconDBProvider reconDBProvider,
public void reinitWithNewContainerDataFromOm(Map<ContainerKeyPrefix, Integer>
containerKeyPrefixCounts)
throws IOException {
// clear and re-init all container-related tables
LOG.info("Starting reinitialization of container data...");

Instant truncateStart = Instant.now();

// Clear and re-init all container-related tables
truncateTable(this.containerKeyTable);
truncateTable(this.keyContainerTable);
truncateTable(this.containerKeyCountTable);

long truncateDuration = Duration.between(truncateStart, Instant.now()).toMillis();
LOG.info("Table truncation completed in {} ms", truncateDuration);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should ideally be a metric.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems test log to capture time taken for a part of operation, may not need this, can have overall operation time taken.


initializeTables();

if (containerKeyPrefixCounts != null) {
KeyPrefixContainer tmpKeyPrefixContainer;
for (Map.Entry<ContainerKeyPrefix, Integer> entry :
containerKeyPrefixCounts.entrySet()) {
for (Map.Entry<ContainerKeyPrefix, Integer> entry : containerKeyPrefixCounts.entrySet()) {
containerKeyTable.put(entry.getKey(), entry.getValue());
tmpKeyPrefixContainer = entry.getKey().toKeyPrefixContainer();
if (tmpKeyPrefixContainer != null) {
Expand All @@ -126,7 +133,7 @@ public void reinitWithNewContainerDataFromOm(Map<ContainerKeyPrefix, Integer>
}
}

// reset total count of containers to zero
// Reset total count of containers to zero
storeContainerCount(0L);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,16 @@ static void truncateTable(Table table) throws IOException {
if (table == null) {
return;
}
try (TableIterator<Object, ? extends KeyValue<Object, Object>>
tableIterator = table.iterator()) {
while (tableIterator.hasNext()) {
KeyValue<Object, Object> entry = tableIterator.next();
table.delete(entry.getKey());
try (TableIterator<Object, ? extends KeyValue<Object, Object>> iterator = table.iterator()) {
if (iterator.hasNext()) {
Object firstKey = iterator.next().getKey();
Object lastKey = null;
while (iterator.hasNext()) {
lastKey = iterator.next().getKey();
}
if (lastKey != null) {
table.deleteRange(firstKey, lastKey); // Efficient bulk deletion
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -81,102 +82,105 @@ public ContainerKeyMapperTask(ReconContainerMetadataManager
}

/**
* Read Key -&gt; ContainerId data from OM snapshot DB and write reverse map
* (container, key) -&gt; count to Recon Container DB.
* Read Key -> ContainerId data from OM snapshot DB and write reverse map
* (container, key) -> count to Recon Container DB.
*/
@Override
public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
long omKeyCount = 0;
LOG.info("Starting parallel reprocess of ContainerKeyMapperTask.");

Instant start = Instant.now();

// In-memory maps for fast look up and batch write
// (container, key) -> count
Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
// containerId -> key count
Map<Long, Long> containerKeyCountMap = new HashMap<>();
try {
LOG.debug("Starting a 'reprocess' run of ContainerKeyMapperTask.");
Instant start = Instant.now();
// Step 1: Reset Recon DB before processing
reconContainerMetadataManager.reinitWithNewContainerDataFromOm(new HashMap<>());

// Step 2: Thread pool for parallel execution
int numThreads = 2; // Adjust based on performance requirements
ExecutorService executor = Executors.newFixedThreadPool(numThreads);

// Step 3: List of tasks for parallel execution
List<Future<Boolean>> futures = new ArrayList<>();
List<BucketLayout> layouts = Arrays.asList(
BucketLayout.LEGACY, BucketLayout.FILE_SYSTEM_OPTIMIZED
);

for (BucketLayout layout : layouts) {
futures.add(executor.submit(() -> {
LOG.info("Processing layout: {}", layout);

Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
Map<Long, Long> containerKeyCountMap = new HashMap<>();

try {
Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable(layout);
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> iter =
omKeyInfoTable.iterator()) {

while (iter.hasNext()) {
Table.KeyValue<String, OmKeyInfo> kv = iter.next();
handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap, containerKeyCountMap);

// Partial flush if threshold is reached
if (containerKeyMap.size() >= containerKeyFlushToDBMaxThreshold) {
synchronized (this) {
writeToTheDB(containerKeyMap, containerKeyCountMap, Collections.emptyList());
containerKeyMap.clear();
containerKeyCountMap.clear();
}
}
}
}

// initialize new container DB
reconContainerMetadataManager
.reinitWithNewContainerDataFromOm(new HashMap<>());

// loop over both key table and file table
for (BucketLayout layout : Arrays.asList(BucketLayout.LEGACY,
BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
// (HDDS-8580) Since "reprocess" iterate over the whole key table,
// containerKeyMap needs to be incrementally flushed to DB based on
// configured batch threshold.
// containerKeyCountMap can be flushed at the end since the number
// of containers in a cluster will not have significant memory overhead.
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(layout);
try (
TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
while (keyIter.hasNext()) {
Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
OmKeyInfo omKeyInfo = kv.getValue();
handleKeyReprocess(kv.getKey(), omKeyInfo, containerKeyMap,
containerKeyCountMap);
if (!checkAndCallFlushToDB(containerKeyMap)) {
LOG.error("Unable to flush containerKey information to the DB");
return new ImmutablePair<>(getTaskName(), false);
// Final flush for any remaining data
if (!containerKeyMap.isEmpty()) {
synchronized (this) {
writeToTheDB(containerKeyMap, containerKeyCountMap, Collections.emptyList());
}
}
omKeyCount++;

LOG.info("Completed layout: {}", layout);
return true;
} catch (Exception e) {
LOG.error("Error processing layout {}", layout, e);
return false;
}
}));
}

// Step 4: Wait for all tasks to finish
boolean allSuccessful = true;
for (Future<Boolean> f : futures) {
if (!f.get()) {
allSuccessful = false;
}
}

// flush and commit left out keys at end,
// also batch write containerKeyCountMap to the containerKeyCountTable
if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap,
containerKeyCountMap)) {
LOG.error("Unable to flush Container Key Count and " +
"remaining Container Key information to the DB");
// Step 5: Shutdown executor
executor.shutdown();
executor.awaitTermination(10, TimeUnit.MINUTES);

if (!allSuccessful) {
LOG.error("One or more threads failed during reprocess().");
return new ImmutablePair<>(getTaskName(), false);
}

LOG.debug("Completed 'reprocess' of ContainerKeyMapperTask.");
Instant end = Instant.now();
long duration = Duration.between(start, end).toMillis();
LOG.debug("It took me {} seconds to process {} keys.",
(double) duration / 1000.0, omKeyCount);
// Done, log total
long durationMillis = Duration.between(start, Instant.now()).toMillis();
LOG.info("Completed parallel reprocess in {} ms", durationMillis);
return new ImmutablePair<>(getTaskName(), true);

} catch (IOException ioEx) {
LOG.error("Unable to populate Container Key data in Recon DB. ",
ioEx);
LOG.error("Unable to re-init Recon DB.", ioEx);
return new ImmutablePair<>(getTaskName(), false);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("reprocess() was interrupted.", ie);
return new ImmutablePair<>(getTaskName(), false);
} catch (ExecutionException ee) {
LOG.error("Exception from parallel tasks in reprocess()", ee);
return new ImmutablePair<>(getTaskName(), false);
}
return new ImmutablePair<>(getTaskName(), true);
}

private boolean flushAndCommitContainerKeyInfoToDB(
Map<ContainerKeyPrefix, Integer> containerKeyMap,
Map<Long, Long> containerKeyCountMap) {
try {
// deleted container list is not needed since "reprocess" only has
// put operations
writeToTheDB(containerKeyMap, containerKeyCountMap,
Collections.emptyList());
containerKeyMap.clear();
containerKeyCountMap.clear();
} catch (IOException e) {
LOG.error("Unable to write Container Key and " +
"Container Key Count data in Recon DB.", e);
return false;
}
return true;
}

private boolean checkAndCallFlushToDB(
Map<ContainerKeyPrefix, Integer> containerKeyMap) {
// if containerKeyMap more than entries, flush to DB and clear the map
if (null != containerKeyMap && containerKeyMap.size() >=
containerKeyFlushToDBMaxThreshold) {
return flushAndCommitContainerKeyInfoToDB(containerKeyMap,
Collections.emptyMap());
}
return true;
}

@Override
Expand Down Expand Up @@ -208,7 +212,7 @@ public Pair<String, Boolean> process(OMUpdateEventBatch events) {
Map<Long, Long> containerKeyCountMap = new HashMap<>();
// List of the deleted (container, key) pair's
List<ContainerKeyPrefix> deletedKeyCountList = new ArrayList<>();
long startTime = System.currentTimeMillis();

while (eventIterator.hasNext()) {
OMDBUpdateEvent<String, OmKeyInfo> omdbUpdateEvent = eventIterator.next();
// Filter event inside process method to avoid duping
Expand Down Expand Up @@ -258,8 +262,8 @@ public Pair<String, Boolean> process(OMUpdateEventBatch events) {
LOG.error("Unable to write Container Key Prefix data in Recon DB.", e);
return new ImmutablePair<>(getTaskName(), false);
}
LOG.debug("{} successfully processed {} OM DB update event(s) in {} milliseconds.",
getTaskName(), eventCount, (System.currentTimeMillis() - startTime));
LOG.debug("{} successfully processed {} OM DB update event(s).",
getTaskName(), eventCount);
return new ImmutablePair<>(getTaskName(), true);
}

Expand Down Expand Up @@ -451,7 +455,7 @@ private void handlePutOMKeyEvent(String key, OmKeyInfo omKeyInfo,
* @param omKeyInfo omKeyInfo value
* @param containerKeyMap we keep the added containerKeys in this map
* to allow incremental batching to containerKeyTable
* @param containerKeyCountMap we keep the containerKey counts in this map
* @param containerKeyCountMap we keep the containerKey counts in this map
* to allow batching to containerKeyCountTable
* after reprocessing is done
* @throws IOException if unable to write to recon DB.
Expand Down
Loading