From 672a21ef7d74b8580e350d064c4d8c1d28a6c58c Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Sun, 16 Mar 2025 22:48:21 +0530 Subject: [PATCH 1/3] HDDS-12615. Ozone Recon - Failure of any OM task during bootstrapping of Recon needs to be handled. --- .../impl/OzoneManagerServiceProviderImpl.java | 154 +++++++++++++++--- 1 file changed, 132 insertions(+), 22 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index 455dabf6ac04..cdf2e66d9358 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -51,6 +51,7 @@ import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -269,36 +270,145 @@ public void start() { .values() .forEach(ReconOmTask::init); - // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber number not matching with + // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber is greater than zero and greater than + // 'OmSnapshotRequest' task's lastUpdatedSeqNumber number and not matching with // lastUpdatedSeqNumber number for any of the OM task, then just run reprocess for such tasks. + + ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater = + taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmSnapshotRequest.name()); ReconTaskStatusUpdater deltaTaskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmDeltaRequest.name()); - Map reconOmTaskMap = reconTaskController.getRegisteredTasks() - .entrySet() - .stream() - .filter(entry -> { - String taskName = entry.getKey(); - ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName); - - return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) && // Condition 1 - !taskStatusUpdater.getLastUpdatedSeqNumber() - .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); // Condition 2 - }) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); // Collect into desired Map + Map reconOmTaskMap = + verifyAndGetReconOmTaskMapForReprocess(fullSnapshotTaskStatusUpdater, deltaTaskStatusUpdater); + reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap); + startSyncDataFromOM(initialDelay); + } + + /** + * This method validates the following cases for OM tasks to be reprocessed during Recon bootstrap process. + * + * Case 1: Normal bootstrap flow will take care of this scenario. + * full snapshot: DB not Updated + * - Om Snapshot number - 0 + * - Om Delta snapshot number - 0 + * - All Om Tasks snapshot number - 0 + * + * Case 2: This case will force Recon to run reprocess of only those OM tasks whose + * last updated sequence number is zero. + * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash + * - Om Snapshot number - 100000 + * - Om Delta snapshot number - 0 + * - Few Om Tasks snapshot number - 0, remaining Om tasks snapshot number - 100000 + * + * Case 3: This case will force Recon to run reprocess of all OM tasks. + * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash + * - Om Snapshot number - 100000 + * - Om Delta snapshot number - 0 + * - All Om Tasks snapshot number - 0 + * + * Case 4: This case will not force to reprocess any OM tasks and on restart of Recon, + * bootstrap normal flow will be okay. + * full snapshot: DB Updated, Tasks reprocessed, but before delta DB applied, Recon restarted or crash + * - Om Snapshot number - 100000 + * - Om Delta snapshot number - 0 + * - All Om Tasks snapshot number - 100000 + * + * Case 5: This case will force Recon to run reprocess of all OM tasks. + * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also applied, recon restarted or crash, + * but all delta tasks not processed. + * - Om Snapshot number - 100000 + * - Om Delta snapshot number - 100010 + * - All Om Tasks snapshot number - 100000 + * + * Case 6: This case will force Recon to run reprocess of only those OM tasks whose + * last updated sequence number is less than Om Delta snapshot number. + * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also applied, recon restarted or crash, + * but delta tasks not processed. + * - Om Snapshot number - 100000 + * - Om Delta snapshot number - 100010 + * - Few Om Tasks snapshot number - 100000 , Remaining Om Tasks snapshot number - 100010 + * + * @param fullSnapshotTaskStatusUpdater + * @param deltaTaskStatusUpdater + * @return reconOmTaskMap + */ + private Map verifyAndGetReconOmTaskMapForReprocess( + ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater, + ReconTaskStatusUpdater deltaTaskStatusUpdater) { + + Map reconOmTaskMap = new HashMap<>(); + + for (Map.Entry entry : reconTaskController.getRegisteredTasks().entrySet()) { + String taskName = entry.getKey(); + ReconOmTask task = entry.getValue(); + ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName); + + if (taskStatusUpdater == null) { + continue; // Handle null cases + } + + boolean isBootstrapTask = + shouldProcessTaskForBootstrap(fullSnapshotTaskStatusUpdater, deltaTaskStatusUpdater, taskName, + taskStatusUpdater); + boolean isDeltaTask = + shouldProcessTaskForDelta(fullSnapshotTaskStatusUpdater, deltaTaskStatusUpdater, taskName, taskStatusUpdater); + + if (isBootstrapTask || isDeltaTask) { + if (reconOmTaskMap.containsKey(taskName)) { + LOG.warn("Duplicate task detected in reconOmTaskMap: {}", taskName); + } + reconOmTaskMap.put(taskName, task); + } + } + if (!reconOmTaskMap.isEmpty()) { - LOG.info("Task name and last updated sequence number of tasks, that are not matching with " + - "the last updated sequence number of OmDeltaRequest task:\n"); + LOG.info("Tasks with mismatched last updated sequence numbers:"); LOG.info("{} -> {}", deltaTaskStatusUpdater.getTaskName(), deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); - reconOmTaskMap.keySet() - .forEach(taskName -> { - LOG.info("{} -> {}", taskName, - taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber()); - }); + reconOmTaskMap.forEach((taskName, task) -> { + long lastUpdatedSeqNum = taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber(); + LOG.info("{} -> {}", taskName, lastUpdatedSeqNum); + }); } - reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap); - startSyncDataFromOM(initialDelay); + + return reconOmTaskMap; + } + + /** + * Determines if a task should be processed under the "bootstrap" condition. + */ + private boolean shouldProcessTaskForBootstrap( + ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater, + ReconTaskStatusUpdater deltaTaskStatusUpdater, + String taskName, + ReconTaskStatusUpdater taskStatusUpdater) { + return fullSnapshotTaskStatusUpdater.getLastUpdatedSeqNumber() > 0 + && deltaTaskStatusUpdater.getLastUpdatedSeqNumber() == 0 + && !isOmSnapshotTask(taskName) + && taskStatusUpdater.getLastUpdatedSeqNumber() == 0; + } + + /** + * Determines if a task should be processed under the "delta" condition. + */ + private boolean shouldProcessTaskForDelta( + ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater, + ReconTaskStatusUpdater deltaTaskStatusUpdater, + String taskName, + ReconTaskStatusUpdater taskStatusUpdater) { + return deltaTaskStatusUpdater.getLastUpdatedSeqNumber() > 0 + && deltaTaskStatusUpdater.getLastUpdatedSeqNumber() > fullSnapshotTaskStatusUpdater.getLastUpdatedSeqNumber() + && !isOmSnapshotTask(taskName) + && !taskStatusUpdater.getLastUpdatedSeqNumber().equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); + } + + /** + * Checks if the given task is an OM full snapshot or delta snapshot task. + */ + private boolean isOmSnapshotTask(String taskName) { + return taskName.equals(OmSnapshotTaskName.OmSnapshotRequest.name()) + || taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()); } private void startSyncDataFromOM(long initialDelay) { From 661b7dd22d046e0c7583a2edb852f0ccbb396104 Mon Sep 17 00:00:00 2001 From: Devesh Singh Date: Tue, 18 Mar 2025 23:16:59 +0530 Subject: [PATCH 2/3] HDDS-12615. Fixing review comments. --- .../impl/OzoneManagerServiceProviderImpl.java | 214 +++++------------- 1 file changed, 58 insertions(+), 156 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index cdf2e66d9358..97ed5acd6c1a 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -51,7 +51,6 @@ import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -270,145 +269,38 @@ public void start() { .values() .forEach(ReconOmTask::init); - // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber is greater than zero and greater than - // 'OmSnapshotRequest' task's lastUpdatedSeqNumber number and not matching with + // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber number not matching with // lastUpdatedSeqNumber number for any of the OM task, then just run reprocess for such tasks. - - ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater = - taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmSnapshotRequest.name()); ReconTaskStatusUpdater deltaTaskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmDeltaRequest.name()); - Map reconOmTaskMap = - verifyAndGetReconOmTaskMapForReprocess(fullSnapshotTaskStatusUpdater, deltaTaskStatusUpdater); - reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap); - startSyncDataFromOM(initialDelay); - } - - /** - * This method validates the following cases for OM tasks to be reprocessed during Recon bootstrap process. - * - * Case 1: Normal bootstrap flow will take care of this scenario. - * full snapshot: DB not Updated - * - Om Snapshot number - 0 - * - Om Delta snapshot number - 0 - * - All Om Tasks snapshot number - 0 - * - * Case 2: This case will force Recon to run reprocess of only those OM tasks whose - * last updated sequence number is zero. - * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash - * - Om Snapshot number - 100000 - * - Om Delta snapshot number - 0 - * - Few Om Tasks snapshot number - 0, remaining Om tasks snapshot number - 100000 - * - * Case 3: This case will force Recon to run reprocess of all OM tasks. - * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash - * - Om Snapshot number - 100000 - * - Om Delta snapshot number - 0 - * - All Om Tasks snapshot number - 0 - * - * Case 4: This case will not force to reprocess any OM tasks and on restart of Recon, - * bootstrap normal flow will be okay. - * full snapshot: DB Updated, Tasks reprocessed, but before delta DB applied, Recon restarted or crash - * - Om Snapshot number - 100000 - * - Om Delta snapshot number - 0 - * - All Om Tasks snapshot number - 100000 - * - * Case 5: This case will force Recon to run reprocess of all OM tasks. - * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also applied, recon restarted or crash, - * but all delta tasks not processed. - * - Om Snapshot number - 100000 - * - Om Delta snapshot number - 100010 - * - All Om Tasks snapshot number - 100000 - * - * Case 6: This case will force Recon to run reprocess of only those OM tasks whose - * last updated sequence number is less than Om Delta snapshot number. - * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also applied, recon restarted or crash, - * but delta tasks not processed. - * - Om Snapshot number - 100000 - * - Om Delta snapshot number - 100010 - * - Few Om Tasks snapshot number - 100000 , Remaining Om Tasks snapshot number - 100010 - * - * @param fullSnapshotTaskStatusUpdater - * @param deltaTaskStatusUpdater - * @return reconOmTaskMap - */ - private Map verifyAndGetReconOmTaskMapForReprocess( - ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater, - ReconTaskStatusUpdater deltaTaskStatusUpdater) { - - Map reconOmTaskMap = new HashMap<>(); - - for (Map.Entry entry : reconTaskController.getRegisteredTasks().entrySet()) { - String taskName = entry.getKey(); - ReconOmTask task = entry.getValue(); - ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName); - - if (taskStatusUpdater == null) { - continue; // Handle null cases - } - - boolean isBootstrapTask = - shouldProcessTaskForBootstrap(fullSnapshotTaskStatusUpdater, deltaTaskStatusUpdater, taskName, - taskStatusUpdater); - boolean isDeltaTask = - shouldProcessTaskForDelta(fullSnapshotTaskStatusUpdater, deltaTaskStatusUpdater, taskName, taskStatusUpdater); - - if (isBootstrapTask || isDeltaTask) { - if (reconOmTaskMap.containsKey(taskName)) { - LOG.warn("Duplicate task detected in reconOmTaskMap: {}", taskName); - } - reconOmTaskMap.put(taskName, task); - } - } - + Map reconOmTaskMap = reconTaskController.getRegisteredTasks() + .entrySet() + .stream() + .filter(entry -> { + String taskName = entry.getKey(); + ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName); + + return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) // Condition 1 + && !taskName.equals(OmSnapshotTaskName.OmSnapshotRequest.name()) // Condition 2 + && + taskStatusUpdater.getLastUpdatedSeqNumber().compareTo( + deltaTaskStatusUpdater.getLastUpdatedSeqNumber()) < 0; // Condition 3 + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); // Collect into desired Map if (!reconOmTaskMap.isEmpty()) { - LOG.info("Tasks with mismatched last updated sequence numbers:"); + LOG.info("Task name and last updated sequence number of tasks, that are not matching with " + + "the last updated sequence number of OmDeltaRequest task:\n"); LOG.info("{} -> {}", deltaTaskStatusUpdater.getTaskName(), deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); + reconOmTaskMap.keySet() + .forEach(taskName -> { + LOG.info("{} -> {}", taskName, + taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber()); - reconOmTaskMap.forEach((taskName, task) -> { - long lastUpdatedSeqNum = taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber(); - LOG.info("{} -> {}", taskName, lastUpdatedSeqNum); - }); + }); } - - return reconOmTaskMap; - } - - /** - * Determines if a task should be processed under the "bootstrap" condition. - */ - private boolean shouldProcessTaskForBootstrap( - ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater, - ReconTaskStatusUpdater deltaTaskStatusUpdater, - String taskName, - ReconTaskStatusUpdater taskStatusUpdater) { - return fullSnapshotTaskStatusUpdater.getLastUpdatedSeqNumber() > 0 - && deltaTaskStatusUpdater.getLastUpdatedSeqNumber() == 0 - && !isOmSnapshotTask(taskName) - && taskStatusUpdater.getLastUpdatedSeqNumber() == 0; - } - - /** - * Determines if a task should be processed under the "delta" condition. - */ - private boolean shouldProcessTaskForDelta( - ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater, - ReconTaskStatusUpdater deltaTaskStatusUpdater, - String taskName, - ReconTaskStatusUpdater taskStatusUpdater) { - return deltaTaskStatusUpdater.getLastUpdatedSeqNumber() > 0 - && deltaTaskStatusUpdater.getLastUpdatedSeqNumber() > fullSnapshotTaskStatusUpdater.getLastUpdatedSeqNumber() - && !isOmSnapshotTask(taskName) - && !taskStatusUpdater.getLastUpdatedSeqNumber().equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); - } - - /** - * Checks if the given task is an OM full snapshot or delta snapshot task. - */ - private boolean isOmSnapshotTask(String taskName) { - return taskName.equals(OmSnapshotTaskName.OmSnapshotRequest.name()) - || taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()); + reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap); + startSyncDataFromOM(initialDelay); } private void startSyncDataFromOM(long initialDelay) { @@ -679,7 +571,10 @@ fromSequenceNumber, getCurrentOMDBSequenceNumber(), numUpdates, */ @VisibleForTesting public boolean syncDataFromOM() { - ReconTaskStatusUpdater reconTaskUpdater; + ReconTaskStatusUpdater fullSnapshotReconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater( + OmSnapshotTaskName.OmSnapshotRequest.name()); + ReconTaskStatusUpdater deltaReconTaskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater( + OmSnapshotTaskName.OmDeltaRequest.name()); if (isSyncDataFromOMRunning.compareAndSet(false, true)) { try { long currentSequenceNumber = getCurrentOMDBSequenceNumber(); @@ -689,11 +584,8 @@ public boolean syncDataFromOM() { if (currentSequenceNumber <= 0) { fullSnapshot = true; } else { - reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater( - OmSnapshotTaskName.OmDeltaRequest.name()); - // Get updates from OM and apply to local Recon OM DB and update task status in table - reconTaskUpdater.recordRunStart(); + deltaReconTaskStatusUpdater.recordRunStart(); int loopCount = 0; long fromSequenceNumber = currentSequenceNumber; long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1; @@ -715,9 +607,15 @@ public boolean syncDataFromOM() { } diffBetweenOMDbAndReconDBSeqNumber = getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, omdbUpdatesHandler); - reconTaskUpdater.setLastTaskRunStatus(0); - reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber()); - reconTaskUpdater.recordRunCompletion(); + deltaReconTaskStatusUpdater.setLastTaskRunStatus(0); + // Keeping last updated sequence number for both full and delta tasks to be same + // because sequence number of DB denotes and points to same OM DB copy of Recon, + // even though two different tasks are updating the DB at different conditions, but + // it tells the sync state with actual OM DB for the same Recon OM DB copy. + deltaReconTaskStatusUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber()); + fullSnapshotReconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber()); + deltaReconTaskStatusUpdater.recordRunCompletion(); + fullSnapshotReconTaskUpdater.updateDetails(); // Pass on DB update events to tasks that are listening. reconTaskController.consumeOMEvents(new OMUpdateEventBatch( omdbUpdatesHandler.getEvents(), omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager); @@ -728,16 +626,16 @@ public boolean syncDataFromOM() { LOG.error("OM DB Delta update sync thread was interrupted and delta sync failed."); // We are updating the table even if it didn't run i.e. got interrupted beforehand // to indicate that a task was supposed to run, but it didn't. - reconTaskUpdater.setLastTaskRunStatus(-1); - reconTaskUpdater.recordRunCompletion(); + deltaReconTaskStatusUpdater.setLastTaskRunStatus(-1); + deltaReconTaskStatusUpdater.recordRunCompletion(); Thread.currentThread().interrupt(); // Since thread is interrupted, we do not fall back to snapshot sync. // Return with sync failed status. return false; } catch (Exception e) { metrics.incrNumDeltaRequestsFailed(); - reconTaskUpdater.setLastTaskRunStatus(-1); - reconTaskUpdater.recordRunCompletion(); + deltaReconTaskStatusUpdater.setLastTaskRunStatus(-1); + deltaReconTaskStatusUpdater.recordRunCompletion(); LOG.warn("Unable to get and apply delta updates from OM: {}, falling back to full snapshot", e.getMessage()); fullSnapshot = true; @@ -751,8 +649,6 @@ public boolean syncDataFromOM() { } if (fullSnapshot) { - reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater( - OmSnapshotTaskName.OmSnapshotRequest.name()); try { metrics.incrNumSnapshotRequests(); LOG.info("Obtaining full snapshot from Ozone Manager"); @@ -764,13 +660,19 @@ public boolean syncDataFromOM() { } // Update local Recon OM DB to new snapshot. - reconTaskUpdater.recordRunStart(); + fullSnapshotReconTaskUpdater.recordRunStart(); boolean success = updateReconOmDBWithNewSnapshot(); // Update timestamp of successful delta updates query. if (success) { - reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber()); - reconTaskUpdater.setLastTaskRunStatus(0); - reconTaskUpdater.recordRunCompletion(); + // Keeping last updated sequence number for both full and delta tasks to be same + // because sequence number of DB denotes and points to same OM DB copy of Recon, + // even though two different tasks are updating the DB at different conditions, but + // it tells the sync state with actual OM DB for the same Recon OM DB copy. + fullSnapshotReconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber()); + deltaReconTaskStatusUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber()); + fullSnapshotReconTaskUpdater.setLastTaskRunStatus(0); + fullSnapshotReconTaskUpdater.recordRunCompletion(); + deltaReconTaskStatusUpdater.updateDetails(); // Reinitialize tasks that are listening. LOG.info("Calling reprocess on Recon tasks."); @@ -781,23 +683,23 @@ public boolean syncDataFromOM() { reconContext.getErrors().remove(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED); } else { metrics.incrNumSnapshotRequestsFailed(); - reconTaskUpdater.setLastTaskRunStatus(-1); - reconTaskUpdater.recordRunCompletion(); + fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1); + fullSnapshotReconTaskUpdater.recordRunCompletion(); // Update health status in ReconContext reconContext.updateHealthStatus(new AtomicBoolean(false)); reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED); } } catch (InterruptedException intEx) { LOG.error("OM DB Snapshot update sync thread was interrupted."); - reconTaskUpdater.setLastTaskRunStatus(-1); - reconTaskUpdater.recordRunCompletion(); + fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1); + fullSnapshotReconTaskUpdater.recordRunCompletion(); Thread.currentThread().interrupt(); // Mark sync status as failed. return false; } catch (Exception e) { metrics.incrNumSnapshotRequestsFailed(); - reconTaskUpdater.setLastTaskRunStatus(-1); - reconTaskUpdater.recordRunCompletion(); + fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1); + fullSnapshotReconTaskUpdater.recordRunCompletion(); LOG.error("Unable to update Recon's metadata with new OM DB. ", e); // Update health status in ReconContext reconContext.updateHealthStatus(new AtomicBoolean(false)); From eb720893cbc26d6c48936d9970ab302580d4ac2c Mon Sep 17 00:00:00 2001 From: Devesh Singh Date: Wed, 19 Mar 2025 09:33:31 +0530 Subject: [PATCH 3/3] HDDS-12615. Fixing failed testcases. --- .../TestOzoneManagerServiceProviderImpl.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java index 6771f727f8c4..6eef1d4c1e46 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java @@ -50,6 +50,7 @@ import java.net.HttpURLConnection; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; @@ -491,8 +492,10 @@ public void testSyncDataFromOMFullSnapshot( ozoneManagerServiceProvider.syncDataFromOM(); ArgumentCaptor taskNameCaptor = ArgumentCaptor.forClass(String.class); - verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(taskNameCaptor.capture()); - assertEquals(OmSnapshotRequest.name(), taskNameCaptor.getValue()); + verify(reconTaskStatusUpdaterManager, times(2)).getTaskStatusUpdater(taskNameCaptor.capture()); + List capturedValues = taskNameCaptor.getAllValues(); + assertTrue(capturedValues.contains(OmSnapshotRequest.name())); + assertTrue(capturedValues.contains(OmDeltaRequest.name())); verify(reconTaskControllerMock, times(1)) .reInitializeTasks(omMetadataManager, null); assertEquals(1, metrics.getNumSnapshotRequests()); @@ -524,8 +527,10 @@ public void testSyncDataFromOMDeltaUpdates( ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); - verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(captor.capture()); - assertEquals(OmDeltaRequest.name(), captor.getValue()); + verify(reconTaskStatusUpdaterManager, times(2)).getTaskStatusUpdater(captor.capture()); + List capturedValues = captor.getAllValues(); + assertTrue(capturedValues.contains(OmSnapshotRequest.name())); + assertTrue(capturedValues.contains(OmDeltaRequest.name())); verify(reconTaskControllerMock, times(1)) .consumeOMEvents(any(OMUpdateEventBatch.class), @@ -559,8 +564,10 @@ public void testSyncDataFromOMFullSnapshotForSNNFE( ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); - verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(captor.capture()); - assertEquals(OmSnapshotRequest.name(), captor.getValue()); + verify(reconTaskStatusUpdaterManager, times(2)).getTaskStatusUpdater(captor.capture()); + List capturedValues = captor.getAllValues(); + assertTrue(capturedValues.contains(OmSnapshotRequest.name())); + assertTrue(capturedValues.contains(OmDeltaRequest.name())); verify(reconTaskControllerMock, times(1)) .reInitializeTasks(omMetadataManager, null); assertEquals(1, metrics.getNumSnapshotRequests());