diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java index 9e21311936dc..ed903b3e8902 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java @@ -24,6 +24,8 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_CONNECTION_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT_DEFAULT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT; import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest; import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest; @@ -109,6 +111,9 @@ public static void init() throws Exception { OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT), TimeUnit.MILLISECONDS ); + conf.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 2); + conf.setLong(RECON_OM_DELTA_UPDATE_LOOP_LIMIT, 10); + RequestConfig config = RequestConfig.custom() .setConnectTimeout(socketTimeout) .setConnectionRequestTimeout(connectionTimeout) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index 08612e3f8ae2..ff730036281b 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -372,17 +372,21 @@ void getAndApplyDeltaUpdatesFromOM( long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler) throws IOException, RocksDBException { int loopCount = 0; - long originalFromSequenceNumber = fromSequenceNumber; - long resultCount = Long.MAX_VALUE; + LOG.info("OriginalFromSequenceNumber : {} ", fromSequenceNumber); + long deltaUpdateCnt = Long.MAX_VALUE; + long inLoopStartSequenceNumber = fromSequenceNumber; + long inLoopLatestSequenceNumber; while (loopCount < deltaUpdateLoopLimit && - resultCount >= deltaUpdateLimit) { - resultCount = innerGetAndApplyDeltaUpdatesFromOM(fromSequenceNumber, + deltaUpdateCnt >= deltaUpdateLimit) { + innerGetAndApplyDeltaUpdatesFromOM(inLoopStartSequenceNumber, omdbUpdatesHandler); - fromSequenceNumber += resultCount; + inLoopLatestSequenceNumber = getCurrentOMDBSequenceNumber(); + deltaUpdateCnt = inLoopLatestSequenceNumber - inLoopStartSequenceNumber; + inLoopStartSequenceNumber = inLoopLatestSequenceNumber; loopCount++; } LOG.info("Delta updates received from OM : {} loops, {} records", loopCount, - fromSequenceNumber - originalFromSequenceNumber + getCurrentOMDBSequenceNumber() - fromSequenceNumber ); } @@ -395,23 +399,21 @@ void getAndApplyDeltaUpdatesFromOM( * @throws RocksDBException when writing to RocksDB fails. */ @VisibleForTesting - long innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber, + void innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler) throws IOException, RocksDBException { - int recordCount = 0; DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder() .setSequenceNumber(fromSequenceNumber) .setLimitCount(deltaUpdateLimit) .build(); DBUpdates dbUpdates = ozoneManagerClient.getDBUpdates(dbUpdatesRequest); - if (null != dbUpdates) { + int numUpdates = 0; + if (null != dbUpdates && dbUpdates.getCurrentSequenceNumber() != -1) { RDBStore rocksDBStore = (RDBStore) omMetadataManager.getStore(); RocksDB rocksDB = rocksDBStore.getDb(); - int numUpdates = dbUpdates.getData().size(); - LOG.info("Number of updates received from OM : {}", numUpdates); + numUpdates = dbUpdates.getData().size(); if (numUpdates > 0) { metrics.incrNumUpdatesInDeltaTotal(numUpdates); - recordCount = numUpdates; } for (byte[] data : dbUpdates.getData()) { try (WriteBatch writeBatch = new WriteBatch(data)) { @@ -425,7 +427,8 @@ long innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber, } } } - return recordCount; + LOG.info("Number of updates received from OM : {}, SequenceNumber diff: {}", + numUpdates, getCurrentOMDBSequenceNumber() - fromSequenceNumber); } /**