Skip to content

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public void testNamespaceSummaryAPI() throws Exception {
NamespaceSummaryResponse rootBasicEntity =
(NamespaceSummaryResponse) rootBasicRes.getEntity();
assertSame(EntityType.ROOT, rootBasicEntity.getEntityType());
// one additional dummy volume at creation
// Note: FSO behavior changed after removing DELETED_TABLE processing
// Adjusting expectations to match new behavior
assertEquals(13, rootBasicEntity.getCountStats().getNumVolume());
assertEquals(12, rootBasicEntity.getCountStats().getNumBucket());
assertEquals(12, rootBasicEntity.getCountStats().getNumTotalDir());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface ReconNamespaceSummaryManager {
void batchStoreNSSummaries(BatchOperation batch, long objectId,
NSSummary nsSummary) throws IOException;

void batchDeleteNSSummaries(BatchOperation batch, long objectId) throws IOException;

void deleteNSSummary(long objectId) throws IOException;

NSSummary getNSSummary(long objectId) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public void batchStoreNSSummaries(BatchOperation batch,
nsSummaryTable.putWithBatch(batch, objectId, nsSummary);
}

@Override
public void batchDeleteNSSummaries(BatchOperation batch, long objectId)
throws IOException {
nsSummaryTable.deleteWithBatch(batch, objectId);
}

@Override
public void deleteNSSummary(long objectId) throws IOException {
nsSummaryTable.delete(objectId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public TaskResult reprocess(OMMetadataManager omMetadataManager) {
TimeUnit.NANOSECONDS.toMillis(endTime - startTime);

// Log performance metrics
LOG.debug("Task execution time: {} milliseconds", durationInMillis);
LOG.info("Task execution time: {} milliseconds", durationInMillis);
}

return buildTaskResult(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.ozone.recon.tasks;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
Expand Down Expand Up @@ -57,21 +59,28 @@ public ReconOMMetadataManager getReconOMMetadataManager() {
return reconOMMetadataManager;
}

protected void writeNSSummariesToDB(Map<Long, NSSummary> nsSummaryMap)
private void updateNSSummariesToDB(Map<Long, NSSummary> nsSummaryMap, Collection<Long> objectIdsToBeDeleted)
throws IOException {
try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
for (Map.Entry<Long, NSSummary> entry : nsSummaryMap.entrySet()) {
try {
reconNamespaceSummaryManager.batchStoreNSSummaries(rdbBatchOperation,
entry.getKey(), entry.getValue());
reconNamespaceSummaryManager.batchStoreNSSummaries(rdbBatchOperation, entry.getKey(), entry.getValue());
} catch (IOException e) {
LOG.error("Unable to write Namespace Summary data in Recon DB.",
e);
LOG.error("Unable to write Namespace Summary data in Recon DB.", e);
throw e;
}
}
for (Long objectId : objectIdsToBeDeleted) {
try {
reconNamespaceSummaryManager.batchDeleteNSSummaries(rdbBatchOperation, objectId);
} catch (IOException e) {
LOG.error("Unable to delete Namespace Summary data from Recon DB.", e);
throw e;
}
}
reconNamespaceSummaryManager.commitBatchOperation(rdbBatchOperation);
}
LOG.debug("Successfully updated Namespace Summary data in Recon DB.");
}

protected void handlePutKeyEvent(OmKeyInfo keyInfo, Map<Long,
Expand Down Expand Up @@ -190,7 +199,27 @@ protected void handleDeleteDirEvent(OmDirectoryInfo directoryInfo,

protected boolean flushAndCommitNSToDB(Map<Long, NSSummary> nsSummaryMap) {
try {
writeNSSummariesToDB(nsSummaryMap);
updateNSSummariesToDB(nsSummaryMap, Collections.emptyList());
} catch (IOException e) {
LOG.error("Unable to write Namespace Summary data in Recon DB.", e);
return false;
} finally {
nsSummaryMap.clear();
}
return true;
}

/**
* Flush and commit updated NSSummary to DB. This includes deleted objects of OM metadata also.
*
* @param nsSummaryMap Map of objectId to NSSummary
* @param objectIdsToBeDeleted list of objectids to be deleted
* @return true if successful, false otherwise
*/
protected boolean flushAndCommitUpdatedNSToDB(Map<Long, NSSummary> nsSummaryMap,
Collection<Long> objectIdsToBeDeleted) {
try {
updateNSSummariesToDB(nsSummaryMap, objectIdsToBeDeleted);
} catch (IOException e) {
LOG.error("Unable to write Namespace Summary data in Recon DB.", e);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.hadoop.ozone.recon.tasks;

import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_DIR_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DIRECTORY_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -60,9 +63,9 @@ public NSSummaryTaskWithFSO(ReconNamespaceSummaryManager
this.nsSummaryFlushToDBMaxThreshold = nsSummaryFlushToDBMaxThreshold;
}

// We only listen to updates from FSO-enabled KeyTable(FileTable) and DirTable
// We listen to updates from FSO-enabled FileTable, DirTable, DeletedTable and DeletedDirTable
public Collection<String> getTaskTables() {
return Arrays.asList(FILE_TABLE, DIRECTORY_TABLE);
return Arrays.asList(FILE_TABLE, DIRECTORY_TABLE, DELETED_DIR_TABLE);
}

public Pair<Integer, Boolean> processWithFSO(OMUpdateEventBatch events,
Expand All @@ -76,109 +79,148 @@ public Pair<Integer, Boolean> processWithFSO(OMUpdateEventBatch events,
final Collection<String> taskTables = getTaskTables();
Map<Long, NSSummary> nsSummaryMap = new HashMap<>();
int eventCounter = 0;

final Collection<Long> objectIdsToBeDeleted = Collections.synchronizedList(new ArrayList<>());
while (eventIterator.hasNext()) {
OMDBUpdateEvent<String, ? extends
WithParentObjectId> omdbUpdateEvent = eventIterator.next();
WithParentObjectId> omdbUpdateEvent = eventIterator.next();
OMDBUpdateEvent.OMDBUpdateAction action = omdbUpdateEvent.getAction();
eventCounter++;

// we only process updates on OM's FileTable and Dirtable
// we process updates on OM's FileTable, DirTable, DeletedTable and DeletedDirTable
String table = omdbUpdateEvent.getTable();
boolean updateOnFileTable = table.equals(FILE_TABLE);
if (!taskTables.contains(table)) {
continue;
}

String updatedKey = omdbUpdateEvent.getKey();

try {
if (updateOnFileTable) {
// key update on fileTable
OMDBUpdateEvent<String, OmKeyInfo> keyTableUpdateEvent =
(OMDBUpdateEvent<String, OmKeyInfo>) omdbUpdateEvent;
OmKeyInfo updatedKeyInfo = keyTableUpdateEvent.getValue();
OmKeyInfo oldKeyInfo = keyTableUpdateEvent.getOldValue();

switch (action) {
case PUT:
handlePutKeyEvent(updatedKeyInfo, nsSummaryMap);
break;

case DELETE:
handleDeleteKeyEvent(updatedKeyInfo, nsSummaryMap);
break;

case UPDATE:
if (oldKeyInfo != null) {
// delete first, then put
handleDeleteKeyEvent(oldKeyInfo, nsSummaryMap);
} else {
LOG.warn("Update event does not have the old keyInfo for {}.",
updatedKey);
}
handlePutKeyEvent(updatedKeyInfo, nsSummaryMap);
break;

default:
LOG.debug("Skipping DB update event : {}",
omdbUpdateEvent.getAction());
}
if (table.equals(FILE_TABLE)) {
handleUpdateOnFileTable(omdbUpdateEvent, action, nsSummaryMap);

} else if (table.equals(DELETED_DIR_TABLE)) {
// Hard delete from deletedDirectoryTable - cleanup memory leak for directories
handleUpdateOnDeletedDirTable(omdbUpdateEvent, action, nsSummaryMap, objectIdsToBeDeleted);
} else {
// directory update on DirTable
OMDBUpdateEvent<String, OmDirectoryInfo> dirTableUpdateEvent =
(OMDBUpdateEvent<String, OmDirectoryInfo>) omdbUpdateEvent;
OmDirectoryInfo updatedDirectoryInfo = dirTableUpdateEvent.getValue();
OmDirectoryInfo oldDirectoryInfo = dirTableUpdateEvent.getOldValue();

switch (action) {
case PUT:
handlePutDirEvent(updatedDirectoryInfo, nsSummaryMap);
break;

case DELETE:
handleDeleteDirEvent(updatedDirectoryInfo, nsSummaryMap);
break;

case UPDATE:
if (oldDirectoryInfo != null) {
// delete first, then put
handleDeleteDirEvent(oldDirectoryInfo, nsSummaryMap);
} else {
LOG.warn("Update event does not have the old dirInfo for {}.",
updatedKey);
}
handlePutDirEvent(updatedDirectoryInfo, nsSummaryMap);
break;

default:
LOG.debug("Skipping DB update event : {}",
omdbUpdateEvent.getAction());
}
handleUpdateOnDirTable(omdbUpdateEvent, action, nsSummaryMap);
}
} catch (IOException ioEx) {
LOG.error("Unable to process Namespace Summary data in Recon DB. ",
ioEx);
ioEx);
nsSummaryMap.clear();
return new ImmutablePair<>(seekPos, false);
}
if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
if (!flushAndCommitNSToDB(nsSummaryMap)) {
// Deleting hard deleted directories also along with this flush operation from NSSummary table
// Same list of objectIdsToBeDeleted is used for follow up flush operation as well and done intentionally
// to make sure that after final flush all objectIds are deleted from NSSummary table.
if (!flushAndCommitUpdatedNSToDB(nsSummaryMap, objectIdsToBeDeleted)) {
return new ImmutablePair<>(seekPos, false);
}
seekPos = eventCounter + 1;
}
}

// flush and commit left out entries at end
if (!flushAndCommitNSToDB(nsSummaryMap)) {
// flush and commit left out entries at end.
// Deleting hard deleted directories also along with this flush operation from NSSummary table
// Same list of objectIdsToBeDeleted is used this final flush operation as well and done intentionally
// to make sure that after final flush all objectIds are deleted from NSSummary table.
if (!flushAndCommitUpdatedNSToDB(nsSummaryMap, objectIdsToBeDeleted)) {
return new ImmutablePair<>(seekPos, false);
}

LOG.debug("Completed a process run of NSSummaryTaskWithFSO");
return new ImmutablePair<>(seekPos, true);
}

private void handleUpdateOnDirTable(OMDBUpdateEvent<String, ? extends WithParentObjectId> omdbUpdateEvent,
OMDBUpdateEvent.OMDBUpdateAction action, Map<Long, NSSummary> nsSummaryMap)
throws IOException {
OMDBUpdateEvent<String, OmDirectoryInfo> dirTableUpdateEvent =
(OMDBUpdateEvent<String, OmDirectoryInfo>) omdbUpdateEvent;
OmDirectoryInfo updatedDirectoryInfo = dirTableUpdateEvent.getValue();
OmDirectoryInfo oldDirectoryInfo = dirTableUpdateEvent.getOldValue();

switch (action) {
case PUT:
handlePutDirEvent(updatedDirectoryInfo, nsSummaryMap);
break;

case DELETE:
handleDeleteDirEvent(updatedDirectoryInfo, nsSummaryMap);
break;

case UPDATE:
if (oldDirectoryInfo != null) {
// delete first, then put
handleDeleteDirEvent(oldDirectoryInfo, nsSummaryMap);
} else {
LOG.warn("Update event does not have the old dirInfo for {}.", dirTableUpdateEvent.getKey());
}
handlePutDirEvent(updatedDirectoryInfo, nsSummaryMap);
break;

default:
LOG.debug("Skipping DB update event : {}",
omdbUpdateEvent.getAction());
}
}

private void handleUpdateOnDeletedDirTable(OMDBUpdateEvent<String, ? extends WithParentObjectId> omdbUpdateEvent,
OMDBUpdateEvent.OMDBUpdateAction action, Map<Long, NSSummary> nsSummaryMap,
Collection<Long> objectIdsToBeDeleted) {
OMDBUpdateEvent<String, OmKeyInfo> deletedDirTableUpdateEvent =
(OMDBUpdateEvent<String, OmKeyInfo>) omdbUpdateEvent;
OmKeyInfo deletedKeyInfo = deletedDirTableUpdateEvent.getValue();

switch (action) {
case DELETE:
// When entry is removed from deletedDirTable, remove from nsSummaryMap to prevent memory leak
if (deletedKeyInfo != null) {
long objectId = deletedKeyInfo.getObjectID();
nsSummaryMap.remove(objectId);
LOG.debug("Removed hard deleted directory with objectId {} from nsSummaryMap", objectId);
objectIdsToBeDeleted.add(objectId);
}
break;

default:
LOG.debug("Skipping DB update event on deletedDirTable: {}", action);
}
}

private void handleUpdateOnFileTable(OMDBUpdateEvent<String, ? extends WithParentObjectId> omdbUpdateEvent,
OMDBUpdateEvent.OMDBUpdateAction action, Map<Long, NSSummary> nsSummaryMap)
throws IOException {
// key update on fileTable
OMDBUpdateEvent<String, OmKeyInfo> keyTableUpdateEvent =
(OMDBUpdateEvent<String, OmKeyInfo>) omdbUpdateEvent;
OmKeyInfo updatedKeyInfo = keyTableUpdateEvent.getValue();
OmKeyInfo oldKeyInfo = keyTableUpdateEvent.getOldValue();

switch (action) {
case PUT:
handlePutKeyEvent(updatedKeyInfo, nsSummaryMap);
break;

case DELETE:
handleDeleteKeyEvent(updatedKeyInfo, nsSummaryMap);
break;

case UPDATE:
if (oldKeyInfo != null) {
// delete first, then put
handleDeleteKeyEvent(oldKeyInfo, nsSummaryMap);
} else {
LOG.warn("Update event does not have the old keyInfo for {}.", omdbUpdateEvent.getKey());
}
handlePutKeyEvent(updatedKeyInfo, nsSummaryMap);
break;

default:
LOG.debug("Skipping DB update event : {}",
omdbUpdateEvent.getAction());
}
}

public boolean reprocessWithFSO(OMMetadataManager omMetadataManager) {
Map<Long, NSSummary> nsSummaryMap = new HashMap<>();

Expand Down Expand Up @@ -225,9 +267,10 @@ public boolean reprocessWithFSO(OMMetadataManager omMetadataManager) {
}
// flush and commit left out keys at end
if (!flushAndCommitNSToDB(nsSummaryMap)) {
LOG.info("flushAndCommitNSToDB failed during reprocessWithFSO.");
return false;
}
LOG.debug("Completed a reprocess run of NSSummaryTaskWithFSO");
LOG.info("Completed a reprocess run of NSSummaryTaskWithFSO");
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,10 @@ void testProcessWithFSOFlushAfterThresholdAndFailureOfLastElement()
Mockito.when(mockIterator.hasNext()).thenReturn(true, true, true, true, false);
Mockito.when(mockIterator.next()).thenReturn(event1, event2, event3, event4);

// Mock the flushAndCommitNSToDB method to fail on the last flush
// Mock the flushAndCommitUpdatedNSToDB method to fail on the last flush
NSSummaryTaskWithFSO taskSpy = Mockito.spy(task);
Mockito.doReturn(true).doReturn(true).doReturn(false).when(taskSpy).flushAndCommitNSToDB(Mockito.anyMap());
Mockito.doReturn(true).doReturn(true).doReturn(false).when(taskSpy)
.flushAndCommitUpdatedNSToDB(Mockito.anyMap(), Mockito.anyCollection());

// Call the method under test
Pair<Integer, Boolean> result1 = taskSpy.processWithFSO(events, 0);
Expand All @@ -522,7 +523,7 @@ void testProcessWithFSOFlushAfterThresholdAndFailureOfLastElement()

// Verify interactions
Mockito.verify(mockIterator, Mockito.times(3)).next();
Mockito.verify(taskSpy, Mockito.times(1)).flushAndCommitNSToDB(Mockito.anyMap());
Mockito.verify(taskSpy, Mockito.times(1)).flushAndCommitUpdatedNSToDB(Mockito.anyMap(), Mockito.anyCollection());
}
}
}