Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupCopyJob;
import org.apache.hadoop.hbase.backup.BackupInfo;
Expand All @@ -38,7 +44,9 @@
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -144,21 +152,21 @@ public void execute() throws IOException {
// logs while we do the backup.
backupManager.writeBackupStartCode(0L);
}
// We roll log here before we do the snapshot. It is possible there is duplicate data
// in the log that is already in the snapshot. But if we do it after the snapshot, we
// could have data loss.
// A better approach is to do the roll log on each RS in the same global procedure as
// the snapshot.
LOG.info("Execute roll log procedure for full backup ...");

// Gather the bulk loads being tracked by the system, which can be deleted (since their data
// will be part of the snapshot being taken). We gather this list before taking the actual
// snapshots for the same reason as the log rolls.
List<BulkLoad> bulkLoadsToDelete = backupManager.readBulkloadRows(tableList);
Map<String, Long> previousLogRollsByHost = backupManager.readRegionServerLastLogRollResult();

// We roll log here before we do the snapshot. It is possible there is duplicate data
// in the log that is already in the snapshot. But if we do it after the snapshot, we
// could have data loss.
// A better approach is to do the roll log on each RS in the same global procedure as
// the snapshot.
LOG.info("Execute roll log procedure for full backup ...");
BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);

newTimestamps = backupManager.readRegionServerLastLogRollResult();
Map<String, Long> latestLogRollsByHost = backupManager.readRegionServerLastLogRollResult();

// SNAPSHOT_TABLES:
backupInfo.setPhase(BackupPhase.SNAPSHOT);
Expand All @@ -181,6 +189,50 @@ public void execute() throws IOException {
// set overall backup status: complete. Here we make sure to complete the backup.
// After this checkpoint, even if entering cancel process, will let the backup finished
backupInfo.setState(BackupState.COMPLETE);

// Scan oldlogs for dead/decommissioned hosts and add their max WAL timestamps
// to newTimestamps. This ensures subsequent incremental backups won't try to back up
// WALs that are already covered by this full backup's snapshot.
Path walRootDir = CommonFSUtils.getWALRootDir(conf);
Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
FileSystem fs = walRootDir.getFileSystem(conf);

List<FileStatus> allLogs = new ArrayList<>();
for (FileStatus hostLogDir : fs.listStatus(logDir)) {
String host = BackupUtils.parseHostNameFromLogFile(hostLogDir.getPath());
if (host == null) {
continue;
}
allLogs.addAll(Arrays.asList(fs.listStatus(hostLogDir.getPath())));
}
allLogs.addAll(Arrays.asList(fs.listStatus(oldLogDir)));

newTimestamps = new HashMap<>();

for (FileStatus log : allLogs) {
if (AbstractFSWALProvider.isMetaFile(log.getPath())) {
continue;
}
String host = BackupUtils.parseHostNameFromLogFile(log.getPath());
if (host == null) {
continue;
}
long timestamp = BackupUtils.getCreationTime(log.getPath());
Long previousLogRoll = previousLogRollsByHost.get(host);
Long latestLogRoll = latestLogRollsByHost.get(host);
boolean isInactive = latestLogRoll == null || latestLogRoll.equals(previousLogRoll);

if (isInactive) {
long currentTs = newTimestamps.getOrDefault(host, 0L);
if (timestamp > currentTs) {
newTimestamps.put(host, timestamp);
}
} else {
newTimestamps.put(host, latestLogRoll);
}
}

// The table list in backupInfo is good for both full backup and incremental backup.
// For incremental backup, it contains the incremental backup table set.
backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public IncrementalBackupManager(Connection conn, Configuration conf) throws IOEx
*/
public Map<String, Long> getIncrBackupLogFileMap() throws IOException {
List<String> logList;
Map<String, Long> newTimestamps;
Map<String, Long> previousTimestampMins;

String savedStartCode = readBackupStartCode();
Expand All @@ -83,12 +82,48 @@ public Map<String, Long> getIncrBackupLogFileMap() throws IOException {
LOG.info("Execute roll log procedure for incremental backup ...");
BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);

newTimestamps = readRegionServerLastLogRollResult();
Map<String, Long> newTimestamps = readRegionServerLastLogRollResult();

Map<String, Long> latestLogRollByHost = readRegionServerLastLogRollResult();
for (Map.Entry<String, Long> entry : latestLogRollByHost.entrySet()) {
String host = entry.getKey();
long latestLogRoll = entry.getValue();
Long earliestTimestampToIncludeInBackup = previousTimestampMins.get(host);

boolean isInactive = earliestTimestampToIncludeInBackup != null
&& earliestTimestampToIncludeInBackup > latestLogRoll;

long latestTimestampToIncludeInBackup;
if (isInactive) {
LOG.debug("Avoided resetting latest timestamp boundary for {} from {} to {}", host,
earliestTimestampToIncludeInBackup, latestLogRoll);
latestTimestampToIncludeInBackup = earliestTimestampToIncludeInBackup;
} else {
latestTimestampToIncludeInBackup = latestLogRoll;
}
newTimestamps.put(host, latestTimestampToIncludeInBackup);
}

logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
logList = excludeProcV2WALs(logList);
backupInfo.setIncrBackupFileList(logList);

// Update boundaries based on WALs that will be backed up
for (String logFile : logList) {
Path logPath = new Path(logFile);
String logHost = BackupUtils.parseHostFromOldLog(logPath);
if (logHost == null) {
logHost = BackupUtils.parseHostNameFromLogFile(logPath.getParent());
}
if (logHost != null) {
long logTs = BackupUtils.getCreationTime(logPath);
Long latestTimestampToIncludeInBackup = newTimestamps.get(logHost);
if (latestTimestampToIncludeInBackup == null || logTs > latestTimestampToIncludeInBackup) {
LOG.info("Updating backup boundary for inactive host {}: timestamp={}", logHost, logTs);
newTimestamps.put(logHost, logTs);
}
}
}
return newTimestamps;
}

Expand Down Expand Up @@ -228,15 +263,6 @@ private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
} else if (currentLogTS > oldTimeStamp) {
resultLogFiles.add(currentLogFile);
}

