From 63665779c75025fbeaaa26c06bb4a92b13ada854 Mon Sep 17 00:00:00 2001 From: Hernan Gelaf-Romer Date: Thu, 4 Apr 2024 15:13:15 -0400 Subject: [PATCH] Supports intercluster replication to a different target table --- .../ReplicationPeerConfigUtil.java | 25 +++++++++++ .../replication/ReplicationPeerConfig.java | 22 ++++++++++ .../ReplicationPeerConfigBuilder.java | 11 +++++ .../ReplicationPeerConfigTestUtil.java | 15 ++++++- .../protobuf/server/master/Replication.proto | 6 +++ .../HBaseInterClusterReplicationEndpoint.java | 41 +++++++++++++++++-- .../replication/TestReplicationEndpoint.java | 32 +++++++++++++++ 7 files changed, 147 insertions(+), 5 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 2fc5fa3c1152..3ed7a994128b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -332,6 +332,19 @@ public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer pe if (peer.hasRemoteWALDir()) { builder.setRemoteWALDir(peer.getRemoteWALDir()); } + + List sourceToTargetTables = + peer.getSourceTablesToTargetTablesList(); + if (sourceToTargetTables != null) { + Map map = new HashMap<>(sourceToTargetTables.size()); + for (ReplicationProtos.SourceTablesToTargetTables tables : sourceToTargetTables) { + map.put(ProtobufUtil.toTableName(tables.getSource()), + ProtobufUtil.toTableName(tables.getTarget())); + } + + builder.setSourceTablesToTargetTable(map); + } + return builder.build(); } @@ -389,6 +402,18 @@ public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig pe if (peerConfig.getRemoteWALDir() != null) { builder.setRemoteWALDir(peerConfig.getRemoteWALDir()); } + + Map sourceTablesToTargetTables = + peerConfig.getSourceTablesToTargetTables(); + if (sourceTablesToTargetTables != null) { + for (Map.Entry entry : sourceTablesToTargetTables.entrySet()) { + HBaseProtos.TableName source = ProtobufUtil.toProtoTableName(entry.getKey()); + HBaseProtos.TableName target = ProtobufUtil.toProtoTableName(entry.getValue()); + builder.addSourceTablesToTargetTables(ReplicationProtos.SourceTablesToTargetTables + .newBuilder().setSource(source).setTarget(target).build()); + } + } + return builder.build(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 58c3a2e59cdf..b242b2cd493e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -46,6 +46,7 @@ public class ReplicationPeerConfig { // Default value is true, means replicate all user tables to peer cluster. private boolean replicateAllUserTables = true; private Map> excludeTableCFsMap = null; + private Map sourceTablesToTargetTables = null; private Set excludeNamespaces = null; private long bandwidth = 0; private final boolean serial; @@ -65,6 +66,9 @@ private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { this.excludeTableCFsMap = builder.excludeTableCFsMap != null ? unmodifiableTableCFsMap(builder.excludeTableCFsMap) : null; + this.sourceTablesToTargetTables = builder.sourceTablesToTargetTables != null + ? Collections.unmodifiableMap(builder.sourceTablesToTargetTables) + : null; this.excludeNamespaces = builder.excludeNamespaces != null ? Collections.unmodifiableSet(builder.excludeNamespaces) : null; @@ -117,6 +121,10 @@ public Map> getExcludeTableCFsMap() { return (Map>) excludeTableCFsMap; } + public Map getSourceTablesToTargetTables() { + return sourceTablesToTargetTables; + } + public Set getExcludeNamespaces() { return this.excludeNamespaces; } @@ -148,6 +156,7 @@ public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peer .setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces()) .setReplicateAllUserTables(peerConfig.replicateAllUserTables()) .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()) + .setSourceTablesToTargetTable(peerConfig.getSourceTablesToTargetTables()) .setExcludeNamespaces(peerConfig.getExcludeNamespaces()) .setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial()) .setRemoteWALDir(peerConfig.getRemoteWALDir()); @@ -172,6 +181,7 @@ static class ReplicationPeerConfigBuilderImpl implements ReplicationPeerConfigBu private boolean replicateAllUserTables = true; private Map> excludeTableCFsMap = null; + private Map sourceTablesToTargetTables = null; private Set excludeNamespaces = null; @@ -236,6 +246,13 @@ public ReplicationPeerConfigBuilder setReplicateAllUserTables(boolean replicateA return this; } + @Override + public ReplicationPeerConfigBuilder + setSourceTablesToTargetTable(Map sourceTablesToTargetTable) { + this.sourceTablesToTargetTables = sourceTablesToTargetTable; + return this; + } + @Override public ReplicationPeerConfigBuilder setExcludeNamespaces(Set excludeNamespaces) { this.excludeNamespaces = excludeNamespaces; @@ -288,6 +305,11 @@ public String toString() { builder.append("tableCFs=").append(tableCFsMap.toString()).append(","); } } + + if (sourceTablesToTargetTables != null) { + builder.append("sourceTablesToTargetTables").append(sourceTablesToTargetTables).append(","); + } + builder.append("bandwidth=").append(bandwidth).append(","); builder.append("serial=").append(serial); if (this.remoteWALDir != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java index 95256d128b46..e4f9690a4d6e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java @@ -129,6 +129,17 @@ default ReplicationPeerConfigBuilder putAllPeerData(Map peerData */ ReplicationPeerConfigBuilder setExcludeTableCFsMap(Map> tableCFsMap); + /** + * Sets the mapping of source table names to target replication table names. This method sets + * state which is mutually exclusive to {@link #setSourceTablesToTargetTable(Map)}. + * @param sourceTablesToTargetTable A mapping of source table names to target table names. By + * default, edits will be replicated to the same target table as + * the source. Only applies to intercluster replication. + * @return {@code this} + */ + ReplicationPeerConfigBuilder + setSourceTablesToTargetTable(Map sourceTablesToTargetTable); + /** * Sets the collection of namespaces which should not be replicated when all user tables are * configured to be replicated. This method sets state which is mutually exclusive to diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java index ea2cb536a053..0131ffeb4055 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java @@ -60,6 +60,17 @@ private static Map> randTableCFs(Random rand) { return map; } + private static Map randSourceToTargetTables(Random rand) { + int size = rand.nextInt(5); + Map map = new HashMap<>(size); + for (int i = 0; i < size; i++) { + TableName source = TableName.valueOf(Long.toHexString(rand.nextLong())); + TableName target = TableName.valueOf(Long.toHexString(rand.nextLong())); + map.put(source, target); + } + return map; + } + public static ReplicationPeerConfig getConfig(int seed) { RNG.setSeed(seed); return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong())) @@ -67,7 +78,8 @@ public static ReplicationPeerConfig getConfig(int seed) { .setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG)) .setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG)) .setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean()) - .setBandwidth(RNG.nextInt(1000)).build(); + .setSourceTablesToTargetTable(randSourceToTargetTables(RNG)).setBandwidth(RNG.nextInt(1000)) + .build(); } private static void assertSetEquals(Set expected, Set actual) { @@ -112,5 +124,6 @@ public static void assertConfigEquals(ReplicationPeerConfig expected, assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); assertEquals(expected.getBandwidth(), actual.getBandwidth()); + assertEquals(expected.getSourceTablesToTargetTables(), actual.getSourceTablesToTargetTables()); } } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto index 262a26985587..af4b13d9f3ae 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto @@ -51,6 +51,7 @@ message ReplicationPeer { repeated bytes exclude_namespaces = 10; optional bool serial = 11; optional string remoteWALDir = 12; + repeated SourceTablesToTargetTables source_tables_to_target_tables = 13; } /** @@ -196,3 +197,8 @@ message IsReplicationPeerModificationEnabledRequest { message IsReplicationPeerModificationEnabledResponse { required bool enabled = 1; } + +message SourceTablesToTargetTables { + required TableName source = 1; + required TableName target = 2; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index d895920a51a8..4e360f9311a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -542,10 +544,9 @@ protected CompletableFuture replicateEntries(List entries, int b assert sinkPeer != null; AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); final SinkPeer sinkPeerToUse = sinkPeer; - FutureUtils.addListener( - ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout), - (response, exception) -> { + final Entry[] preparedEntries = prepareEntries(entries); + FutureUtils.addListener(ReplicationProtobufUtil.replicateWALEntry(rsAdmin, preparedEntries, + replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout), (response, exception) -> { if (exception != null) { onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse); resultCompletableFuture.completeExceptionally(exception); @@ -632,4 +633,36 @@ protected CompletableFuture asyncReplicate(List entries, int bat private String logPeerId() { return "[Source for peer " + this.ctx.getPeerId() + "]:"; } + + private Entry[] prepareEntries(List entries) { + Entry[] results = new Entry[entries.size()]; + ReplicationPeerConfig peerConfig = ctx.getPeerConfig(); + Map sourceToTargetTables = peerConfig.getSourceTablesToTargetTables(); + + if (sourceToTargetTables == null) { + return entries.toArray(new Entry[0]); + } + + for (int i = 0; i < entries.size(); ++i) { + Entry entry = entries.get(i); + + TableName sourceTable = entry.getKey().getTableName(); + TableName targetTable = sourceToTargetTables.get(sourceTable); + + if (targetTable == null) { + results[i] = entry; + continue; + } + + WALKeyImpl current = entry.getKey(); + WALKeyImpl updatedKey = + new WALKeyImpl(current.getEncodedRegionName(), targetTable, current.getWriteTime(), + current.getClusterIds(), current.getNonceGroup(), current.getNonce(), current.getMvcc(), + current.getReplicationScopes(), current.getExtendedAttributes()); + + results[i] = new Entry(updatedKey, entry.getEdit()); + } + + return results; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 9bc632e223be..c1a4a5521d48 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; @@ -399,6 +400,37 @@ public void testMetricsSourceBaseSourcePassThrough() { } + @Test + public void testMapsTablesCorrectly() throws Exception { + TableName targetTable = TableName.valueOf("test-target"); + Map tableMappings = new HashMap<>(1); + tableMappings.put(tableName, targetTable); + ReplicationPeerConfig rpc = + ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2)) + .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName()) + .setSourceTablesToTargetTable(tableMappings).build(); + String peer = "testMapsTablesCorrectly"; + + createTable(targetTable); + // Remove existing intercluster replication so we can verify we didn't replicate to the + // target cluster for table `tableName` + hbaseAdmin.removeReplicationPeer(PEER_ID2); + hbaseAdmin.addReplicationPeer(peer, rpc); + + byte[] row = Bytes.toBytes("row1"); + try (Connection connection = ConnectionFactory.createConnection(CONF1)) { + doPut(connection, row); + } + + htable2 = connection2.getTable(targetTable); + final Get get = new Get(row); + Waiter.waitFor(CONF1, 60000, (Waiter.Predicate) () -> !htable2.get(get).isEmpty()); + + htable2 = connection2.getTable(tableName); + Assert.assertTrue(htable2.get(get).isEmpty()); + hbaseAdmin.removeReplicationPeer(peer); + } + private List> createWALEntriesWithSize(String tableName) { List> walEntriesWithSize = new ArrayList<>(); byte[] a = new byte[] { 'a' };