Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_CONNECTION_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest;
import static org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest;

Expand Down Expand Up @@ -109,6 +111,9 @@ public static void init() throws Exception {
OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT),
TimeUnit.MILLISECONDS
);
conf.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 2);
conf.setLong(RECON_OM_DELTA_UPDATE_LOOP_LIMIT, 10);

RequestConfig config = RequestConfig.custom()
.setConnectTimeout(socketTimeout)
.setConnectionRequestTimeout(connectionTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,17 +372,21 @@ void getAndApplyDeltaUpdatesFromOM(
long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
int loopCount = 0;
long originalFromSequenceNumber = fromSequenceNumber;
long resultCount = Long.MAX_VALUE;
LOG.info("OriginalFromSequenceNumber : {} ", fromSequenceNumber);
long deltaUpdateCnt = Long.MAX_VALUE;
long inLoopStartSequenceNumber = fromSequenceNumber;
long inLoopLatestSequenceNumber;
while (loopCount < deltaUpdateLoopLimit &&
resultCount >= deltaUpdateLimit) {
resultCount = innerGetAndApplyDeltaUpdatesFromOM(fromSequenceNumber,
deltaUpdateCnt >= deltaUpdateLimit) {
innerGetAndApplyDeltaUpdatesFromOM(inLoopStartSequenceNumber,
omdbUpdatesHandler);
fromSequenceNumber += resultCount;
inLoopLatestSequenceNumber = getCurrentOMDBSequenceNumber();
deltaUpdateCnt = inLoopLatestSequenceNumber - inLoopStartSequenceNumber;
inLoopStartSequenceNumber = inLoopLatestSequenceNumber;
loopCount++;
}
LOG.info("Delta updates received from OM : {} loops, {} records", loopCount,
fromSequenceNumber - originalFromSequenceNumber
getCurrentOMDBSequenceNumber() - fromSequenceNumber
);
}

Expand All @@ -395,23 +399,21 @@ void getAndApplyDeltaUpdatesFromOM(
* @throws RocksDBException when writing to RocksDB fails.
*/
@VisibleForTesting
long innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
void innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
int recordCount = 0;
DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
.setSequenceNumber(fromSequenceNumber)
.setLimitCount(deltaUpdateLimit)
.build();
DBUpdates dbUpdates = ozoneManagerClient.getDBUpdates(dbUpdatesRequest);
if (null != dbUpdates) {
int numUpdates = 0;
if (null != dbUpdates && dbUpdates.getCurrentSequenceNumber() != -1) {
RDBStore rocksDBStore = (RDBStore) omMetadataManager.getStore();
RocksDB rocksDB = rocksDBStore.getDb();
int numUpdates = dbUpdates.getData().size();
LOG.info("Number of updates received from OM : {}", numUpdates);
numUpdates = dbUpdates.getData().size();
if (numUpdates > 0) {
metrics.incrNumUpdatesInDeltaTotal(numUpdates);
recordCount = numUpdates;
}
for (byte[] data : dbUpdates.getData()) {
try (WriteBatch writeBatch = new WriteBatch(data)) {
Expand All @@ -425,7 +427,8 @@ long innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
}
}
}
return recordCount;
LOG.info("Number of updates received from OM : {}, SequenceNumber diff: {}",
numUpdates, getCurrentOMDBSequenceNumber() - fromSequenceNumber);
}

/**
Expand Down