Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2569,6 +2569,24 @@
Request to flush the OM DB before taking checkpoint snapshot.
</description>
</property>
<property>
<name>recon.om.delta.update.limit</name>
<value>2000</value>
<tag>OZONE, RECON</tag>
<description>
Recon each time get a limited delta updates from OM.
The actual fetched data might be larger than this limit.
</description>
</property>
<property>
<name>recon.om.delta.update.loop.limit</name>
<value>10</value>
<tag>OZONE, RECON</tag>
<description>
The sync between Recon and OM consists of several small
fetch loops.
</description>
</property>
<property>
<name>ozone.recon.scm.container.threshold</name>
<value>100</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ public final class ReconServerConfigKeys {
public static final String RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM =
"recon.om.snapshot.task.flush.param";

public static final String RECON_OM_DELTA_UPDATE_LIMIT =
"recon.om.delta.update.limit";
public static final long RECON_OM_DELTA_UPDATE_LIMIT_DEFUALT = 2000;
public static final String RECON_OM_DELTA_UPDATE_LOOP_LIMIT =
"recon.om.delta.update.loop.limit";
public static final int RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFUALT = 10;

public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY =
"ozone.recon.task.thread.count";
public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT_DEFUALT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFUALT;
import static org.apache.ratis.proto.RaftProtos.RaftPeerRole.LEADER;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
Expand Down Expand Up @@ -105,6 +109,9 @@ public class OzoneManagerServiceProviderImpl
private ReconUtils reconUtils;
private OzoneManagerSyncMetrics metrics;

private long deltaUpdateLimit;
private int deltaUpdateLoopLimit;

/**
* OM Snapshot related task names.
*/
Expand Down Expand Up @@ -145,6 +152,12 @@ public OzoneManagerServiceProviderImpl(
String ozoneManagerHttpsAddress = configuration.get(OMConfigKeys
.OZONE_OM_HTTPS_ADDRESS_KEY);

long deltaUpdateLimits = configuration.getLong(RECON_OM_DELTA_UPDATE_LIMIT,
RECON_OM_DELTA_UPDATE_LIMIT_DEFUALT);
int deltaUpdateLoopLimits = configuration.getInt(
RECON_OM_DELTA_UPDATE_LOOP_LIMIT,
RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFUALT);

omSnapshotDBParentDir = reconUtils.getReconDbDir(configuration,
OZONE_RECON_OM_SNAPSHOT_DB_DIR);

Expand Down Expand Up @@ -176,6 +189,8 @@ public OzoneManagerServiceProviderImpl(
this.ozoneManagerClient = ozoneManagerClient;
this.configuration = configuration;
this.metrics = OzoneManagerSyncMetrics.create();
this.deltaUpdateLimit = deltaUpdateLimits;
this.deltaUpdateLoopLimit = deltaUpdateLoopLimits;
}

public void registerOMDBTasks() {
Expand Down Expand Up @@ -356,8 +371,38 @@ boolean updateReconOmDBWithNewSnapshot() throws IOException {
void getAndApplyDeltaUpdatesFromOM(
long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
int loopCount = 0;
long originalFromSequenceNumber = fromSequenceNumber;
long resultCount = Long.MAX_VALUE;
while (loopCount < deltaUpdateLoopLimit &&
resultCount >= deltaUpdateLimit) {
resultCount = innerGetAndApplyDeltaUpdatesFromOM(fromSequenceNumber,
omdbUpdatesHandler);
fromSequenceNumber += resultCount;
loopCount++;
}
LOG.info("Delta updates received from OM : {} loops, {} records", loopCount,
fromSequenceNumber - originalFromSequenceNumber
);
}

/**
* Get Delta updates from OM through RPC call and apply to local OM DB as
* well as accumulate in a buffer.
* @param fromSequenceNumber from sequence number to request from.
* @param omdbUpdatesHandler OM DB updates handler to buffer updates.
* @throws IOException when OM RPC request fails.
* @throws RocksDBException when writing to RocksDB fails.
*/
@VisibleForTesting
long innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
int recordCount = 0;
DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
.setSequenceNumber(fromSequenceNumber).build();
.setSequenceNumber(fromSequenceNumber)
.setLimitCount(deltaUpdateLimit)
.build();
DBUpdates dbUpdates = ozoneManagerClient.getDBUpdates(dbUpdatesRequest);
if (null != dbUpdates) {
RDBStore rocksDBStore = (RDBStore) omMetadataManager.getStore();
Expand All @@ -366,6 +411,7 @@ void getAndApplyDeltaUpdatesFromOM(
LOG.info("Number of updates received from OM : {}", numUpdates);
if (numUpdates > 0) {
metrics.incrNumUpdatesInDeltaTotal(numUpdates);
recordCount = numUpdates;
}
for (byte[] data : dbUpdates.getData()) {
try (WriteBatch writeBatch = new WriteBatch(data)) {
Expand All @@ -379,6 +425,7 @@ void getAndApplyDeltaUpdatesFromOM(
}
}
}
return recordCount;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDataToOm;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconUtils.createTarFile;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -245,7 +248,7 @@ public void testGetAndApplyDeltaUpdatesFromOM() throws Exception {
metrics.getAverageNumUpdatesInDeltaRequest().value(), 0.0);
assertEquals(1, metrics.getNumNonZeroDeltaRequests().value());

// In this method, we have to assert the "GET" part and the "APPLY" path.
// In this method, we have to assert the "GET" path and the "APPLY" path.

// Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4
// events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
Expand All @@ -263,6 +266,76 @@ public void testGetAndApplyDeltaUpdatesFromOM() throws Exception {
.getKeyTable(getBucketLayout()).isExist(fullKey));
}

@Test
public void testGetAndApplyDeltaUpdatesFromOMWithLimit() throws Exception {

// Writing 2 Keys into a source OM DB and collecting it in a
// DBUpdatesWrapper.
OMMetadataManager sourceOMMetadataMgr =
initializeNewOmMetadataManager(temporaryFolder.newFolder());
writeDataToOm(sourceOMMetadataMgr, "key_one");
writeDataToOm(sourceOMMetadataMgr, "key_two");

RocksDB rocksDB = ((RDBStore)sourceOMMetadataMgr.getStore()).getDb();
TransactionLogIterator transactionLogIterator = rocksDB.getUpdatesSince(0L);
DBUpdates[] dbUpdatesWrapper = new DBUpdates[4];
int index = 0;
while(transactionLogIterator.isValid()) {
TransactionLogIterator.BatchResult result =
transactionLogIterator.getBatch();
result.writeBatch().markWalTerminationPoint();
WriteBatch writeBatch = result.writeBatch();
dbUpdatesWrapper[index] = new DBUpdates();
dbUpdatesWrapper[index].addWriteBatch(writeBatch.data(),
result.sequenceNumber());
index++;
transactionLogIterator.next();
}

// OM Service Provider's Metadata Manager.
OMMetadataManager omMetadataManager =
initializeNewOmMetadataManager(temporaryFolder.newFolder());

OzoneConfiguration withLimitConfiguration =
new OzoneConfiguration(configuration);
withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 1);
withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LOOP_LIMIT, 3);
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(withLimitConfiguration,
getTestReconOmMetadataManager(omMetadataManager,
temporaryFolder.newFolder()),
getMockTaskController(), new ReconUtils(),
getMockOzoneManagerClientWith4Updates(dbUpdatesWrapper[0],
dbUpdatesWrapper[1], dbUpdatesWrapper[2], dbUpdatesWrapper[3]));

OMDBUpdatesHandler updatesHandler =
new OMDBUpdatesHandler(omMetadataManager);
ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
0L, updatesHandler);

OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics();
assertEquals(1.0,
metrics.getAverageNumUpdatesInDeltaRequest().value(), 0.0);
assertEquals(3, metrics.getNumNonZeroDeltaRequests().value());

// In this method, we have to assert the "GET" path and the "APPLY" path.

// Assert GET path --> verify if the OMDBUpdatesHandler picked up the first
// 3 of 4 events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
assertEquals(3, updatesHandler.getEvents().size());

// Assert APPLY path --> Verify if the OM service provider's RocksDB got
// the first 3 changes, last change not applied.
String fullKey = omMetadataManager.getOzoneKey("sampleVol",
"bucketOne", "key_one");
assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
.getKeyTable(getBucketLayout()).isExist(fullKey));
fullKey = omMetadataManager.getOzoneKey("sampleVol",
"bucketOne", "key_two");
assertFalse(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
.getKeyTable(getBucketLayout()).isExist(fullKey));
}

@Test
public void testSyncDataFromOMFullSnapshot() throws Exception {

Expand Down Expand Up @@ -364,6 +437,17 @@ private OzoneManagerProtocol getMockOzoneManagerClient(
return ozoneManagerProtocolMock;
}

private OzoneManagerProtocol getMockOzoneManagerClientWith4Updates(
DBUpdates updates1, DBUpdates updates2, DBUpdates updates3,
DBUpdates updates4) throws IOException {
OzoneManagerProtocol ozoneManagerProtocolMock =
mock(OzoneManagerProtocol.class);
when(ozoneManagerProtocolMock.getDBUpdates(any(OzoneManagerProtocolProtos
.DBUpdatesRequest.class))).thenReturn(updates1, updates2, updates3,
updates4);
return ozoneManagerProtocolMock;
}

private BucketLayout getBucketLayout() {
return BucketLayout.DEFAULT;
}
Expand Down