diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index cc3a44f02c03..bde8537b9ca5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -822,7 +822,7 @@ protected void initializeZKBasedSystemTrackers() } this.rsGroupInfoManager = RSGroupInfoManager.create(this); - this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf); + this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf, clusterId); this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager); this.drainingServerTracker.start(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index d497c225651d..52060aeb36cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; @@ -50,9 +51,11 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; @@ -81,11 +84,17 @@ public class ReplicationPeerManager { // Only allow to add one sync replication peer concurrently private final Semaphore syncReplicationPeerLock = new Semaphore(1); + private final String clusterId; + + private final Configuration conf; + ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, - ConcurrentMap peers) { + ConcurrentMap peers, Configuration conf, String clusterId) { this.peerStorage = peerStorage; this.queueStorage = queueStorage; this.peers = peers; + this.conf = conf; + this.clusterId = clusterId; } private void checkQueuesDeleted(String peerId) @@ -337,11 +346,10 @@ public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationExcepti private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); - boolean checkClusterKey = true; + ReplicationEndpoint endpoint = null; if (!StringUtils.isBlank(replicationEndpointImpl)) { - // try creating a instance - ReplicationEndpoint endpoint; try { + // try creating a instance endpoint = Class.forName(replicationEndpointImpl) .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance(); } catch (Throwable e) { @@ -349,14 +357,15 @@ private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetry "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl, e); } - // do not check cluster key if we are not HBaseInterClusterReplicationEndpoint - if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) { - checkClusterKey = false; - } } - if (checkClusterKey) { + // Default is HBaseInterClusterReplicationEndpoint and only it need to check cluster key + if (endpoint == null || endpoint instanceof HBaseInterClusterReplicationEndpoint) { checkClusterKey(peerConfig.getClusterKey()); } + // Default is HBaseInterClusterReplicationEndpoint which cannot replicate to same cluster + if (endpoint == null || !endpoint.canReplicateToSameCluster()) { + checkClusterId(peerConfig.getClusterKey()); + } if (peerConfig.replicateAllUserTables()) { // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. @@ -501,6 +510,25 @@ private void checkClusterKey(String clusterKey) throws DoNotRetryIOException { } } + private void checkClusterId(String clusterKey) throws DoNotRetryIOException { + String peerClusterId = ""; + try { + // Create the peer cluster config for get peer cluster id + Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey); + try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) { + peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher); + } + } catch (IOException | KeeperException e) { + throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e); + } + // In rare case, zookeeper setting may be messed up. That leads to the incorrect + // peerClusterId value, which is the same as the source clusterId + if (clusterId.equals(peerClusterId)) { + throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey + + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint"); + } + } + public List getSerialPeerIdsBelongsTo(TableName tableName) { return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId()) @@ -511,7 +539,7 @@ public ReplicationQueueStorage getQueueStorage() { return queueStorage; } - public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) + public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId) throws ReplicationException { ReplicationPeerStorage peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); @@ -523,7 +551,7 @@ public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state)); } return new ReplicationPeerManager(peerStorage, - ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); + ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c4936e68e26e..bc1754904b89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -523,16 +523,6 @@ private void initialize() { if(!this.isSourceActive()) { return; } - - // In rare case, zookeeper setting may be messed up. That leads to the incorrect - // peerClusterId value, which is the same as the source clusterId - if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { - this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " - + peerClusterId + " which is not allowed by ReplicationEndpoint:" - + replicationEndpoint.getClass().getName(), null, false); - this.manager.removeSource(this); - return; - } LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};", logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index 506f8e75f6fd..3defa80421e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -71,9 +71,9 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); private final String ID_ONE = "1"; - private final String KEY_ONE = "127.0.0.1:2181:/hbase"; + private static String KEY_ONE; private final String ID_TWO = "2"; - private final String KEY_TWO = "127.0.0.1:2181:/hbase2"; + private static String KEY_TWO; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -82,6 +82,8 @@ public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); TEST_UTIL.startMiniCluster(); + KEY_ONE = TEST_UTIL.getClusterKey() + "-test1"; + KEY_TWO = TEST_UTIL.getClusterKey() + "-test2"; ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminForSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminForSyncReplication.java index 31fd53acda85..58c3e7a6a92d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminForSyncReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminForSyncReplication.java @@ -81,7 +81,7 @@ public void testAddPeerWithSameTable() throws Exception { Thread[] threads = new Thread[5]; for (int i = 0; i < 5; i++) { String peerId = "id" + i; - String clusterKey = "127.0.0.1:2181:/hbase" + i; + String clusterKey = TEST_UTIL.getClusterKey() + "-test" + i; int index = i; threads[i] = new Thread(() -> { try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index 04cf392a7106..8c5c78cec437 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -113,6 +113,11 @@ protected void doStart() { protected void doStop() { notifyStopped(); } + + @Override + public boolean canReplicateToSameCluster() { + return true; + } } @BeforeClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index f546058bc00c..fff10345cb5d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -25,7 +25,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.Arrays; -import java.util.EnumSet; import java.util.List; import java.util.Optional; import java.util.Random; @@ -34,17 +33,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.ServerMetrics; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -72,9 +68,7 @@ import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -176,40 +170,16 @@ public void testCyclicReplication1() throws Exception { /** * Tests the replication scenario 0 -> 0. By default - * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the - * ReplicationSource should terminate, and no further logs should get enqueued + * {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint}, + * the replication peer should not be added. */ - @Test - public void testLoopedReplication() throws Exception { + @Test(expected = DoNotRetryIOException.class) + public void testLoopedReplication() + throws Exception { LOG.info("testLoopedReplication"); startMiniClusters(1); createTableOnClusters(table); addPeer("1", 0, 0); - Thread.sleep(SLEEP_TIME); - - // wait for source to terminate - final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName(); - Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - ClusterMetrics clusterStatus = utilities[0].getAdmin() - .getClusterMetrics(EnumSet.of(ClusterMetrics.Option.LIVE_SERVERS)); - ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(rsName); - List replicationLoadSourceList = - serverLoad.getReplicationLoadSourceList(); - return replicationLoadSourceList.isEmpty(); - } - }); - - Table[] htables = getHTablesOnClusters(tableName); - putAndWait(row, famName, htables[0], htables[0]); - rollWALAndWait(utilities[0], table.getTableName(), row); - ZKWatcher zkw = utilities[0].getZooKeeperWatcher(); - String queuesZnode = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, - ZNodePaths.joinZNode("replication", "rs")); - List listChildrenNoWatch = - ZKUtil.listChildrenNoWatch(zkw, ZNodePaths.joinZNode(queuesZnode, rsName.toString())); - assertEquals(0, listChildrenNoWatch.size()); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 1735f83564f3..4dd264cd5b2a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -490,6 +490,11 @@ protected void doStop() { stoppedCount.incrementAndGet(); notifyStopped(); } + + @Override + public boolean canReplicateToSameCluster() { + return true; + } } public static class InterClusterReplicationEndpointForTest diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java index bd800a841f8e..522fb20543b0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java @@ -127,6 +127,11 @@ protected void doStart() { protected void doStop() { notifyStopped(); } + + @Override + public boolean canReplicateToSameCluster() { + return true; + } } @BeforeClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java index 8c218d6cee8c..b312402fbc8e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java @@ -174,8 +174,9 @@ public void testCleanReplicationBarrierWithExistTable() throws Exception { } public static void createPeer() throws IOException { - ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() - .setClusterKey(UTIL.getClusterKey()).setSerial(true).build(); + ReplicationPeerConfig rpc = + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test") + .setSerial(true).build(); UTIL.getAdmin().addReplicationPeer(PEER_1, rpc); UTIL.getAdmin().addReplicationPeer(PEER_2, rpc); } diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index c669c2a4212b..f6f41d2f7eb9 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -33,6 +33,7 @@ class ReplicationAdminTest < Test::Unit::TestCase def setup @peer_id = '1' + @dummy_endpoint = 'org.apache.hadoop.hbase.replication.DummyReplicationEndpoint' setup_hbase @@ -73,7 +74,8 @@ def teardown define_test "add_peer: single zk cluster key" do cluster_key = "server1.cie.com:2181:/hbase" - command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key}) + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} + command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) peer = command(:list_peers).get(0) @@ -88,7 +90,8 @@ def teardown define_test "add_peer: multiple zk cluster key" do cluster_key = "zk1,zk2,zk3:2182:/hbase-prod" - command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key}) + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} + command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) peer = command(:list_peers).get(0) @@ -106,8 +109,8 @@ def teardown table_cfs = { "ns3:table1" => [], "ns3:table2" => [], "ns3:table3" => [] } # add a new replication peer which serial flag is true - args = { CLUSTER_KEY => cluster_key, SERIAL => true, - TABLE_CFS => table_cfs} + args = {CLUSTER_KEY => cluster_key, SERIAL => true, + TABLE_CFS => table_cfs, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) @@ -126,8 +129,8 @@ def teardown remote_wal_dir = "hdfs://srv1:9999/hbase" table_cfs = { "ns3:table1" => [], "ns3:table2" => [], "ns3:table3" => [] } - args = { CLUSTER_KEY => cluster_key, REMOTE_WAL_DIR => remote_wal_dir, - TABLE_CFS => table_cfs} + args = {CLUSTER_KEY => cluster_key, REMOTE_WAL_DIR => remote_wal_dir, + TABLE_CFS => table_cfs, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) @@ -144,7 +147,7 @@ def teardown define_test "add_peer: single zk cluster key with enabled/disabled state" do cluster_key = "server1.cie.com:2181:/hbase" - args = { CLUSTER_KEY => cluster_key } + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) @@ -153,7 +156,8 @@ def teardown command(:remove_peer, @peer_id) - enable_args = { CLUSTER_KEY => cluster_key, STATE => 'ENABLED' } + enable_args = {CLUSTER_KEY => cluster_key, STATE => 'ENABLED', + ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, enable_args) assert_equal(1, command(:list_peers).length) @@ -162,7 +166,8 @@ def teardown command(:remove_peer, @peer_id) - disable_args = { CLUSTER_KEY => cluster_key, STATE => 'DISABLED' } + disable_args = {CLUSTER_KEY => cluster_key, STATE => 'DISABLED', + ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, disable_args) assert_equal(1, command(:list_peers).length) @@ -175,7 +180,7 @@ def teardown define_test "add_peer: multiple zk cluster key - peer config" do cluster_key = "zk1,zk2,zk3:2182:/hbase-prod" - args = { CLUSTER_KEY => cluster_key } + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) @@ -193,7 +198,8 @@ def teardown namespaces = ["ns1", "ns2", "ns3"] namespaces_str = "ns1;ns2;ns3" - args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces } + args = {CLUSTER_KEY => cluster_key, NAMESPACES => namespaces, + ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) @@ -216,8 +222,8 @@ def teardown "ns3:table3" => ["cf1", "cf2"] } namespaces_str = "ns1;ns2" - args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces, - TABLE_CFS => table_cfs } + args = {CLUSTER_KEY => cluster_key, NAMESPACES => namespaces, + TABLE_CFS => table_cfs, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) @@ -253,7 +259,8 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) cluster_key = "zk4,zk5,zk6:11000:/hbase-test" table_cfs = { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } - args = { CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs } + args = {CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs, + ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) @@ -279,7 +286,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test "set_peer_tableCFs: works with table-cfs map" do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" - args = { CLUSTER_KEY => cluster_key} + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) command(:set_peer_replicate_all, @peer_id, false) @@ -298,7 +305,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test "append_peer_tableCFs: works with table-cfs map" do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" - args = { CLUSTER_KEY => cluster_key} + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) command(:set_peer_replicate_all, @peer_id, false) @@ -322,7 +329,8 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test "remove_peer_tableCFs: works with table-cfs map" do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" table_cfs = { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] } - args = { CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs } + args = {CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs, + ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) @@ -340,7 +348,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test 'set_peer_exclude_tableCFs: works with table-cfs map' do cluster_key = 'zk4,zk5,zk6:11000:/hbase-test' - args = { CLUSTER_KEY => cluster_key } + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) @@ -363,7 +371,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test "append_peer_exclude_tableCFs: works with exclude table-cfs map" do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" - args = {CLUSTER_KEY => cluster_key} + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) peer = command(:list_peers).get(0) @@ -398,7 +406,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test 'remove_peer_exclude_tableCFs: works with exclude table-cfs map' do cluster_key = 'zk4,zk5,zk6:11000:/hbase-test' - args = {CLUSTER_KEY => cluster_key} + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) peer = command(:list_peers).get(0) @@ -436,7 +444,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) namespaces = ["ns1", "ns2"] namespaces_str = "ns1;ns2" - args = { CLUSTER_KEY => cluster_key } + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) command(:set_peer_replicate_all, @peer_id, false) @@ -457,7 +465,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) namespaces = ["ns1", "ns2"] namespaces_str = "ns1;ns2" - args = { CLUSTER_KEY => cluster_key } + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) command(:set_peer_replicate_all, @peer_id, false) @@ -496,7 +504,8 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) cluster_key = "zk4,zk5,zk6:11000:/hbase-test" namespaces = ["ns1", "ns2", "ns3"] - args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces } + args = {CLUSTER_KEY => cluster_key, NAMESPACES => namespaces, + ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) namespaces = ["ns1", "ns2"] @@ -537,7 +546,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) namespaces = ['ns1', 'ns2'] namespaces_str = '!ns1;ns2' - args = { CLUSTER_KEY => cluster_key } + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) command(:set_peer_exclude_namespaces, @peer_id, namespaces) @@ -554,7 +563,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test 'set_peer_replicate_all' do cluster_key = 'zk4,zk5,zk6:11000:/hbase-test' - args = { CLUSTER_KEY => cluster_key } + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) @@ -576,7 +585,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test 'set_peer_serial' do cluster_key = 'zk4,zk5,zk6:11000:/hbase-test' - args = { CLUSTER_KEY => cluster_key } + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) @@ -599,7 +608,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" - args = { CLUSTER_KEY => cluster_key } + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) peer_config = command(:get_peer_config, @peer_id) @@ -617,8 +626,8 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) remote_wal_dir = "hdfs://srv1:9999/hbase" table_cfs = { "ns3:table1" => [], "ns3:table2" => [], "ns3:table3" => [] } - args = { CLUSTER_KEY => cluster_key, REMOTE_WAL_DIR => remote_wal_dir, - TABLE_CFS => table_cfs} + args = {CLUSTER_KEY => cluster_key, REMOTE_WAL_DIR => remote_wal_dir, + TABLE_CFS => table_cfs, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) @@ -645,7 +654,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test "get_peer_config: works with simple clusterKey peer" do cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" - args = { CLUSTER_KEY => cluster_key } + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) peer_config = command(:get_peer_config, @peer_id) assert_equal(cluster_key, peer_config.get_cluster_key) @@ -655,14 +664,13 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test "get_peer_config: works with replicationendpointimpl peer and config params" do cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" - repl_impl = 'org.apache.hadoop.hbase.replication.DummyReplicationEndpoint' config_params = { "config1" => "value1", "config2" => "value2" } - args = { CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => repl_impl, + args = { CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint, CONFIG => config_params } command(:add_peer, @peer_id, args) peer_config = command(:get_peer_config, @peer_id) assert_equal(cluster_key, peer_config.get_cluster_key) - assert_equal(repl_impl, peer_config.get_replication_endpoint_impl) + assert_equal(@dummy_endpoint, peer_config.get_replication_endpoint_impl) assert_equal(2, peer_config.get_configuration.size) assert_equal("value1", peer_config.get_configuration.get("config1")) #cleanup @@ -671,29 +679,27 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test "list_peer_configs: returns all peers' ReplicationPeerConfig objects" do cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" - args = { CLUSTER_KEY => cluster_key } + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} peer_id_second = '2' command(:add_peer, @peer_id, args) - repl_impl = "org.apache.hadoop.hbase.replication.DummyReplicationEndpoint" config_params = { "config1" => "value1", "config2" => "value2" } - args2 = { ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params} + args2 = {ENDPOINT_CLASSNAME => @dummy_endpoint, CONFIG => config_params} command(:add_peer, peer_id_second, args2) peer_configs = command(:list_peer_configs) assert_equal(2, peer_configs.size) assert_equal(cluster_key, peer_configs.get(@peer_id).get_cluster_key) - assert_equal(repl_impl, peer_configs.get(peer_id_second).get_replication_endpoint_impl) + assert_equal(@dummy_endpoint, peer_configs.get(peer_id_second).get_replication_endpoint_impl) #cleanup command(:remove_peer, @peer_id) command(:remove_peer, peer_id_second) end define_test "update_peer_config: can update peer config and data" do - repl_impl = "org.apache.hadoop.hbase.replication.DummyReplicationEndpoint" config_params = { "config1" => "value1", "config2" => "value2" } data_params = {"data1" => "value1", "data2" => "value2"} - args = { ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params, DATA => data_params} + args = {ENDPOINT_CLASSNAME => @dummy_endpoint, CONFIG => config_params, DATA => data_params} command(:add_peer, @peer_id, args) new_config_params = { "config1" => "new_value1" } @@ -713,7 +719,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test "append_peer_exclude_namespaces: works with namespaces array" do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" - args = {CLUSTER_KEY => cluster_key} + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) command(:set_peer_replicate_all, @peer_id, true) @@ -749,7 +755,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) define_test "remove_peer_exclude_namespaces: works with namespaces array" do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" - args = {CLUSTER_KEY => cluster_key} + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) namespaces = ["ns1", "ns2", "ns3"]