From 9db5f51d58ee9549e98ab2854913eedc19c1c5ee Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Wed, 15 Oct 2025 22:03:56 +0530 Subject: [PATCH 01/10] Scan WALs to identify bulkload operations for incremental backup --- .../hadoop/hbase/backup/BackupObserver.java | 3 +- .../impl/IncrementalTableBackupClient.java | 171 ++++++++++-------- .../hbase/backup/impl/TableBackupClient.java | 7 + .../mapreduce/BulkLoadCollectorJob.java | 2 +- .../hadoop/hbase/backup/TestBackupBase.java | 4 +- 5 files changed, 103 insertions(+), 84 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java index 392e27710911..d67a648e44bb 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java @@ -96,8 +96,9 @@ private void registerBulkLoad(ObserverContext fullyBackedUpTables = tbl.getTablesIncludedInBackups(); + Map continuousBackupTableSet = tbl.getContinuousBackupTableSet(); - if (fullyBackedUpTables.contains(tableName)) { + if (fullyBackedUpTables.contains(tableName) && !continuousBackupTableSet.containsKey(tableName)) { tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(), cfToHFilePaths); } else { if (LOG.isTraceEnabled()) { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index c2aa0aa17fd1..8e6d77383ae4 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Set; import java.util.TimeZone; +import java.util.stream.Collectors; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; @@ -44,6 +45,7 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyJob; import org.apache.hadoop.hbase.backup.BackupInfo; @@ -52,9 +54,11 @@ import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.mapreduce.BulkLoadCollectorJob; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob; import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.backup.util.BulkFilesCollector; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Connection; @@ -83,8 +87,7 @@ /** * Incremental backup implementation. See the {@link #execute() execute} method. */ -@InterfaceAudience.Private -public class IncrementalTableBackupClient extends TableBackupClient { +@InterfaceAudience.Private public class IncrementalTableBackupClient extends TableBackupClient { private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class); protected IncrementalTableBackupClient() { @@ -110,6 +113,7 @@ protected List filterMissingFiles(List incrBackupFileList) throw /** * Check if a given path is belongs to active WAL directory + * * @param p path * @return true, if yes */ @@ -135,84 +139,87 @@ protected static int getIndex(TableName tbl, List sTableList) { * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination. This method does NOT * clean up the entries in the bulk load system table. Those entries should not be cleaned until * the backup is marked as complete. + * * @param tablesToBackup list of tables to be backed up */ - protected List handleBulkLoad(List tablesToBackup) throws IOException { + protected List handleBulkLoad(List tablesToBackup, + Map> tablesToWALFileList, + Map tablesToPrevBackupTs) throws IOException { Map toBulkload = new HashMap<>(); - List bulkLoads; - if (backupInfo.isContinuousBackupEnabled()) { - bulkLoads = - backupManager.readBulkloadRows(tablesToBackup, backupInfo.getIncrCommittedWalTs()); - } else { - bulkLoads = backupManager.readBulkloadRows(tablesToBackup); - } + List bulkLoads = new ArrayList<>(); + FileSystem tgtFs; try { tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); } catch (URISyntaxException use) { throw new IOException("Unable to get FileSystem", use); } + Path rootdir = CommonFSUtils.getRootDir(conf); Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); - for (BulkLoad bulkLoad : bulkLoads) { - TableName srcTable = bulkLoad.getTableName(); - MergeSplitBulkloadInfo bulkloadInfo = - toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new); - String regionName = bulkLoad.getRegion(); - String fam = bulkLoad.getColumnFamily(); - String filename = FilenameUtils.getName(bulkLoad.getHfilePath()); + if (!backupInfo.isContinuousBackupEnabled()) { + bulkLoads = backupManager.readBulkloadRows(tablesToBackup); + for (BulkLoad bulkLoad : bulkLoads) { + TableName srcTable = bulkLoad.getTableName(); + MergeSplitBulkloadInfo bulkloadInfo = + toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new); + String regionName = bulkLoad.getRegion(); + String fam = bulkLoad.getColumnFamily(); + String filename = FilenameUtils.getName(bulkLoad.getHfilePath()); + + if (!tablesToBackup.contains(srcTable)) { + LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable); + continue; + } + Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); + Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); + + String srcTableQualifier = srcTable.getQualifierAsString(); + String srcTableNs = srcTable.getNamespaceAsString(); + Path tgtFam = new Path(tgtRoot, + srcTableNs + Path.SEPARATOR + srcTableQualifier + Path.SEPARATOR + regionName + + Path.SEPARATOR + fam); + if (!tgtFs.mkdirs(tgtFam)) { + throw new IOException("couldn't create " + tgtFam); + } + Path tgt = new Path(tgtFam, filename); + + Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); + Path archive = new Path(archiveDir, filename); - if (!tablesToBackup.contains(srcTable)) { - LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable); - continue; - } - Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); - Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); - - // For continuous backup: bulkload files are copied from backup directory defined by - // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster. - String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); - if (backupInfo.isContinuousBackupEnabled() && !Strings.isNullOrEmpty(backupRootDir)) { - String dayDirectoryName = BackupUtils.formatToDateString(bulkLoad.getTimestamp()); - Path bulkLoadBackupPath = - new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR + dayDirectoryName); - Path bulkLoadDir = new Path(bulkLoadBackupPath, - srcTable.getNamespaceAsString() + Path.SEPARATOR + srcTable.getNameAsString()); - FileSystem backupFs = FileSystem.get(bulkLoadDir.toUri(), conf); - Path fullBulkLoadBackupPath = - new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); - if (backupFs.exists(fullBulkLoadBackupPath)) { - LOG.debug("Backup bulkload file found {}", fullBulkLoadBackupPath); - p = fullBulkLoadBackupPath; - } else { - LOG.warn("Backup bulkload file not found {}", fullBulkLoadBackupPath); + if (fs.exists(p)) { + if (LOG.isTraceEnabled()) { + LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.getHfilePath(), p.getParent(), + srcTableQualifier); + LOG.trace("copying {} to {}", p, tgt); + } + bulkloadInfo.addActiveFile(p.toString()); + } else if (fs.exists(archive)) { + LOG.debug("copying archive {} to {}", archive, tgt); + bulkloadInfo.addArchiveFiles(archive.toString()); } } + } else { + // Continuous incremental backup: run BulkLoadCollectorJob over backed-up WALs + Path collectorOutput = new Path(getBulkOutputDir(), "bulkload-collector-output"); + for (TableName table : tablesToBackup) { + String walDirsCsv = String.join(",", tablesToWALFileList.get(table)); - String srcTableQualifier = srcTable.getQualifierAsString(); - String srcTableNs = srcTable.getNamespaceAsString(); - Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier - + Path.SEPARATOR + regionName + Path.SEPARATOR + fam); - if (!tgtFs.mkdirs(tgtFam)) { - throw new IOException("couldn't create " + tgtFam); - } - Path tgt = new Path(tgtFam, filename); + List bulkloadPaths = BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv, + collectorOutput, table, table, tablesToPrevBackupTs.get(table), + backupInfo.getIncrCommittedWalTs()); - Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); - Path archive = new Path(archiveDir, filename); + List bulkLoadFiles = bulkloadPaths.stream() + .map(Path::toString) + .collect(Collectors.toList()); - if (fs.exists(p)) { - if (LOG.isTraceEnabled()) { - LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.getHfilePath(), p.getParent(), - srcTableQualifier); - LOG.trace("copying {} to {}", p, tgt); - } - bulkloadInfo.addActiveFile(p.toString()); - } else if (fs.exists(archive)) { - LOG.debug("copying archive {} to {}", archive, tgt); - bulkloadInfo.addArchiveFiles(archive.toString()); + mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs); } + if (fs.exists(collectorOutput)) { + fs.delete(collectorOutput, true); + } + } for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) { @@ -304,8 +311,10 @@ private void updateFileLists(List activeFiles, List archiveFiles * the column families for the last full backup. In which * case, a full backup should be taken */ - @Override - public void execute() throws IOException, ColumnFamilyMismatchException { + @Override public void execute() throws IOException, ColumnFamilyMismatchException { + // tablesToWALFileList and tablesToPrevBackupTs are needed for "continuous" Incremental backup + Map> tablesToWALFileList = new HashMap<>(); + Map tablesToPrevBackupTs = new HashMap<>(); try { Map tablesToFullBackupIds = getFullBackupIds(); verifyCfCompatibility(backupInfo.getTables(), tablesToFullBackupIds); @@ -339,7 +348,7 @@ public void execute() throws IOException, ColumnFamilyMismatchException { BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); setupRegionLocator(); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(); + convertWALsToHFiles(tablesToWALFileList, tablesToPrevBackupTs); incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, backupInfo.getBackupRootDir()); } catch (Exception e) { @@ -371,7 +380,9 @@ public void execute() throws IOException, ColumnFamilyMismatchException { backupManager.writeBackupStartCode(newStartCode); } - List bulkLoads = handleBulkLoad(backupInfo.getTableNames()); + List bulkLoads = handleBulkLoad(backupInfo.getTableNames(), + tablesToWALFileList, + tablesToPrevBackupTs); // backup complete completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf); @@ -425,10 +436,13 @@ protected void deleteBulkLoadDirectory() throws IOException { } } - protected void convertWALsToHFiles() throws IOException { + protected void convertWALsToHFiles(Map> tablesToWALFileList, + Map tablesToPrevBackupTs) throws IOException { long previousBackupTs = 0L; + long currentBackupTs = 0L; if (backupInfo.isContinuousBackupEnabled()) { Set tableSet = backupInfo.getTables(); + currentBackupTs = backupInfo.getIncrCommittedWalTs(); List backupInfos = backupManager.getBackupHistory(true); for (TableName table : tableSet) { for (BackupInfo backup : backupInfos) { @@ -442,7 +456,9 @@ protected void convertWALsToHFiles() throws IOException { } else { previousBackupTs = backup.getIncrCommittedWalTs(); } - walBackupFileList = getBackupLogs(previousBackupTs); + walBackupFileList = getBackupLogs(previousBackupTs, currentBackupTs); + tablesToWALFileList.put(table, walBackupFileList); + tablesToPrevBackupTs.put(table, previousBackupTs); walToHFiles(walBackupFileList, Arrays.asList(table.getNameAsString()), previousBackupTs); break; @@ -469,7 +485,7 @@ protected void convertWALsToHFiles() throws IOException { } } - private List getBackupLogs(long startTs) throws IOException { + private List getBackupLogs(long startTs, long endTs) throws IOException { // get log files from backup dir String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); if (Strings.isNullOrEmpty(walBackupDir)) { @@ -494,7 +510,7 @@ private List getBackupLogs(long startTs) throws IOException { long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00) long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59) - if (dirEndTime >= startTs) { + if (dirEndTime >= startTs && dirStartTime <= endTs) { Path dirPath = dayDir.getPath(); FileStatus[] logs = backupFs.listStatus(dirPath); for (FileStatus log : logs) { @@ -533,11 +549,7 @@ protected void walToHFiles(List dirPaths, List tableList, long p conf.set(JOB_NAME_CONF_KEY, jobname); if (backupInfo.isContinuousBackupEnabled()) { conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs)); - // committedWALsTs is needed only for Incremental backups with continuous backup - // since these do not depend on log roll ts - long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn); - backupInfo.setIncrCommittedWalTs(committedWALsTs); - conf.set(WALInputFormat.END_TIME_KEY, Long.toString(committedWALsTs)); + conf.set(WALInputFormat.END_TIME_KEY, Long.toString(backupInfo.getIncrCommittedWalTs())); } String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; @@ -653,8 +665,9 @@ private void verifyCfCompatibility(Set tables, ColumnFamilyDescriptor[] currentCfs = admin.getDescriptor(tn).getColumnFamilies(); String snapshotName = fullBackupInfo.getSnapshotName(tn); - Path root = HBackupFileSystem.getTableBackupPath(tn, - new Path(fullBackupInfo.getBackupRootDir()), fullBackupInfo.getBackupId()); + Path root = + HBackupFileSystem.getTableBackupPath(tn, new Path(fullBackupInfo.getBackupRootDir()), + fullBackupInfo.getBackupId()); Path manifestDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root); FileSystem fs; @@ -668,10 +681,8 @@ private void verifyCfCompatibility(Set tables, SnapshotDescriptionUtils.readSnapshotInfo(fs, manifestDir); SnapshotManifest manifest = SnapshotManifest.open(conf, fs, manifestDir, snapshotDescription); - if ( - SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDescription.getTtl(), - snapshotDescription.getCreationTime(), EnvironmentEdgeManager.currentTime()) - ) { + if (SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDescription.getTtl(), + snapshotDescription.getCreationTime(), EnvironmentEdgeManager.currentTime())) { throw new SnapshotTTLExpiredException( ProtobufUtil.createSnapshotDesc(snapshotDescription)); } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java index 9e31ca409ada..b1f45d8b6813 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -113,6 +114,12 @@ protected void beginBackup(BackupManager backupManager, BackupInfo backupInfo) // set the start timestamp of the overall backup long startTs = EnvironmentEdgeManager.currentTime(); backupInfo.setStartTs(startTs); + if (backupInfo.getType() == BackupType.INCREMENTAL && backupInfo.isContinuousBackupEnabled()) { + // committedWALsTs is needed only for Incremental backups with continuous backup + // since these do not depend on log roll ts + long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn); + backupInfo.setIncrCommittedWalTs(committedWALsTs); + } // set overall backup status: ongoing backupInfo.setState(BackupState.RUNNING); backupInfo.setPhase(BackupPhase.REQUEST); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java index b752c7f78e01..cf19d2622216 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java @@ -75,7 +75,7 @@ public class BulkLoadCollectorJob extends Configured implements Tool { public BulkLoadCollectorJob() { } - protected BulkLoadCollectorJob(final Configuration c) { + public BulkLoadCollectorJob(final Configuration c) { super(c); } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 159514bd45b1..e32e1b8f920a 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -175,7 +175,7 @@ public void execute() throws IOException { // copy out the table and region info files for each table BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(); + convertWALsToHFiles(new HashMap<>(), new HashMap<>()); incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, backupInfo.getBackupRootDir()); failStageIf(Stage.stage_2); @@ -200,7 +200,7 @@ public void execute() throws IOException { BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); backupManager.writeBackupStartCode(newStartCode); - handleBulkLoad(backupInfo.getTableNames()); + handleBulkLoad(backupInfo.getTableNames(), new HashMap<>(), new HashMap<>()); failStageIf(Stage.stage_4); // backup complete From fa71d8db802a32318435ec27471f64aba6ce7b5a Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Thu, 16 Oct 2025 10:55:02 +0530 Subject: [PATCH 02/10] Update unit test --- .../hadoop/hbase/backup/BackupObserver.java | 4 +- .../impl/IncrementalTableBackupClient.java | 61 +++++++++---------- .../TestIncrementalBackupWithContinuous.java | 9 ++- 3 files changed, 38 insertions(+), 36 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java index d67a648e44bb..c8e2e9543a61 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java @@ -98,7 +98,9 @@ private void registerBulkLoad(ObserverContext fullyBackedUpTables = tbl.getTablesIncludedInBackups(); Map continuousBackupTableSet = tbl.getContinuousBackupTableSet(); - if (fullyBackedUpTables.contains(tableName) && !continuousBackupTableSet.containsKey(tableName)) { + if ( + fullyBackedUpTables.contains(tableName) && !continuousBackupTableSet.containsKey(tableName) + ) { tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(), cfToHFilePaths); } else { if (LOG.isTraceEnabled()) { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 8e6d77383ae4..2f2ec5cf5e2f 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -20,7 +20,6 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; -import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR; import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; @@ -45,7 +44,6 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyJob; import org.apache.hadoop.hbase.backup.BackupInfo; @@ -54,7 +52,6 @@ import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; -import org.apache.hadoop.hbase.backup.mapreduce.BulkLoadCollectorJob; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob; import org.apache.hadoop.hbase.backup.util.BackupUtils; @@ -87,7 +84,8 @@ /** * Incremental backup implementation. See the {@link #execute() execute} method. */ -@InterfaceAudience.Private public class IncrementalTableBackupClient extends TableBackupClient { +@InterfaceAudience.Private +public class IncrementalTableBackupClient extends TableBackupClient { private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class); protected IncrementalTableBackupClient() { @@ -113,7 +111,6 @@ protected List filterMissingFiles(List incrBackupFileList) throw /** * Check if a given path is belongs to active WAL directory - * * @param p path * @return true, if yes */ @@ -139,12 +136,11 @@ protected static int getIndex(TableName tbl, List sTableList) { * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination. This method does NOT * clean up the entries in the bulk load system table. Those entries should not be cleaned until * the backup is marked as complete. - * * @param tablesToBackup list of tables to be backed up */ protected List handleBulkLoad(List tablesToBackup, - Map> tablesToWALFileList, - Map tablesToPrevBackupTs) throws IOException { + Map> tablesToWALFileList, Map tablesToPrevBackupTs) + throws IOException { Map toBulkload = new HashMap<>(); List bulkLoads = new ArrayList<>(); @@ -177,9 +173,8 @@ protected List handleBulkLoad(List tablesToBackup, String srcTableQualifier = srcTable.getQualifierAsString(); String srcTableNs = srcTable.getNamespaceAsString(); - Path tgtFam = new Path(tgtRoot, - srcTableNs + Path.SEPARATOR + srcTableQualifier + Path.SEPARATOR + regionName - + Path.SEPARATOR + fam); + Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier + + Path.SEPARATOR + regionName + Path.SEPARATOR + fam); if (!tgtFs.mkdirs(tgtFam)) { throw new IOException("couldn't create " + tgtFam); } @@ -200,21 +195,27 @@ protected List handleBulkLoad(List tablesToBackup, bulkloadInfo.addArchiveFiles(archive.toString()); } } + + for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) { + mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(), + bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs); + } } else { // Continuous incremental backup: run BulkLoadCollectorJob over backed-up WALs Path collectorOutput = new Path(getBulkOutputDir(), "bulkload-collector-output"); for (TableName table : tablesToBackup) { String walDirsCsv = String.join(",", tablesToWALFileList.get(table)); - List bulkloadPaths = BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv, - collectorOutput, table, table, tablesToPrevBackupTs.get(table), - backupInfo.getIncrCommittedWalTs()); + List bulkloadPaths = + BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv, collectorOutput, table, table, + tablesToPrevBackupTs.get(table), backupInfo.getIncrCommittedWalTs()); - List bulkLoadFiles = bulkloadPaths.stream() - .map(Path::toString) - .collect(Collectors.toList()); + List bulkLoadFiles = + bulkloadPaths.stream().map(Path::toString).collect(Collectors.toList()); - mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs); + if (!bulkLoadFiles.isEmpty()) { + mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs); + } } if (fs.exists(collectorOutput)) { fs.delete(collectorOutput, true); @@ -222,11 +223,6 @@ protected List handleBulkLoad(List tablesToBackup, } - for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) { - mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(), - bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs); - } - return bulkLoads; } @@ -311,7 +307,8 @@ private void updateFileLists(List activeFiles, List archiveFiles * the column families for the last full backup. In which * case, a full backup should be taken */ - @Override public void execute() throws IOException, ColumnFamilyMismatchException { + @Override + public void execute() throws IOException, ColumnFamilyMismatchException { // tablesToWALFileList and tablesToPrevBackupTs are needed for "continuous" Incremental backup Map> tablesToWALFileList = new HashMap<>(); Map tablesToPrevBackupTs = new HashMap<>(); @@ -380,9 +377,8 @@ private void updateFileLists(List activeFiles, List archiveFiles backupManager.writeBackupStartCode(newStartCode); } - List bulkLoads = handleBulkLoad(backupInfo.getTableNames(), - tablesToWALFileList, - tablesToPrevBackupTs); + List bulkLoads = + handleBulkLoad(backupInfo.getTableNames(), tablesToWALFileList, tablesToPrevBackupTs); // backup complete completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf); @@ -665,9 +661,8 @@ private void verifyCfCompatibility(Set tables, ColumnFamilyDescriptor[] currentCfs = admin.getDescriptor(tn).getColumnFamilies(); String snapshotName = fullBackupInfo.getSnapshotName(tn); - Path root = - HBackupFileSystem.getTableBackupPath(tn, new Path(fullBackupInfo.getBackupRootDir()), - fullBackupInfo.getBackupId()); + Path root = HBackupFileSystem.getTableBackupPath(tn, + new Path(fullBackupInfo.getBackupRootDir()), fullBackupInfo.getBackupId()); Path manifestDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root); FileSystem fs; @@ -681,8 +676,10 @@ private void verifyCfCompatibility(Set tables, SnapshotDescriptionUtils.readSnapshotInfo(fs, manifestDir); SnapshotManifest manifest = SnapshotManifest.open(conf, fs, manifestDir, snapshotDescription); - if (SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDescription.getTtl(), - snapshotDescription.getCreationTime(), EnvironmentEdgeManager.currentTime())) { + if ( + SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDescription.getTtl(), + snapshotDescription.getCreationTime(), EnvironmentEdgeManager.currentTime()) + ) { throw new SnapshotTTLExpiredException( ProtobufUtil.createSnapshotDesc(snapshotDescription)); } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java index 54f3842f463b..3ba88a72035f 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java @@ -163,12 +163,14 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws performBulkLoad("bulkPreIncr", methodName, tableName1); expectedRowCount += ROWS_IN_BULK_LOAD; assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); - assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + //assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); loadTable(TEST_UTIL.getConnection().getTable(tableName1)); Thread.sleep(15000); performBulkLoad("bulkPostIncr", methodName, tableName1); - assertEquals(2, systemTable.readBulkloadRows(List.of(tableName1)).size()); + //assertEquals(2, systemTable.readBulkloadRows(List.of(tableName1)).size()); + assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); // Incremental backup String backup2 = @@ -176,7 +178,8 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws assertTrue(checkSucceeded(backup2)); // bulkPostIncr Bulkload entry should not be deleted post incremental backup - assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + //assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); TEST_UTIL.truncateTable(tableName1); // Restore incremental backup From 12f1d3981203fd80a137546690139514d309d046 Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Mon, 20 Oct 2025 16:10:53 +0530 Subject: [PATCH 03/10] Info log --- .../hbase/backup/impl/IncrementalTableBackupClient.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 2f2ec5cf5e2f..7e371bded820 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -213,16 +213,13 @@ protected List handleBulkLoad(List tablesToBackup, List bulkLoadFiles = bulkloadPaths.stream().map(Path::toString).collect(Collectors.toList()); - if (!bulkLoadFiles.isEmpty()) { + if (bulkLoadFiles.isEmpty()) { + LOG.info("No bulk-load files found for table {}", table); + } else { mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs); } } - if (fs.exists(collectorOutput)) { - fs.delete(collectorOutput, true); - } - } - return bulkLoads; } From d218c576da618e4a44fd9e16573949ff96388464 Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Mon, 20 Oct 2025 16:26:52 +0530 Subject: [PATCH 04/10] Minor test fix --- .../hbase/backup/impl/IncrementalTableBackupClient.java | 3 ++- .../hbase/backup/TestIncrementalBackupWithContinuous.java | 5 ----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 7e371bded820..0e0ccfbe421a 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -87,6 +87,7 @@ @InterfaceAudience.Private public class IncrementalTableBackupClient extends TableBackupClient { private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class); + private static final String BULKLOAD_COLLECTOR_OUTPUT = "bulkload-collector-output"; protected IncrementalTableBackupClient() { } @@ -202,7 +203,7 @@ protected List handleBulkLoad(List tablesToBackup, } } else { // Continuous incremental backup: run BulkLoadCollectorJob over backed-up WALs - Path collectorOutput = new Path(getBulkOutputDir(), "bulkload-collector-output"); + Path collectorOutput = new Path(getBulkOutputDir(), BULKLOAD_COLLECTOR_OUTPUT); for (TableName table : tablesToBackup) { String walDirsCsv = String.join(",", tablesToWALFileList.get(table)); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java index 3ba88a72035f..72867da95f17 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java @@ -163,22 +163,17 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws performBulkLoad("bulkPreIncr", methodName, tableName1); expectedRowCount += ROWS_IN_BULK_LOAD; assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); - //assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); loadTable(TEST_UTIL.getConnection().getTable(tableName1)); Thread.sleep(15000); performBulkLoad("bulkPostIncr", methodName, tableName1); - //assertEquals(2, systemTable.readBulkloadRows(List.of(tableName1)).size()); assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); // Incremental backup String backup2 = backupTables(BackupType.INCREMENTAL, List.of(tableName1), BACKUP_ROOT_DIR, true); assertTrue(checkSucceeded(backup2)); - - // bulkPostIncr Bulkload entry should not be deleted post incremental backup - //assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); TEST_UTIL.truncateTable(tableName1); From 0bc69a0c4185e22e8dc18463d72ece7f0e7bc4fd Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Wed, 22 Oct 2025 20:20:55 +0530 Subject: [PATCH 05/10] Address review comments --- .../hadoop/hbase/backup/BackupObserver.java | 5 +- .../impl/AbstractPitrRestoreHandler.java | 37 +-------- .../impl/IncrementalTableBackupClient.java | 77 ++++++------------- .../hbase/backup/impl/TableBackupClient.java | 6 -- .../hadoop/hbase/backup/util/BackupUtils.java | 36 +++++++++ .../hbase/backup/util/BulkFilesCollector.java | 3 + 6 files changed, 69 insertions(+), 95 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java index c8e2e9543a61..48ae6ebd0c2a 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java @@ -98,13 +98,16 @@ private void registerBulkLoad(ObserverContext fullyBackedUpTables = tbl.getTablesIncludedInBackups(); Map continuousBackupTableSet = tbl.getContinuousBackupTableSet(); + // Tables in continuousBackupTableSet do not rely on BackupSystemTable but rather + // scan on WAL backup directory to identify bulkload operation HBASE-29656 if ( fullyBackedUpTables.contains(tableName) && !continuousBackupTableSet.containsKey(tableName) ) { tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(), cfToHFilePaths); } else { if (LOG.isTraceEnabled()) { - LOG.trace("Table {} has not gone through full backup - skipping.", tableName); + LOG.trace("Table {} has either not gone through full backup or is " + + "part of continuousBackupTableSet - skipping.", tableName); } } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java index ce6c4d4dc683..fcc712608490 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java @@ -380,7 +380,7 @@ private void replayWal(TableName sourceTable, TableName targetTable, long startT sourceTable, targetTable, startTime, endTime, walDirPath); List validDirs = - getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); + BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); if (validDirs.isEmpty()) { LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL replay.", startTime, endTime); @@ -400,7 +400,7 @@ private List collectBulkFiles(TableName sourceTable, TableName targetTable sourceTable, targetTable, startTime, endTime, walDirPath, restoreRootDir); List validDirs = - getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); + BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); if (validDirs.isEmpty()) { LOG.warn("No valid WAL directories found for range {} - {}. Skipping bulk-file collection.", startTime, endTime); @@ -413,39 +413,6 @@ private List collectBulkFiles(TableName sourceTable, TableName targetTable walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime, endTime); } - /** - * Fetches valid WAL directories based on the given time range. - */ - private List getValidWalDirs(Configuration conf, Path walBackupDir, long startTime, - long endTime) throws IOException { - FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf); - FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR)); - - List validDirs = new ArrayList<>(); - SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); - - for (FileStatus dayDir : dayDirs) { - if (!dayDir.isDirectory()) { - continue; // Skip files, only process directories - } - - String dirName = dayDir.getPath().getName(); - try { - Date dirDate = dateFormat.parse(dirName); - long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00) - long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59) - - // Check if this day's WAL files overlap with the required time range - if (dirEndTime >= startTime && dirStartTime <= endTime) { - validDirs.add(dayDir.getPath().toString()); - } - } catch (ParseException e) { - LOG.warn("Skipping invalid directory name: {}", dirName, e); - } - } - return validDirs; - } - /** * Executes WAL replay using WALPlayer. */ diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 0e0ccfbe421a..5415f4a47236 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -159,19 +159,18 @@ protected List handleBulkLoad(List tablesToBackup, bulkLoads = backupManager.readBulkloadRows(tablesToBackup); for (BulkLoad bulkLoad : bulkLoads) { TableName srcTable = bulkLoad.getTableName(); + if (!tablesToBackup.contains(srcTable)) { + LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable); + continue; + } + MergeSplitBulkloadInfo bulkloadInfo = toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new); String regionName = bulkLoad.getRegion(); String fam = bulkLoad.getColumnFamily(); String filename = FilenameUtils.getName(bulkLoad.getHfilePath()); - - if (!tablesToBackup.contains(srcTable)) { - LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable); - continue; - } Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); - String srcTableQualifier = srcTable.getQualifierAsString(); String srcTableNs = srcTable.getNamespaceAsString(); Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier @@ -179,8 +178,8 @@ protected List handleBulkLoad(List tablesToBackup, if (!tgtFs.mkdirs(tgtFam)) { throw new IOException("couldn't create " + tgtFam); } - Path tgt = new Path(tgtFam, filename); + Path tgt = new Path(tgtFam, filename); Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); Path archive = new Path(archiveDir, filename); @@ -205,20 +204,21 @@ protected List handleBulkLoad(List tablesToBackup, // Continuous incremental backup: run BulkLoadCollectorJob over backed-up WALs Path collectorOutput = new Path(getBulkOutputDir(), BULKLOAD_COLLECTOR_OUTPUT); for (TableName table : tablesToBackup) { - String walDirsCsv = String.join(",", tablesToWALFileList.get(table)); + String walDirsCsv = String.join(",", tablesToWALFileList.getOrDefault(table, new ArrayList())); List bulkloadPaths = BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv, collectorOutput, table, table, - tablesToPrevBackupTs.get(table), backupInfo.getIncrCommittedWalTs()); + tablesToPrevBackupTs.getOrDefault(table, 0L), backupInfo.getIncrCommittedWalTs()); List bulkLoadFiles = bulkloadPaths.stream().map(Path::toString).collect(Collectors.toList()); if (bulkLoadFiles.isEmpty()) { LOG.info("No bulk-load files found for table {}", table); - } else { - mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs); + continue; } + + mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs); } } return bulkLoads; @@ -315,6 +315,12 @@ public void execute() throws IOException, ColumnFamilyMismatchException { verifyCfCompatibility(backupInfo.getTables(), tablesToFullBackupIds); // case PREPARE_INCREMENTAL: + if (backupInfo.isContinuousBackupEnabled()) { + // committedWALsTs is needed only for Incremental backups with continuous backup + // since these do not depend on log roll ts + long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn); + backupInfo.setIncrCommittedWalTs(committedWALsTs); + } beginBackup(backupManager, backupInfo); backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL); // Non-continuous Backup incremental backup is controlled by 'incremental backup table set' @@ -435,6 +441,12 @@ protected void convertWALsToHFiles(Map> tablesToWALFileL long previousBackupTs = 0L; long currentBackupTs = 0L; if (backupInfo.isContinuousBackupEnabled()) { + String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); + if (Strings.isNullOrEmpty(walBackupDir)) { + throw new IOException( + "Incremental backup requires the WAL backup directory " + CONF_CONTINUOUS_BACKUP_WAL_DIR); + } + Path walBackupPath = new Path(walBackupDir); Set tableSet = backupInfo.getTables(); currentBackupTs = backupInfo.getIncrCommittedWalTs(); List backupInfos = backupManager.getBackupHistory(true); @@ -450,7 +462,7 @@ protected void convertWALsToHFiles(Map> tablesToWALFileL } else { previousBackupTs = backup.getIncrCommittedWalTs(); } - walBackupFileList = getBackupLogs(previousBackupTs, currentBackupTs); + walBackupFileList = BackupUtils.getValidWalDirs(conf, walBackupPath, previousBackupTs, currentBackupTs); tablesToWALFileList.put(table, walBackupFileList); tablesToPrevBackupTs.put(table, previousBackupTs); walToHFiles(walBackupFileList, Arrays.asList(table.getNameAsString()), @@ -479,47 +491,6 @@ protected void convertWALsToHFiles(Map> tablesToWALFileL } } - private List getBackupLogs(long startTs, long endTs) throws IOException { - // get log files from backup dir - String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); - if (Strings.isNullOrEmpty(walBackupDir)) { - throw new IOException( - "Incremental backup requires the WAL backup directory " + CONF_CONTINUOUS_BACKUP_WAL_DIR); - } - List resultLogFiles = new ArrayList<>(); - Path walBackupPath = new Path(walBackupDir); - FileSystem backupFs = FileSystem.get(walBackupPath.toUri(), conf); - FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR)); - SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); - dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - - for (FileStatus dayDir : dayDirs) { - if (!dayDir.isDirectory()) { - continue; // Skip files, only process directories - } - - String dirName = dayDir.getPath().getName(); - try { - Date dirDate = dateFormat.parse(dirName); - long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00) - long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59) - - if (dirEndTime >= startTs && dirStartTime <= endTs) { - Path dirPath = dayDir.getPath(); - FileStatus[] logs = backupFs.listStatus(dirPath); - for (FileStatus log : logs) { - String filepath = log.getPath().toString(); - LOG.debug("Found WAL file: {}", filepath); - resultLogFiles.add(filepath); - } - } - } catch (ParseException e) { - LOG.warn("Skipping invalid directory name: " + dirName, e); - } - } - return resultLogFiles; - } - protected boolean tableExists(TableName table, Connection conn) throws IOException { try (Admin admin = conn.getAdmin()) { return admin.tableExists(table); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java index b1f45d8b6813..e0808597d7a9 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java @@ -114,12 +114,6 @@ protected void beginBackup(BackupManager backupManager, BackupInfo backupInfo) // set the start timestamp of the overall backup long startTs = EnvironmentEdgeManager.currentTime(); backupInfo.setStartTs(startTs); - if (backupInfo.getType() == BackupType.INCREMENTAL && backupInfo.isContinuousBackupEnabled()) { - // committedWALsTs is needed only for Incremental backups with continuous backup - // since these do not depend on log roll ts - long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn); - backupInfo.setIncrCommittedWalTs(committedWALsTs); - } // set overall backup status: ongoing backupInfo.setState(BackupState.RUNNING); backupInfo.setPhase(BackupPhase.REQUEST); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index bf309104775c..4641263c800a 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hbase.backup.util; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URLDecoder; +import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; @@ -945,4 +948,37 @@ public static String formatToDateString(long dayInMillis) { dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); return dateFormat.format(new Date(dayInMillis)); } + + /** + * Fetches valid WAL directories based on the given time range. + */ + public static List getValidWalDirs(Configuration conf, Path walBackupDir, long startTime, + long endTime) throws IOException { + FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf); + FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR)); + + List validDirs = new ArrayList<>(); + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + + for (FileStatus dayDir : dayDirs) { + if (!dayDir.isDirectory()) { + continue; // Skip files, only process directories + } + + String dirName = dayDir.getPath().getName(); + try { + Date dirDate = dateFormat.parse(dirName); + long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00) + long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59) + + // Check if this day's WAL files overlap with the required time range + if (dirEndTime >= startTime && dirStartTime <= endTime) { + validDirs.add(dayDir.getPath().toString()); + } + } catch (ParseException e) { + LOG.warn("Skipping invalid directory name: {}", dirName, e); + } + } + return validDirs; + } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java index 718a662abb7b..2415fc70edaa 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java @@ -70,6 +70,9 @@ public static List collectFromWalDirs(Configuration conf, String walDirsCs Path restoreRootDir, TableName sourceTable, TableName targetTable, long startTime, long endTime) throws IOException { + LOG.info("Called collectFromWalDirs for source table {}, target table {}, startTime {}, endTime" + + " {}, restoreRootDir {}", sourceTable, targetTable, startTime, endTime, restoreRootDir); + // prepare job Tool Configuration jobConf = new Configuration(conf); if (startTime > 0) jobConf.setLong(WALInputFormat.START_TIME_KEY, startTime); From a2dd167eafdfd1d113d6e2a22eb5d7f5cbe3de28 Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Wed, 22 Oct 2025 20:27:59 +0530 Subject: [PATCH 06/10] Spotless apply --- .../backup/impl/AbstractPitrRestoreHandler.java | 9 --------- .../backup/impl/IncrementalTableBackupClient.java | 14 ++++---------- .../hbase/backup/impl/TableBackupClient.java | 1 - 3 files changed, 4 insertions(+), 20 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java index fcc712608490..0ebc83649c3a 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java @@ -20,25 +20,16 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; -import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR; -import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES; import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 5415f4a47236..70189624138b 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -19,27 +19,19 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; -import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR; -import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TimeZone; import java.util.stream.Collectors; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -204,7 +196,8 @@ protected List handleBulkLoad(List tablesToBackup, // Continuous incremental backup: run BulkLoadCollectorJob over backed-up WALs Path collectorOutput = new Path(getBulkOutputDir(), BULKLOAD_COLLECTOR_OUTPUT); for (TableName table : tablesToBackup) { - String walDirsCsv = String.join(",", tablesToWALFileList.getOrDefault(table, new ArrayList())); + String walDirsCsv = + String.join(",", tablesToWALFileList.getOrDefault(table, new ArrayList())); List bulkloadPaths = BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv, collectorOutput, table, table, @@ -462,7 +455,8 @@ protected void convertWALsToHFiles(Map> tablesToWALFileL } else { previousBackupTs = backup.getIncrCommittedWalTs(); } - walBackupFileList = BackupUtils.getValidWalDirs(conf, walBackupPath, previousBackupTs, currentBackupTs); + walBackupFileList = + BackupUtils.getValidWalDirs(conf, walBackupPath, previousBackupTs, currentBackupTs); tablesToWALFileList.put(table, walBackupFileList); tablesToPrevBackupTs.put(table, previousBackupTs); walToHFiles(walBackupFileList, Arrays.asList(table.getNameAsString()), diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java index e0808597d7a9..9e31ca409ada 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; -import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.util.CommonFSUtils; From 4f414d233bb0a3f78a64f821cef2bf4e4550ac3b Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Thu, 23 Oct 2025 15:43:11 +0530 Subject: [PATCH 07/10] Addressed review comment --- .../impl/AbstractPitrRestoreHandler.java | 27 ++------------ .../impl/IncrementalTableBackupClient.java | 8 ++--- .../hadoop/hbase/backup/util/BackupUtils.java | 36 +++++++++++++++++++ 3 files changed, 43 insertions(+), 28 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java index 0ebc83649c3a..981a3c258a3d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -334,7 +335,8 @@ private void reBulkloadFiles(TableName sourceTable, TableName targetTable, long RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf); List bulkloadFiles = - collectBulkFiles(sourceTable, targetTable, startTime, endTime, new Path(restoreRootDir)); + BackupUtils.collectBulkFiles(conn, sourceTable, targetTable, startTime, endTime, + new Path(restoreRootDir), new ArrayList()); if (bulkloadFiles.isEmpty()) { LOG.info("No bulk-load files found for {} in time range {}-{}. Skipping bulkload restore.", @@ -381,29 +383,6 @@ private void replayWal(TableName sourceTable, TableName targetTable, long startT executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime); } - private List collectBulkFiles(TableName sourceTable, TableName targetTable, long startTime, - long endTime, Path restoreRootDir) throws IOException { - - String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR); - Path walDirPath = new Path(walBackupDir); - LOG.info( - "Starting WAL bulk-file collection for source: {}, target: {}, time range: {} - {}, WAL backup dir: {}, restore root: {}", - sourceTable, targetTable, startTime, endTime, walDirPath, restoreRootDir); - - List validDirs = - BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); - if (validDirs.isEmpty()) { - LOG.warn("No valid WAL directories found for range {} - {}. Skipping bulk-file collection.", - startTime, endTime); - return Collections.emptyList(); - } - - String walDirsCsv = String.join(",", validDirs); - - return BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()), - walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime, endTime); - } - /** * Executes WAL replay using WALPlayer. */ diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 70189624138b..4cb62473c157 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -196,12 +196,12 @@ protected List handleBulkLoad(List tablesToBackup, // Continuous incremental backup: run BulkLoadCollectorJob over backed-up WALs Path collectorOutput = new Path(getBulkOutputDir(), BULKLOAD_COLLECTOR_OUTPUT); for (TableName table : tablesToBackup) { - String walDirsCsv = - String.join(",", tablesToWALFileList.getOrDefault(table, new ArrayList())); + long startTs = tablesToPrevBackupTs.getOrDefault(table, 0L); + long endTs = backupInfo.getIncrCommittedWalTs(); + List walDirs = tablesToWALFileList.getOrDefault(table, new ArrayList()); List bulkloadPaths = - BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv, collectorOutput, table, table, - tablesToPrevBackupTs.getOrDefault(table, 0L), backupInfo.getIncrCommittedWalTs()); + BackupUtils.collectBulkFiles(conn, table, table, startTs, endTs, collectorOutput, walDirs); List bulkLoadFiles = bulkloadPaths.stream().map(Path::toString).collect(Collectors.toList()); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index 4641263c800a..c97406547ffb 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.backup.util; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR; @@ -49,6 +50,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ServerName; @@ -76,6 +78,7 @@ import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hbase.thirdparty.com.google.common.base.Strings; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -949,6 +952,39 @@ public static String formatToDateString(long dayInMillis) { return dateFormat.format(new Date(dayInMillis)); } + /** + * Fetches bulkload filepaths based on the given time range from backup WAL directory. + */ + public static List collectBulkFiles(Connection conn, TableName sourceTable, + TableName targetTable, long startTime, long endTime, Path restoreRootDir, + List walDirs) throws IOException { + + if (walDirs.isEmpty()) { + String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR); + if (Strings.isNullOrEmpty(walBackupDir)) { + throw new IOException( + "WAL backup directory is not configured " + CONF_CONTINUOUS_BACKUP_WAL_DIR); + } + Path walDirPath = new Path(walBackupDir); + walDirs = BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); + } + + if (walDirs.isEmpty()) { + LOG.warn("No valid WAL directories found for range {} - {}. Skipping bulk-file collection.", + startTime, endTime); + return Collections.emptyList(); + } + + LOG.info( + "Starting WAL bulk-file collection for source: {}, target: {}, time range: {} - {}, WAL " + + "backup dir: {}, restore root: {}", sourceTable, targetTable, startTime, endTime, + walDirs, restoreRootDir); + String walDirsCsv = String.join(",", walDirs); + + return BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()), + walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime, endTime); + } + /** * Fetches valid WAL directories based on the given time range. */ From b4f88b3df93e0b43439b09cb123de54019a2ddbc Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Thu, 23 Oct 2025 15:44:50 +0530 Subject: [PATCH 08/10] spotless --- .../backup/impl/AbstractPitrRestoreHandler.java | 7 ++----- .../backup/impl/IncrementalTableBackupClient.java | 5 ++--- .../hadoop/hbase/backup/util/BackupUtils.java | 13 +++++++------ 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java index 981a3c258a3d..3f31255d60f6 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -39,7 +38,6 @@ import org.apache.hadoop.hbase.backup.RestoreJob; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.backup.util.BulkFilesCollector; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.mapreduce.WALInputFormat; import org.apache.hadoop.hbase.mapreduce.WALPlayer; @@ -334,9 +332,8 @@ private void reBulkloadFiles(TableName sourceTable, TableName targetTable, long RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf); - List bulkloadFiles = - BackupUtils.collectBulkFiles(conn, sourceTable, targetTable, startTime, endTime, - new Path(restoreRootDir), new ArrayList()); + List bulkloadFiles = BackupUtils.collectBulkFiles(conn, sourceTable, targetTable, + startTime, endTime, new Path(restoreRootDir), new ArrayList()); if (bulkloadFiles.isEmpty()) { LOG.info("No bulk-load files found for {} in time range {}-{}. Skipping bulkload restore.", diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 4cb62473c157..1bd3621b2945 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.backup.util.BulkFilesCollector; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Connection; @@ -200,8 +199,8 @@ protected List handleBulkLoad(List tablesToBackup, long endTs = backupInfo.getIncrCommittedWalTs(); List walDirs = tablesToWALFileList.getOrDefault(table, new ArrayList()); - List bulkloadPaths = - BackupUtils.collectBulkFiles(conn, table, table, startTs, endTs, collectorOutput, walDirs); + List bulkloadPaths = BackupUtils.collectBulkFiles(conn, table, table, startTs, endTs, + collectorOutput, walDirs); List bulkLoadFiles = bulkloadPaths.stream().map(Path::toString).collect(Collectors.toList()); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index c97406547ffb..28bbfcf254ae 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -78,12 +78,12 @@ import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hbase.thirdparty.com.google.common.base.Strings; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Splitter; +import org.apache.hbase.thirdparty.com.google.common.base.Strings; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; @@ -956,8 +956,8 @@ public static String formatToDateString(long dayInMillis) { * Fetches bulkload filepaths based on the given time range from backup WAL directory. */ public static List collectBulkFiles(Connection conn, TableName sourceTable, - TableName targetTable, long startTime, long endTime, Path restoreRootDir, - List walDirs) throws IOException { + TableName targetTable, long startTime, long endTime, Path restoreRootDir, List walDirs) + throws IOException { if (walDirs.isEmpty()) { String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR); @@ -966,7 +966,8 @@ public static List collectBulkFiles(Connection conn, TableName sourceTable "WAL backup directory is not configured " + CONF_CONTINUOUS_BACKUP_WAL_DIR); } Path walDirPath = new Path(walBackupDir); - walDirs = BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); + walDirs = + BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); } if (walDirs.isEmpty()) { @@ -977,8 +978,8 @@ public static List collectBulkFiles(Connection conn, TableName sourceTable LOG.info( "Starting WAL bulk-file collection for source: {}, target: {}, time range: {} - {}, WAL " - + "backup dir: {}, restore root: {}", sourceTable, targetTable, startTime, endTime, - walDirs, restoreRootDir); + + "backup dir: {}, restore root: {}", + sourceTable, targetTable, startTime, endTime, walDirs, restoreRootDir); String walDirsCsv = String.join(",", walDirs); return BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()), From 730dac3bfba86e20301c64941f5e35aa04107eb8 Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Fri, 24 Oct 2025 01:28:43 +0530 Subject: [PATCH 09/10] Remove log --- .../apache/hadoop/hbase/backup/util/BulkFilesCollector.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java index 2415fc70edaa..718a662abb7b 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java @@ -70,9 +70,6 @@ public static List collectFromWalDirs(Configuration conf, String walDirsCs Path restoreRootDir, TableName sourceTable, TableName targetTable, long startTime, long endTime) throws IOException { - LOG.info("Called collectFromWalDirs for source table {}, target table {}, startTime {}, endTime" - + " {}, restoreRootDir {}", sourceTable, targetTable, startTime, endTime, restoreRootDir); - // prepare job Tool Configuration jobConf = new Configuration(conf); if (startTime > 0) jobConf.setLong(WALInputFormat.START_TIME_KEY, startTime); From 192353a2a2d482819d083dfca12e8ed8b33601c0 Mon Sep 17 00:00:00 2001 From: Ankit Solomon Date: Sat, 25 Oct 2025 15:55:23 +0530 Subject: [PATCH 10/10] Retrigger CI --- .../java/org/apache/hadoop/hbase/backup/BackupObserver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java index 48ae6ebd0c2a..c506d6dc6aed 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java @@ -107,7 +107,7 @@ private void registerBulkLoad(ObserverContext