Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ boolean updateReconOmDBWithNewSnapshot() throws IOException {
Long getAndApplyDeltaUpdatesFromOM(
long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
LOG.info("OriginalFromSequenceNumber : {} ", fromSequenceNumber);
LOG.debug("OriginalFromSequenceNumber : {} ", fromSequenceNumber);
ImmutablePair<Boolean, Long> dbUpdatesLatestSeqNumOfOMDB =
innerGetAndApplyDeltaUpdatesFromOM(fromSequenceNumber, omdbUpdatesHandler);
if (!dbUpdatesLatestSeqNumOfOMDB.getLeft()) {
Expand All @@ -466,7 +466,6 @@ Long getAndApplyDeltaUpdatesFromOM(
fromSequenceNumber);
}
omdbUpdatesHandler.setLatestSequenceNumber(getCurrentOMDBSequenceNumber());
LOG.info("Delta updates received from OM : {} records", getCurrentOMDBSequenceNumber() - fromSequenceNumber);
return dbUpdatesLatestSeqNumOfOMDB.getRight();
}

Expand Down Expand Up @@ -514,11 +513,10 @@ ImmutablePair<Boolean, Long> innerGetAndApplyDeltaUpdatesFromOM(long fromSequenc
long lag = latestSequenceNumberOfOM == -1 ? 0 :
latestSequenceNumberOfOM - getCurrentOMDBSequenceNumber();
metrics.setSequenceNumberLag(lag);
LOG.info("Number of updates received from OM : {}, " +
"SequenceNumber diff: {}, SequenceNumber Lag from OM {}, " +
"isDBUpdateSuccess: {}", numUpdates, getCurrentOMDBSequenceNumber()
- fromSequenceNumber, lag,
null != dbUpdates && dbUpdates.isDBUpdateSuccess());
LOG.info("From Sequence Number:{}, Recon DB Sequence Number: {}, Number of updates received from OM : {}, " +
"SequenceNumber diff: {}, SequenceNumber Lag from OM {}, isDBUpdateSuccess: {}",
fromSequenceNumber, getCurrentOMDBSequenceNumber(), numUpdates,
getCurrentOMDBSequenceNumber() - fromSequenceNumber, lag, null != dbUpdates && dbUpdates.isDBUpdateSuccess());
return new ImmutablePair<>(null != dbUpdates && dbUpdates.isDBUpdateSuccess(), lag);
}

Expand All @@ -540,9 +538,8 @@ public boolean syncDataFromOM() {
ReconTaskStatusUpdater reconTaskUpdater;
if (isSyncDataFromOMRunning.compareAndSet(false, true)) {
try {
LOG.info("Syncing data from Ozone Manager.");
long currentSequenceNumber = getCurrentOMDBSequenceNumber();
LOG.debug("Seq number of Recon's OM DB : {}", currentSequenceNumber);
LOG.info("Seq number of Recon's OM DB : {}", currentSequenceNumber);
boolean fullSnapshot = false;

if (currentSequenceNumber <= 0) {
Expand All @@ -551,29 +548,27 @@ public boolean syncDataFromOM() {
reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(
OmSnapshotTaskName.OmDeltaRequest.name());

try (OMDBUpdatesHandler omdbUpdatesHandler =
new OMDBUpdatesHandler(omMetadataManager)) {
LOG.info("Obtaining delta updates from Ozone Manager");

// If interrupt was previously signalled,
// we should check for it before starting delta update sync.
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted during delta update.");
}

// Get updates from OM and apply to local Recon OM DB and update task status in table
reconTaskUpdater.recordRunStart();
int loopCount = 0;
long fromSequenceNumber = currentSequenceNumber;
long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1;
/**
* This loop will continue to fetch and apply OM DB updates and with every
* OM DB fetch request, it will fetch {@code deltaUpdateLimit} count of DB updates.
* It continues to fetch from OM till the lag, between OM DB WAL sequence number
* and Recon OM DB snapshot WAL sequence number, is less than this lag threshold value.
* In high OM write TPS cluster, this simulates continuous pull from OM without any delay.
*/
while (diffBetweenOMDbAndReconDBSeqNumber > omDBLagThreshold) {
// Get updates from OM and apply to local Recon OM DB and update task status in table
reconTaskUpdater.recordRunStart();
int loopCount = 0;
long fromSequenceNumber = currentSequenceNumber;
long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1;
/**
* This loop will continue to fetch and apply OM DB updates and with every
* OM DB fetch request, it will fetch {@code deltaUpdateLimit} count of DB updates.
* It continues to fetch from OM till the lag, between OM DB WAL sequence number
* and Recon OM DB snapshot WAL sequence number, is less than this lag threshold value.
* In high OM write TPS cluster, this simulates continuous pull from OM without any delay.
*/
while (diffBetweenOMDbAndReconDBSeqNumber > omDBLagThreshold) {
try (OMDBUpdatesHandler omdbUpdatesHandler =
new OMDBUpdatesHandler(omMetadataManager)) {

// If interrupt was previously signalled,
// we should check for it before starting delta update sync.
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted during delta update.");
}
diffBetweenOMDbAndReconDBSeqNumber =
getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, omdbUpdatesHandler);
reconTaskUpdater.setLastTaskRunStatus(0);
Expand All @@ -585,25 +580,30 @@ public boolean syncDataFromOM() {
currentSequenceNumber = getCurrentOMDBSequenceNumber();
LOG.debug("Updated current sequence number: {}", currentSequenceNumber);
loopCount++;
} catch (InterruptedException intEx) {
LOG.error("OM DB Delta update sync thread was interrupted and delta sync failed.");
// We are updating the table even if it didn't run i.e. got interrupted beforehand
// to indicate that a task was supposed to run, but it didn't.
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
Thread.currentThread().interrupt();
// Since thread is interrupted, we do not fall back to snapshot sync.
// Return with sync failed status.
return false;
} catch (Exception e) {
metrics.incrNumDeltaRequestsFailed();
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
LOG.warn("Unable to get and apply delta updates from OM: {}, falling back to full snapshot",
e.getMessage());
fullSnapshot = true;
}
if (fullSnapshot) {
break;
}
LOG.info("Delta updates received from OM : {} loops, {} records", loopCount,
getCurrentOMDBSequenceNumber() - fromSequenceNumber);
} catch (InterruptedException intEx) {
LOG.error("OM DB Delta update sync thread was interrupted.");
// We are updating the table even if it didn't run i.e. got interrupted beforehand
// to indicate that a task was supposed to run, but it didn't.
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
Thread.currentThread().interrupt();
// Since thread is interrupted, we do not fall back to snapshot sync. Return with sync failed status.
return false;
} catch (Exception e) {
metrics.incrNumDeltaRequestsFailed();
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
LOG.warn("Unable to get and apply delta updates from OM: {}", e.getMessage());
fullSnapshot = true;
}
LOG.info("Delta updates received from OM : {} loops, {} records", loopCount,
getCurrentOMDBSequenceNumber() - fromSequenceNumber);
}

if (fullSnapshot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private void processEvent(int cfIndex, byte[] keyBytes, byte[]
"event is on {} table which is not useful for Recon to " +
"capture.", tableName);
}
LOG.warn("Old Value of Key: {} in table: {} should not be null " +
LOG.debug("Old Value of Key: {} in table: {} should not be null " +
"for DELETE event ", keyStr, tableName);
return;
}
Expand Down
Loading