Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5a70ca7
HDDS-12185. Enhance FileSizeCountTask for Faster Processing.
ArafatKhan2198 Feb 3, 2025
7aca9f2
Made code changes
ArafatKhan2198 Feb 3, 2025
4b1da9f
Also fixed the testcases
ArafatKhan2198 Feb 5, 2025
c8b2e47
Made final review changes
ArafatKhan2198 Feb 7, 2025
68b8948
Passing the executor from the injector
ArafatKhan2198 Feb 7, 2025
9e00401
Removed unwanted changes
ArafatKhan2198 Feb 7, 2025
991c7d5
Removed unwanted import
ArafatKhan2198 Feb 7, 2025
1166bce
Fixd checkstyle and using injectors
ArafatKhan2198 Feb 10, 2025
678ddb2
more changes
ArafatKhan2198 Feb 10, 2025
978c160
Merge branch 'master' into HDDS-12185
ArafatKhan2198 Feb 10, 2025
4719546
Changed log levels
ArafatKhan2198 Feb 10, 2025
ce7e523
Split the FilesizecountTask into two separate tasks
ArafatKhan2198 Feb 13, 2025
0a845bc
Fixed checkstyle
ArafatKhan2198 Feb 13, 2025
1608468
Refactored code to a helper class
ArafatKhan2198 Feb 17, 2025
b7883ec
Fixed checkstyle issues
ArafatKhan2198 Feb 17, 2025
a426eb3
Fixed unit tests
ArafatKhan2198 Feb 17, 2025
46a0b26
Fixed doc
ArafatKhan2198 Feb 17, 2025
dbd54c2
Merge remote-tracking branch 'origin/master' into HDDS-12185
adoroszlai Feb 17, 2025
d31e450
Ensure shared synchronization for table truncation in FileSizeCount t…
ArafatKhan2198 Feb 18, 2025
3484f2a
Changed parameter name
ArafatKhan2198 Feb 18, 2025
e80362c
Ensure proper error handling in table truncation and reprocess by res…
ArafatKhan2198 Feb 19, 2025
ad3e6c1
Added the excpetion propagation test case
ArafatKhan2198 Feb 20, 2025
5e6fdc3
Fixed checkstyle
ArafatKhan2198 Feb 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.hadoop.hdds.scm.cli.ContainerOperationClient.newContainerRpcClient;
import static org.apache.hadoop.ozone.OmUtils.getOzoneManagerServiceId;
Expand Down Expand Up @@ -136,6 +138,12 @@ protected void configure() {
}
}

@Provides
@Singleton
public ExecutorService provideReconExecutorService() {
return Executors.newFixedThreadPool(5);
}

/**
* Class that has all the DAO bindings in Recon.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Arrays;

import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
Expand All @@ -58,13 +64,16 @@ public class FileSizeCountTask implements ReconOmTask {

private FileCountBySizeDao fileCountBySizeDao;
private DSLContext dslContext;
private final ConcurrentHashMap<FileSizeCountKey, Long> sharedFileSizeCountMap = new ConcurrentHashMap<>();
private final ExecutorService executorService;

@Inject
public FileSizeCountTask(FileCountBySizeDao fileCountBySizeDao,
UtilizationSchemaDefinition
utilizationSchemaDefinition) {
UtilizationSchemaDefinition utilizationSchemaDefinition,
ExecutorService executorService) {
this.fileCountBySizeDao = fileCountBySizeDao;
this.dslContext = utilizationSchemaDefinition.getDSLContext();
this.executorService = executorService;
}

/**
Expand All @@ -76,49 +85,85 @@ public FileSizeCountTask(FileCountBySizeDao fileCountBySizeDao,
*/
@Override
public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
// Map to store the count of files based on file size
Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
LOG.info("Starting reprocess of FileSizeCountTask...");
long startTime = System.currentTimeMillis();

