Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5a70ca7
HDDS-12185. Enhance FileSizeCountTask for Faster Processing.
ArafatKhan2198 Feb 3, 2025
7aca9f2
Made code changes
ArafatKhan2198 Feb 3, 2025
4b1da9f
Also fixed the testcases
ArafatKhan2198 Feb 5, 2025
c8b2e47
Made final review changes
ArafatKhan2198 Feb 7, 2025
68b8948
Passing the executor from the injector
ArafatKhan2198 Feb 7, 2025
9e00401
Removed unwanted changes
ArafatKhan2198 Feb 7, 2025
991c7d5
Removed unwanted import
ArafatKhan2198 Feb 7, 2025
1166bce
Fixd checkstyle and using injectors
ArafatKhan2198 Feb 10, 2025
678ddb2
more changes
ArafatKhan2198 Feb 10, 2025
978c160
Merge branch 'master' into HDDS-12185
ArafatKhan2198 Feb 10, 2025
4719546
Changed log levels
ArafatKhan2198 Feb 10, 2025
ce7e523
Split the FilesizecountTask into two separate tasks
ArafatKhan2198 Feb 13, 2025
0a845bc
Fixed checkstyle
ArafatKhan2198 Feb 13, 2025
1608468
Refactored code to a helper class
ArafatKhan2198 Feb 17, 2025
b7883ec
Fixed checkstyle issues
ArafatKhan2198 Feb 17, 2025
a426eb3
Fixed unit tests
ArafatKhan2198 Feb 17, 2025
46a0b26
Fixed doc
ArafatKhan2198 Feb 17, 2025
dbd54c2
Merge remote-tracking branch 'origin/master' into HDDS-12185
adoroszlai Feb 17, 2025
d31e450
Ensure shared synchronization for table truncation in FileSizeCount t…
ArafatKhan2198 Feb 18, 2025
3484f2a
Changed parameter name
ArafatKhan2198 Feb 18, 2025
e80362c
Ensure proper error handling in table truncation and reprocess by res…
ArafatKhan2198 Feb 19, 2025
ad3e6c1
Added the excpetion propagation test case
ArafatKhan2198 Feb 20, 2025
5e6fdc3
Fixed checkstyle
ArafatKhan2198 Feb 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.ozone.recon;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* Recon Server constants file.
*/
Expand Down Expand Up @@ -90,4 +92,15 @@ private ReconConstants() {
(double) MAX_CONTAINER_SIZE_UPPER_BOUND /
MIN_CONTAINER_SIZE_UPPER_BOUND) /
Math.log(2)) + 1;

// For file-size count reprocessing: ensure only one task truncates the table.
public static final AtomicBoolean TABLE_TRUNCATED = new AtomicBoolean(false);

