From 25d095b942b66ab4b9d5e77492b8a8d3f86767da Mon Sep 17 00:00:00 2001 From: Hernan Gelaf-Romer Date: Mon, 29 Dec 2025 08:02:17 -0500 Subject: [PATCH] Log filtering in IncrementalBackupManager can lead to data loss --- .../backup/impl/FullTableBackupClient.java | 68 +++++++-- .../backup/impl/IncrementalBackupManager.java | 48 +++++-- .../hbase/backup/TestBackupOfflineRS.java | 129 ++++++++++++++++++ 3 files changed, 226 insertions(+), 19 deletions(-) create mode 100644 hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupOfflineRS.java diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java index 2293fd4f8149..87e1250f8e35 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -25,8 +25,14 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyJob; import org.apache.hadoop.hbase.backup.BackupInfo; @@ -38,7 +44,9 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,21 +152,21 @@ public void execute() throws IOException { // logs while we do the backup. backupManager.writeBackupStartCode(0L); } - // We roll log here before we do the snapshot. It is possible there is duplicate data - // in the log that is already in the snapshot. But if we do it after the snapshot, we - // could have data loss. - // A better approach is to do the roll log on each RS in the same global procedure as - // the snapshot. - LOG.info("Execute roll log procedure for full backup ..."); // Gather the bulk loads being tracked by the system, which can be deleted (since their data // will be part of the snapshot being taken). We gather this list before taking the actual // snapshots for the same reason as the log rolls. List bulkLoadsToDelete = backupManager.readBulkloadRows(tableList); + Map previousLogRollsByHost = backupManager.readRegionServerLastLogRollResult(); + // We roll log here before we do the snapshot. It is possible there is duplicate data + // in the log that is already in the snapshot. But if we do it after the snapshot, we + // could have data loss. + // A better approach is to do the roll log on each RS in the same global procedure as + // the snapshot. + LOG.info("Execute roll log procedure for full backup ..."); BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf); - - newTimestamps = backupManager.readRegionServerLastLogRollResult(); + Map latestLogRollsByHost = backupManager.readRegionServerLastLogRollResult(); // SNAPSHOT_TABLES: backupInfo.setPhase(BackupPhase.SNAPSHOT); @@ -181,6 +189,50 @@ public void execute() throws IOException { // set overall backup status: complete. Here we make sure to complete the backup. // After this checkpoint, even if entering cancel process, will let the backup finished backupInfo.setState(BackupState.COMPLETE); + + // Scan oldlogs for dead/decommissioned hosts and add their max WAL timestamps + // to newTimestamps. This ensures subsequent incremental backups won't try to back up + // WALs that are already covered by this full backup's snapshot. + Path walRootDir = CommonFSUtils.getWALRootDir(conf); + Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); + Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); + FileSystem fs = walRootDir.getFileSystem(conf); + + List allLogs = new ArrayList<>(); + for (FileStatus hostLogDir : fs.listStatus(logDir)) { + String host = BackupUtils.parseHostNameFromLogFile(hostLogDir.getPath()); + if (host == null) { + continue; + } + allLogs.addAll(Arrays.asList(fs.listStatus(hostLogDir.getPath()))); + } + allLogs.addAll(Arrays.asList(fs.listStatus(oldLogDir))); + + newTimestamps = new HashMap<>(); + + for (FileStatus log : allLogs) { + if (AbstractFSWALProvider.isMetaFile(log.getPath())) { + continue; + } + String host = BackupUtils.parseHostNameFromLogFile(log.getPath()); + if (host == null) { + continue; + } + long timestamp = BackupUtils.getCreationTime(log.getPath()); + Long previousLogRoll = previousLogRollsByHost.get(host); + Long latestLogRoll = latestLogRollsByHost.get(host); + boolean isInactive = latestLogRoll == null || latestLogRoll.equals(previousLogRoll); + + if (isInactive) { + long currentTs = newTimestamps.getOrDefault(host, 0L); + if (timestamp > currentTs) { + newTimestamps.put(host, timestamp); + } + } else { + newTimestamps.put(host, latestLogRoll); + } + } + // The table list in backupInfo is good for both full backup and incremental backup. // For incremental backup, it contains the incremental backup table set. backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java index 20884edf836e..de35eebbcdc2 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java @@ -58,7 +58,6 @@ public IncrementalBackupManager(Connection conn, Configuration conf) throws IOEx */ public Map getIncrBackupLogFileMap() throws IOException { List logList; - Map newTimestamps; Map previousTimestampMins; String savedStartCode = readBackupStartCode(); @@ -83,12 +82,48 @@ public Map getIncrBackupLogFileMap() throws IOException { LOG.info("Execute roll log procedure for incremental backup ..."); BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf); - newTimestamps = readRegionServerLastLogRollResult(); + Map newTimestamps = readRegionServerLastLogRollResult(); + + Map latestLogRollByHost = readRegionServerLastLogRollResult(); + for (Map.Entry entry : latestLogRollByHost.entrySet()) { + String host = entry.getKey(); + long latestLogRoll = entry.getValue(); + Long earliestTimestampToIncludeInBackup = previousTimestampMins.get(host); + + boolean isInactive = earliestTimestampToIncludeInBackup != null + && earliestTimestampToIncludeInBackup > latestLogRoll; + + long latestTimestampToIncludeInBackup; + if (isInactive) { + LOG.debug("Avoided resetting latest timestamp boundary for {} from {} to {}", host, + earliestTimestampToIncludeInBackup, latestLogRoll); + latestTimestampToIncludeInBackup = earliestTimestampToIncludeInBackup; + } else { + latestTimestampToIncludeInBackup = latestLogRoll; + } + newTimestamps.put(host, latestTimestampToIncludeInBackup); + } logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode); logList = excludeProcV2WALs(logList); backupInfo.setIncrBackupFileList(logList); + // Update boundaries based on WALs that will be backed up + for (String logFile : logList) { + Path logPath = new Path(logFile); + String logHost = BackupUtils.parseHostFromOldLog(logPath); + if (logHost == null) { + logHost = BackupUtils.parseHostNameFromLogFile(logPath.getParent()); + } + if (logHost != null) { + long logTs = BackupUtils.getCreationTime(logPath); + Long latestTimestampToIncludeInBackup = newTimestamps.get(logHost); + if (latestTimestampToIncludeInBackup == null || logTs > latestTimestampToIncludeInBackup) { + LOG.info("Updating backup boundary for inactive host {}: timestamp={}", logHost, logTs); + newTimestamps.put(logHost, logTs); + } + } + } return newTimestamps; } @@ -228,15 +263,6 @@ private List getLogFilesForNewBackup(Map olderTimestamps, } else if (currentLogTS > oldTimeStamp) { resultLogFiles.add(currentLogFile); } - - // It is possible that a host in .oldlogs is an obsolete region server - // so newestTimestamps.get(host) here can be null. - // Even if these logs belong to a obsolete region server, we still need - // to include they to avoid loss of edits for backup. - Long newTimestamp = newestTimestamps.get(host); - if (newTimestamp == null || currentLogTS > newTimestamp) { - newestLogs.add(currentLogFile); - } } // remove newest log per host because they are still in use resultLogFiles.removeAll(newestLogs); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupOfflineRS.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupOfflineRS.java new file mode 100644 index 000000000000..2a38ba6b7227 --- /dev/null +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupOfflineRS.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +/** + * Tests that WAL files from offline/inactive RegionServers are handled correctly during backup. + * Specifically verifies that WALs from an offline RS are: + *
    + *
  1. Backed up once in the first backup after the RS goes offline
  2. + *
  3. NOT re-backed up in subsequent backups
  4. + *
+ */ +@Category(LargeTests.class) +public class TestBackupOfflineRS extends TestBackupBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBackupOfflineRS.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestBackupOfflineRS.class); + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + conf1 = TEST_UTIL.getConfiguration(); + conf1.setInt("hbase.regionserver.info.port", -1); + autoRestoreOnFailure = true; + useSecondCluster = false; + setUpHelper(); + // Start an additional RS so we have at least 2 + TEST_UTIL.getMiniHBaseCluster().startRegionServer(); + TEST_UTIL.waitTableAvailable(table1); + } + + /** + * Tests that when a full backup is taken while an RS is offline (with WALs in oldlogs), the + * offline host's timestamps are recorded so subsequent incremental backups don't re-include those + * WALs. + */ + @Test + public void testBackupWithOfflineRS() throws Exception { + LOG.info("Starting testFullBackupWithOfflineRS"); + + SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List tables = Lists.newArrayList(table1); + + if (cluster.getNumLiveRegionServers() < 2) { + cluster.startRegionServer(); + Thread.sleep(2000); + } + + LOG.info("Inserting data to generate WAL entries"); + try (Connection conn = ConnectionFactory.createConnection(conf1)) { + insertIntoTable(conn, table1, famName, 2, 100); + } + + int rsToStop = 0; + HRegionServer rsBeforeStop = cluster.getRegionServer(rsToStop); + String offlineHost = + rsBeforeStop.getServerName().getHostname() + ":" + rsBeforeStop.getServerName().getPort(); + LOG.info("Stopping RS: {}", offlineHost); + + cluster.stopRegionServer(rsToStop); + // Wait for WALs to be moved to oldlogs + Thread.sleep(5000); + + LOG.info("Taking full backup (with offline RS WALs in oldlogs)"); + String fullBackupId = fullTableBackup(tables); + assertTrue("Full backup should succeed", checkSucceeded(fullBackupId)); + + try (BackupSystemTable sysTable = new BackupSystemTable(TEST_UTIL.getConnection())) { + Map> timestamps = sysTable.readLogTimestampMap(BACKUP_ROOT_DIR); + Map rsTimestamps = timestamps.get(table1); + LOG.info("RS timestamps after full backup: {}", rsTimestamps); + + Long tsAfterFullBackup = rsTimestamps.get(offlineHost); + assertNotNull("Offline host should have timestamp recorded in trslm after full backup", + tsAfterFullBackup); + + LOG.info("Taking incremental backup (should NOT include offline RS WALs)"); + String incrBackupId = incrementalTableBackup(tables); + assertTrue("Incremental backup should succeed", checkSucceeded(incrBackupId)); + + timestamps = sysTable.readLogTimestampMap(BACKUP_ROOT_DIR); + rsTimestamps = timestamps.get(table1); + assertFalse("Offline host should not have a boundary ", + rsTimestamps.containsKey(offlineHost)); + } + } +}