Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,19 @@ public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer pe
if (peer.hasRemoteWALDir()) {
builder.setRemoteWALDir(peer.getRemoteWALDir());
}

List<ReplicationProtos.SourceTablesToTargetTables> sourceToTargetTables =
peer.getSourceTablesToTargetTablesList();
if (sourceToTargetTables != null) {
Map<TableName, TableName> map = new HashMap<>(sourceToTargetTables.size());
for (ReplicationProtos.SourceTablesToTargetTables tables : sourceToTargetTables) {
map.put(ProtobufUtil.toTableName(tables.getSource()),
ProtobufUtil.toTableName(tables.getTarget()));
}

builder.setSourceTablesToTargetTable(map);
}

return builder.build();
}

Expand Down Expand Up @@ -389,6 +402,18 @@ public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig pe
if (peerConfig.getRemoteWALDir() != null) {
builder.setRemoteWALDir(peerConfig.getRemoteWALDir());
}

Map<TableName, TableName> sourceTablesToTargetTables =
peerConfig.getSourceTablesToTargetTables();
if (sourceTablesToTargetTables != null) {
for (Map.Entry<TableName, TableName> entry : sourceTablesToTargetTables.entrySet()) {
HBaseProtos.TableName source = ProtobufUtil.toProtoTableName(entry.getKey());
HBaseProtos.TableName target = ProtobufUtil.toProtoTableName(entry.getValue());
builder.addSourceTablesToTargetTables(ReplicationProtos.SourceTablesToTargetTables
.newBuilder().setSource(source).setTarget(target).build());
}
}

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ReplicationPeerConfig {
// Default value is true, means replicate all user tables to peer cluster.
private boolean replicateAllUserTables = true;
private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = null;
private Map<TableName, TableName> sourceTablesToTargetTables = null;
private Set<String> excludeNamespaces = null;
private long bandwidth = 0;
private final boolean serial;
Expand All @@ -65,6 +66,9 @@ private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
this.excludeTableCFsMap = builder.excludeTableCFsMap != null
? unmodifiableTableCFsMap(builder.excludeTableCFsMap)
: null;
this.sourceTablesToTargetTables = builder.sourceTablesToTargetTables != null
? Collections.unmodifiableMap(builder.sourceTablesToTargetTables)
: null;
this.excludeNamespaces = builder.excludeNamespaces != null
? Collections.unmodifiableSet(builder.excludeNamespaces)
: null;
Expand Down Expand Up @@ -117,6 +121,10 @@ public Map<TableName, List<String>> getExcludeTableCFsMap() {
return (Map<TableName, List<String>>) excludeTableCFsMap;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the best practice here is, but would it make sense to add a deprecated setter to ReplicationPeerConfig?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReplicationPeerConfig is designed to be immutable. So you'd better add a setter in the ReplicationPeerConfigBuilder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this method to the builder in the initial commit. I asked this question because I noticed ReplicationPeerConfig has some deprecated setter methods. I wasn't sure if it would make sense to introduce a deprecated setter for this new field to follow the existing code, or if I should leave it out for this new field.

public Map<TableName, TableName> getSourceTablesToTargetTables() {
return sourceTablesToTargetTables;
}

public Set<String> getExcludeNamespaces() {
return this.excludeNamespaces;
}
Expand Down Expand Up @@ -148,6 +156,7 @@ public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peer
.setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces())
.setReplicateAllUserTables(peerConfig.replicateAllUserTables())
.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
.setSourceTablesToTargetTable(peerConfig.getSourceTablesToTargetTables())
.setExcludeNamespaces(peerConfig.getExcludeNamespaces())
.setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial())
.setRemoteWALDir(peerConfig.getRemoteWALDir());
Expand All @@ -172,6 +181,7 @@ static class ReplicationPeerConfigBuilderImpl implements ReplicationPeerConfigBu
private boolean replicateAllUserTables = true;

private Map<TableName, List<String>> excludeTableCFsMap = null;
private Map<TableName, TableName> sourceTablesToTargetTables = null;

private Set<String> excludeNamespaces = null;

Expand Down Expand Up @@ -236,6 +246,13 @@ public ReplicationPeerConfigBuilder setReplicateAllUserTables(boolean replicateA
return this;
}

@Override
public ReplicationPeerConfigBuilder
setSourceTablesToTargetTable(Map<TableName, TableName> sourceTablesToTargetTable) {
this.sourceTablesToTargetTables = sourceTablesToTargetTable;
return this;
}

@Override
public ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> excludeNamespaces) {
this.excludeNamespaces = excludeNamespaces;
Expand Down Expand Up @@ -288,6 +305,11 @@ public String toString() {
builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
}
}

if (sourceTablesToTargetTables != null) {
builder.append("sourceTablesToTargetTables").append(sourceTablesToTargetTables).append(",");
}

