diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 4693392a217e..f47f2ab440c7 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3358,7 +3358,7 @@
5s
OZONE, RECON, OM
- Interval in MINUTES by Recon to request OM DB Snapshot.
+ Interval in SECONDS by Recon to request OM DB Snapshot.
@@ -3391,12 +3391,13 @@
- recon.om.delta.update.loop.limit
- 50
+ recon.om.delta.update.lag.threshold
+ 0
OZONE, RECON
- The sync between Recon and OM consists of several small
- fetch loops.
+ At every Recon OM sync, recon starts fetching OM DB updates, and it continues to
+ fetch from OM till the lag, between OM DB WAL sequence number and Recon OM DB
+ snapshot WAL sequence number, is less than this lag threshold value.
diff --git a/hadoop-ozone/dist/src/main/smoketest/recon/recon-taskstatus.robot b/hadoop-ozone/dist/src/main/smoketest/recon/recon-taskstatus.robot
index 3b5b011523ee..f04a3ce741ed 100644
--- a/hadoop-ozone/dist/src/main/smoketest/recon/recon-taskstatus.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/recon/recon-taskstatus.robot
@@ -105,6 +105,8 @@ Validate All Tasks Updated After Sync
Validate Sequence number is updated after sync
Log To Console Triggering OM DB sync for updates
+ Log To Console Wait for few seconds to let previous OM DB Sync thread to finish
+ Sleep 2s # Waits for 2 seconds
Sync OM Data
${tasks} = Fetch Task Status
Should Not Be Empty ${tasks}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java
index daf506bc7c32..0d1e072ad938 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java
@@ -25,7 +25,6 @@
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest;
import static org.assertj.core.api.Assertions.assertThat;
@@ -107,8 +106,7 @@ public static void init() throws Exception {
OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT),
TimeUnit.MILLISECONDS
);
- conf.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 2);
- conf.setLong(RECON_OM_DELTA_UPDATE_LOOP_LIMIT, 10);
+ conf.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 10);
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(socketTimeout)
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index 02060c03ef8a..72efe1ffea52 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -99,9 +99,10 @@ public final class ReconServerConfigKeys {
public static final String RECON_OM_DELTA_UPDATE_LIMIT =
"recon.om.delta.update.limit";
public static final long RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT = 50000;
- public static final String RECON_OM_DELTA_UPDATE_LOOP_LIMIT =
- "recon.om.delta.update.loop.limit";
- public static final int RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFAULT = 50;
+
+ public static final String RECON_OM_DELTA_UPDATE_LAG_THRESHOLD =
+ "recon.om.delta.update.lag.threshold";
+ public static final long RECON_OM_DELTA_UPDATE_LAG_THRESHOLD_DEFAULT = 0;
public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY =
"ozone.recon.task.thread.count";
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index 5a49e55b06bd..72ce5100194b 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -40,6 +40,7 @@
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdds.recon.ReconConfigKeys;
import org.apache.hadoop.hdds.utils.db.RocksDatabase;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -92,10 +93,10 @@
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LAG_THRESHOLD;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LAG_THRESHOLD_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconUtils.convertNumericToSymbolic;
import static org.apache.ratis.proto.RaftProtos.RaftPeerRole.LEADER;
@@ -127,8 +128,8 @@ public class OzoneManagerServiceProviderImpl
private ReconUtils reconUtils;
private OzoneManagerSyncMetrics metrics;
- private long deltaUpdateLimit;
- private int deltaUpdateLoopLimit;
+ private final long deltaUpdateLimit;
+ private final long omDBLagThreshold;
private AtomicBoolean isSyncDataFromOMRunning;
private final String threadNamePrefix;
@@ -181,9 +182,6 @@ public OzoneManagerServiceProviderImpl(
long deltaUpdateLimits = configuration.getLong(RECON_OM_DELTA_UPDATE_LIMIT,
RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT);
- int deltaUpdateLoopLimits = configuration.getInt(
- RECON_OM_DELTA_UPDATE_LOOP_LIMIT,
- RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFAULT);
omSnapshotDBParentDir = reconUtils.getReconDbDir(configuration,
OZONE_RECON_OM_SNAPSHOT_DB_DIR);
@@ -218,7 +216,6 @@ public OzoneManagerServiceProviderImpl(
this.configuration = configuration;
this.metrics = OzoneManagerSyncMetrics.create();
this.deltaUpdateLimit = deltaUpdateLimits;
- this.deltaUpdateLoopLimit = deltaUpdateLoopLimits;
this.isSyncDataFromOMRunning = new AtomicBoolean();
this.threadNamePrefix =
reconUtils.getReconNodeDetails(configuration).threadNamePrefix();
@@ -227,6 +224,8 @@ public OzoneManagerServiceProviderImpl(
.build();
this.reconContext = reconContext;
this.taskStatusUpdaterManager = taskStatusUpdaterManager;
+ this.omDBLagThreshold = configuration.getLong(RECON_OM_DELTA_UPDATE_LAG_THRESHOLD,
+ RECON_OM_DELTA_UPDATE_LAG_THRESHOLD_DEFAULT);
}
@Override
@@ -256,7 +255,7 @@ public void start() {
LOG.error("Failed fetching a full snapshot from Ozone Manager");
}
} catch (IOException e) {
- LOG.error("Unexpected IOException occurred while trying to fetch a full snapshot: {}", e);
+ LOG.error("Unexpected IOException occurred while trying to fetch a full snapshot", e);
throw new RuntimeException(runtimeException);
}
}
@@ -311,7 +310,7 @@ public boolean triggerSyncDataFromOMImmediately() {
startSyncDataFromOM(0L);
return true;
} else {
- LOG.debug("OM DB sync is already running.");
+ LOG.info("OM DB sync is already running when trying to trigger OM DB sync manually.");
}
return false;
}
@@ -444,54 +443,46 @@ boolean updateReconOmDBWithNewSnapshot() throws IOException {
/**
* Get Delta updates from OM through RPC call and apply to local OM DB as
* well as accumulate in a buffer.
+ *
* @param fromSequenceNumber from sequence number to request from.
* @param omdbUpdatesHandler OM DB updates handler to buffer updates.
- * @throws IOException when OM RPC request fails.
+ * @return lag count which tells how much Recon OM DB snapshot is lagging from OM DB.
+ * @throws IOException when OM RPC request fails.
* @throws RocksDBException when writing to RocksDB fails.
*/
@VisibleForTesting
- void getAndApplyDeltaUpdatesFromOM(
+ Long getAndApplyDeltaUpdatesFromOM(
long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
- int loopCount = 0;
LOG.info("OriginalFromSequenceNumber : {} ", fromSequenceNumber);
- long deltaUpdateCnt = Long.MAX_VALUE;
- long inLoopStartSequenceNumber = fromSequenceNumber;
- long inLoopLatestSequenceNumber;
- while (loopCount < deltaUpdateLoopLimit &&
- deltaUpdateCnt >= deltaUpdateLimit) {
- if (!innerGetAndApplyDeltaUpdatesFromOM(
- inLoopStartSequenceNumber, omdbUpdatesHandler)) {
- LOG.error(
- "Retrieve OM DB delta update failed for sequence number : {}, " +
- "so falling back to full snapshot.", inLoopStartSequenceNumber);
- throw new RocksDBException(
- "Unable to get delta updates since sequenceNumber - " +
- inLoopStartSequenceNumber);
- }
- inLoopLatestSequenceNumber = getCurrentOMDBSequenceNumber();
- deltaUpdateCnt = inLoopLatestSequenceNumber - inLoopStartSequenceNumber;
- inLoopStartSequenceNumber = inLoopLatestSequenceNumber;
- loopCount++;
+ ImmutablePair dbUpdatesLatestSeqNumOfOMDB =
+ innerGetAndApplyDeltaUpdatesFromOM(fromSequenceNumber, omdbUpdatesHandler);
+ if (!dbUpdatesLatestSeqNumOfOMDB.getLeft()) {
+ LOG.error(
+ "Retrieve OM DB delta update failed for sequence number : {}, " +
+ "so falling back to full snapshot.", fromSequenceNumber);
+ throw new RocksDBException(
+ "Unable to get delta updates since sequenceNumber - " +
+ fromSequenceNumber);
}
-
omdbUpdatesHandler.setLatestSequenceNumber(getCurrentOMDBSequenceNumber());
- LOG.info("Delta updates received from OM : {} loops, {} records", loopCount,
- getCurrentOMDBSequenceNumber() - fromSequenceNumber
- );
+ LOG.info("Delta updates received from OM : {} records", getCurrentOMDBSequenceNumber() - fromSequenceNumber);
+ return dbUpdatesLatestSeqNumOfOMDB.getRight();
}
/**
* Get Delta updates from OM through RPC call and apply to local OM DB as
* well as accumulate in a buffer.
+ *
* @param fromSequenceNumber from sequence number to request from.
* @param omdbUpdatesHandler OM DB updates handler to buffer updates.
- * @throws IOException when OM RPC request fails.
+ * @return Pair of dbUpdatesSuccess, lag (lag between OM and Recom)
+ * @throws IOException when OM RPC request fails.
* @throws RocksDBException when writing to RocksDB fails.
*/
@VisibleForTesting
- boolean innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
- OMDBUpdatesHandler omdbUpdatesHandler)
+ ImmutablePair innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
+ OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
.setSequenceNumber(fromSequenceNumber)
@@ -528,7 +519,7 @@ boolean innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
"isDBUpdateSuccess: {}", numUpdates, getCurrentOMDBSequenceNumber()
- fromSequenceNumber, lag,
null != dbUpdates && dbUpdates.isDBUpdateSuccess());
- return null != dbUpdates && dbUpdates.isDBUpdateSuccess();
+ return new ImmutablePair<>(null != dbUpdates && dbUpdates.isDBUpdateSuccess(), lag);
}
/**
@@ -572,15 +563,31 @@ public boolean syncDataFromOM() {
// Get updates from OM and apply to local Recon OM DB and update task status in table
reconTaskUpdater.recordRunStart();
- getAndApplyDeltaUpdatesFromOM(currentSequenceNumber,
- omdbUpdatesHandler);
-
- reconTaskUpdater.setLastTaskRunStatus(0);
- reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
- reconTaskUpdater.recordRunCompletion();
- // Pass on DB update events to tasks that are listening.
- reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
- omdbUpdatesHandler.getEvents(), omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager);
+ int loopCount = 0;
+ long fromSequenceNumber = currentSequenceNumber;
+ long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1;
+ /**
+ * This loop will continue to fetch and apply OM DB updates and with every
+ * OM DB fetch request, it will fetch {@code deltaUpdateLimit} count of DB updates.
+ * It continues to fetch from OM till the lag, between OM DB WAL sequence number
+ * and Recon OM DB snapshot WAL sequence number, is less than this lag threshold value.
+ * In high OM write TPS cluster, this simulates continuous pull from OM without any delay.
+ */
+ while (diffBetweenOMDbAndReconDBSeqNumber > omDBLagThreshold) {
+ diffBetweenOMDbAndReconDBSeqNumber =
+ getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, omdbUpdatesHandler);
+ reconTaskUpdater.setLastTaskRunStatus(0);
+ reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
+ reconTaskUpdater.recordRunCompletion();
+ // Pass on DB update events to tasks that are listening.
+ reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
+ omdbUpdatesHandler.getEvents(), omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager);
+ currentSequenceNumber = getCurrentOMDBSequenceNumber();
+ LOG.debug("Updated current sequence number: {}", currentSequenceNumber);
+ loopCount++;
+ }
+ LOG.info("Delta updates received from OM : {} loops, {} records", loopCount,
+ getCurrentOMDBSequenceNumber() - fromSequenceNumber);
} catch (InterruptedException intEx) {
LOG.error("OM DB Delta update sync thread was interrupted.");
// We are updating the table even if it didn't run i.e. got interrupted beforehand
@@ -594,8 +601,7 @@ public boolean syncDataFromOM() {
metrics.incrNumDeltaRequestsFailed();
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
- LOG.warn("Unable to get and apply delta updates from OM.",
- e.getMessage());
+ LOG.warn("Unable to get and apply delta updates from OM: {}", e.getMessage());
fullSnapshot = true;
}
}
@@ -659,7 +665,7 @@ public boolean syncDataFromOM() {
isSyncDataFromOMRunning.set(false);
}
} else {
- LOG.info("OM DB sync is already running.");
+ LOG.info("OM DB sync is already running in syncDataFromOM.");
return false;
}
return true;
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
index 0d5050a934ef..5731056e67c2 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
@@ -25,8 +25,8 @@
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDataToOm;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LAG_THRESHOLD;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconUtils.createTarFile;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest;
@@ -71,7 +71,6 @@
import org.apache.hadoop.ozone.recon.common.CommonUtils;
import org.apache.hadoop.ozone.recon.metrics.OzoneManagerSyncMetrics;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
-import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler;
import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
@@ -363,17 +362,19 @@ public void testGetAndApplyDeltaUpdatesFromOM(
OMMetadataManager omMetadataManager =
initializeNewOmMetadataManager(dirOmMetadata);
+ OzoneConfiguration withLimitConfiguration =
+ new OzoneConfiguration(configuration);
+ withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 10);
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(configuration,
getTestReconOmMetadataManager(omMetadataManager, dirReconMetadata),
getMockTaskController(), new ReconUtils(), getMockOzoneManagerClient(dbUpdatesWrapper),
reconContext, getMockTaskStatusUpdaterManager());
- OMDBUpdatesHandler updatesHandler =
- new OMDBUpdatesHandler(omMetadataManager);
- ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
- 0L, updatesHandler);
+ long currentReconDBSequenceNumber = ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber();
+ dbUpdatesWrapper.setLatestSequenceNumber(currentReconDBSequenceNumber + 4);
+ ozoneManagerServiceProvider.syncDataFromOM();
OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics();
assertEquals(4.0,
metrics.getAverageNumUpdatesInDeltaRequest(), 0.0);
@@ -383,7 +384,7 @@ public void testGetAndApplyDeltaUpdatesFromOM(
// Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4
// events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
- assertEquals(4, updatesHandler.getEvents().size());
+ assertEquals(currentReconDBSequenceNumber + 4, ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber());
// Assert APPLY path --> Verify if the OM service provider's RocksDB got
// the changes.
@@ -430,8 +431,8 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit(
OzoneConfiguration withLimitConfiguration =
new OzoneConfiguration(configuration);
- withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 1);
- withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LOOP_LIMIT, 3);
+ withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 3);
+ withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LAG_THRESHOLD, 1);
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(withLimitConfiguration,
getTestReconOmMetadataManager(omMetadataManager, dirReconMetadata),
@@ -440,15 +441,17 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit(
dbUpdatesWrapper[1], dbUpdatesWrapper[2], dbUpdatesWrapper[3]),
reconContext, getMockTaskStatusUpdaterManager());
+ long currentReconDBSequenceNumber = ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber();
+ dbUpdatesWrapper[0].setLatestSequenceNumber(currentReconDBSequenceNumber + 4);
+ dbUpdatesWrapper[1].setLatestSequenceNumber(currentReconDBSequenceNumber + 4);
+ dbUpdatesWrapper[2].setLatestSequenceNumber(currentReconDBSequenceNumber + 4);
+
assertTrue(dbUpdatesWrapper[0].isDBUpdateSuccess());
assertTrue(dbUpdatesWrapper[1].isDBUpdateSuccess());
assertTrue(dbUpdatesWrapper[2].isDBUpdateSuccess());
assertTrue(dbUpdatesWrapper[3].isDBUpdateSuccess());
- OMDBUpdatesHandler updatesHandler =
- new OMDBUpdatesHandler(omMetadataManager);
- ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
- 0L, updatesHandler);
+ ozoneManagerServiceProvider.syncDataFromOM();
OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics();
assertEquals(1.0,
@@ -459,7 +462,7 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit(
// Assert GET path --> verify if the OMDBUpdatesHandler picked up the first
// 3 of 4 events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
- assertEquals(3, updatesHandler.getEvents().size());
+ assertEquals(currentReconDBSequenceNumber + 3, ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber());
// Assert APPLY path --> Verify if the OM service provider's RocksDB got
// the first 3 changes, last change not applied.