// Delete all records from FILE_COUNT_BY_SIZE table
// Truncate table first
int execute = dslContext.delete(FILE_COUNT_BY_SIZE).execute();
LOG.debug("Deleted {} records from {}", execute, FILE_COUNT_BY_SIZE);

// Call reprocessBucket method for FILE_SYSTEM_OPTIMIZED bucket layout
boolean statusFSO =
reprocessBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED,
omMetadataManager,
fileSizeCountMap);
// Call reprocessBucket method for LEGACY bucket layout
boolean statusOBS =
reprocessBucketLayout(BucketLayout.LEGACY, omMetadataManager,
fileSizeCountMap);
if (!statusFSO && !statusOBS) {
return new ImmutablePair<>(getTaskName(), false);
LOG.debug("Cleared {} existing records from {}", execute, FILE_COUNT_BY_SIZE);

List<Future<Boolean>> futures = Arrays.asList(
submitReprocessTask("FSO", BucketLayout.FILE_SYSTEM_OPTIMIZED, omMetadataManager),
submitReprocessTask("LEGACY", BucketLayout.LEGACY, omMetadataManager)
);

boolean allSuccess = true;
try {
for (Future<Boolean> future : futures) {
if (!future.get()) {
allSuccess = false;
}
}
} catch (InterruptedException | ExecutionException e) {
LOG.error("Parallel processing failed: ", e);
allSuccess = false;
Thread.currentThread().interrupt();
}
writeCountsToDB(fileSizeCountMap);
LOG.debug("Completed a 'reprocess' run of FileSizeCountTask.");
return new ImmutablePair<>(getTaskName(), true);

// Write any remaining entries to the database
if (!sharedFileSizeCountMap.isEmpty()) {
writeCountsToDB(sharedFileSizeCountMap);
sharedFileSizeCountMap.clear();
}

LOG.info("Reprocess completed. Success: {}. Time taken: {} ms",
allSuccess, (System.currentTimeMillis() - startTime));
return new ImmutablePair<>(getTaskName(), allSuccess);
}

/**
* Submits a reprocess task with proper thread naming.
*/
private Future<Boolean> submitReprocessTask(String bucketType, BucketLayout layout,
OMMetadataManager omMetadataManager) {
return executorService.submit(() -> {
Thread currentThread = Thread.currentThread();
String originalName = currentThread.getName();
try {
currentThread.setName("FileSizeCountTask-" + bucketType + "-" + originalName);
return reprocessBucketLayout(layout, omMetadataManager);
} finally {
currentThread.setName(originalName); // Restore original name after execution
}
});
}