builder.append("bandwidth=").append(bandwidth).append(",");
builder.append("serial=").append(serial);
if (this.remoteWALDir != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ default ReplicationPeerConfigBuilder putAllPeerData(Map<byte[], byte[]> peerData
*/
ReplicationPeerConfigBuilder setExcludeTableCFsMap(Map<TableName, List<String>> tableCFsMap);

/**
* Sets the mapping of source table names to target replication table names. This method sets
* state which is mutually exclusive to {@link #setSourceTablesToTargetTable(Map)}.
* @param sourceTablesToTargetTable A mapping of source table names to target table names. By
* default, edits will be replicated to the same target table as
* the source. Only applies to intercluster replication.
* @return {@code this}
*/
ReplicationPeerConfigBuilder
setSourceTablesToTargetTable(Map<TableName, TableName> sourceTablesToTargetTable);

/**
* Sets the collection of namespaces which should not be replicated when all user tables are
* configured to be replicated. This method sets state which is mutually exclusive to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,26 @@ private static Map<TableName, List<String>> randTableCFs(Random rand) {
return map;
}

private static Map<TableName, TableName> randSourceToTargetTables(Random rand) {
int size = rand.nextInt(5);
Map<TableName, TableName> map = new HashMap<>(size);
for (int i = 0; i < size; i++) {
TableName source = TableName.valueOf(Long.toHexString(rand.nextLong()));
TableName target = TableName.valueOf(Long.toHexString(rand.nextLong()));
map.put(source, target);
}
return map;
}

public static ReplicationPeerConfig getConfig(int seed) {
RNG.setSeed(seed);
return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
.setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
.setBandwidth(RNG.nextInt(1000)).build();
.setSourceTablesToTargetTable(randSourceToTargetTables(RNG)).setBandwidth(RNG.nextInt(1000))
.build();
}

private static void assertSetEquals(Set<String> expected, Set<String> actual) {
Expand Down Expand Up @@ -112,5 +124,6 @@ public static void assertConfigEquals(ReplicationPeerConfig expected,
assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
assertEquals(expected.getBandwidth(), actual.getBandwidth());
assertEquals(expected.getSourceTablesToTargetTables(), actual.getSourceTablesToTargetTables());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ message ReplicationPeer {
repeated bytes exclude_namespaces = 10;
optional bool serial = 11;
optional string remoteWALDir = 12;
repeated SourceTablesToTargetTables source_tables_to_target_tables = 13;
}

/**
Expand Down Expand Up @@ -196,3 +197,8 @@ message IsReplicationPeerModificationEnabledRequest {
message IsReplicationPeerModificationEnabledResponse {
required bool enabled = 1;
}

message SourceTablesToTargetTables {
required TableName source = 1;
required TableName target = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -542,10 +544,9 @@ protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int b
assert sinkPeer != null;
AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
final SinkPeer sinkPeerToUse = sinkPeer;
FutureUtils.addListener(
ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]),
replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout),
(response, exception) -> {
final Entry[] preparedEntries = prepareEntries(entries);
FutureUtils.addListener(ReplicationProtobufUtil.replicateWALEntry(rsAdmin, preparedEntries,
replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout), (response, exception) -> {
if (exception != null) {
onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse);
resultCompletableFuture.completeExceptionally(exception);
Expand Down Expand Up @@ -632,4 +633,36 @@ protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int bat
private String logPeerId() {
return "[Source for peer " + this.ctx.getPeerId() + "]:";
}

private Entry[] prepareEntries(List<Entry> entries) {
Entry[] results = new Entry[entries.size()];
ReplicationPeerConfig peerConfig = ctx.getPeerConfig();
Map<TableName, TableName> sourceToTargetTables = peerConfig.getSourceTablesToTargetTables();

if (sourceToTargetTables == null) {
return entries.toArray(new Entry[0]);
}

for (int i = 0; i < entries.size(); ++i) {
Entry entry = entries.get(i);

TableName sourceTable = entry.getKey().getTableName();
TableName targetTable = sourceToTargetTables.get(sourceTable);

if (targetTable == null) {
results[i] = entry;
continue;
}

WALKeyImpl current = entry.getKey();
WALKeyImpl updatedKey =
new WALKeyImpl(current.getEncodedRegionName(), targetTable, current.getWriteTime(),
current.getClusterIds(), current.getNonceGroup(), current.getNonce(), current.getMvcc(),
current.getReplicationScopes(), current.getExtendedAttributes());

results[i] = new Entry(updatedKey, entry.getEdit());
}

return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
Expand Down Expand Up @@ -399,6 +400,37 @@ public void testMetricsSourceBaseSourcePassThrough() {

}

@Test
public void testMapsTablesCorrectly() throws Exception {
TableName targetTable = TableName.valueOf("test-target");
Map<TableName, TableName> tableMappings = new HashMap<>(1);
tableMappings.put(tableName, targetTable);
ReplicationPeerConfig rpc =
ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
.setReplicationEndpointImpl(ReplicationEndpointTest.class.getName())
.setSourceTablesToTargetTable(tableMappings).build();
String peer = "testMapsTablesCorrectly";

createTable(targetTable);
// Remove existing intercluster replication so we can verify we didn't replicate to the
// target cluster for table `tableName`
hbaseAdmin.removeReplicationPeer(PEER_ID2);
hbaseAdmin.addReplicationPeer(peer, rpc);

byte[] row = Bytes.toBytes("row1");
try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
doPut(connection, row);
}

htable2 = connection2.getTable(targetTable);
final Get get = new Get(row);
Waiter.waitFor(CONF1, 60000, (Waiter.Predicate<Exception>) () -> !htable2.get(get).isEmpty());

htable2 = connection2.getTable(tableName);
Assert.assertTrue(htable2.get(get).isEmpty());
hbaseAdmin.removeReplicationPeer(peer);
}

private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
byte[] a = new byte[] { 'a' };
Expand Down