diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index 72ce5100194b..cb714c59a1e9 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -454,7 +454,7 @@ boolean updateReconOmDBWithNewSnapshot() throws IOException { Long getAndApplyDeltaUpdatesFromOM( long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler) throws IOException, RocksDBException { - LOG.info("OriginalFromSequenceNumber : {} ", fromSequenceNumber); + LOG.debug("OriginalFromSequenceNumber : {} ", fromSequenceNumber); ImmutablePair dbUpdatesLatestSeqNumOfOMDB = innerGetAndApplyDeltaUpdatesFromOM(fromSequenceNumber, omdbUpdatesHandler); if (!dbUpdatesLatestSeqNumOfOMDB.getLeft()) { @@ -466,7 +466,6 @@ Long getAndApplyDeltaUpdatesFromOM( fromSequenceNumber); } omdbUpdatesHandler.setLatestSequenceNumber(getCurrentOMDBSequenceNumber()); - LOG.info("Delta updates received from OM : {} records", getCurrentOMDBSequenceNumber() - fromSequenceNumber); return dbUpdatesLatestSeqNumOfOMDB.getRight(); } @@ -514,11 +513,10 @@ ImmutablePair innerGetAndApplyDeltaUpdatesFromOM(long fromSequenc long lag = latestSequenceNumberOfOM == -1 ? 0 : latestSequenceNumberOfOM - getCurrentOMDBSequenceNumber(); metrics.setSequenceNumberLag(lag); - LOG.info("Number of updates received from OM : {}, " + - "SequenceNumber diff: {}, SequenceNumber Lag from OM {}, " + - "isDBUpdateSuccess: {}", numUpdates, getCurrentOMDBSequenceNumber() - - fromSequenceNumber, lag, - null != dbUpdates && dbUpdates.isDBUpdateSuccess()); + LOG.info("From Sequence Number:{}, Recon DB Sequence Number: {}, Number of updates received from OM : {}, " + + "SequenceNumber diff: {}, SequenceNumber Lag from OM {}, isDBUpdateSuccess: {}", + fromSequenceNumber, getCurrentOMDBSequenceNumber(), numUpdates, + getCurrentOMDBSequenceNumber() - fromSequenceNumber, lag, null != dbUpdates && dbUpdates.isDBUpdateSuccess()); return new ImmutablePair<>(null != dbUpdates && dbUpdates.isDBUpdateSuccess(), lag); } @@ -540,9 +538,8 @@ public boolean syncDataFromOM() { ReconTaskStatusUpdater reconTaskUpdater; if (isSyncDataFromOMRunning.compareAndSet(false, true)) { try { - LOG.info("Syncing data from Ozone Manager."); long currentSequenceNumber = getCurrentOMDBSequenceNumber(); - LOG.debug("Seq number of Recon's OM DB : {}", currentSequenceNumber); + LOG.info("Seq number of Recon's OM DB : {}", currentSequenceNumber); boolean fullSnapshot = false; if (currentSequenceNumber <= 0) { @@ -551,29 +548,27 @@ public boolean syncDataFromOM() { reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater( OmSnapshotTaskName.OmDeltaRequest.name()); - try (OMDBUpdatesHandler omdbUpdatesHandler = - new OMDBUpdatesHandler(omMetadataManager)) { - LOG.info("Obtaining delta updates from Ozone Manager"); - - // If interrupt was previously signalled, - // we should check for it before starting delta update sync. - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException("Thread interrupted during delta update."); - } - - // Get updates from OM and apply to local Recon OM DB and update task status in table - reconTaskUpdater.recordRunStart(); - int loopCount = 0; - long fromSequenceNumber = currentSequenceNumber; - long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1; - /** - * This loop will continue to fetch and apply OM DB updates and with every - * OM DB fetch request, it will fetch {@code deltaUpdateLimit} count of DB updates. - * It continues to fetch from OM till the lag, between OM DB WAL sequence number - * and Recon OM DB snapshot WAL sequence number, is less than this lag threshold value. - * In high OM write TPS cluster, this simulates continuous pull from OM without any delay. - */ - while (diffBetweenOMDbAndReconDBSeqNumber > omDBLagThreshold) { + // Get updates from OM and apply to local Recon OM DB and update task status in table + reconTaskUpdater.recordRunStart(); + int loopCount = 0; + long fromSequenceNumber = currentSequenceNumber; + long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1; + /** + * This loop will continue to fetch and apply OM DB updates and with every + * OM DB fetch request, it will fetch {@code deltaUpdateLimit} count of DB updates. + * It continues to fetch from OM till the lag, between OM DB WAL sequence number + * and Recon OM DB snapshot WAL sequence number, is less than this lag threshold value. + * In high OM write TPS cluster, this simulates continuous pull from OM without any delay. + */ + while (diffBetweenOMDbAndReconDBSeqNumber > omDBLagThreshold) { + try (OMDBUpdatesHandler omdbUpdatesHandler = + new OMDBUpdatesHandler(omMetadataManager)) { + + // If interrupt was previously signalled, + // we should check for it before starting delta update sync. + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Thread interrupted during delta update."); + } diffBetweenOMDbAndReconDBSeqNumber = getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, omdbUpdatesHandler); reconTaskUpdater.setLastTaskRunStatus(0); @@ -585,25 +580,30 @@ public boolean syncDataFromOM() { currentSequenceNumber = getCurrentOMDBSequenceNumber(); LOG.debug("Updated current sequence number: {}", currentSequenceNumber); loopCount++; + } catch (InterruptedException intEx) { + LOG.error("OM DB Delta update sync thread was interrupted and delta sync failed."); + // We are updating the table even if it didn't run i.e. got interrupted beforehand + // to indicate that a task was supposed to run, but it didn't. + reconTaskUpdater.setLastTaskRunStatus(-1); + reconTaskUpdater.recordRunCompletion(); + Thread.currentThread().interrupt(); + // Since thread is interrupted, we do not fall back to snapshot sync. + // Return with sync failed status. + return false; + } catch (Exception e) { + metrics.incrNumDeltaRequestsFailed(); + reconTaskUpdater.setLastTaskRunStatus(-1); + reconTaskUpdater.recordRunCompletion(); + LOG.warn("Unable to get and apply delta updates from OM: {}, falling back to full snapshot", + e.getMessage()); + fullSnapshot = true; + } + if (fullSnapshot) { + break; } - LOG.info("Delta updates received from OM : {} loops, {} records", loopCount, - getCurrentOMDBSequenceNumber() - fromSequenceNumber); - } catch (InterruptedException intEx) { - LOG.error("OM DB Delta update sync thread was interrupted."); - // We are updating the table even if it didn't run i.e. got interrupted beforehand - // to indicate that a task was supposed to run, but it didn't. - reconTaskUpdater.setLastTaskRunStatus(-1); - reconTaskUpdater.recordRunCompletion(); - Thread.currentThread().interrupt(); - // Since thread is interrupted, we do not fall back to snapshot sync. Return with sync failed status. - return false; - } catch (Exception e) { - metrics.incrNumDeltaRequestsFailed(); - reconTaskUpdater.setLastTaskRunStatus(-1); - reconTaskUpdater.recordRunCompletion(); - LOG.warn("Unable to get and apply delta updates from OM: {}", e.getMessage()); - fullSnapshot = true; } + LOG.info("Delta updates received from OM : {} loops, {} records", loopCount, + getCurrentOMDBSequenceNumber() - fromSequenceNumber); } if (fullSnapshot) { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java index d586718ffb46..be0de984ebd9 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java @@ -175,7 +175,7 @@ private void processEvent(int cfIndex, byte[] keyBytes, byte[] "event is on {} table which is not useful for Recon to " + "capture.", tableName); } - LOG.warn("Old Value of Key: {} in table: {} should not be null " + + LOG.debug("Old Value of Key: {} in table: {} should not be null " + "for DELETE event ", keyStr, tableName); return; }