private boolean reprocessBucketLayout(BucketLayout bucketLayout,
OMMetadataManager omMetadataManager,
Map<FileSizeCountKey, Long> fileSizeCountMap) {
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(bucketLayout);
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
while (keyIter.hasNext()) {
Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
// The time complexity of .size() method is constant time, O(1)
if (fileSizeCountMap.size() >= 100000) {
writeCountsToDB(fileSizeCountMap);
fileSizeCountMap.clear();
private Boolean reprocessBucketLayout(BucketLayout bucketLayout,
OMMetadataManager omMetadataManager) {
long keysProcessed = 0;

try {
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(bucketLayout);
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
while (keyIter.hasNext()) {
Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
FileSizeCountKey key = getFileSizeCountKey(kv.getValue());

// Atomically update the count in the shared map
sharedFileSizeCountMap.merge(key, 1L, Long::sum);
keysProcessed++;

// Periodically write to the database to avoid memory overflow
if (sharedFileSizeCountMap.size() >= 100_000) {
writeCountsToDB(sharedFileSizeCountMap);
sharedFileSizeCountMap.clear();
}
}
}
} catch (IOException ioEx) {
LOG.error("Unable to populate File Size Count for " + bucketLayout +
" in Recon DB. ", ioEx);
LOG.error("Failed to process {} layout: ", bucketLayout, ioEx);
return false;
}
return true;
Expand Down Expand Up @@ -198,7 +243,9 @@ public Pair<String, Boolean> process(OMUpdateEventBatch events) {
value.getClass().getName(), updatedKey);
}
}
writeCountsToDB(fileSizeCountMap);
if (!fileSizeCountMap.isEmpty()) {
writeCountsToDB(fileSizeCountMap);
}
LOG.debug("{} successfully processed in {} milliseconds",
getTaskName(), (System.currentTimeMillis() - startTime));
return new ImmutablePair<>(getTaskName(), true);
Expand All @@ -214,15 +261,15 @@ private void writeCountsToDB(Map<FileSizeCountKey, Long> fileSizeCountMap) {
List<FileCountBySize> insertToDb = new ArrayList<>();
List<FileCountBySize> updateInDb = new ArrayList<>();
boolean isDbTruncated = isFileCountBySizeTableEmpty(); // Check if table is empty

fileSizeCountMap.keySet().forEach((FileSizeCountKey key) -> {
fileSizeCountMap.forEach((key, count) -> {
FileCountBySize newRecord = new FileCountBySize();
newRecord.setVolume(key.volume);
newRecord.setBucket(key.bucket);
newRecord.setFileSize(key.fileSizeUpperBound);
newRecord.setCount(fileSizeCountMap.get(key));
newRecord.setCount(count);

if (!isDbTruncated) {
// Get the current count from database and update
// Get the current count from the database and update
Record3<String, String, Long> recordToFind =
dslContext.newRecord(
FILE_COUNT_BY_SIZE.VOLUME,
Expand All @@ -234,18 +281,19 @@ private void writeCountsToDB(Map<FileSizeCountKey, Long> fileSizeCountMap) {
FileCountBySize fileCountRecord =
fileCountBySizeDao.findById(recordToFind);
if (fileCountRecord == null && newRecord.getCount() > 0L) {
// insert new row only for non-zero counts.
// Insert new row only for non-zero counts.
insertToDb.add(newRecord);
} else if (fileCountRecord != null) {
newRecord.setCount(fileCountRecord.getCount() +
fileSizeCountMap.get(key));
newRecord.setCount(fileCountRecord.getCount() + count);
updateInDb.add(newRecord);
}
} else if (newRecord.getCount() > 0) {
// insert new row only for non-zero counts.
// Insert new row only for non-zero counts.
insertToDb.add(newRecord);
}
});

// Perform batch inserts and updates
fileCountBySizeDao.insert(insertToDb);
fileCountBySizeDao.update(updateInDb);
}
Expand Down Expand Up @@ -305,6 +353,16 @@ private boolean isFileCountBySizeTableEmpty() {
return dslContext.fetchCount(FILE_COUNT_BY_SIZE) == 0;
}

private static class BucketLayoutProcessResult {
private final boolean success;
private final long keysProcessed;

BucketLayoutProcessResult(boolean success, long keysProcessed) {
this.success = success;
this.keysProcessed = keysProcessed;
}
}

private static class FileSizeCountKey {
private String volume;
private String bucket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -310,8 +312,9 @@ private void initializeInjector() throws Exception {
fileCountBySizeDao,
containerCountBySizeDao,
utilizationSchemaDefinition);
ExecutorService executor = Executors.newFixedThreadPool(2);
fileSizeCountTask =
new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition, executor);
omTableInsightTask =
new OmTableInsightTask(globalStatsDao, sqlConfiguration,
reconOMMetadataManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.DELETE;
import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT;
Expand Down Expand Up @@ -69,8 +71,8 @@ public void setUp() {
fileCountBySizeDao = getDao(FileCountBySizeDao.class);
UtilizationSchemaDefinition utilizationSchemaDefinition =
getSchemaDefinition(UtilizationSchemaDefinition.class);
fileSizeCountTask =
new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
ExecutorService executor = Executors.newFixedThreadPool(2);
fileSizeCountTask = new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition, executor);
dslContext = utilizationSchemaDefinition.getDSLContext();
// Truncate table before running each test
dslContext.truncate(FILE_COUNT_BY_SIZE);
Expand Down
Loading