diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index c882e71e877a..2eb28b56f294 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -968,6 +968,9 @@ public enum OperationStatusCode { public static final String REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT = "org.apache.hadoop.hbase.replication.ReplicationSinkServiceImpl"; public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled"; + public static final String REPLICATION_SINK_TRANSLATOR = "hbase.replication.sink.translator"; + public static final String REPLICATION_SINK_TRANSLATOR_DEFAULT = + "org.apache.hadoop.hbase.replication.regionserver.DefaultReplicationSinkTranslator"; public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false; /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */ public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java index 38cf33090d9b..7df00b546c33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java @@ -80,7 +80,7 @@ private ReplicationSinkTrackerTableCreator() { /* * We will create this table only if hbase.regionserver.replication.sink.tracker.enabled is - * enabled and table doesn't exists already. + * enabled and table doesn't exist already. */ public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java index 2d0b4e32ced0..326f9cbf47a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java @@ -94,11 +94,12 @@ public class HFileReplicator implements Closeable { private int maxCopyThreads; private int copiesPerThread; private List sourceClusterIds; + private ReplicationSinkTranslator translator; public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath, Map>>> tableQueueMap, - Configuration conf, AsyncClusterConnection connection, List sourceClusterIds) - throws IOException { + Configuration conf, AsyncClusterConnection connection, List sourceClusterIds, + ReplicationSinkTranslator translator) throws IOException { this.sourceClusterConf = sourceClusterConf; this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath; this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath; @@ -106,6 +107,7 @@ public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespa this.conf = conf; this.connection = connection; this.sourceClusterIds = sourceClusterIds; + this.translator = translator; userProvider = UserProvider.instantiate(conf); fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); @@ -131,29 +133,30 @@ public void close() throws IOException { public Void replicate() throws IOException { // Copy all the hfiles to the local file system - Map tableStagingDirsMap = copyHFilesToStagingDir(); + Map tableToSinkStagingDir = copySourceHFilesToSinkStagingDir(); int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); - for (Entry tableStagingDir : tableStagingDirsMap.entrySet()) { - String tableNameString = tableStagingDir.getKey(); - Path stagingDir = tableStagingDir.getValue(); - TableName tableName = TableName.valueOf(tableNameString); + for (Entry tableStagingDir : tableToSinkStagingDir.entrySet()) { + String tableNameStr = tableStagingDir.getKey(); + TableName tableName = TableName.valueOf(tableNameStr); + TableName sinkTableName = translator.getSinkTableName(tableName); + Path sinkStagingDir = tableStagingDir.getValue(); // Prepare collection of queue of hfiles to be loaded(replicated) Deque queue = new LinkedList<>(); - BulkLoadHFilesTool.prepareHFileQueue(conf, connection, tableName, stagingDir, queue, false, - false); + BulkLoadHFilesTool.prepareHFileQueue(conf, connection, sinkTableName, sinkStagingDir, queue, + false, false); if (queue.isEmpty()) { - LOG.warn("Did not find any files to replicate in directory {}", stagingDir.toUri()); + LOG.warn("Did not find any files to replicate in directory {}", sinkStagingDir.toUri()); return null; } fsDelegationToken.acquireDelegationToken(sinkFs); try { - doBulkLoad(conf, tableName, stagingDir, queue, maxRetries); + doBulkLoad(conf, sinkTableName, sinkStagingDir, queue, maxRetries); } finally { - cleanup(stagingDir); + cleanup(sinkStagingDir); } } return null; @@ -194,12 +197,12 @@ private void cleanup(Path stagingDir) { // Do not close the file system } - private Map copyHFilesToStagingDir() throws IOException { + private Map copySourceHFilesToSinkStagingDir() throws IOException { Map mapOfCopiedHFiles = new HashMap<>(); Pair> familyHFilePathsPair; List hfilePaths; byte[] family; - Path familyStagingDir; + Path sinkFamilyStagingDir; int familyHFilePathsPairsListSize; int totalNoOfHFiles; List>> familyHFilePathsPairsList; @@ -224,32 +227,33 @@ private Map copyHFilesToStagingDir() throws IOException { // For each table name in the map for (Entry>>> tableEntry : bulkLoadHFileMap .entrySet()) { - String tableName = tableEntry.getKey(); + String tableNameStr = tableEntry.getKey(); + TableName tableName = TableName.valueOf(tableNameStr); // Create staging directory for each table - Path stagingDir = createStagingDir(hbaseStagingDir, user, TableName.valueOf(tableName)); + Path sinkStagingDir = createSinkStagingDir(hbaseStagingDir, user, tableName); familyHFilePathsPairsList = tableEntry.getValue(); familyHFilePathsPairsListSize = familyHFilePathsPairsList.size(); - // For each list of family hfile paths pair in the table + // For each (family, hfile paths) pair in the table for (int i = 0; i < familyHFilePathsPairsListSize; i++) { familyHFilePathsPair = familyHFilePathsPairsList.get(i); family = familyHFilePathsPair.getFirst(); hfilePaths = familyHFilePathsPair.getSecond(); - familyStagingDir = new Path(stagingDir, Bytes.toString(family)); + sinkFamilyStagingDir = getSinkFamilyStagingDir(sinkStagingDir, tableName, family); totalNoOfHFiles = hfilePaths.size(); - // For each list of hfile paths for the family + // For each hfile path in the family List> futures = new ArrayList<>(); Callable c; Future future; int currentCopied = 0; - // Copy the hfiles parallely + // Copy the hfiles in parallel while (totalNoOfHFiles > currentCopied + this.copiesPerThread) { - c = new Copier(sourceFs, familyStagingDir, + c = new Copier(sourceFs, sinkFamilyStagingDir, hfilePaths.subList(currentCopied, currentCopied + this.copiesPerThread)); future = exec.submit(c); futures.add(future); @@ -258,7 +262,7 @@ private Map copyHFilesToStagingDir() throws IOException { int remaining = totalNoOfHFiles - currentCopied; if (remaining > 0) { - c = new Copier(sourceFs, familyStagingDir, + c = new Copier(sourceFs, sinkFamilyStagingDir, hfilePaths.subList(currentCopied, currentCopied + remaining)); future = exec.submit(c); futures.add(future); @@ -281,7 +285,7 @@ private Map copyHFilesToStagingDir() throws IOException { } // Add the staging directory to this table. Staging directory contains all the hfiles // belonging to this table - mapOfCopiedHFiles.put(tableName, stagingDir); + mapOfCopiedHFiles.put(tableNameStr, sinkStagingDir); } return mapOfCopiedHFiles; } finally { @@ -294,12 +298,14 @@ private Map copyHFilesToStagingDir() throws IOException { } } - private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException { - String tblName = tableName.getNameAsString().replace(":", UNDERSCORE); + private Path createSinkStagingDir(Path baseDir, User user, TableName tableName) + throws IOException { + TableName sinkTableName = translator.getSinkTableName(tableName); + String sinkTableNameStr = sinkTableName.getNameAsString().replace(":", UNDERSCORE); int RANDOM_WIDTH = 320; int RANDOM_RADIX = 32; String doubleUnderScore = UNDERSCORE + UNDERSCORE; - String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore + String randomDir = user.getShortName() + doubleUnderScore + sinkTableNameStr + doubleUnderScore + (new BigInteger(RANDOM_WIDTH, ThreadLocalRandom.current()).toString(RANDOM_RADIX)); return createStagingDir(baseDir, user, randomDir); } @@ -311,50 +317,55 @@ private Path createStagingDir(Path baseDir, User user, String randomDir) throws return p; } + private Path getSinkFamilyStagingDir(Path baseDir, TableName tableName, byte[] family) { + byte[] sinkFamily = translator.getSinkFamily(tableName, family); + return new Path(baseDir, Bytes.toString(sinkFamily)); + } + /** * This class will copy the given hfiles from the given source file system to the given local file * system staging directory. */ private class Copier implements Callable { private FileSystem sourceFs; - private Path stagingDir; - private List hfiles; + private Path sinkStagingDir; + private List hfilePaths; - public Copier(FileSystem sourceFs, final Path stagingDir, final List hfiles) + public Copier(FileSystem sourceFs, final Path sinkStagingDir, final List hfilePaths) throws IOException { this.sourceFs = sourceFs; - this.stagingDir = stagingDir; - this.hfiles = hfiles; + this.sinkStagingDir = sinkStagingDir; + this.hfilePaths = hfilePaths; } @Override public Void call() throws IOException { Path sourceHFilePath; - Path localHFilePath; - int totalHFiles = hfiles.size(); + Path sinkHFilePath; + int totalHFiles = hfilePaths.size(); for (int i = 0; i < totalHFiles; i++) { - sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i)); - localHFilePath = new Path(stagingDir, sourceHFilePath.getName()); + sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfilePaths.get(i)); + sinkHFilePath = new Path(sinkStagingDir, sourceHFilePath.getName()); try { - FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); + FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, sinkHFilePath, false, conf); // If any other exception other than FNFE then we will fail the replication requests and // source will retry to replicate these data. } catch (FileNotFoundException e) { - LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath + LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + sinkHFilePath + ". Trying to copy from hfile archive directory.", e); - sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i)); + sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfilePaths.get(i)); try { - FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); + FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, sinkHFilePath, false, conf); } catch (FileNotFoundException e1) { // This will mean that the hfile does not exists any where in source cluster FS. So we // cannot do anything here just log and continue. - LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath + LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + sinkHFilePath + ". Hence ignoring this hfile from replication..", e1); continue; } } - sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS); + sinkFs.setPermission(sinkHFilePath, PERM_ALL_ACCESS); } return null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/IdentityReplicationSinkTranslator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/IdentityReplicationSinkTranslator.java new file mode 100644 index 000000000000..cf1496eddf36 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/IdentityReplicationSinkTranslator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Public +public class IdentityReplicationSinkTranslator implements ReplicationSinkTranslator { + @Override + public TableName getSinkTableName(TableName tableName) { + return tableName; + } + + @Override + public byte[] getSinkRowKey(TableName tableName, byte[] rowKey) { + return rowKey; + } + + @Override + public byte[] getSinkFamily(TableName tableName, byte[] family) { + return family; + } + + @Override + public byte[] getSinkQualifier(TableName tableName, byte[] family, byte[] qualifier) { + return qualifier; + } + + @Override + public ExtendedCell getSinkExtendedCell(TableName tableName, ExtendedCell cell) { + return cell; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 508ace390565..81689a457133 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -72,6 +72,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -181,6 +182,19 @@ private void decorateConf() { } } + private ReplicationSinkTranslator getReplicationSinkTranslator() throws IOException { + Class translatorClass = this.conf.getClass(HConstants.REPLICATION_SINK_TRANSLATOR, null); + ReplicationSinkTranslator translator = null; + try { + translator = translatorClass == null + ? new IdentityReplicationSinkTranslator() + : (ReplicationSinkTranslator) translatorClass.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + LOG.warn("Failed to instantiate " + translatorClass); + } + return translator; + } + /** * Replicate this array of entries directly into the local cluster using the native client. Only * operates against raw protobuf type saving on a conversion from pb to pojo. @@ -202,18 +216,22 @@ public void replicateEntries(List entries, final ExtendedCellScanner c // Very simple optimization where we batch sequences of rows going // to the same table. try { + ReplicationSinkTranslator translator = getReplicationSinkTranslator(); long totalReplicated = 0; - // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per - // invocation of this method per table and cluster id. - Map, List>> rowMap = new TreeMap<>(); + Map, Map>>>> bulkLoadsPerClusters = + new HashMap<>(); - Map, Map>>>> bulkLoadsPerClusters = null; - Pair, List> mutationsToWalEntriesPairs = + // Map of tableName => list of Rows, grouped by source cluster id. + // In each call to this method, we only want to flushCommits once per table per source + // clusterq + Map, List>> sinkRowMap = new TreeMap<>(); + Pair, List> sinkMutationsToWalEntriesPairs = new Pair<>(new ArrayList<>(), new ArrayList<>()); + for (WALEntry entry : entries) { - TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); + TableName tableName = TableName.valueOf(entry.getKey().getTableName().toByteArray()); if (this.walEntrySinkFilter != null) { - if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) { + if (this.walEntrySinkFilter.filter(tableName, entry.getKey().getWriteTime())) { // Skip Cells in CellScanner associated with this entry. int count = entry.getAssociatedCellCount(); for (int i = 0; i < count; i++) { @@ -226,8 +244,8 @@ public void replicateEntries(List entries, final ExtendedCellScanner c continue; } } - ExtendedCell previousCell = null; - Mutation mutation = null; + ExtendedCell sinkCellPrev = null; + Mutation sinkMutation = null; int count = entry.getAssociatedCellCount(); for (int i = 0; i < count; i++) { // Throw index out of bounds if our cell count is off @@ -236,81 +254,71 @@ public void replicateEntries(List entries, final ExtendedCellScanner c throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); } ExtendedCell cell = cells.current(); - // Handle bulk load hfiles replication if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { + // Bulk load events BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); if (bld.getReplicate()) { - if (bulkLoadsPerClusters == null) { - bulkLoadsPerClusters = new HashMap<>(); - } - // Map of table name Vs list of pair of family and list of - // hfile paths from its namespace + // Map of tableNameStr to (family, hfile paths) pairs Map>>> bulkLoadHFileMap = bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); - buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); + buildBulkLoadHFileMap(bulkLoadHFileMap, bld); } } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) { Mutation put = processReplicationMarkerEntry(cell); if (put == null) { continue; } - table = REPLICATION_SINK_TRACKER_TABLE_NAME; - List clusterIds = new ArrayList<>(); - for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { - clusterIds.add(toUUID(clusterId)); - } + List clusterIds = getSourceClusterIds(entry); put.setClusterIds(clusterIds); - addToHashMultiMap(rowMap, table, clusterIds, put); + addToHashMultiMap(sinkRowMap, REPLICATION_SINK_TRACKER_TABLE_NAME, clusterIds, put); } else { - // Handle wal replication - if (isNewRowOrType(previousCell, cell)) { - // Create new mutation - mutation = CellUtil.isDelete(cell) - ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) - : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - List clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); - for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { - clusterIds.add(toUUID(clusterId)); - } - mutation.setClusterIds(clusterIds); - mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, + TableName sinkTableName = translator.getSinkTableName(tableName); + ExtendedCell sinkCell = translator.getSinkExtendedCell(tableName, cell); + if (isNewRowOrType(sinkCellPrev, sinkCell)) { + sinkMutation = CellUtil.isDelete(sinkCell) + ? new Delete(sinkCell.getRowArray(), sinkCell.getRowOffset(), + sinkCell.getRowLength()) + : new Put(sinkCell.getRowArray(), sinkCell.getRowOffset(), sinkCell.getRowLength()); + List clusterIds = getSourceClusterIds(entry); + sinkMutation.setClusterIds(clusterIds); + sinkMutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, HConstants.EMPTY_BYTE_ARRAY); if (rsServerHost != null) { - rsServerHost.preReplicationSinkBatchMutate(entry, mutation); - mutationsToWalEntriesPairs.getFirst().add(mutation); - mutationsToWalEntriesPairs.getSecond().add(entry); + rsServerHost.preReplicationSinkBatchMutate(entry, sinkMutation); + sinkMutationsToWalEntriesPairs.getFirst().add(sinkMutation); + sinkMutationsToWalEntriesPairs.getSecond().add(entry); } - addToHashMultiMap(rowMap, table, clusterIds, mutation); + addToHashMultiMap(sinkRowMap, sinkTableName, clusterIds, sinkMutation); } - if (CellUtil.isDelete(cell)) { - ((Delete) mutation).add(cell); + if (CellUtil.isDelete(sinkCell)) { + ((Delete) sinkMutation).add(sinkCell); } else { - ((Put) mutation).add(cell); + ((Put) sinkMutation).add(sinkCell); } - previousCell = cell; + sinkCellPrev = sinkCell; } } totalReplicated++; } // TODO Replicating mutations and bulk loaded data can be made parallel - if (!rowMap.isEmpty()) { + if (!sinkRowMap.isEmpty()) { LOG.debug("Started replicating mutations."); - for (Entry, List>> entry : rowMap.entrySet()) { + for (Entry, List>> entry : sinkRowMap.entrySet()) { batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold); } LOG.debug("Finished replicating mutations."); - } - if (rsServerHost != null) { - List mutations = mutationsToWalEntriesPairs.getFirst(); - List walEntries = mutationsToWalEntriesPairs.getSecond(); - for (int i = 0; i < mutations.size(); i++) { - rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i)); + if (rsServerHost != null) { + List sinkMutations = sinkMutationsToWalEntriesPairs.getFirst(); + List walEntries = sinkMutationsToWalEntriesPairs.getSecond(); + for (int i = 0; i < sinkMutations.size(); i++) { + rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), sinkMutations.get(i)); + } } } - if (bulkLoadsPerClusters != null) { + if (!bulkLoadsPerClusters.isEmpty()) { for (Entry, Map>>>> entry : bulkLoadsPerClusters.entrySet()) { Map>>> bulkLoadHFileMap = entry.getValue(); @@ -319,7 +327,7 @@ public void replicateEntries(List entries, final ExtendedCellScanner c Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId); try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf, sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, - getConnection(), entry.getKey())) { + getConnection(), entry.getKey(), translator)) { hFileReplicator.replicate(); LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString()); } @@ -338,6 +346,14 @@ public void replicateEntries(List entries, final ExtendedCellScanner c } } + private List getSourceClusterIds(WALEntry entry) { + List clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); + for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { + clusterIds.add(toUUID(clusterId)); + } + return clusterIds; + } + /* * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not. * If false, then ignore this cell. If set to true, de-serialize value into @@ -366,11 +382,10 @@ private Put processReplicationMarkerEntry(Cell cell) throws IOException { } private void buildBulkLoadHFileMap( - final Map>>> bulkLoadHFileMap, TableName table, - BulkLoadDescriptor bld) throws IOException { + final Map>>> bulkLoadHFileMap, BulkLoadDescriptor bld) + throws IOException { List storesList = bld.getStoresList(); - int storesSize = storesList.size(); - for (int j = 0; j < storesSize; j++) { + for (int j = 0; j < storesList.size(); j++) { StoreDescriptor storeDescriptor = storesList.get(j); List storeFileList = storeDescriptor.getStoreFileList(); int storeFilesSize = storeFileList.size(); @@ -378,10 +393,11 @@ private void buildBulkLoadHFileMap( for (int k = 0; k < storeFilesSize; k++) { byte[] family = storeDescriptor.getFamilyName().toByteArray(); - // Build hfile relative path from its namespace - String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family); - String tableName = table.getNameWithNamespaceInclAsString(); - List>> familyHFilePathsList = bulkLoadHFileMap.get(tableName); + // Build relative hfile path starting with its namespace dir + TableName tableName = ProtobufUtil.toTableName(bld.getTableName()); + String pathToHfileFromNS = getHFilePath(tableName, bld, storeFileList.get(k), family); + String tableNameStr = tableName.getNameWithNamespaceInclAsString(); + List>> familyHFilePathsList = bulkLoadHFileMap.get(tableNameStr); if (familyHFilePathsList != null) { boolean foundFamily = false; for (Pair> familyHFilePathsPair : familyHFilePathsList) { @@ -393,12 +409,12 @@ private void buildBulkLoadHFileMap( } } if (!foundFamily) { - // Family not found, add this family and its hfile paths pair to the list + // Family not found, add this (family, hfile paths) pair to the list addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList); } } else { // Add this table entry into the map - addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName); + addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableNameStr); } } } @@ -422,10 +438,10 @@ private void addNewTableEntryInMap( bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList); } - private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile, + private String getHFilePath(TableName tableName, BulkLoadDescriptor bld, String storeFile, byte[] family) { - return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR) - .append(table.getQualifierAsString()).append(Path.SEPARATOR) + return new StringBuilder(100).append(tableName.getNamespaceAsString()).append(Path.SEPARATOR) + .append(tableName.getQualifierAsString()).append(Path.SEPARATOR) .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR) .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkTranslator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkTranslator.java new file mode 100644 index 000000000000..5b0168d51b81 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkTranslator.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Public +public interface ReplicationSinkTranslator { + + public TableName getSinkTableName(TableName tableName); + + public byte[] getSinkRowKey(TableName tableName, byte[] rowKey); + + public byte[] getSinkFamily(TableName tableName, byte[] family); + + public byte[] getSinkQualifier(TableName tableName, byte[] family, byte[] qualifier); + + public ExtendedCell getSinkExtendedCell(TableName tableName, ExtendedCell cell); +}