diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java index 42908a775a4a..742c4c942c5f 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java @@ -108,16 +108,23 @@ public ReconContainerMetadataManagerImpl(ReconDBProvider reconDBProvider, public void reinitWithNewContainerDataFromOm(Map containerKeyPrefixCounts) throws IOException { - // clear and re-init all container-related tables + LOG.info("Starting reinitialization of container data..."); + + Instant truncateStart = Instant.now(); + + // Clear and re-init all container-related tables truncateTable(this.containerKeyTable); truncateTable(this.keyContainerTable); truncateTable(this.containerKeyCountTable); + + long truncateDuration = Duration.between(truncateStart, Instant.now()).toMillis(); + LOG.info("Table truncation completed in {} ms", truncateDuration); + initializeTables(); if (containerKeyPrefixCounts != null) { KeyPrefixContainer tmpKeyPrefixContainer; - for (Map.Entry entry : - containerKeyPrefixCounts.entrySet()) { + for (Map.Entry entry : containerKeyPrefixCounts.entrySet()) { containerKeyTable.put(entry.getKey(), entry.getValue()); tmpKeyPrefixContainer = entry.getKey().toKeyPrefixContainer(); if (tmpKeyPrefixContainer != null) { @@ -126,7 +133,7 @@ public void reinitWithNewContainerDataFromOm(Map } } - // reset total count of containers to zero + // Reset total count of containers to zero storeContainerCount(0L); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java index f3b32a4d1908..55df1129bd2c 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java @@ -83,11 +83,16 @@ static void truncateTable(Table table) throws IOException { if (table == null) { return; } - try (TableIterator> - tableIterator = table.iterator()) { - while (tableIterator.hasNext()) { - KeyValue entry = tableIterator.next(); - table.delete(entry.getKey()); + try (TableIterator> iterator = table.iterator()) { + if (iterator.hasNext()) { + Object firstKey = iterator.next().getKey(); + Object lastKey = null; + while (iterator.hasNext()) { + lastKey = iterator.next().getKey(); + } + if (lastKey != null) { + table.deleteRange(firstKey, lastKey); // Efficient bulk deletion + } } } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java index 3202f6aa8bbb..3ce674b863f3 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.*; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -81,102 +82,105 @@ public ContainerKeyMapperTask(ReconContainerMetadataManager } /** - * Read Key -> ContainerId data from OM snapshot DB and write reverse map - * (container, key) -> count to Recon Container DB. + * Read Key -> ContainerId data from OM snapshot DB and write reverse map + * (container, key) -> count to Recon Container DB. */ @Override public Pair reprocess(OMMetadataManager omMetadataManager) { - long omKeyCount = 0; + LOG.info("Starting parallel reprocess of ContainerKeyMapperTask."); + + Instant start = Instant.now(); - // In-memory maps for fast look up and batch write - // (container, key) -> count - Map containerKeyMap = new HashMap<>(); - // containerId -> key count - Map containerKeyCountMap = new HashMap<>(); try { - LOG.debug("Starting a 'reprocess' run of ContainerKeyMapperTask."); - Instant start = Instant.now(); + // Step 1: Reset Recon DB before processing + reconContainerMetadataManager.reinitWithNewContainerDataFromOm(new HashMap<>()); + + // Step 2: Thread pool for parallel execution + int numThreads = 2; // Adjust based on performance requirements + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + + // Step 3: List of tasks for parallel execution + List> futures = new ArrayList<>(); + List layouts = Arrays.asList( + BucketLayout.LEGACY, BucketLayout.FILE_SYSTEM_OPTIMIZED + ); + + for (BucketLayout layout : layouts) { + futures.add(executor.submit(() -> { + LOG.info("Processing layout: {}", layout); + + Map containerKeyMap = new HashMap<>(); + Map containerKeyCountMap = new HashMap<>(); + + try { + Table omKeyInfoTable = omMetadataManager.getKeyTable(layout); + try (TableIterator> iter = + omKeyInfoTable.iterator()) { + + while (iter.hasNext()) { + Table.KeyValue kv = iter.next(); + handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap, containerKeyCountMap); + + // Partial flush if threshold is reached + if (containerKeyMap.size() >= containerKeyFlushToDBMaxThreshold) { + synchronized (this) { + writeToTheDB(containerKeyMap, containerKeyCountMap, Collections.emptyList()); + containerKeyMap.clear(); + containerKeyCountMap.clear(); + } + } + } + } - // initialize new container DB - reconContainerMetadataManager - .reinitWithNewContainerDataFromOm(new HashMap<>()); - - // loop over both key table and file table - for (BucketLayout layout : Arrays.asList(BucketLayout.LEGACY, - BucketLayout.FILE_SYSTEM_OPTIMIZED)) { - // (HDDS-8580) Since "reprocess" iterate over the whole key table, - // containerKeyMap needs to be incrementally flushed to DB based on - // configured batch threshold. - // containerKeyCountMap can be flushed at the end since the number - // of containers in a cluster will not have significant memory overhead. - Table omKeyInfoTable = - omMetadataManager.getKeyTable(layout); - try ( - TableIterator> - keyIter = omKeyInfoTable.iterator()) { - while (keyIter.hasNext()) { - Table.KeyValue kv = keyIter.next(); - OmKeyInfo omKeyInfo = kv.getValue(); - handleKeyReprocess(kv.getKey(), omKeyInfo, containerKeyMap, - containerKeyCountMap); - if (!checkAndCallFlushToDB(containerKeyMap)) { - LOG.error("Unable to flush containerKey information to the DB"); - return new ImmutablePair<>(getTaskName(), false); + // Final flush for any remaining data + if (!containerKeyMap.isEmpty()) { + synchronized (this) { + writeToTheDB(containerKeyMap, containerKeyCountMap, Collections.emptyList()); + } } - omKeyCount++; + + LOG.info("Completed layout: {}", layout); + return true; + } catch (Exception e) { + LOG.error("Error processing layout {}", layout, e); + return false; } + })); + } + + // Step 4: Wait for all tasks to finish + boolean allSuccessful = true; + for (Future f : futures) { + if (!f.get()) { + allSuccessful = false; } } - // flush and commit left out keys at end, - // also batch write containerKeyCountMap to the containerKeyCountTable - if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap, - containerKeyCountMap)) { - LOG.error("Unable to flush Container Key Count and " + - "remaining Container Key information to the DB"); + // Step 5: Shutdown executor + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.MINUTES); + + if (!allSuccessful) { + LOG.error("One or more threads failed during reprocess()."); return new ImmutablePair<>(getTaskName(), false); } - LOG.debug("Completed 'reprocess' of ContainerKeyMapperTask."); - Instant end = Instant.now(); - long duration = Duration.between(start, end).toMillis(); - LOG.debug("It took me {} seconds to process {} keys.", - (double) duration / 1000.0, omKeyCount); + // Done, log total + long durationMillis = Duration.between(start, Instant.now()).toMillis(); + LOG.info("Completed parallel reprocess in {} ms", durationMillis); + return new ImmutablePair<>(getTaskName(), true); + } catch (IOException ioEx) { - LOG.error("Unable to populate Container Key data in Recon DB. ", - ioEx); + LOG.error("Unable to re-init Recon DB.", ioEx); + return new ImmutablePair<>(getTaskName(), false); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.error("reprocess() was interrupted.", ie); + return new ImmutablePair<>(getTaskName(), false); + } catch (ExecutionException ee) { + LOG.error("Exception from parallel tasks in reprocess()", ee); return new ImmutablePair<>(getTaskName(), false); } - return new ImmutablePair<>(getTaskName(), true); - } - - private boolean flushAndCommitContainerKeyInfoToDB( - Map containerKeyMap, - Map containerKeyCountMap) { - try { - // deleted container list is not needed since "reprocess" only has - // put operations - writeToTheDB(containerKeyMap, containerKeyCountMap, - Collections.emptyList()); - containerKeyMap.clear(); - containerKeyCountMap.clear(); - } catch (IOException e) { - LOG.error("Unable to write Container Key and " + - "Container Key Count data in Recon DB.", e); - return false; - } - return true; - } - - private boolean checkAndCallFlushToDB( - Map containerKeyMap) { - // if containerKeyMap more than entries, flush to DB and clear the map - if (null != containerKeyMap && containerKeyMap.size() >= - containerKeyFlushToDBMaxThreshold) { - return flushAndCommitContainerKeyInfoToDB(containerKeyMap, - Collections.emptyMap()); - } - return true; } @Override @@ -208,7 +212,7 @@ public Pair process(OMUpdateEventBatch events) { Map containerKeyCountMap = new HashMap<>(); // List of the deleted (container, key) pair's List deletedKeyCountList = new ArrayList<>(); - long startTime = System.currentTimeMillis(); + while (eventIterator.hasNext()) { OMDBUpdateEvent omdbUpdateEvent = eventIterator.next(); // Filter event inside process method to avoid duping @@ -258,8 +262,8 @@ public Pair process(OMUpdateEventBatch events) { LOG.error("Unable to write Container Key Prefix data in Recon DB.", e); return new ImmutablePair<>(getTaskName(), false); } - LOG.debug("{} successfully processed {} OM DB update event(s) in {} milliseconds.", - getTaskName(), eventCount, (System.currentTimeMillis() - startTime)); + LOG.debug("{} successfully processed {} OM DB update event(s).", + getTaskName(), eventCount); return new ImmutablePair<>(getTaskName(), true); } @@ -451,7 +455,7 @@ private void handlePutOMKeyEvent(String key, OmKeyInfo omKeyInfo, * @param omKeyInfo omKeyInfo value * @param containerKeyMap we keep the added containerKeys in this map * to allow incremental batching to containerKeyTable - * @param containerKeyCountMap we keep the containerKey counts in this map + * @param containerKeyCountMap we keep the containerKey counts in this map * to allow batching to containerKeyCountTable * after reprocessing is done * @throws IOException if unable to write to recon DB.