diff --git a/hadoop-ozone/recon-codegen/src/main/java/org/apache/ozone/recon/schema/ContainerSchemaDefinition.java b/hadoop-ozone/recon-codegen/src/main/java/org/apache/ozone/recon/schema/ContainerSchemaDefinition.java index 6107c1eb3ada..0fffeb9edeff 100644 --- a/hadoop-ozone/recon-codegen/src/main/java/org/apache/ozone/recon/schema/ContainerSchemaDefinition.java +++ b/hadoop-ozone/recon-codegen/src/main/java/org/apache/ozone/recon/schema/ContainerSchemaDefinition.java @@ -29,6 +29,8 @@ import org.jooq.DSLContext; import org.jooq.impl.DSL; import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class used to create tables that are required for tracking containers. @@ -38,6 +40,8 @@ public class ContainerSchemaDefinition implements ReconSchemaDefinition { public static final String UNHEALTHY_CONTAINERS_TABLE_NAME = "UNHEALTHY_CONTAINERS"; + private static final Logger LOG = + LoggerFactory.getLogger(ContainerSchemaDefinition.class); /** * ENUM describing the allowed container states which can be stored in the @@ -68,6 +72,7 @@ public void initializeSchema() throws SQLException { Connection conn = dataSource.getConnection(); dslContext = DSL.using(conn); if (!TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) { + LOG.info("UNHEALTHY_CONTAINERS is missing creating new one."); createUnhealthyContainersTable(); } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java index ecd88f80995f..6927d77c0331 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java @@ -95,11 +95,14 @@ private ReconConstants() { // For file-size count reprocessing: ensure only one task truncates the table. public static final AtomicBoolean FILE_SIZE_COUNT_TABLE_TRUNCATED = new AtomicBoolean(false); + public static final AtomicBoolean CONTAINER_KEY_TABLES_TRUNCATED = new AtomicBoolean(false); + /** - * Resets the table-truncated flag for the given tables. This should be called once per reprocess cycle, + * Resets the table truncated flag for the given tables. This should be called once per reprocess cycle, * for example by the OM task controller, before the tasks run. */ public static void resetTableTruncatedFlags() { FILE_SIZE_COUNT_TABLE_TRUNCATED.set(false); + CONTAINER_KEY_TABLES_TRUNCATED.set(false); } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java index 5bbda0356217..ad2d25a29d4e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java @@ -58,7 +58,8 @@ import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider; import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl; import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; -import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskFSO; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskOBS; import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO; import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask; @@ -131,7 +132,8 @@ static class ReconOmTaskBindingModule extends AbstractModule { protected void configure() { Multibinder taskBinder = Multibinder.newSetBinder(binder(), ReconOmTask.class); - taskBinder.addBinding().to(ContainerKeyMapperTask.class); + taskBinder.addBinding().to(ContainerKeyMapperTaskFSO.class); + taskBinder.addBinding().to(ContainerKeyMapperTaskOBS.class); taskBinder.addBinding().to(FileSizeCountTaskFSO.class); taskBinder.addBinding().to(FileSizeCountTaskOBS.class); taskBinder.addBinding().to(OmTableInsightTask.class); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index c3fd5bb3592b..ce4baa604790 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -106,7 +106,7 @@ public final class ReconServerConfigKeys { public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY = "ozone.recon.task.thread.count"; - public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 5; + public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 8; public static final String OZONE_RECON_HTTP_AUTH_CONFIG_PREFIX = "ozone.recon.http.auth."; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java similarity index 50% rename from hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java rename to hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java index e42e021b9e45..7e5a02ff99a1 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java @@ -17,16 +17,10 @@ package org.apache.hadoop.ozone.recon.tasks; -import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE; -import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE; - -import com.google.inject.Inject; import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -34,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; @@ -43,7 +36,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; -import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; +import org.apache.hadoop.ozone.recon.ReconConstants; import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix; import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer; import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; @@ -51,147 +44,111 @@ import org.slf4j.LoggerFactory; /** - * Class to iterate over the OM DB and populate the Recon container DB with - * the container -> Key reverse mapping. + * Helper class that encapsulates the common logic for ContainerKeyMapperTaskFSO and ContainerKeyMapperTaskOBS. */ -public class ContainerKeyMapperTask implements ReconOmTask { - - private static final Logger LOG = - LoggerFactory.getLogger(ContainerKeyMapperTask.class); - - private ReconContainerMetadataManager reconContainerMetadataManager; - private final long containerKeyFlushToDBMaxThreshold; - - @Inject - public ContainerKeyMapperTask(ReconContainerMetadataManager - reconContainerMetadataManager, - OzoneConfiguration configuration) { - this.reconContainerMetadataManager = reconContainerMetadataManager; - this.containerKeyFlushToDBMaxThreshold = configuration.getLong( - ReconServerConfigKeys. - OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD, - ReconServerConfigKeys. - OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT - ); - } +public abstract class ContainerKeyMapperHelper { + + private static final Logger LOG = LoggerFactory.getLogger(ContainerKeyMapperHelper.class); + + // Static lock to guard table truncation. + private static final Object TRUNCATE_LOCK = new Object(); /** - * Read Key -> ContainerId data from OM snapshot DB and write reverse map - * (container, key) -> count to Recon Container DB. + * Ensures that the container key tables are truncated only once before reprocessing. + * Uses an AtomicBoolean to track if truncation has already been performed. + * + * @param reconContainerMetadataManager The metadata manager instance responsible for DB operations. */ - @Override - public TaskResult reprocess(OMMetadataManager omMetadataManager) { - long omKeyCount = 0; + public static void truncateTablesIfNeeded(ReconContainerMetadataManager reconContainerMetadataManager, + String taskName) { + synchronized (TRUNCATE_LOCK) { + if (ReconConstants.CONTAINER_KEY_TABLES_TRUNCATED.compareAndSet(false, true)) { + try { + // Perform table truncation + reconContainerMetadataManager.reinitWithNewContainerDataFromOm(Collections.emptyMap()); + LOG.debug("Successfully truncated container key tables."); + } catch (Exception e) { + // Reset the flag so truncation can be retried + ReconConstants.CONTAINER_KEY_TABLES_TRUNCATED.set(false); + LOG.error("Error while truncating container key tables for task {}. Resetting flag.", taskName, e); + throw new RuntimeException("Table truncation failed", e); + } + } else { + LOG.debug("Container key tables already truncated by another task."); + } + } + } - // In-memory maps for fast look up and batch write - // (container, key) -> count + public static boolean reprocess(OMMetadataManager omMetadataManager, + ReconContainerMetadataManager reconContainerMetadataManager, + BucketLayout bucketLayout, + String taskName, + long containerKeyFlushToDBMaxThreshold) { + long omKeyCount = 0; Map containerKeyMap = new HashMap<>(); - // containerId -> key count Map containerKeyCountMap = new HashMap<>(); + try { - LOG.debug("Starting a 'reprocess' run of ContainerKeyMapperTask."); + LOG.debug("Starting a 'reprocess' run for {}.", taskName); Instant start = Instant.now(); - // initialize new container DB - reconContainerMetadataManager - .reinitWithNewContainerDataFromOm(new HashMap<>()); - - // loop over both key table and file table - for (BucketLayout layout : Arrays.asList(BucketLayout.LEGACY, - BucketLayout.FILE_SYSTEM_OPTIMIZED)) { - // (HDDS-8580) Since "reprocess" iterate over the whole key table, - // containerKeyMap needs to be incrementally flushed to DB based on - // configured batch threshold. - // containerKeyCountMap can be flushed at the end since the number - // of containers in a cluster will not have significant memory overhead. - Table omKeyInfoTable = - omMetadataManager.getKeyTable(layout); - try ( - TableIterator> - keyIter = omKeyInfoTable.iterator()) { - while (keyIter.hasNext()) { - Table.KeyValue kv = keyIter.next(); - OmKeyInfo omKeyInfo = kv.getValue(); - handleKeyReprocess(kv.getKey(), omKeyInfo, containerKeyMap, - containerKeyCountMap); - if (!checkAndCallFlushToDB(containerKeyMap)) { - LOG.error("Unable to flush containerKey information to the DB"); - return buildTaskResult(false); - } - omKeyCount++; + // Ensure the tables are truncated only once + truncateTablesIfNeeded(reconContainerMetadataManager, taskName); + + // Get the appropriate table based on BucketLayout + Table omKeyInfoTable = omMetadataManager.getKeyTable(bucketLayout); + + // Iterate through the table and process keys + try (TableIterator> keyIter = omKeyInfoTable.iterator()) { + while (keyIter.hasNext()) { + Table.KeyValue kv = keyIter.next(); + handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap, containerKeyCountMap, + reconContainerMetadataManager); + omKeyCount++; + + // Check and flush data if it reaches the batch threshold + if (!checkAndCallFlushToDB(containerKeyMap, containerKeyFlushToDBMaxThreshold, + reconContainerMetadataManager)) { + LOG.error("Failed to flush container key data for {}", taskName); + return false; } } } - // flush and commit left out keys at end, - // also batch write containerKeyCountMap to the containerKeyCountTable - if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap, - containerKeyCountMap)) { - LOG.error("Unable to flush Container Key Count and " + - "remaining Container Key information to the DB"); - return buildTaskResult(false); + // Final flush and commit + if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap, containerKeyCountMap, reconContainerMetadataManager)) { + LOG.error("Failed to flush Container Key data to DB for {}", taskName); + return false; } - LOG.debug("Completed 'reprocess' of ContainerKeyMapperTask."); Instant end = Instant.now(); - long duration = Duration.between(start, end).toMillis(); - LOG.debug("It took me {} seconds to process {} keys.", - (double) duration / 1000.0, omKeyCount); - } catch (IOException ioEx) { - LOG.error("Unable to populate Container Key data in Recon DB. ", - ioEx); - return buildTaskResult(false); - } - return buildTaskResult(true); - } + long durationMillis = Duration.between(start, end).toMillis(); + double durationSeconds = (double) durationMillis / 1000.0; + LOG.debug("Completed 'reprocess' for {}. Processed {} keys in {} ms ({} seconds).", + taskName, omKeyCount, durationMillis, durationSeconds); - private boolean flushAndCommitContainerKeyInfoToDB( - Map containerKeyMap, - Map containerKeyCountMap) { - try { - // deleted container list is not needed since "reprocess" only has - // put operations - writeToTheDB(containerKeyMap, containerKeyCountMap, - Collections.emptyList()); - containerKeyMap.clear(); - containerKeyCountMap.clear(); - } catch (IOException e) { - LOG.error("Unable to write Container Key and " + - "Container Key Count data in Recon DB.", e); + } catch (IOException ioEx) { + LOG.error("Error populating Container Key data for {} in Recon DB.", taskName, ioEx); return false; } return true; } - private boolean checkAndCallFlushToDB( - Map containerKeyMap) { - // if containerKeyMap more than entries, flush to DB and clear the map - if (null != containerKeyMap && containerKeyMap.size() >= - containerKeyFlushToDBMaxThreshold) { - return flushAndCommitContainerKeyInfoToDB(containerKeyMap, - Collections.emptyMap()); + private static boolean checkAndCallFlushToDB(Map containerKeyMap, + long containerKeyFlushToDBMaxThreshold, + ReconContainerMetadataManager reconContainerMetadataManager) { + if (containerKeyMap.size() >= containerKeyFlushToDBMaxThreshold) { + return flushAndCommitContainerKeyInfoToDB(containerKeyMap, Collections.emptyMap(), reconContainerMetadataManager); } return true; } - @Override - public String getTaskName() { - return "ContainerKeyMapperTask"; - } - - public Collection getTaskTables() { - List taskTables = new ArrayList<>(); - taskTables.add(KEY_TABLE); - taskTables.add(FILE_TABLE); - return taskTables; - } - - @Override - public TaskResult process(OMUpdateEventBatch events, - Map subTaskSeekPosMap) { + public static boolean process(OMUpdateEventBatch events, + String tableName, + ReconContainerMetadataManager reconContainerMetadataManager, + String taskName) { Iterator eventIterator = events.getIterator(); int eventCount = 0; - final Collection taskTables = getTaskTables(); // In-memory maps for fast look up and batch write // (HDDS-8580) containerKeyMap map is allowed to be used @@ -205,10 +162,11 @@ public TaskResult process(OMUpdateEventBatch events, // List of the deleted (container, key) pair's List deletedKeyCountList = new ArrayList<>(); long startTime = System.currentTimeMillis(); + while (eventIterator.hasNext()) { OMDBUpdateEvent omdbUpdateEvent = eventIterator.next(); // Filter event inside process method to avoid duping - if (!taskTables.contains(omdbUpdateEvent.getTable())) { + if (!tableName.equals(omdbUpdateEvent.getTable())) { continue; } String updatedKey = omdbUpdateEvent.getKey(); @@ -217,92 +175,108 @@ public TaskResult process(OMUpdateEventBatch events, switch (omdbUpdateEvent.getAction()) { case PUT: handlePutOMKeyEvent(updatedKey, updatedKeyValue, containerKeyMap, - containerKeyCountMap, deletedKeyCountList); + containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); break; case DELETE: handleDeleteOMKeyEvent(updatedKey, containerKeyMap, - containerKeyCountMap, deletedKeyCountList); + containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); break; case UPDATE: if (omdbUpdateEvent.getOldValue() != null) { handleDeleteOMKeyEvent( omdbUpdateEvent.getOldValue().getKeyName(), containerKeyMap, - containerKeyCountMap, deletedKeyCountList); + containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); } else { - LOG.warn("Update event does not have the old Key Info for {}.", - updatedKey); + LOG.warn("Update event does not have the old Key Info for {}.", updatedKey); } handlePutOMKeyEvent(updatedKey, updatedKeyValue, containerKeyMap, - containerKeyCountMap, deletedKeyCountList); + containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); break; - default: LOG.debug("Skipping DB update event : {}", - omdbUpdateEvent.getAction()); + default: + LOG.info("Skipping DB update event: {}", omdbUpdateEvent.getAction()); } eventCount++; } catch (IOException e) { - LOG.error("Unexpected exception while updating key data : {} ", - updatedKey, e); - return buildTaskResult(false); + LOG.error("Unexpected exception while updating key data: {} ", updatedKey, e); + return false; } } try { - writeToTheDB(containerKeyMap, containerKeyCountMap, deletedKeyCountList); + writeToTheDB(containerKeyMap, containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); } catch (IOException e) { LOG.error("Unable to write Container Key Prefix data in Recon DB.", e); - return buildTaskResult(false); + return false; } LOG.debug("{} successfully processed {} OM DB update event(s) in {} milliseconds.", - getTaskName(), eventCount, (System.currentTimeMillis() - startTime)); - return buildTaskResult(true); + taskName, eventCount, (System.currentTimeMillis() - startTime)); + return true; } - private void writeToTheDB(Map containerKeyMap, - Map containerKeyCountMap, - List deletedContainerKeyList) + /** + * Note to add an OM key and update containerID -> no. of keys count. + * + * @param key key String + * @param omKeyInfo omKeyInfo value + * @param containerKeyMap we keep the added containerKeys in this map + * (in this batch) + * @param containerKeyCountMap we keep the containerKey counts in this map + * @param deletedContainerKeyList list of the deleted containerKeys + * @throws IOException if unable to write to recon DB. + */ + private static void handlePutOMKeyEvent(String key, OmKeyInfo omKeyInfo, + Map containerKeyMap, + Map containerKeyCountMap, + List deletedContainerKeyList, + ReconContainerMetadataManager reconContainerMetadataManager) throws IOException { - try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { - containerKeyMap.keySet().forEach((ContainerKeyPrefix key) -> { - try { - reconContainerMetadataManager - .batchStoreContainerKeyMapping(rdbBatchOperation, key, - containerKeyMap.get(key)); - } catch (IOException e) { - LOG.error("Unable to write Container Key Prefix data in Recon DB.", - e); - } - }); + long containerCountToIncrement = 0; + for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo.getKeyLocationVersions()) { + long keyVersion = omKeyLocationInfoGroup.getVersion(); + for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup.getLocationList()) { + long containerId = omKeyLocationInfo.getContainerID(); + ContainerKeyPrefix containerKeyPrefix = ContainerKeyPrefix.get(containerId, key, keyVersion); + if (reconContainerMetadataManager.getCountForContainerKeyPrefix(containerKeyPrefix) == 0 && + !containerKeyMap.containsKey(containerKeyPrefix)) { + // Save on writes. No need to save same container-key prefix + // mapping again. + containerKeyMap.put(containerKeyPrefix, 1); + // Remove the container-key prefix from the deleted list if we + // previously deleted it in this batch (and now we add it again) + deletedContainerKeyList.remove(containerKeyPrefix); + // check if container already exists and + // increment the count of containers if it does not exist + if (!reconContainerMetadataManager.doesContainerExists(containerId) && + !containerKeyCountMap.containsKey(containerId)) { + containerCountToIncrement++; + } - containerKeyCountMap.keySet().forEach((Long key) -> { - try { - reconContainerMetadataManager - .batchStoreContainerKeyCounts(rdbBatchOperation, key, - containerKeyCountMap.get(key)); - } catch (IOException e) { - LOG.error("Unable to write Container Key Prefix data in Recon DB.", - e); - } - }); + // update the count of keys for the given containerID + long keyCount; + if (containerKeyCountMap.containsKey(containerId)) { + keyCount = containerKeyCountMap.get(containerId); + } else { + keyCount = reconContainerMetadataManager.getKeyCountForContainer(containerId); + } - deletedContainerKeyList.forEach((ContainerKeyPrefix key) -> { - try { - reconContainerMetadataManager - .batchDeleteContainerMapping(rdbBatchOperation, key); - } catch (IOException e) { - LOG.error("Unable to write Container Key Prefix data in Recon DB.", - e); + // increment the count and update containerKeyCount. + // keyCount will be 0 if containerID is not found. So, there is no + // need to initialize keyCount for the first time. + containerKeyCountMap.put(containerId, ++keyCount); } - }); + } + } - reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation); + if (containerCountToIncrement > 0) { + reconContainerMetadataManager.incrementContainerCountBy(containerCountToIncrement); } } /** - * Note to delete an OM Key and update the containerID -> no. of keys counts + * Note to delete an OM Key and update the containerID -> no. of keys counts * (we are preparing for batch deletion in these data structures). * * @param key key String. @@ -312,24 +286,21 @@ private void writeToTheDB(Map containerKeyMap, * @param deletedContainerKeyList list of the deleted containerKeys * @throws IOException If Unable to write to container DB. */ - private void handleDeleteOMKeyEvent(String key, - Map - containerKeyMap, - Map containerKeyCountMap, - List - deletedContainerKeyList) + private static void handleDeleteOMKeyEvent(String key, + Map containerKeyMap, + Map containerKeyCountMap, + List deletedContainerKeyList, + ReconContainerMetadataManager reconContainerMetadataManager) throws IOException { Set keysToBeDeleted = new HashSet<>(); - try (TableIterator> keyContainerIterator = - reconContainerMetadataManager.getKeyContainerTableIterator()) { + try (TableIterator> + keyContainerIterator = reconContainerMetadataManager.getKeyContainerTableIterator()) { // Check if we have keys in this container in the DB keyContainerIterator.seek(KeyPrefixContainer.get(key)); while (keyContainerIterator.hasNext()) { - Table.KeyValue keyValue = - keyContainerIterator.next(); + Table.KeyValue keyValue = keyContainerIterator.next(); String keyPrefix = keyValue.getKey().getKeyPrefix(); if (keyPrefix.equals(key)) { if (keyValue.getKey().getContainerId() != -1) { @@ -342,13 +313,12 @@ private void handleDeleteOMKeyEvent(String key, } // Check if we have keys in this container in our containerKeyMap - containerKeyMap.keySet() - .forEach((ContainerKeyPrefix containerKeyPrefix) -> { - String keyPrefix = containerKeyPrefix.getKeyPrefix(); - if (keyPrefix.equals(key)) { - keysToBeDeleted.add(containerKeyPrefix); - } - }); + containerKeyMap.keySet().forEach((ContainerKeyPrefix containerKeyPrefix) -> { + String keyPrefix = containerKeyPrefix.getKeyPrefix(); + if (keyPrefix.equals(key)) { + keysToBeDeleted.add(containerKeyPrefix); + } + }); for (ContainerKeyPrefix containerKeyPrefix : keysToBeDeleted) { deletedContainerKeyList.add(containerKeyPrefix); @@ -356,14 +326,13 @@ private void handleDeleteOMKeyEvent(String key, // it in this batch (and now we delete it) containerKeyMap.remove(containerKeyPrefix); - // decrement count and update containerKeyCount. + // Decrement count and update containerKeyCount. Long containerID = containerKeyPrefix.getContainerId(); long keyCount; if (containerKeyCountMap.containsKey(containerID)) { keyCount = containerKeyCountMap.get(containerID); } else { - keyCount = reconContainerMetadataManager - .getKeyCountForContainer(containerID); + keyCount = reconContainerMetadataManager.getKeyCountForContainer(containerID); } if (keyCount > 0) { containerKeyCountMap.put(containerID, --keyCount); @@ -371,130 +340,121 @@ private void handleDeleteOMKeyEvent(String key, } } - /** - * Note to add an OM key and update containerID -> no. of keys count. - * - * @param key key String - * @param omKeyInfo omKeyInfo value - * @param containerKeyMap we keep the added containerKeys in this map - * (in this batch) - * @param containerKeyCountMap we keep the containerKey counts in this map - * @param deletedContainerKeyList list of the deleted containerKeys - * @throws IOException if unable to write to recon DB. - */ - private void handlePutOMKeyEvent(String key, OmKeyInfo omKeyInfo, - Map - containerKeyMap, + private static void writeToTheDB(Map containerKeyMap, Map containerKeyCountMap, - List - deletedContainerKeyList) + List deletedContainerKeyList, + ReconContainerMetadataManager reconContainerMetadataManager) throws IOException { - long containerCountToIncrement = 0; - for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo - .getKeyLocationVersions()) { - long keyVersion = omKeyLocationInfoGroup.getVersion(); - for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup - .getLocationList()) { - long containerId = omKeyLocationInfo.getContainerID(); - ContainerKeyPrefix containerKeyPrefix = ContainerKeyPrefix.get( - containerId, key, keyVersion); - if (reconContainerMetadataManager.getCountForContainerKeyPrefix( - containerKeyPrefix) == 0 - && !containerKeyMap.containsKey(containerKeyPrefix)) { - // Save on writes. No need to save same container-key prefix - // mapping again. - containerKeyMap.put(containerKeyPrefix, 1); - // Remove the container-key prefix from the deleted list if we - // previously deleted it in this batch (and now we add it again) - deletedContainerKeyList.remove(containerKeyPrefix); + try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { - // check if container already exists and - // increment the count of containers if it does not exist - if (!reconContainerMetadataManager.doesContainerExists(containerId) - && !containerKeyCountMap.containsKey(containerId)) { - containerCountToIncrement++; - } + // Write container key mappings + containerKeyMap.keySet().forEach((ContainerKeyPrefix key) -> { + try { + reconContainerMetadataManager.batchStoreContainerKeyMapping( + rdbBatchOperation, key, containerKeyMap.get(key)); + } catch (IOException e) { + LOG.error("Unable to write Container Key Prefix data in Recon DB.", e); + } + }); - // update the count of keys for the given containerID - long keyCount; - if (containerKeyCountMap.containsKey(containerId)) { - keyCount = containerKeyCountMap.get(containerId); - } else { - keyCount = reconContainerMetadataManager - .getKeyCountForContainer(containerId); - } + // Write container key count mappings + containerKeyCountMap.keySet().forEach((Long key) -> { + try { + reconContainerMetadataManager.batchStoreContainerKeyCounts( + rdbBatchOperation, key, containerKeyCountMap.get(key)); + } catch (IOException e) { + LOG.error("Unable to write Container Key Count data in Recon DB.", e); + } + }); - // increment the count and update containerKeyCount. - // keyCount will be 0 if containerID is not found. So, there is no - // need to initialize keyCount for the first time. - containerKeyCountMap.put(containerId, ++keyCount); + // Delete container key mappings + deletedContainerKeyList.forEach((ContainerKeyPrefix key) -> { + try { + reconContainerMetadataManager.batchDeleteContainerMapping( + rdbBatchOperation, key); + } catch (IOException e) { + LOG.error("Unable to delete Container Key Prefix data in Recon DB.", e); } - } - } + }); - if (containerCountToIncrement > 0) { - reconContainerMetadataManager - .incrementContainerCountBy(containerCountToIncrement); + // Commit batch operation + reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation); } } /** - * Write an OM key to container DB and update containerID -> no. of keys + * Write an OM key to container DB and update containerID -> no. of keys * count to the Global Stats table. * * @param key key String * @param omKeyInfo omKeyInfo value * @param containerKeyMap we keep the added containerKeys in this map * to allow incremental batching to containerKeyTable - * @param containerKeyCountMap we keep the containerKey counts in this map + * @param containerKeyCountMap we keep the containerKey counts in this map * to allow batching to containerKeyCountTable * after reprocessing is done + * @param reconContainerMetadataManager Recon metadata manager instance * @throws IOException if unable to write to recon DB. */ - private void handleKeyReprocess(String key, - OmKeyInfo omKeyInfo, - Map - containerKeyMap, - Map containerKeyCountMap) + public static void handleKeyReprocess(String key, + OmKeyInfo omKeyInfo, + Map containerKeyMap, + Map containerKeyCountMap, + ReconContainerMetadataManager reconContainerMetadataManager) throws IOException { + long containerCountToIncrement = 0; - for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo - .getKeyLocationVersions()) { + + for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo.getKeyLocationVersions()) { long keyVersion = omKeyLocationInfoGroup.getVersion(); - for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup - .getLocationList()) { + for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup.getLocationList()) { long containerId = omKeyLocationInfo.getContainerID(); - ContainerKeyPrefix containerKeyPrefix = ContainerKeyPrefix.get( - containerId, key, keyVersion); - if (reconContainerMetadataManager.getCountForContainerKeyPrefix( - containerKeyPrefix) == 0 + ContainerKeyPrefix containerKeyPrefix = ContainerKeyPrefix.get(containerId, key, keyVersion); + + if (reconContainerMetadataManager.getCountForContainerKeyPrefix(containerKeyPrefix) == 0 && !containerKeyMap.containsKey(containerKeyPrefix)) { - // Save on writes. No need to save same container-key prefix - // mapping again. + // Save on writes. No need to save same container-key prefix mapping again. containerKeyMap.put(containerKeyPrefix, 1); - // check if container already exists and - // if it exists, update the count of keys for the given containerID - // else, increment the count of containers and initialize keyCount - long keyCount; - if (containerKeyCountMap.containsKey(containerId)) { - keyCount = containerKeyCountMap.get(containerId); - } else { + // Check if container already exists; if not, increment the count + if (!reconContainerMetadataManager.doesContainerExists(containerId) + && !containerKeyCountMap.containsKey(containerId)) { containerCountToIncrement++; - keyCount = 0; } - // increment the count and update containerKeyCount. - containerKeyCountMap.put(containerId, ++keyCount); + // Update the count of keys for the given containerID + long keyCount = containerKeyCountMap.getOrDefault(containerId, + reconContainerMetadataManager.getKeyCountForContainer(containerId)); + + containerKeyCountMap.put(containerId, keyCount + 1); } } } if (containerCountToIncrement > 0) { - reconContainerMetadataManager - .incrementContainerCountBy(containerCountToIncrement); + reconContainerMetadataManager.incrementContainerCountBy(containerCountToIncrement); } } + public static boolean flushAndCommitContainerKeyInfoToDB( + Map containerKeyMap, + Map containerKeyCountMap, + ReconContainerMetadataManager reconContainerMetadataManager) { + + try { + // No deleted container list needed since "reprocess" only has put operations + writeToTheDB(containerKeyMap, containerKeyCountMap, Collections.emptyList(), reconContainerMetadataManager); + + // Clear in-memory maps after successful commit + containerKeyMap.clear(); + containerKeyCountMap.clear(); + + } catch (IOException e) { + LOG.error("Unable to write Container Key and Container Key Count data in Recon DB.", e); + return false; + } + return true; + } + } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java new file mode 100644 index 000000000000..fd8df1a1e4f4 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import com.google.inject.Inject; +import java.util.Map; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; +import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; + +/** + * Task for processing ContainerKey mapping specifically for FSO buckets. + */ +public class ContainerKeyMapperTaskFSO implements ReconOmTask { + + private final ReconContainerMetadataManager reconContainerMetadataManager; + private final OzoneConfiguration ozoneConfiguration; + + @Inject + public ContainerKeyMapperTaskFSO(ReconContainerMetadataManager reconContainerMetadataManager, + OzoneConfiguration configuration) { + this.reconContainerMetadataManager = reconContainerMetadataManager; + this.ozoneConfiguration = configuration; + } + + @Override + public TaskResult reprocess(OMMetadataManager omMetadataManager) { + long containerKeyFlushToDBMaxThreshold = ozoneConfiguration.getLong( + ReconServerConfigKeys.OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD, + ReconServerConfigKeys.OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); + boolean result = ContainerKeyMapperHelper.reprocess( + omMetadataManager, reconContainerMetadataManager, + BucketLayout.FILE_SYSTEM_OPTIMIZED, getTaskName(), containerKeyFlushToDBMaxThreshold); + return buildTaskResult(result); + } + + @Override + public String getTaskName() { + return "ContainerKeyMapperTaskFSO"; + } + + @Override + public TaskResult process(OMUpdateEventBatch events, Map subTaskSeekPosMap) { + boolean result = + ContainerKeyMapperHelper.process(events, "fileTable", reconContainerMetadataManager, getTaskName()); + return buildTaskResult(result); + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java new file mode 100644 index 000000000000..178ee8b02867 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import com.google.inject.Inject; +import java.util.Map; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; +import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; + +/** + * Task for processing ContainerKey mapping specifically for OBS buckets. + */ +public class ContainerKeyMapperTaskOBS implements ReconOmTask { + + private final ReconContainerMetadataManager reconContainerMetadataManager; + private final OzoneConfiguration ozoneConfiguration; + + @Inject + public ContainerKeyMapperTaskOBS(ReconContainerMetadataManager reconContainerMetadataManager, + OzoneConfiguration configuration) { + this.reconContainerMetadataManager = reconContainerMetadataManager; + this.ozoneConfiguration = configuration; + } + + @Override + public TaskResult reprocess(OMMetadataManager omMetadataManager) { + long containerKeyFlushToDBMaxThreshold = ozoneConfiguration.getLong( + ReconServerConfigKeys.OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD, + ReconServerConfigKeys.OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); + boolean result = ContainerKeyMapperHelper.reprocess( + omMetadataManager, reconContainerMetadataManager, BucketLayout.OBJECT_STORE, getTaskName(), + containerKeyFlushToDBMaxThreshold); + return buildTaskResult(result); + } + + @Override + public String getTaskName() { + return "ContainerKeyMapperTaskOBS"; + } + + @Override + public TaskResult process(OMUpdateEventBatch events, Map subTaskSeekPosMap) { + boolean result = ContainerKeyMapperHelper.process(events, "keyTable", reconContainerMetadataManager, getTaskName()); + return buildTaskResult(result); + } +} diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java index 9353f406a779..76566b6f9eac 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java @@ -99,7 +99,8 @@ import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; -import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskFSO; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskOBS; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithFSO; import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; import org.apache.ozone.recon.schema.generated.tables.pojos.UnhealthyContainers; @@ -297,10 +298,13 @@ public void setUp() throws Exception { } private void reprocessContainerKeyMapper() { - ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(reconContainerMetadataManager, - omConfiguration); - containerKeyMapperTask.reprocess(reconOMMetadataManager); + ContainerKeyMapperTaskOBS containerKeyMapperTaskOBS = + new ContainerKeyMapperTaskOBS(reconContainerMetadataManager, omConfiguration); + containerKeyMapperTaskOBS.reprocess(reconOMMetadataManager); + + ContainerKeyMapperTaskFSO containerKeyMapperTaskFSO = + new ContainerKeyMapperTaskFSO(reconContainerMetadataManager, omConfiguration); + containerKeyMapperTaskFSO.reprocess(reconOMMetadataManager); } private void setUpFSOData() throws IOException { diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java index aea63b41000e..eb45c6abe847 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java @@ -82,7 +82,7 @@ import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; -import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskOBS; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithFSO; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithLegacy; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithOBS; @@ -387,8 +387,8 @@ private void setUpOmData() throws Exception { when(tableMock.getName()).thenReturn("KeyTable"); when(omMetadataManagerMock.getKeyTable(getBucketLayout())) .thenReturn(tableMock); - ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(reconContainerMetadataManager, + ContainerKeyMapperTaskOBS containerKeyMapperTask = + new ContainerKeyMapperTaskOBS(reconContainerMetadataManager, ozoneConfiguration); containerKeyMapperTask.reprocess(reconOMMetadataManager); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java index 36b335c1b46b..fb31537ec7fb 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java @@ -132,10 +132,10 @@ public void testKeyTableReprocess() throws Exception { VOLUME_NAME, Collections.singletonList(omKeyLocationInfoGroup)); - ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(reconContainerMetadataManager, + ContainerKeyMapperTaskOBS containerKeyMapperTaskOBS = + new ContainerKeyMapperTaskOBS(reconContainerMetadataManager, omConfiguration); - containerKeyMapperTask.reprocess(reconOMMetadataManager); + containerKeyMapperTaskOBS.reprocess(reconOMMetadataManager); keyPrefixesForContainer = reconContainerMetadataManager.getKeyPrefixesForContainer(1); @@ -205,10 +205,10 @@ public void testFileTableReprocess() throws Exception { KEY_ONE_SIZE); // Reprocess container key mappings - ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(reconContainerMetadataManager, + ContainerKeyMapperTaskFSO containerKeyMapperTaskFSO = + new ContainerKeyMapperTaskFSO(reconContainerMetadataManager, omConfiguration); - containerKeyMapperTask.reprocess(reconOMMetadataManager); + containerKeyMapperTaskFSO.reprocess(reconOMMetadataManager); // Check the key prefixes for container 1 keyPrefixesForContainer = @@ -314,10 +314,10 @@ public void testKeyTableProcess() throws IOException { add(keyEvent2); }}, 0L); - ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(reconContainerMetadataManager, + ContainerKeyMapperTaskOBS containerKeyMapperTaskOBS = + new ContainerKeyMapperTaskOBS(reconContainerMetadataManager, omConfiguration); - containerKeyMapperTask.reprocess(reconOMMetadataManager); + containerKeyMapperTaskOBS.reprocess(reconOMMetadataManager); keyPrefixesForContainer = reconContainerMetadataManager .getKeyPrefixesForContainer(1); @@ -336,7 +336,7 @@ public void testKeyTableProcess() throws IOException { assertEquals(1, reconContainerMetadataManager.getKeyCountForContainer(3L)); // Process PUT & DELETE event. - containerKeyMapperTask.process(omUpdateEventBatch, Collections.emptyMap()); + containerKeyMapperTaskOBS.process(omUpdateEventBatch, Collections.emptyMap()); keyPrefixesForContainer = reconContainerMetadataManager .getKeyPrefixesForContainer(1); @@ -384,8 +384,8 @@ public void testFileTableProcess() throws Exception { new OmKeyLocationInfoGroup(0L, omKeyLocationInfoList); // Reprocess container key mappings - ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(reconContainerMetadataManager, + ContainerKeyMapperTaskFSO containerKeyMapperTaskFSO = + new ContainerKeyMapperTaskFSO(reconContainerMetadataManager, omConfiguration); String bucket = BUCKET_NAME; @@ -427,7 +427,7 @@ public void testFileTableProcess() throws Exception { }, 0L); // Process PUT event for both the keys - containerKeyMapperTask.process(omUpdateEventBatch, Collections.emptyMap()); + containerKeyMapperTaskFSO.process(omUpdateEventBatch, Collections.emptyMap()); keyPrefixesForContainer = reconContainerMetadataManager .getKeyPrefixesForContainer(1); @@ -460,7 +460,7 @@ public void testFileTableProcess() throws Exception { }, 0L); // Process DELETE event for key2 - containerKeyMapperTask.process(omUpdateEventBatch2, Collections.emptyMap()); + containerKeyMapperTaskFSO.process(omUpdateEventBatch2, Collections.emptyMap()); keyPrefixesForContainer = reconContainerMetadataManager .getKeyPrefixesForContainer(1);