diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java index 22b9d969bc24..ecd88f80995f 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java @@ -17,6 +17,8 @@ package org.apache.hadoop.ozone.recon; +import java.util.concurrent.atomic.AtomicBoolean; + /** * Recon Server constants file. */ @@ -89,4 +91,15 @@ private ReconConstants() { (double) MAX_CONTAINER_SIZE_UPPER_BOUND / MIN_CONTAINER_SIZE_UPPER_BOUND) / Math.log(2)) + 1; + + // For file-size count reprocessing: ensure only one task truncates the table. + public static final AtomicBoolean FILE_SIZE_COUNT_TABLE_TRUNCATED = new AtomicBoolean(false); + + /** + * Resets the table-truncated flag for the given tables. This should be called once per reprocess cycle, + * for example by the OM task controller, before the tasks run. + */ + public static void resetTableTruncatedFlags() { + FILE_SIZE_COUNT_TABLE_TRUNCATED.set(false); + } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java index 42ab9a47163e..127cbfcf9ed7 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java @@ -30,6 +30,8 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; @@ -57,7 +59,8 @@ import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl; import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask; -import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask; +import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO; +import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask; import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask; import org.apache.hadoop.ozone.recon.tasks.ReconOmTask; @@ -129,12 +132,19 @@ protected void configure() { Multibinder taskBinder = Multibinder.newSetBinder(binder(), ReconOmTask.class); taskBinder.addBinding().to(ContainerKeyMapperTask.class); - taskBinder.addBinding().to(FileSizeCountTask.class); + taskBinder.addBinding().to(FileSizeCountTaskFSO.class); + taskBinder.addBinding().to(FileSizeCountTaskOBS.class); taskBinder.addBinding().to(OmTableInsightTask.class); taskBinder.addBinding().to(NSSummaryTask.class); } } + @Provides + @Singleton + public ExecutorService provideReconExecutorService() { + return Executors.newFixedThreadPool(5); + } + /** * Class that has all the DAO bindings in Recon. */ diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java new file mode 100644 index 000000000000..f40a859590b0 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import com.google.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition; +import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao; +import org.jooq.DSLContext; + +/** + * Task for FileSystemOptimized (FSO) which processes the FILE_TABLE. + */ +public class FileSizeCountTaskFSO implements ReconOmTask { + + private final FileCountBySizeDao fileCountBySizeDao; + private final DSLContext dslContext; + + @Inject + public FileSizeCountTaskFSO(FileCountBySizeDao fileCountBySizeDao, + UtilizationSchemaDefinition utilizationSchemaDefinition) { + this.fileCountBySizeDao = fileCountBySizeDao; + this.dslContext = utilizationSchemaDefinition.getDSLContext(); + } + + @Override + public Pair reprocess(OMMetadataManager omMetadataManager) { + return FileSizeCountTaskHelper.reprocess( + omMetadataManager, + dslContext, + fileCountBySizeDao, + BucketLayout.FILE_SYSTEM_OPTIMIZED, + getTaskName() + ); + } + + @Override + public Pair process(OMUpdateEventBatch events) { + // This task listens only on the FILE_TABLE. + return FileSizeCountTaskHelper.processEvents( + events, + OmMetadataManagerImpl.FILE_TABLE, + dslContext, + fileCountBySizeDao, + getTaskName()); + } + + @Override + public String getTaskName() { + return "FileSizeCountTaskFSO"; + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java new file mode 100644 index 000000000000..489449d6a98e --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.recon.ReconConstants; +import org.apache.hadoop.ozone.recon.ReconUtils; +import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao; +import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize; +import org.jooq.DSLContext; +import org.jooq.Record3; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class that encapsulates the common code for file size count tasks. + */ +public abstract class FileSizeCountTaskHelper { + protected static final Logger LOG = LoggerFactory.getLogger(FileSizeCountTaskHelper.class); + + // Static lock to guard table truncation. + private static final Object TRUNCATE_LOCK = new Object(); + + /** + * Truncates the FILE_COUNT_BY_SIZE table if it has not been truncated yet. + * This method synchronizes on a static lock to ensure only one task truncates at a time. + * If an error occurs, the flag is reset to allow retrying the truncation. + * + * @param dslContext DSLContext for executing DB commands. + */ + public static void truncateTableIfNeeded(DSLContext dslContext) { + synchronized (TRUNCATE_LOCK) { + if (ReconConstants.FILE_SIZE_COUNT_TABLE_TRUNCATED.compareAndSet(false, true)) { + try { + int execute = dslContext.delete(FILE_COUNT_BY_SIZE).execute(); + LOG.info("Deleted {} records from {}", execute, FILE_COUNT_BY_SIZE); + } catch (Exception e) { + // Reset the flag so that truncation can be retried + ReconConstants.FILE_SIZE_COUNT_TABLE_TRUNCATED.set(false); + LOG.error("Error while truncating FILE_COUNT_BY_SIZE table, resetting flag.", e); + throw new RuntimeException("Table truncation failed", e); // Propagate upwards + } + } else { + LOG.info("Table already truncated by another task; waiting for truncation to complete."); + } + } + } + + /** + * Executes the reprocess method for the given task. + * + * @param omMetadataManager OM metadata manager. + * @param dslContext DSLContext for DB operations. + * @param fileCountBySizeDao DAO for file count table. + * @param bucketLayout The bucket layout to process. + * @param taskName The name of the task for logging. + * @return A Pair of task name and boolean indicating success. + */ + public static Pair reprocess(OMMetadataManager omMetadataManager, + DSLContext dslContext, + FileCountBySizeDao fileCountBySizeDao, + BucketLayout bucketLayout, + String taskName) { + LOG.info("Starting Reprocess for {}", taskName); + Map fileSizeCountMap = new HashMap<>(); + long startTime = System.currentTimeMillis(); + truncateTableIfNeeded(dslContext); + boolean status = reprocessBucketLayout( + bucketLayout, omMetadataManager, fileSizeCountMap, dslContext, fileCountBySizeDao, taskName); + if (!status) { + return new ImmutablePair<>(taskName, false); + } + writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao); + long endTime = System.currentTimeMillis(); + LOG.info("{} completed Reprocess in {} ms.", taskName, (endTime - startTime)); + return new ImmutablePair<>(taskName, true); + } + + /** + * Iterates over the OM DB keys for the given bucket layout and updates the fileSizeCountMap. + * + * @param bucketLayout The bucket layout to use. + * @param omMetadataManager OM metadata manager. + * @param fileSizeCountMap Map accumulating file size counts. + * @param dslContext DSLContext for DB operations. + * @param fileCountBySizeDao DAO for file count table. + * @param taskName The name of the task for logging. + * @return true if processing succeeds, false otherwise. + */ + public static boolean reprocessBucketLayout(BucketLayout bucketLayout, + OMMetadataManager omMetadataManager, + Map fileSizeCountMap, + DSLContext dslContext, + FileCountBySizeDao fileCountBySizeDao, + String taskName) { + Table omKeyInfoTable = omMetadataManager.getKeyTable(bucketLayout); + int totalKeysProcessed = 0; + try (TableIterator> keyIter = + omKeyInfoTable.iterator()) { + while (keyIter.hasNext()) { + Table.KeyValue kv = keyIter.next(); + handlePutKeyEvent(kv.getValue(), fileSizeCountMap); + totalKeysProcessed++; + + // Flush to DB periodically. + if (fileSizeCountMap.size() >= 100000) { + writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao); + fileSizeCountMap.clear(); + } + } + } catch (IOException ioEx) { + LOG.error("Unable to populate File Size Count for {} in Recon DB.", taskName, ioEx); + return false; + } + LOG.info("Reprocessed {} keys for bucket layout {}.", totalKeysProcessed, bucketLayout); + return true; + } + + /** + * Processes a batch of OM update events. + * + * @param events OM update event batch. + * @param tableName The bucket layout for which either keyTable or fileTable is fetched + * @param dslContext DSLContext for DB operations. + * @param fileCountBySizeDao DAO for file count table. + * @param taskName The name of the task for logging. + * @return A Pair of task name and boolean indicating success. + */ + public static Pair processEvents(OMUpdateEventBatch events, + String tableName, + DSLContext dslContext, + FileCountBySizeDao fileCountBySizeDao, + String taskName) { + Iterator eventIterator = events.getIterator(); + Map fileSizeCountMap = new HashMap<>(); + long startTime = System.currentTimeMillis(); + while (eventIterator.hasNext()) { + OMDBUpdateEvent omdbUpdateEvent = eventIterator.next(); + if (!tableName.equals(omdbUpdateEvent.getTable())) { + continue; + } + String updatedKey = omdbUpdateEvent.getKey(); + Object value = omdbUpdateEvent.getValue(); + Object oldValue = omdbUpdateEvent.getOldValue(); + if (value instanceof OmKeyInfo) { + OmKeyInfo omKeyInfo = (OmKeyInfo) value; + OmKeyInfo omKeyInfoOld = (OmKeyInfo) oldValue; + try { + switch (omdbUpdateEvent.getAction()) { + case PUT: + handlePutKeyEvent(omKeyInfo, fileSizeCountMap); + break; + case DELETE: + handleDeleteKeyEvent(updatedKey, omKeyInfo, fileSizeCountMap); + break; + case UPDATE: + if (omKeyInfoOld != null) { + handleDeleteKeyEvent(updatedKey, omKeyInfoOld, fileSizeCountMap); + handlePutKeyEvent(omKeyInfo, fileSizeCountMap); + } else { + LOG.warn("Update event does not have the old keyInfo for {}.", updatedKey); + } + break; + default: + LOG.trace("Skipping DB update event: {}", omdbUpdateEvent.getAction()); + } + } catch (Exception e) { + LOG.error("Unexpected exception while processing key {}.", updatedKey, e); + return new ImmutablePair<>(taskName, false); + } + } else { + LOG.warn("Unexpected value type {} for key {}. Skipping processing.", + value.getClass().getName(), updatedKey); + } + } + writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao); + LOG.debug("{} successfully processed in {} milliseconds", taskName, + (System.currentTimeMillis() - startTime)); + return new ImmutablePair<>(taskName, true); + } + + /** + * Writes the accumulated file size counts to the DB. + * + * @param fileSizeCountMap Map of file size counts. + * @param dslContext DSLContext for DB operations. + * @param fileCountBySizeDao DAO for file count table. + */ + public static void writeCountsToDB(Map fileSizeCountMap, + DSLContext dslContext, + FileCountBySizeDao fileCountBySizeDao) { + + List insertToDb = new ArrayList<>(); + List updateInDb = new ArrayList<>(); + boolean isDbTruncated = isFileCountBySizeTableEmpty(dslContext); // Check if table is empty + + fileSizeCountMap.keySet().forEach((FileSizeCountKey key) -> { + FileCountBySize newRecord = new FileCountBySize(); + newRecord.setVolume(key.volume); + newRecord.setBucket(key.bucket); + newRecord.setFileSize(key.fileSizeUpperBound); + newRecord.setCount(fileSizeCountMap.get(key)); + if (!isDbTruncated) { + // Get the current count from database and update + Record3 recordToFind = + dslContext.newRecord( + FILE_COUNT_BY_SIZE.VOLUME, + FILE_COUNT_BY_SIZE.BUCKET, + FILE_COUNT_BY_SIZE.FILE_SIZE) + .value1(key.volume) + .value2(key.bucket) + .value3(key.fileSizeUpperBound); + FileCountBySize fileCountRecord = + fileCountBySizeDao.findById(recordToFind); + if (fileCountRecord == null && newRecord.getCount() > 0L) { + // insert new row only for non-zero counts. + insertToDb.add(newRecord); + } else if (fileCountRecord != null) { + newRecord.setCount(fileCountRecord.getCount() + + fileSizeCountMap.get(key)); + updateInDb.add(newRecord); + } + } else if (newRecord.getCount() > 0) { + // insert new row only for non-zero counts. + insertToDb.add(newRecord); + } + }); + fileCountBySizeDao.insert(insertToDb); + fileCountBySizeDao.update(updateInDb); + } + + /** + * Increments the count for a given key on a PUT event. + */ + public static void handlePutKeyEvent(OmKeyInfo omKeyInfo, + Map fileSizeCountMap) { + FileSizeCountKey key = getFileSizeCountKey(omKeyInfo); + Long count = fileSizeCountMap.containsKey(key) ? fileSizeCountMap.get(key) + 1L : 1L; + fileSizeCountMap.put(key, count); + } + + /** + * Decrements the count for a given key on a DELETE event. + */ + public static void handleDeleteKeyEvent(String key, OmKeyInfo omKeyInfo, + Map fileSizeCountMap) { + if (omKeyInfo == null) { + LOG.warn("Deleting a key not found while handling DELETE key event. Key not found in Recon OM DB: {}", key); + } else { + FileSizeCountKey countKey = getFileSizeCountKey(omKeyInfo); + Long count = fileSizeCountMap.containsKey(countKey) ? fileSizeCountMap.get(countKey) - 1L : -1L; + fileSizeCountMap.put(countKey, count); + } + } + + /** + * Returns a FileSizeCountKey for the given OmKeyInfo. + */ + public static FileSizeCountKey getFileSizeCountKey(OmKeyInfo omKeyInfo) { + return new FileSizeCountKey(omKeyInfo.getVolumeName(), + omKeyInfo.getBucketName(), + ReconUtils.getFileSizeUpperBound(omKeyInfo.getDataSize())); + } + + /** + * Checks if the FILE_COUNT_BY_SIZE table is empty. + */ + public static boolean isFileCountBySizeTableEmpty(DSLContext dslContext) { + return dslContext.fetchCount(FILE_COUNT_BY_SIZE) == 0; + } + + /** + * Helper key class used for grouping file size counts. + */ + public static class FileSizeCountKey { + private final String volume; + private final String bucket; + private final Long fileSizeUpperBound; + + public FileSizeCountKey(String volume, String bucket, Long fileSizeUpperBound) { + this.volume = volume; + this.bucket = bucket; + this.fileSizeUpperBound = fileSizeUpperBound; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof FileSizeCountKey) { + FileSizeCountKey other = (FileSizeCountKey) obj; + return volume.equals(other.volume) && + bucket.equals(other.bucket) && + fileSizeUpperBound.equals(other.fileSizeUpperBound); + } + return false; + } + + @Override + public int hashCode() { + return (volume + bucket + fileSizeUpperBound).hashCode(); + } + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java new file mode 100644 index 000000000000..acaab763ac0a --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import com.google.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition; +import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao; +import org.jooq.DSLContext; + +/** + * Task for ObjectStore (OBS) which processes the KEY_TABLE. + */ +public class FileSizeCountTaskOBS implements ReconOmTask { + + private final FileCountBySizeDao fileCountBySizeDao; + private final DSLContext dslContext; + + @Inject + public FileSizeCountTaskOBS(FileCountBySizeDao fileCountBySizeDao, + UtilizationSchemaDefinition utilizationSchemaDefinition) { + this.fileCountBySizeDao = fileCountBySizeDao; + this.dslContext = utilizationSchemaDefinition.getDSLContext(); + } + + @Override + public Pair reprocess(OMMetadataManager omMetadataManager) { + return FileSizeCountTaskHelper.reprocess( + omMetadataManager, + dslContext, + fileCountBySizeDao, + BucketLayout.OBJECT_STORE, + getTaskName() + ); + } + + @Override + public Pair process(OMUpdateEventBatch events) { + // This task listens only on the KEY_TABLE. + return FileSizeCountTaskHelper.processEvents( + events, + OmMetadataManagerImpl.KEY_TABLE, + dslContext, + fileCountBySizeDao, + getTaskName()); + } + + @Override + public String getTaskName() { + return "FileSizeCountTaskOBS"; + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java index c2a967415c99..c1d786db1a9d 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java @@ -39,6 +39,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.recon.ReconConstants; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.tasks.types.NamedCallableTask; import org.apache.hadoop.ozone.recon.tasks.types.TaskExecutionException; @@ -122,6 +123,7 @@ public synchronized void consumeOMEvents(OMUpdateEventBatch events, OMMetadataMa } // Reprocess the failed tasks. + ReconConstants.resetTableTruncatedFlags(); if (!retryFailedTasks.isEmpty()) { tasks.clear(); for (String taskName : failedTasks) { @@ -154,6 +156,7 @@ private void ignoreFailedTasks(List failedTasks) { @Override public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataManager) { Collection>> tasks = new ArrayList<>(); + ReconConstants.resetTableTruncatedFlags(); for (Map.Entry taskEntry : reconOmTasks.entrySet()) { ReconOmTask task = taskEntry.getValue(); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java index 5a667bd7c7c2..52ac1d64f4b1 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java @@ -27,6 +27,7 @@ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDeletedKeysToOm; import static org.apache.hadoop.ozone.recon.spi.impl.PrometheusServiceProviderImpl.PROMETHEUS_INSTANT_QUERY_API; import static org.assertj.core.api.Assertions.assertThat; +import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE; import static org.hadoop.ozone.recon.schema.tables.GlobalStatsTable.GLOBAL_STATS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -131,7 +132,8 @@ import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; import org.apache.hadoop.ozone.recon.tasks.ContainerSizeCountTask; -import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask; +import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO; +import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS; import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; import org.apache.ozone.test.LambdaTestUtils; @@ -143,6 +145,7 @@ import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize; import org.jooq.Configuration; import org.jooq.DSLContext; +import org.jooq.Record3; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -162,7 +165,8 @@ public class TestEndpoints extends AbstractReconSqlDBTest { private VolumeEndpoint volumeEndpoint; private BucketEndpoint bucketEndpoint; private ReconOMMetadataManager reconOMMetadataManager; - private FileSizeCountTask fileSizeCountTask; + private FileSizeCountTaskFSO fileSizeCountTaskFSO; + private FileSizeCountTaskOBS fileSizeCountTaskOBS; private ContainerSizeCountTask containerSizeCountTask; private OmTableInsightTask omTableInsightTask; private ReconStorageContainerManagerFacade reconScm; @@ -305,8 +309,10 @@ private void initializeInjector() throws Exception { fileCountBySizeDao, containerCountBySizeDao, utilizationSchemaDefinition); - fileSizeCountTask = - new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition); + fileSizeCountTaskFSO = + new FileSizeCountTaskFSO(fileCountBySizeDao, utilizationSchemaDefinition); + fileSizeCountTaskOBS = + new FileSizeCountTaskOBS(fileCountBySizeDao, utilizationSchemaDefinition); omTableInsightTask = new OmTableInsightTask(globalStatsDao, sqlConfiguration, reconOMMetadataManager); @@ -835,7 +841,7 @@ public void testGetFileCounts() throws Exception { when(keyTableLegacy.iterator()).thenReturn(mockKeyIterLegacy); when(keyTableFso.iterator()).thenReturn(mockKeyIterFso); - when(omMetadataManager.getKeyTable(BucketLayout.LEGACY)).thenReturn( + when(omMetadataManager.getKeyTable(BucketLayout.OBJECT_STORE)).thenReturn( keyTableLegacy); when(omMetadataManager.getKeyTable( BucketLayout.FILE_SYSTEM_OPTIMIZED)).thenReturn(keyTableFso); @@ -862,11 +868,37 @@ public void testGetFileCounts() throws Exception { .thenReturn(omKeyInfo2) .thenReturn(omKeyInfo3); - Pair result = - fileSizeCountTask.reprocess(omMetadataManager); - assertTrue(result.getRight()); + // Call reprocess on both endpoints. + Pair resultOBS = fileSizeCountTaskOBS.reprocess(omMetadataManager); + Pair resultFSO = fileSizeCountTaskFSO.reprocess(omMetadataManager); + assertTrue(resultOBS.getRight()); + assertTrue(resultFSO.getRight()); + // The two tasks should result in 3 rows. assertEquals(3, fileCountBySizeDao.count()); + + // Verify counts: + // For vol1/bucket1 with fileSize 1000L, the upper bound is 1024L and expected count is 2. + Record3 recordToFind = dslContext.newRecord( + FILE_COUNT_BY_SIZE.VOLUME, + FILE_COUNT_BY_SIZE.BUCKET, + FILE_COUNT_BY_SIZE.FILE_SIZE) + .value1("vol1") + .value2("bucket1") + .value3(1024L); + assertEquals(2L, fileCountBySizeDao.findById(recordToFind).getCount().longValue()); + + // For vol1/bucket1 with fileSize 100000L, the upper bound is 131072L and expected count is 2. + recordToFind.value3(131072L); + assertEquals(2L, fileCountBySizeDao.findById(recordToFind).getCount().longValue()); + + // For vol2/bucket1 with fileSize 1000L, the upper bound is 1024L and expected count is 2. + recordToFind.value1("vol2"); + recordToFind.value2("bucket1"); + recordToFind.value3(1024L); + assertEquals(2L, fileCountBySizeDao.findById(recordToFind).getCount().longValue()); + + // --- Now test the query endpoints of the utilization service --- Response response = utilizationEndpoint.getFileCounts(null, null, 0); List resultSet = (List) response.getEntity(); @@ -881,38 +913,38 @@ public void testGetFileCounts() throws Exception { o.getBucket().equals("bucket1") && o.getFileSize() == 1024L && o.getCount() == 2L)); - // Test for "volume" query param + // Test for "volume" query param. response = utilizationEndpoint.getFileCounts("vol1", null, 0); resultSet = (List) response.getEntity(); assertEquals(2, resultSet.size()); assertTrue(resultSet.stream().allMatch(o -> o.getVolume().equals("vol1"))); - // Test for non-existent volume + // Test for non-existent volume. response = utilizationEndpoint.getFileCounts("vol", null, 0); resultSet = (List) response.getEntity(); assertEquals(0, resultSet.size()); - // Test for "volume" + "bucket" query param + // Test for "volume" + "bucket" query param. response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 0); resultSet = (List) response.getEntity(); assertEquals(2, resultSet.size()); assertTrue(resultSet.stream().allMatch(o -> o.getVolume().equals("vol1") && o.getBucket().equals("bucket1"))); - // Test for non-existent bucket + // Test for non-existent bucket. response = utilizationEndpoint.getFileCounts("vol1", "bucket", 0); resultSet = (List) response.getEntity(); assertEquals(0, resultSet.size()); - // Test for "volume" + "bucket" + "fileSize" query params + // Test for "volume" + "bucket" + "fileSize" query params. response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 131072); resultSet = (List) response.getEntity(); assertEquals(1, resultSet.size()); FileCountBySize o = resultSet.get(0); - assertTrue(o.getVolume().equals("vol1") && o.getBucket().equals( - "bucket1") && o.getFileSize() == 131072); + assertTrue(o.getVolume().equals("vol1") && o.getBucket().equals("bucket1") && + o.getFileSize() == 131072); - // Test for non-existent fileSize + // Test for non-existent fileSize. response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 1310725); resultSet = (List) response.getEntity(); assertEquals(0, resultSet.size()); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java index 2c22541d022d..30ca22154bef 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.UPDATE; import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.AdditionalAnswers.returnsElementsOf; import static org.mockito.BDDMockito.given; @@ -55,8 +56,10 @@ public class TestFileSizeCountTask extends AbstractReconSqlDBTest { private FileCountBySizeDao fileCountBySizeDao; - private FileSizeCountTask fileSizeCountTask; + private FileSizeCountTaskOBS fileSizeCountTaskOBS; + private FileSizeCountTaskFSO fileSizeCountTaskFSO; private DSLContext dslContext; + private UtilizationSchemaDefinition utilizationSchemaDefinition; public TestFileSizeCountTask() { super(); @@ -65,24 +68,28 @@ public TestFileSizeCountTask() { @BeforeEach public void setUp() { fileCountBySizeDao = getDao(FileCountBySizeDao.class); - UtilizationSchemaDefinition utilizationSchemaDefinition = - getSchemaDefinition(UtilizationSchemaDefinition.class); - fileSizeCountTask = - new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition); + utilizationSchemaDefinition = getSchemaDefinition(UtilizationSchemaDefinition.class); + // Create separate task instances. + fileSizeCountTaskOBS = new FileSizeCountTaskOBS(fileCountBySizeDao, utilizationSchemaDefinition); + fileSizeCountTaskFSO = new FileSizeCountTaskFSO(fileCountBySizeDao, utilizationSchemaDefinition); dslContext = utilizationSchemaDefinition.getDSLContext(); - // Truncate table before running each test + // Truncate table before each test. dslContext.truncate(FILE_COUNT_BY_SIZE); } @Test public void testReprocess() throws IOException { + // Create three sample OmKeyInfo objects. OmKeyInfo[] omKeyInfos = new OmKeyInfo[3]; String[] keyNames = {"key1", "key2", "key3"}; String[] volumeNames = {"vol1", "vol1", "vol1"}; String[] bucketNames = {"bucket1", "bucket1", "bucket1"}; + // Use sizes so that each falls into a distinct bin: + // - 1000L falls into first bin (upper bound 1024L) + // - 100000L falls into second bin (upper bound 131072L) + // - 4PB (i.e. 1125899906842624L * 4) falls into the highest bin (upper bound Long.MAX_VALUE) Long[] dataSizes = {1000L, 100000L, 1125899906842624L * 4}; - // Loop to initialize each instance of OmKeyInfo for (int i = 0; i < 3; i++) { omKeyInfos[i] = mock(OmKeyInfo.class); given(omKeyInfos[i].getKeyName()).willReturn(keyNames[i]); @@ -91,88 +98,82 @@ public void testReprocess() throws IOException { given(omKeyInfos[i].getDataSize()).willReturn(dataSizes[i]); } - // Create two mock instances of TypedTable, one for FILE_SYSTEM_OPTIMIZED - // layout and one for LEGACY layout + // Prepare the OMMetadataManager mock. OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class); - TypedTable keyTableLegacy = mock(TypedTable.class); - TypedTable keyTableFso = mock(TypedTable.class); - // Set return values for getKeyTable() for FILE_SYSTEM_OPTIMIZED - // and LEGACY layout - when(omMetadataManager.getKeyTable(eq(BucketLayout.LEGACY))) - .thenReturn(keyTableLegacy); + // Configure the OBS (OBJECT_STORE) endpoint. + TypedTable keyTableOBS = mock(TypedTable.class); + // Note: Even though legacy and OBS share the same underlying table, we simulate OBS here. + when(omMetadataManager.getKeyTable(eq(BucketLayout.OBJECT_STORE))) + .thenReturn(keyTableOBS); + TypedTable.TypedTableIterator mockIterOBS = mock(TypedTable.TypedTableIterator.class); + when(keyTableOBS.iterator()).thenReturn(mockIterOBS); + // Simulate three keys then end. + when(mockIterOBS.hasNext()).thenReturn(true, true, true, false); + TypedTable.TypedKeyValue mockKeyValueOBS = mock(TypedTable.TypedKeyValue.class); + when(mockIterOBS.next()).thenReturn(mockKeyValueOBS); + when(mockKeyValueOBS.getValue()).thenReturn(omKeyInfos[0], omKeyInfos[1], omKeyInfos[2]); + + // Configure the FSO (FILE_SYSTEM_OPTIMIZED) endpoint. + TypedTable keyTableFSO = mock(TypedTable.class); when(omMetadataManager.getKeyTable(eq(BucketLayout.FILE_SYSTEM_OPTIMIZED))) - .thenReturn(keyTableFso); - - // Create two mock instances of TypedTableIterator, one for each - // instance of TypedTable - TypedTable.TypedTableIterator mockKeyIterLegacy = - mock(TypedTable.TypedTableIterator.class); - when(keyTableLegacy.iterator()).thenReturn(mockKeyIterLegacy); - // Set return values for hasNext() and next() of the mock instance of - // TypedTableIterator for keyTableLegacy - when(mockKeyIterLegacy.hasNext()).thenReturn(true, true, true, false); - TypedTable.TypedKeyValue mockKeyValueLegacy = - mock(TypedTable.TypedKeyValue.class); - when(mockKeyIterLegacy.next()).thenReturn(mockKeyValueLegacy); - when(mockKeyValueLegacy.getValue()).thenReturn(omKeyInfos[0], omKeyInfos[1], - omKeyInfos[2]); - - - // Same as above, but for keyTableFso - TypedTable.TypedTableIterator mockKeyIterFso = - mock(TypedTable.TypedTableIterator.class); - when(keyTableFso.iterator()).thenReturn(mockKeyIterFso); - when(mockKeyIterFso.hasNext()).thenReturn(true, true, true, false); - TypedTable.TypedKeyValue mockKeyValueFso = - mock(TypedTable.TypedKeyValue.class); - when(mockKeyIterFso.next()).thenReturn(mockKeyValueFso); - when(mockKeyValueFso.getValue()).thenReturn(omKeyInfos[0], omKeyInfos[1], - omKeyInfos[2]); - - // Reprocess could be called from table having existing entries. Adding - // an entry to simulate that. - fileCountBySizeDao.insert( - new FileCountBySize("vol1", "bucket1", 1024L, 10L)); - - Pair result = - fileSizeCountTask.reprocess(omMetadataManager); - - // Verify that the result of reprocess is true - assertTrue(result.getRight()); - - // Verify that the number of entries in fileCountBySizeDao is 3 - assertEquals(3, fileCountBySizeDao.count()); - - // Create a record to find the count of files in a specific volume, - // bucket and file size - Record3 recordToFind = dslContext - .newRecord(FILE_COUNT_BY_SIZE.VOLUME, + .thenReturn(keyTableFSO); + TypedTable.TypedTableIterator mockIterFSO = mock(TypedTable.TypedTableIterator.class); + when(keyTableFSO.iterator()).thenReturn(mockIterFSO); + when(mockIterFSO.hasNext()).thenReturn(true, true, true, false); + TypedTable.TypedKeyValue mockKeyValueFSO = mock(TypedTable.TypedKeyValue.class); + when(mockIterFSO.next()).thenReturn(mockKeyValueFSO); + when(mockKeyValueFSO.getValue()).thenReturn(omKeyInfos[0], omKeyInfos[1], omKeyInfos[2]); + + // Simulate a preexisting entry in the DB. + // (This record will be removed by the first task via table truncation.) + fileCountBySizeDao.insert(new FileCountBySize("vol1", "bucket1", 1024L, 10L)); + + // Call reprocess on both tasks. + Pair resultOBS = fileSizeCountTaskOBS.reprocess(omMetadataManager); + Pair resultFSO = fileSizeCountTaskFSO.reprocess(omMetadataManager); + + // Verify that both tasks reported success. + assertTrue(resultOBS.getRight(), "OBS reprocess should return true"); + assertTrue(resultFSO.getRight(), "FSO reprocess should return true"); + + // After processing, there should be 3 rows (one per bin). + assertEquals(3, fileCountBySizeDao.count(), "Expected 3 rows in the DB"); + + // Now verify the counts in each bin. + // Because each task processes the same 3 keys and each key contributes a count of 1, + // the final count per bin should be 2 (1 from OBS + 1 from FSO). + + // Verify bin for key size 1000L -> upper bound 1024L. + Record3 recordToFind = dslContext.newRecord( + FILE_COUNT_BY_SIZE.VOLUME, FILE_COUNT_BY_SIZE.BUCKET, FILE_COUNT_BY_SIZE.FILE_SIZE) .value1("vol1") .value2("bucket1") .value3(1024L); - assertEquals(2L, - fileCountBySizeDao.findById(recordToFind).getCount().longValue()); - // file size upper bound for 100000L is 131072L (next highest power of 2) + assertEquals(2L, fileCountBySizeDao.findById(recordToFind).getCount().longValue(), + "Expected bin 1024 to have count 2"); + + // Verify bin for key size 100000L -> upper bound 131072L. recordToFind.value3(131072L); - assertEquals(2L, - fileCountBySizeDao.findById(recordToFind).getCount().longValue()); - // file size upper bound for 4PB is Long.MAX_VALUE + assertEquals(2L, fileCountBySizeDao.findById(recordToFind).getCount().longValue(), + "Expected bin 131072 to have count 2"); + + // Verify bin for key size 4PB -> upper bound Long.MAX_VALUE. recordToFind.value3(Long.MAX_VALUE); - assertEquals(2L, - fileCountBySizeDao.findById(recordToFind).getCount().longValue()); + assertEquals(2L, fileCountBySizeDao.findById(recordToFind).getCount().longValue(), + "Expected bin Long.MAX_VALUE to have count 2"); } @Test public void testProcess() { - // Write 2 keys. + // First batch: Write 2 keys. OmKeyInfo toBeDeletedKey = mock(OmKeyInfo.class); given(toBeDeletedKey.getVolumeName()).willReturn("vol1"); given(toBeDeletedKey.getBucketName()).willReturn("bucket1"); given(toBeDeletedKey.getKeyName()).willReturn("deletedKey"); - given(toBeDeletedKey.getDataSize()).willReturn(2000L); // Bin 1 + given(toBeDeletedKey.getDataSize()).willReturn(2000L); // Falls in bin with upper bound 2048L OMDBUpdateEvent event = new OMUpdateEventBuilder() .setAction(PUT) .setKey("deletedKey") @@ -184,7 +185,7 @@ public void testProcess() { given(toBeUpdatedKey.getVolumeName()).willReturn("vol1"); given(toBeUpdatedKey.getBucketName()).willReturn("bucket1"); given(toBeUpdatedKey.getKeyName()).willReturn("updatedKey"); - given(toBeUpdatedKey.getDataSize()).willReturn(10000L); // Bin 4 + given(toBeUpdatedKey.getDataSize()).willReturn(10000L); // Falls in bin with upper bound 16384L OMDBUpdateEvent event2 = new OMUpdateEventBuilder() .setAction(PUT) .setKey("updatedKey") @@ -194,9 +195,14 @@ public void testProcess() { OMUpdateEventBatch omUpdateEventBatch = new OMUpdateEventBatch(Arrays.asList(event, event2), 0L); - fileSizeCountTask.process(omUpdateEventBatch); - // Verify 2 keys are in correct bins. + // Process the same batch on both endpoints. + fileSizeCountTaskOBS.process(omUpdateEventBatch); + fileSizeCountTaskFSO.process(omUpdateEventBatch); + + // After processing the first batch: + // Since each endpoint processes the same events, the counts are doubled. + // Expected: 2 rows (bins 2048 and 16384) with counts 2 each. assertEquals(2, fileCountBySizeDao.count()); Record3 recordToFind = dslContext .newRecord(FILE_COUNT_BY_SIZE.VOLUME, @@ -212,6 +218,7 @@ public void testProcess() { assertEquals(1L, fileCountBySizeDao.findById(recordToFind).getCount().longValue()); + // Second batch: Process update events. // Add new key. OmKeyInfo newKey = mock(OmKeyInfo.class); given(newKey.getVolumeName()).willReturn("vol1"); @@ -249,7 +256,8 @@ public void testProcess() { omUpdateEventBatch = new OMUpdateEventBatch( Arrays.asList(updateEvent, putEvent, deleteEvent), 0L); - fileSizeCountTask.process(omUpdateEventBatch); + fileSizeCountTaskOBS.process(omUpdateEventBatch); + fileSizeCountTaskFSO.process(omUpdateEventBatch); assertEquals(4, fileCountBySizeDao.count()); recordToFind.value3(1024L); @@ -268,8 +276,7 @@ public void testProcess() { @Test public void testReprocessAtScale() throws IOException { - // generate mocks for 2 volumes, 500 buckets each volume - // and 42 keys in each bucket. + // generate mocks for 2 volumes, 500 buckets each volume, and 42 keys in each bucket. List omKeyInfoList = new ArrayList<>(); List hasNextAnswer = new ArrayList<>(); for (int volIndex = 1; volIndex <= 2; volIndex++) { @@ -279,8 +286,8 @@ public void testReprocessAtScale() throws IOException { given(omKeyInfo.getKeyName()).willReturn("key" + keyIndex); given(omKeyInfo.getVolumeName()).willReturn("vol" + volIndex); given(omKeyInfo.getBucketName()).willReturn("bucket" + bktIndex); - // Place keys in each bin - long fileSize = (long)Math.pow(2, keyIndex + 9) - 1L; + // Each key's fileSize = 2^(keyIndex+9) - 1, so that it falls into its respective bin. + long fileSize = (long) Math.pow(2, keyIndex + 9) - 1L; given(omKeyInfo.getDataSize()).willReturn(fileSize); omKeyInfoList.add(omKeyInfo); hasNextAnswer.add(true); @@ -290,46 +297,43 @@ public void testReprocessAtScale() throws IOException { hasNextAnswer.add(false); OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class); + // Create two mock key tables: one for OBS (using LEGACY in this test) and one for FSO. TypedTable keyTableLegacy = mock(TypedTable.class); TypedTable keyTableFso = mock(TypedTable.class); - TypedTable.TypedTableIterator mockKeyIterLegacy = mock(TypedTable - .TypedTableIterator.class); - TypedTable.TypedTableIterator mockKeyIterFso = mock(TypedTable - .TypedTableIterator.class); - TypedTable.TypedKeyValue mockKeyValueLegacy = mock( - TypedTable.TypedKeyValue.class); - TypedTable.TypedKeyValue mockKeyValueFso = mock( - TypedTable.TypedKeyValue.class); + TypedTable.TypedTableIterator mockKeyIterLegacy = mock(TypedTable.TypedTableIterator.class); + TypedTable.TypedTableIterator mockKeyIterFso = mock(TypedTable.TypedTableIterator.class); + TypedTable.TypedKeyValue mockKeyValueLegacy = mock(TypedTable.TypedKeyValue.class); + TypedTable.TypedKeyValue mockKeyValueFso = mock(TypedTable.TypedKeyValue.class); when(keyTableLegacy.iterator()).thenReturn(mockKeyIterLegacy); when(keyTableFso.iterator()).thenReturn(mockKeyIterFso); - when(omMetadataManager.getKeyTable(BucketLayout.LEGACY)) - .thenReturn(keyTableLegacy); - when(omMetadataManager.getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED)) - .thenReturn(keyTableFso); + // In this test, assume OBS task uses BucketLayout.LEGACY and FSO uses FILE_SYSTEM_OPTIMIZED. + when(omMetadataManager.getKeyTable(BucketLayout.OBJECT_STORE)).thenReturn(keyTableLegacy); + when(omMetadataManager.getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED)).thenReturn(keyTableFso); - when(mockKeyIterLegacy.hasNext()) - .thenAnswer(returnsElementsOf(hasNextAnswer)); - when(mockKeyIterFso.hasNext()) - .thenAnswer(returnsElementsOf(hasNextAnswer)); + when(mockKeyIterLegacy.hasNext()).thenAnswer(returnsElementsOf(hasNextAnswer)); + when(mockKeyIterFso.hasNext()).thenAnswer(returnsElementsOf(hasNextAnswer)); when(mockKeyIterLegacy.next()).thenReturn(mockKeyValueLegacy); when(mockKeyIterFso.next()).thenReturn(mockKeyValueFso); - when(mockKeyValueLegacy.getValue()) - .thenAnswer(returnsElementsOf(omKeyInfoList)); - when(mockKeyValueFso.getValue()) - .thenAnswer(returnsElementsOf(omKeyInfoList)); + when(mockKeyValueLegacy.getValue()).thenAnswer(returnsElementsOf(omKeyInfoList)); + when(mockKeyValueFso.getValue()).thenAnswer(returnsElementsOf(omKeyInfoList)); - Pair result = - fileSizeCountTask.reprocess(omMetadataManager); - assertTrue(result.getRight()); + // Call reprocess on both endpoints. + Pair resultOBS = fileSizeCountTaskOBS.reprocess(omMetadataManager); + Pair resultFSO = fileSizeCountTaskFSO.reprocess(omMetadataManager); + assertTrue(resultOBS.getRight()); + assertTrue(resultFSO.getRight()); // 2 volumes * 500 buckets * 42 bins = 42000 rows assertEquals(42000, fileCountBySizeDao.count()); - Record3 recordToFind = dslContext - .newRecord(FILE_COUNT_BY_SIZE.VOLUME, + + // Verify counts for a few representative bins. + // For volume "vol1", bucket "bucket1", the first bin (upper bound 1024L) should have a count of 2. + Record3 recordToFind = dslContext.newRecord( + FILE_COUNT_BY_SIZE.VOLUME, FILE_COUNT_BY_SIZE.BUCKET, FILE_COUNT_BY_SIZE.FILE_SIZE) .value1("vol1") @@ -337,11 +341,13 @@ public void testReprocessAtScale() throws IOException { .value3(1024L); assertEquals(2L, fileCountBySizeDao.findById(recordToFind).getCount().longValue()); - // file size upper bound for 100000L is 131072L (next highest power of 2) - recordToFind.value1("vol1"); + + // For volume "vol1", bucket "bucket1", the bin with upper bound 131072L should have a count of 2. recordToFind.value3(131072L); assertEquals(2L, fileCountBySizeDao.findById(recordToFind).getCount().longValue()); + + // For volume "vol1", bucket "bucket500", the highest bin (upper bound Long.MAX_VALUE) should have a count of 2. recordToFind.value2("bucket500"); recordToFind.value3(Long.MAX_VALUE); assertEquals(2L, @@ -385,9 +391,10 @@ public void testProcessAtScale() { } } - OMUpdateEventBatch omUpdateEventBatch = - new OMUpdateEventBatch(omDbEventList, 0L); - fileSizeCountTask.process(omUpdateEventBatch); + OMUpdateEventBatch omUpdateEventBatch = new OMUpdateEventBatch(omDbEventList, 0L); + // Process the same batch on both endpoints. + fileSizeCountTaskOBS.process(omUpdateEventBatch); + fileSizeCountTaskFSO.process(omUpdateEventBatch); // Verify 2 keys are in correct bins. assertEquals(10000, fileCountBySizeDao.count()); @@ -463,7 +470,8 @@ public void testProcessAtScale() { } omUpdateEventBatch = new OMUpdateEventBatch(omDbEventList, 0L); - fileSizeCountTask.process(omUpdateEventBatch); + fileSizeCountTaskOBS.process(omUpdateEventBatch); + fileSizeCountTaskFSO.process(omUpdateEventBatch); assertEquals(10000, fileCountBySizeDao.count()); recordToFind = dslContext @@ -488,4 +496,34 @@ public void testProcessAtScale() { .getCount().longValue()); } + + @Test + public void testTruncateTableExceptionPropagation() { + // Mock DSLContext and FileCountBySizeDao + DSLContext mockDslContext = mock(DSLContext.class); + FileCountBySizeDao mockDao = mock(FileCountBySizeDao.class); + + // Mock schema definition and ensure it returns our mocked DSLContext + UtilizationSchemaDefinition mockSchema = mock(UtilizationSchemaDefinition.class); + when(mockSchema.getDSLContext()).thenReturn(mockDslContext); + + // Mock delete operation to throw an exception + when(mockDslContext.delete(FILE_COUNT_BY_SIZE)) + .thenThrow(new RuntimeException("Simulated DB failure")); + + // Create instances of FileSizeCountTaskOBS and FileSizeCountTaskFSO using mocks + fileSizeCountTaskOBS = new FileSizeCountTaskOBS(mockDao, mockSchema); + fileSizeCountTaskFSO = new FileSizeCountTaskFSO(mockDao, mockSchema); + + // Mock OMMetadataManager + OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class); + + // Verify that an exception is thrown from reprocess() for both tasks. + assertThrows(RuntimeException.class, () -> fileSizeCountTaskOBS.reprocess(omMetadataManager), + "Expected reprocess to propagate exception but it didn't."); + + assertThrows(RuntimeException.class, () -> fileSizeCountTaskFSO.reprocess(omMetadataManager), + "Expected reprocess to propagate exception but it didn't."); + } + }