// It is possible that a host in .oldlogs is an obsolete region server
// so newestTimestamps.get(host) here can be null.
// Even if these logs belong to a obsolete region server, we still need
// to include they to avoid loss of edits for backup.
Long newTimestamp = newestTimestamps.get(host);
if (newTimestamp == null || currentLogTS > newTimestamp) {
newestLogs.add(currentLogFile);
}
}
// remove newest log per host because they are still in use
resultLogFiles.removeAll(newestLogs);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
* Tests that WAL files from offline/inactive RegionServers are handled correctly during backup.
* Specifically verifies that WALs from an offline RS are:
* <ol>
* <li>Backed up once in the first backup after the RS goes offline</li>
* <li>NOT re-backed up in subsequent backups</li>
* </ol>
*/
@Category(LargeTests.class)
public class TestBackupOfflineRS extends TestBackupBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBackupOfflineRS.class);

private static final Logger LOG = LoggerFactory.getLogger(TestBackupOfflineRS.class);

@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL = new HBaseTestingUtil();
conf1 = TEST_UTIL.getConfiguration();
conf1.setInt("hbase.regionserver.info.port", -1);
autoRestoreOnFailure = true;
useSecondCluster = false;
setUpHelper();
// Start an additional RS so we have at least 2
TEST_UTIL.getMiniHBaseCluster().startRegionServer();
TEST_UTIL.waitTableAvailable(table1);
}

/**
* Tests that when a full backup is taken while an RS is offline (with WALs in oldlogs), the
* offline host's timestamps are recorded so subsequent incremental backups don't re-include those
* WALs.
*/
@Test
public void testBackupWithOfflineRS() throws Exception {
LOG.info("Starting testFullBackupWithOfflineRS");

SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
List<TableName> tables = Lists.newArrayList(table1);

if (cluster.getNumLiveRegionServers() < 2) {
cluster.startRegionServer();
Thread.sleep(2000);
}

LOG.info("Inserting data to generate WAL entries");
try (Connection conn = ConnectionFactory.createConnection(conf1)) {
insertIntoTable(conn, table1, famName, 2, 100);
}

int rsToStop = 0;
HRegionServer rsBeforeStop = cluster.getRegionServer(rsToStop);
String offlineHost =
rsBeforeStop.getServerName().getHostname() + ":" + rsBeforeStop.getServerName().getPort();
LOG.info("Stopping RS: {}", offlineHost);

cluster.stopRegionServer(rsToStop);
// Wait for WALs to be moved to oldlogs
Thread.sleep(5000);

LOG.info("Taking full backup (with offline RS WALs in oldlogs)");
String fullBackupId = fullTableBackup(tables);
assertTrue("Full backup should succeed", checkSucceeded(fullBackupId));

try (BackupSystemTable sysTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
Map<TableName, Map<String, Long>> timestamps = sysTable.readLogTimestampMap(BACKUP_ROOT_DIR);
Map<String, Long> rsTimestamps = timestamps.get(table1);
LOG.info("RS timestamps after full backup: {}", rsTimestamps);

Long tsAfterFullBackup = rsTimestamps.get(offlineHost);
assertNotNull("Offline host should have timestamp recorded in trslm after full backup",
tsAfterFullBackup);

LOG.info("Taking incremental backup (should NOT include offline RS WALs)");
String incrBackupId = incrementalTableBackup(tables);
assertTrue("Incremental backup should succeed", checkSucceeded(incrBackupId));

timestamps = sysTable.readLogTimestampMap(BACKUP_ROOT_DIR);
rsTimestamps = timestamps.get(table1);
assertFalse("Offline host should not have a boundary ",
rsTimestamps.containsKey(offlineHost));
}
}
}