Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,11 @@ public void start() {
String taskName = entry.getKey();
ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName);

return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) && // Condition 1
!taskStatusUpdater.getLastUpdatedSeqNumber()
.equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); // Condition 2
return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) // Condition 1
&& !taskName.equals(OmSnapshotTaskName.OmSnapshotRequest.name()) // Condition 2
&&
taskStatusUpdater.getLastUpdatedSeqNumber().compareTo(
deltaTaskStatusUpdater.getLastUpdatedSeqNumber()) < 0; // Condition 3
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); // Collect into desired Map
if (!reconOmTaskMap.isEmpty()) {
Expand Down Expand Up @@ -569,7 +571,10 @@ fromSequenceNumber, getCurrentOMDBSequenceNumber(), numUpdates,
*/
@VisibleForTesting
public boolean syncDataFromOM() {
ReconTaskStatusUpdater reconTaskUpdater;
ReconTaskStatusUpdater fullSnapshotReconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(
OmSnapshotTaskName.OmSnapshotRequest.name());
ReconTaskStatusUpdater deltaReconTaskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(
OmSnapshotTaskName.OmDeltaRequest.name());
if (isSyncDataFromOMRunning.compareAndSet(false, true)) {
try {
long currentSequenceNumber = getCurrentOMDBSequenceNumber();
Expand All @@ -579,11 +584,8 @@ public boolean syncDataFromOM() {
if (currentSequenceNumber <= 0) {
fullSnapshot = true;
} else {
reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(
OmSnapshotTaskName.OmDeltaRequest.name());

// Get updates from OM and apply to local Recon OM DB and update task status in table
reconTaskUpdater.recordRunStart();
deltaReconTaskStatusUpdater.recordRunStart();
int loopCount = 0;
long fromSequenceNumber = currentSequenceNumber;
long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1;
Expand All @@ -605,9 +607,15 @@ public boolean syncDataFromOM() {
}
diffBetweenOMDbAndReconDBSeqNumber =
getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, omdbUpdatesHandler);
reconTaskUpdater.setLastTaskRunStatus(0);
reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
reconTaskUpdater.recordRunCompletion();
deltaReconTaskStatusUpdater.setLastTaskRunStatus(0);
// Keeping last updated sequence number for both full and delta tasks to be same
// because sequence number of DB denotes and points to same OM DB copy of Recon,
// even though two different tasks are updating the DB at different conditions, but
// it tells the sync state with actual OM DB for the same Recon OM DB copy.
deltaReconTaskStatusUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
fullSnapshotReconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
deltaReconTaskStatusUpdater.recordRunCompletion();
fullSnapshotReconTaskUpdater.updateDetails();
// Pass on DB update events to tasks that are listening.
reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
omdbUpdatesHandler.getEvents(), omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager);
Expand All @@ -618,16 +626,16 @@ public boolean syncDataFromOM() {
LOG.error("OM DB Delta update sync thread was interrupted and delta sync failed.");
// We are updating the table even if it didn't run i.e. got interrupted beforehand
// to indicate that a task was supposed to run, but it didn't.
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
deltaReconTaskStatusUpdater.setLastTaskRunStatus(-1);
deltaReconTaskStatusUpdater.recordRunCompletion();
Thread.currentThread().interrupt();
// Since thread is interrupted, we do not fall back to snapshot sync.
// Return with sync failed status.
return false;
} catch (Exception e) {
metrics.incrNumDeltaRequestsFailed();
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
deltaReconTaskStatusUpdater.setLastTaskRunStatus(-1);
deltaReconTaskStatusUpdater.recordRunCompletion();
LOG.warn("Unable to get and apply delta updates from OM: {}, falling back to full snapshot",
e.getMessage());
fullSnapshot = true;
Expand All @@ -641,8 +649,6 @@ public boolean syncDataFromOM() {
}

if (fullSnapshot) {
reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(
OmSnapshotTaskName.OmSnapshotRequest.name());
try {
metrics.incrNumSnapshotRequests();
LOG.info("Obtaining full snapshot from Ozone Manager");
Expand All @@ -654,13 +660,19 @@ public boolean syncDataFromOM() {
}

// Update local Recon OM DB to new snapshot.
reconTaskUpdater.recordRunStart();
fullSnapshotReconTaskUpdater.recordRunStart();
boolean success = updateReconOmDBWithNewSnapshot();
// Update timestamp of successful delta updates query.
if (success) {
reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
reconTaskUpdater.setLastTaskRunStatus(0);
reconTaskUpdater.recordRunCompletion();
// Keeping last updated sequence number for both full and delta tasks to be same
// because sequence number of DB denotes and points to same OM DB copy of Recon,
// even though two different tasks are updating the DB at different conditions, but
// it tells the sync state with actual OM DB for the same Recon OM DB copy.
fullSnapshotReconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
deltaReconTaskStatusUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
fullSnapshotReconTaskUpdater.setLastTaskRunStatus(0);
fullSnapshotReconTaskUpdater.recordRunCompletion();
deltaReconTaskStatusUpdater.updateDetails();

// Reinitialize tasks that are listening.
LOG.info("Calling reprocess on Recon tasks.");
Expand All @@ -671,23 +683,23 @@ public boolean syncDataFromOM() {
reconContext.getErrors().remove(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
} else {
metrics.incrNumSnapshotRequestsFailed();
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
fullSnapshotReconTaskUpdater.recordRunCompletion();
// Update health status in ReconContext
reconContext.updateHealthStatus(new AtomicBoolean(false));
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
}
} catch (InterruptedException intEx) {
LOG.error("OM DB Snapshot update sync thread was interrupted.");
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
fullSnapshotReconTaskUpdater.recordRunCompletion();
Thread.currentThread().interrupt();
// Mark sync status as failed.
return false;
} catch (Exception e) {
metrics.incrNumSnapshotRequestsFailed();
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
fullSnapshotReconTaskUpdater.recordRunCompletion();
LOG.error("Unable to update Recon's metadata with new OM DB. ", e);
// Update health status in ReconContext
reconContext.updateHealthStatus(new AtomicBoolean(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.net.HttpURLConnection;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
Expand Down Expand Up @@ -491,8 +492,10 @@ public void testSyncDataFromOMFullSnapshot(
ozoneManagerServiceProvider.syncDataFromOM();

ArgumentCaptor<String> taskNameCaptor = ArgumentCaptor.forClass(String.class);
verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(taskNameCaptor.capture());
assertEquals(OmSnapshotRequest.name(), taskNameCaptor.getValue());
verify(reconTaskStatusUpdaterManager, times(2)).getTaskStatusUpdater(taskNameCaptor.capture());
List<String> capturedValues = taskNameCaptor.getAllValues();
assertTrue(capturedValues.contains(OmSnapshotRequest.name()));
assertTrue(capturedValues.contains(OmDeltaRequest.name()));
verify(reconTaskControllerMock, times(1))
.reInitializeTasks(omMetadataManager, null);
assertEquals(1, metrics.getNumSnapshotRequests());
Expand Down Expand Up @@ -524,8 +527,10 @@ public void testSyncDataFromOMDeltaUpdates(

ArgumentCaptor<String> captor =
ArgumentCaptor.forClass(String.class);
verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(captor.capture());
assertEquals(OmDeltaRequest.name(), captor.getValue());
verify(reconTaskStatusUpdaterManager, times(2)).getTaskStatusUpdater(captor.capture());
List<String> capturedValues = captor.getAllValues();
assertTrue(capturedValues.contains(OmSnapshotRequest.name()));
assertTrue(capturedValues.contains(OmDeltaRequest.name()));

verify(reconTaskControllerMock, times(1))
.consumeOMEvents(any(OMUpdateEventBatch.class),
Expand Down Expand Up @@ -559,8 +564,10 @@ public void testSyncDataFromOMFullSnapshotForSNNFE(

ArgumentCaptor<String> captor =
ArgumentCaptor.forClass(String.class);
verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(captor.capture());
assertEquals(OmSnapshotRequest.name(), captor.getValue());
verify(reconTaskStatusUpdaterManager, times(2)).getTaskStatusUpdater(captor.capture());
List<String> capturedValues = captor.getAllValues();
assertTrue(capturedValues.contains(OmSnapshotRequest.name()));
assertTrue(capturedValues.contains(OmDeltaRequest.name()));
verify(reconTaskControllerMock, times(1))
.reInitializeTasks(omMetadataManager, null);
assertEquals(1, metrics.getNumSnapshotRequests());
Expand Down