Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public interface BackupAdmin extends Closeable {
* @return the backup Id
*/

String backupTables(final BackupRequest userRequest) throws IOException;
BackupInfo backupTables(final BackupRequest userRequest) throws IOException;

/**
* Restore backup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ public void restore(RestoreRequest request) throws IOException {
}

@Override
public String backupTables(BackupRequest request) throws IOException {
public BackupInfo backupTables(BackupRequest request) throws IOException {
BackupType type = request.getBackupType();
String targetRootDir = request.getTargetRootDir();
List<TableName> tableList = request.getTableList();
Expand Down Expand Up @@ -593,7 +593,7 @@ public String backupTables(BackupRequest request) throws IOException {

client.execute();

return backupId;
return client.backupInfo;
}

private List<TableName> excludeNonExistingTables(List<TableName> tableList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public void execute() throws IOException {
.withTargetRootDir(targetBackupDir).withTotalTasks(workers)
.withBandwidthPerTasks(bandwidth).withNoChecksumVerify(ignoreChecksum)
.withBackupSetName(setName).build();
String backupId = admin.backupTables(request);
String backupId = admin.backupTables(request).getBackupId();
System.out.println("Backup session " + backupId + " finished. Status: SUCCESS");
} catch (IOException e) {
System.out.println("Backup session finished. Status: FAILURE");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -33,7 +36,9 @@
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -93,13 +98,36 @@ public Map<String, Long> getIncrBackupLogFileMap() throws IOException {
}
newTimestamps = readRegionServerLastLogRollResult();

logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode,
getParticipatingServerNames(backupInfo.getTables()));
logList = excludeProcV2WALs(logList);
backupInfo.setIncrBackupFileList(logList);

return newTimestamps;
}

private Set<String> getParticipatingServerNames(Set<TableName> tables) throws IOException {
Set<Address> participatingServers = new HashSet<>();
boolean flag = false;
for (TableName table : tables) {
RSGroupInfo rsGroupInfo = conn.getAdmin().getRSGroup(table);
if (rsGroupInfo != null && !rsGroupInfo.getServers().isEmpty()) {
LOG.info("Participating servers for table {}, rsgroup Name: {} are: {}", table,
rsGroupInfo.getName(), rsGroupInfo.getServers());
participatingServers.addAll(rsGroupInfo.getServers());
} else {
LOG.warn(
"Rsgroup isn't available for table {}, all servers in the cluster will be participating ",
table);
flag = true;
}
}

return flag
? new HashSet<>()
: participatingServers.stream().map(a -> a.toString()).collect(Collectors.toSet());
}

private List<String> excludeProcV2WALs(List<String> logList) {
List<String> list = new ArrayList<>();
for (int i = 0; i < logList.size(); i++) {
Expand All @@ -126,8 +154,8 @@ private List<String> excludeProcV2WALs(List<String> logList) {
* @throws IOException exception
*/
private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
Map<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
throws IOException {
Map<String, Long> newestTimestamps, Configuration conf, String savedStartCode,
Set<String> servers) throws IOException {
LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
+ "\n newestTimestamps: " + newestTimestamps);

Expand Down Expand Up @@ -160,7 +188,7 @@ private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
for (FileStatus rs : rss) {
p = rs.getPath();
host = BackupUtils.parseHostNameFromLogFile(p);
if (host == null) {
if (host == null || (!servers.isEmpty() && !servers.contains(host))) {
continue;
}
FileStatus[] logs;
Expand Down Expand Up @@ -215,7 +243,7 @@ private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
continue;
}
host = BackupUtils.parseHostFromOldLog(p);
if (host == null) {
if (host == null || (!servers.isEmpty() && !servers.contains(host))) {
continue;
}
currentLogTS = BackupUtils.getCreationTime(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -41,6 +43,7 @@
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -98,13 +101,29 @@ private Map<Address, Long> serverToPreservationBoundaryTs(List<BackupInfo> backu

// This map tracks, for every backup root, the most recent created backup (= highest timestamp)
Map<String, BackupInfo> newestBackupPerRootDir = new HashMap<>();
Set<Address> servers = new HashSet<>();
for (BackupInfo backup : backups) {
BackupInfo existingEntry = newestBackupPerRootDir.get(backup.getBackupRootDir());
if (existingEntry == null || existingEntry.getStartTs() < backup.getStartTs()) {
newestBackupPerRootDir.put(backup.getBackupRootDir(), backup);
}
}

for (BackupInfo backup : backups) {
for (TableName table : backup.getTables()) {
RSGroupInfo rsGroupInfo = conn.getAdmin().getRSGroup(table);
if (
rsGroupInfo != null && rsGroupInfo.getServers() != null
&& !rsGroupInfo.getServers().isEmpty()
) {
servers.addAll(rsGroupInfo.getServers());
} else {
servers.addAll(conn.getAdmin().getRegionServers().stream().map(s -> s.getAddress())
.collect(Collectors.toList()));
}
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("WAL cleanup time-boundary using info from: {}. ",
newestBackupPerRootDir.entrySet().stream()
Expand All @@ -124,7 +143,7 @@ private Map<Address, Long> serverToPreservationBoundaryTs(List<BackupInfo> backu
.entrySet()) {
Address address = Address.fromString(entry.getKey());
Long storedTs = boundaries.get(address);
if (storedTs == null || entry.getValue() < storedTs) {
if ((storedTs == null || entry.getValue() < storedTs) && servers.contains(address)) {
boundaries.put(address, entry.getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@
*/
package org.apache.hadoop.hbase.backup;

import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -58,7 +62,10 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.regionserver.LogRoller;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.rsgroup.RSGroupUtil;
import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
Expand Down Expand Up @@ -87,6 +94,15 @@ public class TestBackupBase {
protected static Configuration conf1;
protected static Configuration conf2;

protected static final int RSGROUP_RS_NUM = 5;
protected static final int NUM_REGIONSERVERS = 3;
protected static final String RSGROUP_NAME = "rsgroup1";
protected static final String RSGROUP_NAMESPACE = "rsgroup_ns";
protected static final TableName RSGROUP_TABLE_1 =
TableName.valueOf(RSGROUP_NAMESPACE + ":rsgroup_table1");
protected static final TableName RSGROUP_TABLE_2 =
TableName.valueOf(RSGROUP_NAMESPACE + ":rsgroup_table2");

protected static TableName table1 = TableName.valueOf("table1");
protected static TableDescriptor table1Desc;
protected static TableName table2 = TableName.valueOf("table2");
Expand All @@ -108,6 +124,7 @@ public class TestBackupBase {

protected static boolean autoRestoreOnFailure;
protected static boolean useSecondCluster;
protected static boolean enableRSgroup;

static class IncrementalTableBackupClientForTest extends IncrementalTableBackupClient {
public IncrementalTableBackupClientForTest() {
Expand Down Expand Up @@ -292,6 +309,22 @@ public void execute() throws IOException {
}
}

private static RSGroupInfo addGroup(String groupName, int serverCount) throws IOException {
Admin admin = TEST_UTIL.getAdmin();
RSGroupInfo defaultInfo = admin.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
admin.addRSGroup(groupName);
Set<Address> set = new HashSet<>();
for (Address server : defaultInfo.getServers()) {
if (set.size() == serverCount) {
break;
}
set.add(server);
}
admin.moveServersToRSGroup(set, groupName);
RSGroupInfo result = admin.getRSGroup(groupName);
return result;
}

public static void setUpHelper() throws Exception {
BACKUP_ROOT_DIR = Path.SEPARATOR + "backupUT";
BACKUP_REMOTE_ROOT_DIR = Path.SEPARATOR + "backupUT";
Expand All @@ -314,7 +347,13 @@ public static void setUpHelper() throws Exception {

// Set MultiWAL (with 2 default WAL files per RS)
conf1.set(WALFactory.WAL_PROVIDER, provider);
TEST_UTIL.startMiniCluster();
if (enableRSgroup) {
conf1.setBoolean(RSGroupUtil.RS_GROUP_ENABLED, true);
TEST_UTIL.startMiniCluster(RSGROUP_RS_NUM + NUM_REGIONSERVERS);
addGroup(RSGROUP_NAME, RSGROUP_RS_NUM);
} else {
TEST_UTIL.startMiniCluster();
}

if (useSecondCluster) {
conf2 = HBaseConfiguration.create(conf1);
Expand Down Expand Up @@ -352,6 +391,7 @@ public static void setUpHelper() throws Exception {
public static void setUp() throws Exception {
TEST_UTIL = new HBaseTestingUtil();
conf1 = TEST_UTIL.getConfiguration();
enableRSgroup = false;
autoRestoreOnFailure = true;
useSecondCluster = false;
setUpHelper();
Expand All @@ -377,6 +417,7 @@ public static void tearDown() throws Exception {
}
TEST_UTIL.shutdownMiniCluster();
TEST_UTIL.shutdownMiniMapReduceCluster();
enableRSgroup = false;
autoRestoreOnFailure = true;
useSecondCluster = false;
}
Expand Down Expand Up @@ -406,16 +447,16 @@ protected BackupRequest createBackupRequest(BackupType type, List<TableName> tab
return request;
}

protected String backupTables(BackupType type, List<TableName> tables, String path)
protected BackupInfo backupTables(BackupType type, List<TableName> tables, String path)
throws IOException {
Connection conn = null;
BackupAdmin badmin = null;
String backupId;
BackupInfo backupInfo;
try {
conn = ConnectionFactory.createConnection(conf1);
badmin = new BackupAdminImpl(conn);
BackupRequest request = createBackupRequest(type, new ArrayList<>(tables), path);
backupId = badmin.backupTables(request);
backupInfo = badmin.backupTables(request);
} finally {
if (badmin != null) {
badmin.close();
Expand All @@ -424,14 +465,14 @@ protected String backupTables(BackupType type, List<TableName> tables, String pa
conn.close();
}
}
return backupId;
return backupInfo;
}

protected String fullTableBackup(List<TableName> tables) throws IOException {
protected BackupInfo fullTableBackup(List<TableName> tables) throws IOException {
return backupTables(BackupType.FULL, tables, BACKUP_ROOT_DIR);
}

protected String incrementalTableBackup(List<TableName> tables) throws IOException {
protected BackupInfo incrementalTableBackup(List<TableName> tables) throws IOException {
return backupTables(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
}

Expand Down Expand Up @@ -479,6 +520,23 @@ protected static void createTables() throws Exception {
table.close();
ha.close();
conn.close();

if (enableRSgroup) {
ha.createNamespace(NamespaceDescriptor.create(RSGROUP_NAMESPACE)
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, RSGROUP_NAME).build());

ha.createTable(TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_1)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(famName)).build());
table = ConnectionFactory.createConnection(conf1).getTable(RSGROUP_TABLE_1);
loadTable(table);
table.close();

ha.createTable(TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_2)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(famName)).build());
table = ConnectionFactory.createConnection(conf1).getTable(RSGROUP_TABLE_2);
loadTable(table);
table.close();
}
}

protected boolean checkSucceeded(String backupId) throws IOException {
Expand All @@ -501,7 +559,7 @@ protected boolean checkFailed(String backupId) throws IOException {
return status.getState() == BackupState.FAILED;
}

private BackupInfo getBackupInfo(String backupId) throws IOException {
protected BackupInfo getBackupInfo(String backupId) throws IOException {
try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) {
BackupInfo status = table.readBackupInfo(backupId);
return status;
Expand Down Expand Up @@ -538,6 +596,26 @@ protected List<FileStatus> getListOfWALFiles(Configuration c) throws IOException
return logFiles;
}

protected Set<Address> getRsgroupServers(String rsgroupName) throws IOException {
RSGroupInfo rsGroupInfo = TEST_UTIL.getAdmin().getRSGroup(rsgroupName);
if (
rsGroupInfo != null && rsGroupInfo.getServers() != null && !rsGroupInfo.getServers().isEmpty()
) {
return new HashSet<>(rsGroupInfo.getServers());
}
return new HashSet<>();
}

protected void checkIfWALFilesBelongToRsgroup(List<String> walFiles, String rsgroupName)
throws IOException {
for (String file : walFiles) {
Address walServerAddress =
Address.fromString(BackupUtils.parseHostNameFromLogFile(new Path(file)));
assertTrue("Backed WAL files should be from RSGroup " + rsgroupName,
getRsgroupServers(rsgroupName).contains(walServerAddress));
}
}

protected void dumpBackupDir() throws IOException {
// Dump Backup Dir
FileSystem fs = FileSystem.get(conf1);
Expand Down
Loading