diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2f9d01da2b92..92ebc6b2e6e4 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2569,6 +2569,24 @@
Request to flush the OM DB before taking checkpoint snapshot.
+
+ recon.om.delta.update.limit
+ 2000
+ OZONE, RECON
+
+ Recon each time get a limited delta updates from OM.
+ The actual fetched data might be larger than this limit.
+
+
+
+ recon.om.delta.update.loop.limit
+ 10
+ OZONE, RECON
+
+ The sync between Recon and OM consists of several small
+ fetch loops.
+
+
ozone.recon.scm.container.threshold
100
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index dd9f0c9aaf52..d23ffe913f09 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -96,6 +96,13 @@ public final class ReconServerConfigKeys {
public static final String RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM =
"recon.om.snapshot.task.flush.param";
+ public static final String RECON_OM_DELTA_UPDATE_LIMIT =
+ "recon.om.delta.update.limit";
+ public static final long RECON_OM_DELTA_UPDATE_LIMIT_DEFUALT = 2000;
+ public static final String RECON_OM_DELTA_UPDATE_LOOP_LIMIT =
+ "recon.om.delta.update.loop.limit";
+ public static final int RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFUALT = 10;
+
public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY =
"ozone.recon.task.thread.count";
public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 5;
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index 021d0147a018..288b26fba4c7 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -71,6 +71,10 @@
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT_DEFUALT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFUALT;
import static org.apache.ratis.proto.RaftProtos.RaftPeerRole.LEADER;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
@@ -105,6 +109,9 @@ public class OzoneManagerServiceProviderImpl
private ReconUtils reconUtils;
private OzoneManagerSyncMetrics metrics;
+ private long deltaUpdateLimit;
+ private int deltaUpdateLoopLimit;
+
/**
* OM Snapshot related task names.
*/
@@ -145,6 +152,12 @@ public OzoneManagerServiceProviderImpl(
String ozoneManagerHttpsAddress = configuration.get(OMConfigKeys
.OZONE_OM_HTTPS_ADDRESS_KEY);
+ long deltaUpdateLimits = configuration.getLong(RECON_OM_DELTA_UPDATE_LIMIT,
+ RECON_OM_DELTA_UPDATE_LIMIT_DEFUALT);
+ int deltaUpdateLoopLimits = configuration.getInt(
+ RECON_OM_DELTA_UPDATE_LOOP_LIMIT,
+ RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFUALT);
+
omSnapshotDBParentDir = reconUtils.getReconDbDir(configuration,
OZONE_RECON_OM_SNAPSHOT_DB_DIR);
@@ -176,6 +189,8 @@ public OzoneManagerServiceProviderImpl(
this.ozoneManagerClient = ozoneManagerClient;
this.configuration = configuration;
this.metrics = OzoneManagerSyncMetrics.create();
+ this.deltaUpdateLimit = deltaUpdateLimits;
+ this.deltaUpdateLoopLimit = deltaUpdateLoopLimits;
}
public void registerOMDBTasks() {
@@ -356,8 +371,38 @@ boolean updateReconOmDBWithNewSnapshot() throws IOException {
void getAndApplyDeltaUpdatesFromOM(
long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
+ int loopCount = 0;
+ long originalFromSequenceNumber = fromSequenceNumber;
+ long resultCount = Long.MAX_VALUE;
+ while (loopCount < deltaUpdateLoopLimit &&
+ resultCount >= deltaUpdateLimit) {
+ resultCount = innerGetAndApplyDeltaUpdatesFromOM(fromSequenceNumber,
+ omdbUpdatesHandler);
+ fromSequenceNumber += resultCount;
+ loopCount++;
+ }
+ LOG.info("Delta updates received from OM : {} loops, {} records", loopCount,
+ fromSequenceNumber - originalFromSequenceNumber
+ );
+ }
+
+ /**
+ * Get Delta updates from OM through RPC call and apply to local OM DB as
+ * well as accumulate in a buffer.
+ * @param fromSequenceNumber from sequence number to request from.
+ * @param omdbUpdatesHandler OM DB updates handler to buffer updates.
+ * @throws IOException when OM RPC request fails.
+ * @throws RocksDBException when writing to RocksDB fails.
+ */
+ @VisibleForTesting
+ long innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
+ OMDBUpdatesHandler omdbUpdatesHandler)
+ throws IOException, RocksDBException {
+ int recordCount = 0;
DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
- .setSequenceNumber(fromSequenceNumber).build();
+ .setSequenceNumber(fromSequenceNumber)
+ .setLimitCount(deltaUpdateLimit)
+ .build();
DBUpdates dbUpdates = ozoneManagerClient.getDBUpdates(dbUpdatesRequest);
if (null != dbUpdates) {
RDBStore rocksDBStore = (RDBStore) omMetadataManager.getStore();
@@ -366,6 +411,7 @@ void getAndApplyDeltaUpdatesFromOM(
LOG.info("Number of updates received from OM : {}", numUpdates);
if (numUpdates > 0) {
metrics.incrNumUpdatesInDeltaTotal(numUpdates);
+ recordCount = numUpdates;
}
for (byte[] data : dbUpdates.getData()) {
try (WriteBatch writeBatch = new WriteBatch(data)) {
@@ -379,6 +425,7 @@ void getAndApplyDeltaUpdatesFromOM(
}
}
}
+ return recordCount;
}
/**
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
index c8d2544c71f2..161c03536d8d 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
@@ -25,12 +25,15 @@
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDataToOm;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconUtils.createTarFile;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
@@ -245,7 +248,7 @@ public void testGetAndApplyDeltaUpdatesFromOM() throws Exception {
metrics.getAverageNumUpdatesInDeltaRequest().value(), 0.0);
assertEquals(1, metrics.getNumNonZeroDeltaRequests().value());
- // In this method, we have to assert the "GET" part and the "APPLY" path.
+ // In this method, we have to assert the "GET" path and the "APPLY" path.
// Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4
// events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
@@ -263,6 +266,76 @@ public void testGetAndApplyDeltaUpdatesFromOM() throws Exception {
.getKeyTable(getBucketLayout()).isExist(fullKey));
}
+ @Test
+ public void testGetAndApplyDeltaUpdatesFromOMWithLimit() throws Exception {
+
+ // Writing 2 Keys into a source OM DB and collecting it in a
+ // DBUpdatesWrapper.
+ OMMetadataManager sourceOMMetadataMgr =
+ initializeNewOmMetadataManager(temporaryFolder.newFolder());
+ writeDataToOm(sourceOMMetadataMgr, "key_one");
+ writeDataToOm(sourceOMMetadataMgr, "key_two");
+
+ RocksDB rocksDB = ((RDBStore)sourceOMMetadataMgr.getStore()).getDb();
+ TransactionLogIterator transactionLogIterator = rocksDB.getUpdatesSince(0L);
+ DBUpdates[] dbUpdatesWrapper = new DBUpdates[4];
+ int index = 0;
+ while(transactionLogIterator.isValid()) {
+ TransactionLogIterator.BatchResult result =
+ transactionLogIterator.getBatch();
+ result.writeBatch().markWalTerminationPoint();
+ WriteBatch writeBatch = result.writeBatch();
+ dbUpdatesWrapper[index] = new DBUpdates();
+ dbUpdatesWrapper[index].addWriteBatch(writeBatch.data(),
+ result.sequenceNumber());
+ index++;
+ transactionLogIterator.next();
+ }
+
+ // OM Service Provider's Metadata Manager.
+ OMMetadataManager omMetadataManager =
+ initializeNewOmMetadataManager(temporaryFolder.newFolder());
+
+ OzoneConfiguration withLimitConfiguration =
+ new OzoneConfiguration(configuration);
+ withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 1);
+ withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LOOP_LIMIT, 3);
+ OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+ new OzoneManagerServiceProviderImpl(withLimitConfiguration,
+ getTestReconOmMetadataManager(omMetadataManager,
+ temporaryFolder.newFolder()),
+ getMockTaskController(), new ReconUtils(),
+ getMockOzoneManagerClientWith4Updates(dbUpdatesWrapper[0],
+ dbUpdatesWrapper[1], dbUpdatesWrapper[2], dbUpdatesWrapper[3]));
+
+ OMDBUpdatesHandler updatesHandler =
+ new OMDBUpdatesHandler(omMetadataManager);
+ ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
+ 0L, updatesHandler);
+
+ OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics();
+ assertEquals(1.0,
+ metrics.getAverageNumUpdatesInDeltaRequest().value(), 0.0);
+ assertEquals(3, metrics.getNumNonZeroDeltaRequests().value());
+
+ // In this method, we have to assert the "GET" path and the "APPLY" path.
+
+ // Assert GET path --> verify if the OMDBUpdatesHandler picked up the first
+ // 3 of 4 events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
+ assertEquals(3, updatesHandler.getEvents().size());
+
+ // Assert APPLY path --> Verify if the OM service provider's RocksDB got
+ // the first 3 changes, last change not applied.
+ String fullKey = omMetadataManager.getOzoneKey("sampleVol",
+ "bucketOne", "key_one");
+ assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
+ .getKeyTable(getBucketLayout()).isExist(fullKey));
+ fullKey = omMetadataManager.getOzoneKey("sampleVol",
+ "bucketOne", "key_two");
+ assertFalse(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
+ .getKeyTable(getBucketLayout()).isExist(fullKey));
+ }
+
@Test
public void testSyncDataFromOMFullSnapshot() throws Exception {
@@ -364,6 +437,17 @@ private OzoneManagerProtocol getMockOzoneManagerClient(
return ozoneManagerProtocolMock;
}
+ private OzoneManagerProtocol getMockOzoneManagerClientWith4Updates(
+ DBUpdates updates1, DBUpdates updates2, DBUpdates updates3,
+ DBUpdates updates4) throws IOException {
+ OzoneManagerProtocol ozoneManagerProtocolMock =
+ mock(OzoneManagerProtocol.class);
+ when(ozoneManagerProtocolMock.getDBUpdates(any(OzoneManagerProtocolProtos
+ .DBUpdatesRequest.class))).thenReturn(updates1, updates2, updates3,
+ updates4);
+ return ozoneManagerProtocolMock;
+ }
+
private BucketLayout getBucketLayout() {
return BucketLayout.DEFAULT;
}