From 7ac707fc52fe1221d5fc0dd9a520e2eb9913039c Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Tue, 4 Feb 2025 20:23:01 +0530 Subject: [PATCH 1/7] HDDS-11953. Ozone Recon - Improve Recon OM sync process based on continuous pull of OM data. --- .../src/main/resources/ozone-default.xml | 13 +-- .../apache/hadoop/hdds/utils/db/RDBStore.java | 2 +- .../recon/TestReconWithOzoneManager.java | 2 - .../ozone/recon/ReconServerConfigKeys.java | 3 - .../impl/OzoneManagerServiceProviderImpl.java | 96 +++++++++---------- .../TestOzoneManagerServiceProviderImpl.java | 16 ++-- 6 files changed, 59 insertions(+), 73 deletions(-) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 4693392a217e..248e1f21bfd0 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3358,7 +3358,7 @@ 5s OZONE, RECON, OM - Interval in MINUTES by Recon to request OM DB Snapshot. + Interval in SECONDS by Recon to request OM DB Snapshot. @@ -3371,7 +3371,7 @@ recon.om.delta.update.limit - 50000 + 1000 OZONE, RECON Recon each time get a limited delta updates from OM. @@ -3390,15 +3390,6 @@ as limiting factor of default 1 GB while preparing DB updates object. - - recon.om.delta.update.loop.limit - 50 - OZONE, RECON - - The sync between Recon and OM consists of several small - fetch loops. - - ozone.recon.scm.container.threshold 100 diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index 99924f724d54..e39bcfb4b7c3 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -414,7 +414,7 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) } // If the above condition was not satisfied, then it is OK to reset // the flag. - checkValidStartingSeqNumber = false; +// checkValidStartingSeqNumber = false; if (currSequenceNumber <= sequenceNumber) { logIterator.get().next(); continue; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java index daf506bc7c32..bf8836854fb1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java @@ -25,7 +25,6 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT; import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest; import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest; import static org.assertj.core.api.Assertions.assertThat; @@ -108,7 +107,6 @@ public static void init() throws Exception { TimeUnit.MILLISECONDS ); conf.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 2); - conf.setLong(RECON_OM_DELTA_UPDATE_LOOP_LIMIT, 10); RequestConfig config = RequestConfig.custom() .setConnectTimeout(socketTimeout) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index 02060c03ef8a..a1ee39162c6a 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -99,9 +99,6 @@ public final class ReconServerConfigKeys { public static final String RECON_OM_DELTA_UPDATE_LIMIT = "recon.om.delta.update.limit"; public static final long RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT = 50000; - public static final String RECON_OM_DELTA_UPDATE_LOOP_LIMIT = - "recon.om.delta.update.loop.limit"; - public static final int RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFAULT = 50; public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY = "ozone.recon.task.thread.count"; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index 5a49e55b06bd..02b192aafe83 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -40,6 +40,7 @@ import com.google.common.collect.Iterators; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.hadoop.hdds.recon.ReconConfigKeys; import org.apache.hadoop.hdds.utils.db.RocksDatabase; import org.apache.hadoop.hdds.utils.db.Table; @@ -94,8 +95,6 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconUtils.convertNumericToSymbolic; import static org.apache.ratis.proto.RaftProtos.RaftPeerRole.LEADER; @@ -127,8 +126,7 @@ public class OzoneManagerServiceProviderImpl private ReconUtils reconUtils; private OzoneManagerSyncMetrics metrics; - private long deltaUpdateLimit; - private int deltaUpdateLoopLimit; + private final long deltaUpdateLimit; private AtomicBoolean isSyncDataFromOMRunning; private final String threadNamePrefix; @@ -181,9 +179,6 @@ public OzoneManagerServiceProviderImpl( long deltaUpdateLimits = configuration.getLong(RECON_OM_DELTA_UPDATE_LIMIT, RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT); - int deltaUpdateLoopLimits = configuration.getInt( - RECON_OM_DELTA_UPDATE_LOOP_LIMIT, - RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFAULT); omSnapshotDBParentDir = reconUtils.getReconDbDir(configuration, OZONE_RECON_OM_SNAPSHOT_DB_DIR); @@ -218,7 +213,6 @@ public OzoneManagerServiceProviderImpl( this.configuration = configuration; this.metrics = OzoneManagerSyncMetrics.create(); this.deltaUpdateLimit = deltaUpdateLimits; - this.deltaUpdateLoopLimit = deltaUpdateLoopLimits; this.isSyncDataFromOMRunning = new AtomicBoolean(); this.threadNamePrefix = reconUtils.getReconNodeDetails(configuration).threadNamePrefix(); @@ -256,7 +250,7 @@ public void start() { LOG.error("Failed fetching a full snapshot from Ozone Manager"); } } catch (IOException e) { - LOG.error("Unexpected IOException occurred while trying to fetch a full snapshot: {}", e); + LOG.error("Unexpected IOException occurred while trying to fetch a full snapshot", e); throw new RuntimeException(runtimeException); } } @@ -444,54 +438,45 @@ boolean updateReconOmDBWithNewSnapshot() throws IOException { /** * Get Delta updates from OM through RPC call and apply to local OM DB as * well as accumulate in a buffer. + * * @param fromSequenceNumber from sequence number to request from. * @param omdbUpdatesHandler OM DB updates handler to buffer updates. - * @throws IOException when OM RPC request fails. + * @return lag count which tells how much Recon OM DB snapshot is lagging from OM DB. + * @throws IOException when OM RPC request fails. * @throws RocksDBException when writing to RocksDB fails. */ @VisibleForTesting - void getAndApplyDeltaUpdatesFromOM( + Long getAndApplyDeltaUpdatesFromOM( long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler) throws IOException, RocksDBException { - int loopCount = 0; LOG.info("OriginalFromSequenceNumber : {} ", fromSequenceNumber); - long deltaUpdateCnt = Long.MAX_VALUE; - long inLoopStartSequenceNumber = fromSequenceNumber; - long inLoopLatestSequenceNumber; - while (loopCount < deltaUpdateLoopLimit && - deltaUpdateCnt >= deltaUpdateLimit) { - if (!innerGetAndApplyDeltaUpdatesFromOM( - inLoopStartSequenceNumber, omdbUpdatesHandler)) { - LOG.error( - "Retrieve OM DB delta update failed for sequence number : {}, " + - "so falling back to full snapshot.", inLoopStartSequenceNumber); - throw new RocksDBException( - "Unable to get delta updates since sequenceNumber - " + - inLoopStartSequenceNumber); - } - inLoopLatestSequenceNumber = getCurrentOMDBSequenceNumber(); - deltaUpdateCnt = inLoopLatestSequenceNumber - inLoopStartSequenceNumber; - inLoopStartSequenceNumber = inLoopLatestSequenceNumber; - loopCount++; + ImmutablePair dbUpdatesLatestSeqNumOfOMDB = + innerGetAndApplyDeltaUpdatesFromOM(fromSequenceNumber, omdbUpdatesHandler); + if (!dbUpdatesLatestSeqNumOfOMDB.getLeft()) { + LOG.error( + "Retrieve OM DB delta update failed for sequence number : {}, " + + "so falling back to full snapshot.", fromSequenceNumber); + throw new RocksDBException( + "Unable to get delta updates since sequenceNumber - " + + fromSequenceNumber); } - - omdbUpdatesHandler.setLatestSequenceNumber(getCurrentOMDBSequenceNumber()); - LOG.info("Delta updates received from OM : {} loops, {} records", loopCount, - getCurrentOMDBSequenceNumber() - fromSequenceNumber - ); + LOG.info("Delta updates received from OM : {} records", getCurrentOMDBSequenceNumber() - fromSequenceNumber); + return dbUpdatesLatestSeqNumOfOMDB.getRight(); } /** * Get Delta updates from OM through RPC call and apply to local OM DB as * well as accumulate in a buffer. + * * @param fromSequenceNumber from sequence number to request from. * @param omdbUpdatesHandler OM DB updates handler to buffer updates. - * @throws IOException when OM RPC request fails. + * @return Pair of dbUpdatesSuccess, lag (lag between OM and Recom) + * @throws IOException when OM RPC request fails. * @throws RocksDBException when writing to RocksDB fails. */ @VisibleForTesting - boolean innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber, - OMDBUpdatesHandler omdbUpdatesHandler) + ImmutablePair innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber, + OMDBUpdatesHandler omdbUpdatesHandler) throws IOException, RocksDBException { DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder() .setSequenceNumber(fromSequenceNumber) @@ -528,7 +513,7 @@ boolean innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber, "isDBUpdateSuccess: {}", numUpdates, getCurrentOMDBSequenceNumber() - fromSequenceNumber, lag, null != dbUpdates && dbUpdates.isDBUpdateSuccess()); - return null != dbUpdates && dbUpdates.isDBUpdateSuccess(); + return new ImmutablePair<>(null != dbUpdates && dbUpdates.isDBUpdateSuccess(), lag); } /** @@ -572,15 +557,27 @@ public boolean syncDataFromOM() { // Get updates from OM and apply to local Recon OM DB and update task status in table reconTaskUpdater.recordRunStart(); - getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, - omdbUpdatesHandler); - - reconTaskUpdater.setLastTaskRunStatus(0); - reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber()); - reconTaskUpdater.recordRunCompletion(); - // Pass on DB update events to tasks that are listening. - reconTaskController.consumeOMEvents(new OMUpdateEventBatch( - omdbUpdatesHandler.getEvents(), omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager); + int loopCount = 0; + long fromSequenceNumber = currentSequenceNumber; + long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1; + // This loop will continue to fetch and apply OM DB updates till the OM fetch + // request will fetch less than deltaUpdateLimit (default value of 1000), so in + // high OM write TPS cluster, this simulates continuous pull from OM without any delay. + while (diffBetweenOMDbAndReconDBSeqNumber > deltaUpdateLimit) { + diffBetweenOMDbAndReconDBSeqNumber = + getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, omdbUpdatesHandler); + reconTaskUpdater.setLastTaskRunStatus(0); + reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber()); + reconTaskUpdater.recordRunCompletion(); + // Pass on DB update events to tasks that are listening. + reconTaskController.consumeOMEvents(new OMUpdateEventBatch( + omdbUpdatesHandler.getEvents(), omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager); + currentSequenceNumber = getCurrentOMDBSequenceNumber(); + LOG.error("Updated current sequence number: {}", currentSequenceNumber); + loopCount++; + } + LOG.info("Delta updates received from OM : {} loops, {} records", loopCount, + getCurrentOMDBSequenceNumber() - fromSequenceNumber); } catch (InterruptedException intEx) { LOG.error("OM DB Delta update sync thread was interrupted."); // We are updating the table even if it didn't run i.e. got interrupted beforehand @@ -594,8 +591,7 @@ public boolean syncDataFromOM() { metrics.incrNumDeltaRequestsFailed(); reconTaskUpdater.setLastTaskRunStatus(-1); reconTaskUpdater.recordRunCompletion(); - LOG.warn("Unable to get and apply delta updates from OM.", - e.getMessage()); + LOG.warn("Unable to get and apply delta updates from OM: {}", e.getMessage()); fullSnapshot = true; } } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java index 0d5050a934ef..f865879ae5d2 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java @@ -26,7 +26,6 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT; import static org.apache.hadoop.ozone.recon.ReconUtils.createTarFile; import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest; import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest; @@ -431,7 +430,6 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit( OzoneConfiguration withLimitConfiguration = new OzoneConfiguration(configuration); withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 1); - withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LOOP_LIMIT, 3); OzoneManagerServiceProviderImpl ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(withLimitConfiguration, getTestReconOmMetadataManager(omMetadataManager, dirReconMetadata), @@ -445,21 +443,26 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit( assertTrue(dbUpdatesWrapper[2].isDBUpdateSuccess()); assertTrue(dbUpdatesWrapper[3].isDBUpdateSuccess()); - OMDBUpdatesHandler updatesHandler = + /*OMDBUpdatesHandler updatesHandler = new OMDBUpdatesHandler(omMetadataManager); ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM( - 0L, updatesHandler); + 0L, updatesHandler);*/ + // ReconOM snapshot DB initialized with Volume and bucket creation events already + assertEquals(2, ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber()); + + ozoneManagerServiceProvider.syncDataFromOM(); + OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics(); assertEquals(1.0, metrics.getAverageNumUpdatesInDeltaRequest(), 0.0); - assertEquals(3, metrics.getNumNonZeroDeltaRequests()); + assertEquals(1, metrics.getNumNonZeroDeltaRequests()); // In this method, we have to assert the "GET" path and the "APPLY" path. // Assert GET path --> verify if the OMDBUpdatesHandler picked up the first // 3 of 4 events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs). - assertEquals(3, updatesHandler.getEvents().size()); + assertEquals(3, ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber()); // Assert APPLY path --> Verify if the OM service provider's RocksDB got // the first 3 changes, last change not applied. @@ -571,6 +574,7 @@ public void testSyncDataFromOMFullSnapshotForSNNFE( verify(reconTaskControllerMock, times(1)) .reInitializeTasks(omMetadataManager); assertEquals(1, metrics.getNumSnapshotRequests()); + omMetadataManager.getStore().close(); } private ReconTaskController getMockTaskController() { From 051a74b68038d5cc53a7b9ba5a01a0e32ddf6178 Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Wed, 5 Feb 2025 15:02:32 +0530 Subject: [PATCH 2/7] HDDS-11953. Ozone Recon - Improve Recon OM sync process based on continuous pull of OM data. --- .../java/org/apache/hadoop/hdds/utils/db/RDBStore.java | 2 +- .../hadoop/ozone/recon/TestReconWithOzoneManager.java | 2 +- .../spi/impl/TestOzoneManagerServiceProviderImpl.java | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index e39bcfb4b7c3..99924f724d54 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -414,7 +414,7 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) } // If the above condition was not satisfied, then it is OK to reset // the flag. -// checkValidStartingSeqNumber = false; + checkValidStartingSeqNumber = false; if (currSequenceNumber <= sequenceNumber) { logIterator.get().next(); continue; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java index bf8836854fb1..0d1e072ad938 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java @@ -106,7 +106,7 @@ public static void init() throws Exception { OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT), TimeUnit.MILLISECONDS ); - conf.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 2); + conf.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 10); RequestConfig config = RequestConfig.custom() .setConnectTimeout(socketTimeout) diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java index f865879ae5d2..5f7886e55600 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java @@ -417,6 +417,7 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit( result.writeBatch().markWalTerminationPoint(); WriteBatch writeBatch = result.writeBatch(); dbUpdatesWrapper[index] = new DBUpdates(); + dbUpdatesWrapper[index].setLatestSequenceNumber(dbUpdatesWrapper.length); dbUpdatesWrapper[index].addWriteBatch(writeBatch.data(), result.sequenceNumber()); index++; @@ -429,7 +430,7 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit( OzoneConfiguration withLimitConfiguration = new OzoneConfiguration(configuration); - withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 1); + withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 10); OzoneManagerServiceProviderImpl ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(withLimitConfiguration, getTestReconOmMetadataManager(omMetadataManager, dirReconMetadata), @@ -443,15 +444,14 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit( assertTrue(dbUpdatesWrapper[2].isDBUpdateSuccess()); assertTrue(dbUpdatesWrapper[3].isDBUpdateSuccess()); - /*OMDBUpdatesHandler updatesHandler = + OMDBUpdatesHandler updatesHandler = new OMDBUpdatesHandler(omMetadataManager); ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM( - 0L, updatesHandler);*/ + 0L, updatesHandler); // ReconOM snapshot DB initialized with Volume and bucket creation events already assertEquals(2, ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber()); - ozoneManagerServiceProvider.syncDataFromOM(); - + //ozoneManagerServiceProvider.syncDataFromOM(); OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics(); assertEquals(1.0, From ebed5999d89858a91e3f11a8a5cc1812c5ff4bc5 Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Wed, 5 Feb 2025 17:23:50 +0530 Subject: [PATCH 3/7] HDDS-11953. Fixed test cases. --- .../TestOzoneManagerServiceProviderImpl.java | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java index 5f7886e55600..5d7c9b5fad1e 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java @@ -70,7 +70,6 @@ import org.apache.hadoop.ozone.recon.common.CommonUtils; import org.apache.hadoop.ozone.recon.metrics.OzoneManagerSyncMetrics; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; -import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler; import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch; import org.apache.hadoop.ozone.recon.tasks.ReconTaskController; @@ -362,17 +361,19 @@ public void testGetAndApplyDeltaUpdatesFromOM( OMMetadataManager omMetadataManager = initializeNewOmMetadataManager(dirOmMetadata); + OzoneConfiguration withLimitConfiguration = + new OzoneConfiguration(configuration); + withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 0); OzoneManagerServiceProviderImpl ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(configuration, getTestReconOmMetadataManager(omMetadataManager, dirReconMetadata), getMockTaskController(), new ReconUtils(), getMockOzoneManagerClient(dbUpdatesWrapper), reconContext, getMockTaskStatusUpdaterManager()); - OMDBUpdatesHandler updatesHandler = - new OMDBUpdatesHandler(omMetadataManager); - ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM( - 0L, updatesHandler); + long currentReconDBSequenceNumber = ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber(); + dbUpdatesWrapper.setLatestSequenceNumber(currentReconDBSequenceNumber + 4); + ozoneManagerServiceProvider.syncDataFromOM(); OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics(); assertEquals(4.0, metrics.getAverageNumUpdatesInDeltaRequest(), 0.0); @@ -382,7 +383,7 @@ public void testGetAndApplyDeltaUpdatesFromOM( // Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4 // events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs). - assertEquals(4, updatesHandler.getEvents().size()); + assertEquals(currentReconDBSequenceNumber + 4, ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber()); // Assert APPLY path --> Verify if the OM service provider's RocksDB got // the changes. @@ -417,7 +418,6 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit( result.writeBatch().markWalTerminationPoint(); WriteBatch writeBatch = result.writeBatch(); dbUpdatesWrapper[index] = new DBUpdates(); - dbUpdatesWrapper[index].setLatestSequenceNumber(dbUpdatesWrapper.length); dbUpdatesWrapper[index].addWriteBatch(writeBatch.data(), result.sequenceNumber()); index++; @@ -430,7 +430,7 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit( OzoneConfiguration withLimitConfiguration = new OzoneConfiguration(configuration); - withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 10); + withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 1); OzoneManagerServiceProviderImpl ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(withLimitConfiguration, getTestReconOmMetadataManager(omMetadataManager, dirReconMetadata), @@ -439,30 +439,28 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit( dbUpdatesWrapper[1], dbUpdatesWrapper[2], dbUpdatesWrapper[3]), reconContext, getMockTaskStatusUpdaterManager()); + long currentReconDBSequenceNumber = ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber(); + dbUpdatesWrapper[0].setLatestSequenceNumber(currentReconDBSequenceNumber + 4); + dbUpdatesWrapper[1].setLatestSequenceNumber(currentReconDBSequenceNumber + 4); + dbUpdatesWrapper[2].setLatestSequenceNumber(currentReconDBSequenceNumber + 4); + assertTrue(dbUpdatesWrapper[0].isDBUpdateSuccess()); assertTrue(dbUpdatesWrapper[1].isDBUpdateSuccess()); assertTrue(dbUpdatesWrapper[2].isDBUpdateSuccess()); assertTrue(dbUpdatesWrapper[3].isDBUpdateSuccess()); - OMDBUpdatesHandler updatesHandler = - new OMDBUpdatesHandler(omMetadataManager); - ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM( - 0L, updatesHandler); - // ReconOM snapshot DB initialized with Volume and bucket creation events already - assertEquals(2, ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber()); - - //ozoneManagerServiceProvider.syncDataFromOM(); + ozoneManagerServiceProvider.syncDataFromOM(); OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics(); assertEquals(1.0, metrics.getAverageNumUpdatesInDeltaRequest(), 0.0); - assertEquals(1, metrics.getNumNonZeroDeltaRequests()); + assertEquals(3, metrics.getNumNonZeroDeltaRequests()); // In this method, we have to assert the "GET" path and the "APPLY" path. // Assert GET path --> verify if the OMDBUpdatesHandler picked up the first // 3 of 4 events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs). - assertEquals(3, ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber()); + assertEquals(currentReconDBSequenceNumber + 3, ozoneManagerServiceProvider.getCurrentOMDBSequenceNumber()); // Assert APPLY path --> Verify if the OM service provider's RocksDB got // the first 3 changes, last change not applied. @@ -574,7 +572,6 @@ public void testSyncDataFromOMFullSnapshotForSNNFE( verify(reconTaskControllerMock, times(1)) .reInitializeTasks(omMetadataManager); assertEquals(1, metrics.getNumSnapshotRequests()); - omMetadataManager.getStore().close(); } private ReconTaskController getMockTaskController() { From 0b3c92dbeb1c6ea2c92de21b458dc407b823703f Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Wed, 5 Feb 2025 17:26:41 +0530 Subject: [PATCH 4/7] HDDS-11953. Updated default value of deltaupdate limit. --- hadoop-hdds/common/src/main/resources/ozone-default.xml | 2 +- .../org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 248e1f21bfd0..7b9490f8c80a 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3371,7 +3371,7 @@ recon.om.delta.update.limit - 1000 + 0 OZONE, RECON Recon each time get a limited delta updates from OM. diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index a1ee39162c6a..b8abc88787bb 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -98,7 +98,7 @@ public final class ReconServerConfigKeys { public static final String RECON_OM_DELTA_UPDATE_LIMIT = "recon.om.delta.update.limit"; - public static final long RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT = 50000; + public static final long RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT = 0; public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY = "ozone.recon.task.thread.count"; From 0b32f5d66fc0e88e4a4bbcd981d5d72ad5696513 Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Thu, 6 Feb 2025 10:48:51 +0530 Subject: [PATCH 5/7] HDDS-11953. Fixed test cases failures. --- .../src/main/resources/ozone-default.xml | 12 +++++++++++- .../ozone/recon/ReconServerConfigKeys.java | 6 +++++- .../impl/OzoneManagerServiceProviderImpl.java | 18 ++++++++++++++---- .../TestOzoneManagerServiceProviderImpl.java | 6 ++++-- 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 7b9490f8c80a..f47f2ab440c7 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3371,7 +3371,7 @@ recon.om.delta.update.limit - 0 + 50000 OZONE, RECON Recon each time get a limited delta updates from OM. @@ -3390,6 +3390,16 @@ as limiting factor of default 1 GB while preparing DB updates object. + + recon.om.delta.update.lag.threshold + 0 + OZONE, RECON + + At every Recon OM sync, recon starts fetching OM DB updates, and it continues to + fetch from OM till the lag, between OM DB WAL sequence number and Recon OM DB + snapshot WAL sequence number, is less than this lag threshold value. + + ozone.recon.scm.container.threshold 100 diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index b8abc88787bb..72efe1ffea52 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -98,7 +98,11 @@ public final class ReconServerConfigKeys { public static final String RECON_OM_DELTA_UPDATE_LIMIT = "recon.om.delta.update.limit"; - public static final long RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT = 0; + public static final long RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT = 50000; + + public static final String RECON_OM_DELTA_UPDATE_LAG_THRESHOLD = + "recon.om.delta.update.lag.threshold"; + public static final long RECON_OM_DELTA_UPDATE_LAG_THRESHOLD_DEFAULT = 0; public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY = "ozone.recon.task.thread.count"; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index 02b192aafe83..339bccd32203 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -93,6 +93,8 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LAG_THRESHOLD; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LAG_THRESHOLD_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconUtils.convertNumericToSymbolic; @@ -101,6 +103,7 @@ import org.rocksdb.RocksDBException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.sqlite.core.DB; /** * Implementation of the OzoneManager Service provider. @@ -127,6 +130,7 @@ public class OzoneManagerServiceProviderImpl private OzoneManagerSyncMetrics metrics; private final long deltaUpdateLimit; + private final long omDBLagThreshold; private AtomicBoolean isSyncDataFromOMRunning; private final String threadNamePrefix; @@ -221,6 +225,8 @@ public OzoneManagerServiceProviderImpl( .build(); this.reconContext = reconContext; this.taskStatusUpdaterManager = taskStatusUpdaterManager; + this.omDBLagThreshold = configuration.getLong(RECON_OM_DELTA_UPDATE_LAG_THRESHOLD, + RECON_OM_DELTA_UPDATE_LAG_THRESHOLD_DEFAULT); } @Override @@ -560,10 +566,14 @@ public boolean syncDataFromOM() { int loopCount = 0; long fromSequenceNumber = currentSequenceNumber; long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1; - // This loop will continue to fetch and apply OM DB updates till the OM fetch - // request will fetch less than deltaUpdateLimit (default value of 1000), so in - // high OM write TPS cluster, this simulates continuous pull from OM without any delay. - while (diffBetweenOMDbAndReconDBSeqNumber > deltaUpdateLimit) { + /** + * This loop will continue to fetch and apply OM DB updates and with every + * OM DB fetch request, it will fetch {@code deltaUpdateLimit} count of DB updates. + * It continues to fetch from OM till the lag, between OM DB WAL sequence number + * and Recon OM DB snapshot WAL sequence number, is less than this lag threshold value. + * In high OM write TPS cluster, this simulates continuous pull from OM without any delay. + */ + while (diffBetweenOMDbAndReconDBSeqNumber > omDBLagThreshold) { diffBetweenOMDbAndReconDBSeqNumber = getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, omdbUpdatesHandler); reconTaskUpdater.setLastTaskRunStatus(0); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java index 5d7c9b5fad1e..5731056e67c2 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDataToOm; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LAG_THRESHOLD; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT; import static org.apache.hadoop.ozone.recon.ReconUtils.createTarFile; import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest; @@ -363,7 +364,7 @@ public void testGetAndApplyDeltaUpdatesFromOM( OzoneConfiguration withLimitConfiguration = new OzoneConfiguration(configuration); - withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 0); + withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 10); OzoneManagerServiceProviderImpl ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(configuration, getTestReconOmMetadataManager(omMetadataManager, dirReconMetadata), @@ -430,7 +431,8 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit( OzoneConfiguration withLimitConfiguration = new OzoneConfiguration(configuration); - withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 1); + withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 3); + withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LAG_THRESHOLD, 1); OzoneManagerServiceProviderImpl ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(withLimitConfiguration, getTestReconOmMetadataManager(omMetadataManager, dirReconMetadata), From 70811cfd24c123214b7d463663aa5db6647e4541 Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Thu, 6 Feb 2025 11:45:43 +0530 Subject: [PATCH 6/7] HDDS-11953. Fixed checkstyle issues. --- .../ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index 339bccd32203..cdf41a3489ea 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -103,7 +103,6 @@ import org.rocksdb.RocksDBException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.sqlite.core.DB; /** * Implementation of the OzoneManager Service provider. From 9cb0ec20ce3109a94cb517da03e2237e100738c2 Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Fri, 7 Feb 2025 12:48:37 +0530 Subject: [PATCH 7/7] HDDS-11953. Fixed robot test failures. --- .../dist/src/main/smoketest/recon/recon-taskstatus.robot | 2 ++ .../recon/spi/impl/OzoneManagerServiceProviderImpl.java | 7 ++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/dist/src/main/smoketest/recon/recon-taskstatus.robot b/hadoop-ozone/dist/src/main/smoketest/recon/recon-taskstatus.robot index 3b5b011523ee..f04a3ce741ed 100644 --- a/hadoop-ozone/dist/src/main/smoketest/recon/recon-taskstatus.robot +++ b/hadoop-ozone/dist/src/main/smoketest/recon/recon-taskstatus.robot @@ -105,6 +105,8 @@ Validate All Tasks Updated After Sync Validate Sequence number is updated after sync Log To Console Triggering OM DB sync for updates + Log To Console Wait for few seconds to let previous OM DB Sync thread to finish + Sleep 2s # Waits for 2 seconds Sync OM Data ${tasks} = Fetch Task Status Should Not Be Empty ${tasks} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index cdf41a3489ea..72ce5100194b 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -310,7 +310,7 @@ public boolean triggerSyncDataFromOMImmediately() { startSyncDataFromOM(0L); return true; } else { - LOG.debug("OM DB sync is already running."); + LOG.info("OM DB sync is already running when trying to trigger OM DB sync manually."); } return false; } @@ -465,6 +465,7 @@ Long getAndApplyDeltaUpdatesFromOM( "Unable to get delta updates since sequenceNumber - " + fromSequenceNumber); } + omdbUpdatesHandler.setLatestSequenceNumber(getCurrentOMDBSequenceNumber()); LOG.info("Delta updates received from OM : {} records", getCurrentOMDBSequenceNumber() - fromSequenceNumber); return dbUpdatesLatestSeqNumOfOMDB.getRight(); } @@ -582,7 +583,7 @@ public boolean syncDataFromOM() { reconTaskController.consumeOMEvents(new OMUpdateEventBatch( omdbUpdatesHandler.getEvents(), omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager); currentSequenceNumber = getCurrentOMDBSequenceNumber(); - LOG.error("Updated current sequence number: {}", currentSequenceNumber); + LOG.debug("Updated current sequence number: {}", currentSequenceNumber); loopCount++; } LOG.info("Delta updates received from OM : {} loops, {} records", loopCount, @@ -664,7 +665,7 @@ public boolean syncDataFromOM() { isSyncDataFromOMRunning.set(false); } } else { - LOG.info("OM DB sync is already running."); + LOG.info("OM DB sync is already running in syncDataFromOM."); return false; } return true;