/**
* Resets the table-truncated flag. This should be called once per reprocess cycle,
* for example by the OM task controller, before the tasks run.
*/
public static void resetTableTruncatedFlag() {
TABLE_TRUNCATED.set(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl;
import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO;
import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS;
import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask;
import org.apache.hadoop.ozone.recon.tasks.ReconOmTask;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
Expand All @@ -74,6 +75,8 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.hadoop.hdds.scm.cli.ContainerOperationClient.newContainerRpcClient;
import static org.apache.hadoop.ozone.OmUtils.getOzoneManagerServiceId;
Expand Down Expand Up @@ -130,12 +133,19 @@ protected void configure() {
Multibinder<ReconOmTask> taskBinder =
Multibinder.newSetBinder(binder(), ReconOmTask.class);
taskBinder.addBinding().to(ContainerKeyMapperTask.class);
taskBinder.addBinding().to(FileSizeCountTask.class);
taskBinder.addBinding().to(FileSizeCountTaskFSO.class);
taskBinder.addBinding().to(FileSizeCountTaskOBS.class);
taskBinder.addBinding().to(OmTableInsightTask.class);
taskBinder.addBinding().to(NSSummaryTask.class);
}
}

@Provides
@Singleton
public ExecutorService provideReconExecutorService() {
return Executors.newFixedThreadPool(5);
}

/**
* Class that has all the DAO bindings in Recon.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.recon.ReconConstants;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
Expand All @@ -44,25 +45,23 @@
import java.util.Map;

import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;

/**
* Class to iterate over the OM DB and store the counts of existing/new
* files binned into ranges (1KB, 2Kb..,4MB,.., 1TB,..1PB) to the Recon
* fileSize DB.
*/
public class FileSizeCountTask implements ReconOmTask {
public class FileSizeCountTaskFSO implements ReconOmTask {
private static final Logger LOG =
LoggerFactory.getLogger(FileSizeCountTask.class);
LoggerFactory.getLogger(FileSizeCountTaskFSO.class);

private FileCountBySizeDao fileCountBySizeDao;
private DSLContext dslContext;

@Inject
public FileSizeCountTask(FileCountBySizeDao fileCountBySizeDao,
UtilizationSchemaDefinition
utilizationSchemaDefinition) {
public FileSizeCountTaskFSO(FileCountBySizeDao fileCountBySizeDao,
UtilizationSchemaDefinition utilizationSchemaDefinition) {
this.fileCountBySizeDao = fileCountBySizeDao;
this.dslContext = utilizationSchemaDefinition.getDSLContext();
}
Expand All @@ -76,27 +75,25 @@ public FileSizeCountTask(FileCountBySizeDao fileCountBySizeDao,
*/
@Override
public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
// Map to store the count of files based on file size
Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
long startTime = System.currentTimeMillis(); // Start time for execution

// Delete all records from FILE_COUNT_BY_SIZE table
int execute = dslContext.delete(FILE_COUNT_BY_SIZE).execute();
LOG.debug("Deleted {} records from {}", execute, FILE_COUNT_BY_SIZE);

// Call reprocessBucket method for FILE_SYSTEM_OPTIMIZED bucket layout
boolean statusFSO =
reprocessBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED,
omMetadataManager,
fileSizeCountMap);
// Call reprocessBucket method for LEGACY bucket layout
boolean statusOBS =
reprocessBucketLayout(BucketLayout.LEGACY, omMetadataManager,
fileSizeCountMap);
if (!statusFSO && !statusOBS) {
// Use the shared atomic flag to ensure only one task truncates the table.
if (ReconConstants.TABLE_TRUNCATED.compareAndSet(false, true)) {
int execute = dslContext.delete(FILE_COUNT_BY_SIZE).execute();
LOG.info("Deleted {} records from {}", execute, FILE_COUNT_BY_SIZE);
} else {
LOG.info("Table already truncated by another task; skipping deletion.");
}

// Process only the FILE_SYSTEM_OPTIMIZED bucket layout using the FILE_TABLE.
boolean status = reprocessBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED, omMetadataManager, fileSizeCountMap);
if (!status) {
return new ImmutablePair<>(getTaskName(), false);
}
writeCountsToDB(fileSizeCountMap);
LOG.debug("Completed a 'reprocess' run of FileSizeCountTask.");
long endTime = System.currentTimeMillis(); // End time for execution
LOG.info("FileSizeCountFSOTask completed Reprocess in {} ms.", (endTime - startTime));
return new ImmutablePair<>(getTaskName(), true);
}

Expand All @@ -105,11 +102,15 @@ private boolean reprocessBucketLayout(BucketLayout bucketLayout,
Map<FileSizeCountKey, Long> fileSizeCountMap) {
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(bucketLayout);
int totalKeysProcessed = 0;

try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
while (keyIter.hasNext()) {
Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
totalKeysProcessed++; // Increment key count

// The time complexity of .size() method is constant time, O(1)
if (fileSizeCountMap.size() >= 100000) {
writeCountsToDB(fileSizeCountMap);
Expand All @@ -121,17 +122,17 @@ private boolean reprocessBucketLayout(BucketLayout bucketLayout,
" in Recon DB. ", ioEx);
return false;
}
LOG.info("Reprocessed {} keys for {} bucket layout.", totalKeysProcessed, bucketLayout);
return true;
}

@Override
public String getTaskName() {
return "FileSizeCountTask";
return "FileSizeCountTaskFSO";
}

public Collection<String> getTaskTables() {
List<String> taskTables = new ArrayList<>();
taskTables.add(KEY_TABLE);
taskTables.add(FILE_TABLE);
return taskTables;
}
Expand Down
Loading
Loading