Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,9 @@ public enum OperationStatusCode {
public static final String REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT =
"org.apache.hadoop.hbase.replication.ReplicationSinkServiceImpl";
public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled";
public static final String REPLICATION_SINK_TRANSLATOR = "hbase.replication.sink.translator";
public static final String REPLICATION_SINK_TRANSLATOR_DEFAULT =
"org.apache.hadoop.hbase.replication.regionserver.DefaultReplicationSinkTranslator";
public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
/** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private ReplicationSinkTrackerTableCreator() {

/*
* We will create this table only if hbase.regionserver.replication.sink.tracker.enabled is
* enabled and table doesn't exists already.
* enabled and table doesn't exist already.
*/
public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,20 @@ public class HFileReplicator implements Closeable {
private int maxCopyThreads;
private int copiesPerThread;
private List<String> sourceClusterIds;
private ReplicationSinkTranslator translator;

public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespaceDirPath,
String sourceHFileArchiveDirPath, Map<String, List<Pair<byte[], List<String>>>> tableQueueMap,
Configuration conf, AsyncClusterConnection connection, List<String> sourceClusterIds)
throws IOException {
Configuration conf, AsyncClusterConnection connection, List<String> sourceClusterIds,
ReplicationSinkTranslator translator) throws IOException {
this.sourceClusterConf = sourceClusterConf;
this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
this.bulkLoadHFileMap = tableQueueMap;
this.conf = conf;
this.connection = connection;
this.sourceClusterIds = sourceClusterIds;
this.translator = translator;

userProvider = UserProvider.instantiate(conf);
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
Expand All @@ -131,29 +133,30 @@ public void close() throws IOException {

public Void replicate() throws IOException {
// Copy all the hfiles to the local file system
Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir();
Map<String, Path> tableToSinkStagingDir = copySourceHFilesToSinkStagingDir();

int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);

for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) {
String tableNameString = tableStagingDir.getKey();
Path stagingDir = tableStagingDir.getValue();
TableName tableName = TableName.valueOf(tableNameString);
for (Entry<String, Path> tableStagingDir : tableToSinkStagingDir.entrySet()) {
String tableNameStr = tableStagingDir.getKey();
TableName tableName = TableName.valueOf(tableNameStr);
TableName sinkTableName = translator.getSinkTableName(tableName);
Path sinkStagingDir = tableStagingDir.getValue();

// Prepare collection of queue of hfiles to be loaded(replicated)
Deque<LoadQueueItem> queue = new LinkedList<>();
BulkLoadHFilesTool.prepareHFileQueue(conf, connection, tableName, stagingDir, queue, false,
false);
BulkLoadHFilesTool.prepareHFileQueue(conf, connection, sinkTableName, sinkStagingDir, queue,
false, false);

if (queue.isEmpty()) {
LOG.warn("Did not find any files to replicate in directory {}", stagingDir.toUri());
LOG.warn("Did not find any files to replicate in directory {}", sinkStagingDir.toUri());
return null;
}
fsDelegationToken.acquireDelegationToken(sinkFs);
try {
doBulkLoad(conf, tableName, stagingDir, queue, maxRetries);
doBulkLoad(conf, sinkTableName, sinkStagingDir, queue, maxRetries);
} finally {
cleanup(stagingDir);
cleanup(sinkStagingDir);
}
}
return null;
Expand Down Expand Up @@ -194,12 +197,12 @@ private void cleanup(Path stagingDir) {
// Do not close the file system
}

private Map<String, Path> copyHFilesToStagingDir() throws IOException {
private Map<String, Path> copySourceHFilesToSinkStagingDir() throws IOException {
Map<String, Path> mapOfCopiedHFiles = new HashMap<>();
Pair<byte[], List<String>> familyHFilePathsPair;
List<String> hfilePaths;
byte[] family;
Path familyStagingDir;
Path sinkFamilyStagingDir;
int familyHFilePathsPairsListSize;
int totalNoOfHFiles;
List<Pair<byte[], List<String>>> familyHFilePathsPairsList;
Expand All @@ -224,32 +227,33 @@ private Map<String, Path> copyHFilesToStagingDir() throws IOException {
// For each table name in the map
for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap
.entrySet()) {
String tableName = tableEntry.getKey();
String tableNameStr = tableEntry.getKey();
TableName tableName = TableName.valueOf(tableNameStr);

// Create staging directory for each table
Path stagingDir = createStagingDir(hbaseStagingDir, user, TableName.valueOf(tableName));
Path sinkStagingDir = createSinkStagingDir(hbaseStagingDir, user, tableName);

familyHFilePathsPairsList = tableEntry.getValue();
familyHFilePathsPairsListSize = familyHFilePathsPairsList.size();

// For each list of family hfile paths pair in the table
// For each (family, hfile paths) pair in the table
for (int i = 0; i < familyHFilePathsPairsListSize; i++) {
familyHFilePathsPair = familyHFilePathsPairsList.get(i);

family = familyHFilePathsPair.getFirst();
hfilePaths = familyHFilePathsPair.getSecond();

familyStagingDir = new Path(stagingDir, Bytes.toString(family));
sinkFamilyStagingDir = getSinkFamilyStagingDir(sinkStagingDir, tableName, family);
totalNoOfHFiles = hfilePaths.size();

// For each list of hfile paths for the family
// For each hfile path in the family
List<Future<Void>> futures = new ArrayList<>();
Callable<Void> c;
Future<Void> future;
int currentCopied = 0;
// Copy the hfiles parallely
// Copy the hfiles in parallel
while (totalNoOfHFiles > currentCopied + this.copiesPerThread) {
c = new Copier(sourceFs, familyStagingDir,
c = new Copier(sourceFs, sinkFamilyStagingDir,
hfilePaths.subList(currentCopied, currentCopied + this.copiesPerThread));
future = exec.submit(c);
futures.add(future);
Expand All @@ -258,7 +262,7 @@ private Map<String, Path> copyHFilesToStagingDir() throws IOException {

int remaining = totalNoOfHFiles - currentCopied;
if (remaining > 0) {
c = new Copier(sourceFs, familyStagingDir,
c = new Copier(sourceFs, sinkFamilyStagingDir,
hfilePaths.subList(currentCopied, currentCopied + remaining));
future = exec.submit(c);
futures.add(future);
Expand All @@ -281,7 +285,7 @@ private Map<String, Path> copyHFilesToStagingDir() throws IOException {
}
// Add the staging directory to this table. Staging directory contains all the hfiles
// belonging to this table
mapOfCopiedHFiles.put(tableName, stagingDir);
mapOfCopiedHFiles.put(tableNameStr, sinkStagingDir);
}
return mapOfCopiedHFiles;
} finally {
Expand All @@ -294,12 +298,14 @@ private Map<String, Path> copyHFilesToStagingDir() throws IOException {
}
}

private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException {
String tblName = tableName.getNameAsString().replace(":", UNDERSCORE);
private Path createSinkStagingDir(Path baseDir, User user, TableName tableName)
throws IOException {
TableName sinkTableName = translator.getSinkTableName(tableName);
String sinkTableNameStr = sinkTableName.getNameAsString().replace(":", UNDERSCORE);
int RANDOM_WIDTH = 320;
int RANDOM_RADIX = 32;
String doubleUnderScore = UNDERSCORE + UNDERSCORE;
String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore
String randomDir = user.getShortName() + doubleUnderScore + sinkTableNameStr + doubleUnderScore
+ (new BigInteger(RANDOM_WIDTH, ThreadLocalRandom.current()).toString(RANDOM_RADIX));
return createStagingDir(baseDir, user, randomDir);
}
Expand All @@ -311,50 +317,55 @@ private Path createStagingDir(Path baseDir, User user, String randomDir) throws
return p;
}

private Path getSinkFamilyStagingDir(Path baseDir, TableName tableName, byte[] family) {
byte[] sinkFamily = translator.getSinkFamily(tableName, family);
return new Path(baseDir, Bytes.toString(sinkFamily));
}

/**
* This class will copy the given hfiles from the given source file system to the given local file
* system staging directory.
*/
private class Copier implements Callable<Void> {
private FileSystem sourceFs;
private Path stagingDir;
private List<String> hfiles;
private Path sinkStagingDir;
private List<String> hfilePaths;

public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles)
public Copier(FileSystem sourceFs, final Path sinkStagingDir, final List<String> hfilePaths)
throws IOException {
this.sourceFs = sourceFs;
this.stagingDir = stagingDir;
this.hfiles = hfiles;
this.sinkStagingDir = sinkStagingDir;
this.hfilePaths = hfilePaths;
}

@Override
public Void call() throws IOException {
Path sourceHFilePath;
Path localHFilePath;
int totalHFiles = hfiles.size();
Path sinkHFilePath;
int totalHFiles = hfilePaths.size();
for (int i = 0; i < totalHFiles; i++) {
sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i));
localHFilePath = new Path(stagingDir, sourceHFilePath.getName());
sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfilePaths.get(i));
sinkHFilePath = new Path(sinkStagingDir, sourceHFilePath.getName());
try {
FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, sinkHFilePath, false, conf);
// If any other exception other than FNFE then we will fail the replication requests and
// source will retry to replicate these data.
} catch (FileNotFoundException e) {
LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + sinkHFilePath
+ ". Trying to copy from hfile archive directory.", e);
sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i));
sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfilePaths.get(i));

try {
FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, sinkHFilePath, false, conf);
} catch (FileNotFoundException e1) {
// This will mean that the hfile does not exists any where in source cluster FS. So we
// cannot do anything here just log and continue.
LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + sinkHFilePath
+ ". Hence ignoring this hfile from replication..", e1);
continue;
}
}
sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS);
sinkFs.setPermission(sinkHFilePath, PERM_ALL_ACCESS);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Public
public class IdentityReplicationSinkTranslator implements ReplicationSinkTranslator {
@Override
public TableName getSinkTableName(TableName tableName) {
return tableName;
}

@Override
public byte[] getSinkRowKey(TableName tableName, byte[] rowKey) {
return rowKey;
}

@Override
public byte[] getSinkFamily(TableName tableName, byte[] family) {
return family;
}

@Override
public byte[] getSinkQualifier(TableName tableName, byte[] family, byte[] qualifier) {
return qualifier;
}

@Override
public ExtendedCell getSinkExtendedCell(TableName tableName, ExtendedCell cell) {
return cell;
}
}
Loading