diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index f569e47bc8d1..c5dcd762e96f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.common.base.Splitter; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -60,6 +61,8 @@ public final class ReplicationPeerConfigUtil { private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class); + public static final String HBASE_REPLICATION_PEER_BASE_CONFIG = + "hbase.replication.peer.base.config"; private ReplicationPeerConfigUtil() {} @@ -450,6 +453,41 @@ public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig( return builder.build(); } + /** + * Helper method to add base peer configs from Configuration to ReplicationPeerConfig + * if not present in latter. + * + * This merges the user supplied peer configuration + * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs + * provided as property hbase.replication.peer.base.configs in hbase configuration. + * Expected format for this hbase configuration is "k1=v1;k2=v2,v2_1". Original value + * of conf is retained if already present in ReplicationPeerConfig. + * + * @param conf Configuration + * @return ReplicationPeerConfig containing updated configs. + */ + public static ReplicationPeerConfig addBasePeerConfigsIfNotPresent(Configuration conf, + ReplicationPeerConfig receivedPeerConfig) { + String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, ""); + ReplicationPeerConfigBuilder copiedPeerConfigBuilder = ReplicationPeerConfig. + newBuilder(receivedPeerConfig); + Map receivedPeerConfigMap = receivedPeerConfig.getConfiguration(); + + if (basePeerConfigs.length() != 0) { + Map basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings() + .withKeyValueSeparator("=").split(basePeerConfigs); + for (Map.Entry entry : basePeerConfigMap.entrySet()) { + String configName = entry.getKey(); + String configValue = entry.getValue(); + // Only override if base config does not exist in existing peer configs + if (!receivedPeerConfigMap.containsKey(configName)) { + copiedPeerConfigBuilder.putConfiguration(configName, configValue); + } + } + } + return copiedPeerConfigBuilder.build(); + } + public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig( Map> excludeTableCfs, ReplicationPeerConfig peerConfig) throws ReplicationException { diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java index 0e7cd74048c3..e7ee1e7c4835 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -34,9 +35,12 @@ import java.util.Random; import java.util.Set; import java.util.stream.Stream; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -215,4 +219,47 @@ public void testNoSyncReplicationState() assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), STORAGE.getNewSyncReplicationStateNode(peerId))); } + + @Test + public void testBaseReplicationPeerConfig() { + String customPeerConfigKey = "hbase.xxx.custom_config"; + String customPeerConfigValue = "test"; + String customPeerConfigUpdatedValue = "testUpdated"; + + String customPeerConfigSecondKey = "hbase.xxx.custom_second_config"; + String customPeerConfigSecondValue = "testSecond"; + String customPeerConfigSecondUpdatedValue = "testSecondUpdated"; + + ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); + + // custom config not present + assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); + + Configuration conf = UTIL.getConfiguration(); + conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, + customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";"). + concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue)); + + ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil. + addBasePeerConfigsIfNotPresent(conf,existingReplicationPeerConfig); + + // validates base configs are present in replicationPeerConfig + assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration(). + get(customPeerConfigKey)); + assertEquals(customPeerConfigSecondValue, updatedReplicationPeerConfig.getConfiguration(). + get(customPeerConfigSecondKey)); + + // validates base configs does not override value if config already present + conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, + customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";"). + concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue)); + + ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil. + addBasePeerConfigsIfNotPresent(conf,updatedReplicationPeerConfig); + + assertEquals(customPeerConfigValue, replicationPeerConfigAfterValueUpdate. + getConfiguration().get(customPeerConfigKey)); + assertEquals(customPeerConfigSecondValue, replicationPeerConfigAfterValueUpdate. + getConfiguration().get(customPeerConfigSecondKey)); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 52060aeb36cd..14f7e9355ed9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -232,6 +233,7 @@ public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean ena // this should be a retry, just return return; } + peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig); ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); SyncReplicationState syncReplicationState = copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE @@ -546,6 +548,9 @@ public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, St ConcurrentMap peers = new ConcurrentHashMap<>(); for (String peerId : peerStorage.listPeerIds()) { ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); + + peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig); + peerStorage.updatePeerConfig(peerId, peerConfig); boolean enabled = peerStorage.isPeerEnabled(peerId); SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId); peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index 7210fc5d7ff2..b7e5edd649b2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -29,6 +29,7 @@ import java.util.Optional; import java.util.Random; import java.util.concurrent.CountDownLatch; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -71,6 +72,7 @@ import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -441,6 +443,84 @@ public void testCyclicReplication3() throws Exception { } } + /** + * Tests that base replication peer configs are applied on peer creation + * and the configs are overriden if updated as part of updateReplicationPeerConfig() + * + */ + @Test + public void testBasePeerConfigsForPeerMutations() + throws Exception { + LOG.info("testBasePeerConfigsForPeerMutations"); + String firstCustomPeerConfigKey = "hbase.xxx.custom_config"; + String firstCustomPeerConfigValue = "test"; + String firstCustomPeerConfigUpdatedValue = "test_updated"; + + String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config"; + String secondCustomPeerConfigValue = "testSecond"; + String secondCustomPeerConfigUpdatedValue = "testSecondUpdated"; + try { + baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, + firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue)); + startMiniClusters(2); + addPeer("1", 0, 1); + addPeer("2", 0, 1); + Admin admin = utilities[0].getAdmin(); + + // Validates base configs 1 is present for both peer. + Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("1"). + getConfiguration().get(firstCustomPeerConfigKey)); + Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2"). + getConfiguration().get(firstCustomPeerConfigKey)); + + // override value of configuration 1 for peer "1". + ReplicationPeerConfig updatedReplicationConfigForPeer1 = ReplicationPeerConfig. + newBuilder(admin.getReplicationPeerConfig("1")). + putConfiguration(firstCustomPeerConfigKey, firstCustomPeerConfigUpdatedValue).build(); + + // add configuration 2 for peer "2". + ReplicationPeerConfig updatedReplicationConfigForPeer2 = ReplicationPeerConfig. + newBuilder(admin.getReplicationPeerConfig("2")). + putConfiguration(secondCustomPeerConfigKey, secondCustomPeerConfigUpdatedValue).build(); + + admin.updateReplicationPeerConfig("1", updatedReplicationConfigForPeer1); + admin.updateReplicationPeerConfig("2", updatedReplicationConfigForPeer2); + + // validates configuration is overridden by updateReplicationPeerConfig + Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1"). + getConfiguration().get(firstCustomPeerConfigKey)); + Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2"). + getConfiguration().get(secondCustomPeerConfigKey)); + + // Add second config to base config and perform restart. + utilities[0].getConfiguration().set(ReplicationPeerConfigUtil. + HBASE_REPLICATION_PEER_BASE_CONFIG, firstCustomPeerConfigKey.concat("="). + concat(firstCustomPeerConfigValue).concat(";").concat(secondCustomPeerConfigKey) + .concat("=").concat(secondCustomPeerConfigValue)); + + utilities[0].shutdownMiniHBaseCluster(); + utilities[0].restartHBaseCluster(1); + admin = utilities[0].getAdmin(); + + // Both retains the value of base configuration 1 value as before restart. + // Peer 1 (Update value), Peer 2 (Base Value) + Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1"). + getConfiguration().get(firstCustomPeerConfigKey)); + Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2"). + getConfiguration().get(firstCustomPeerConfigKey)); + + // Peer 1 gets new base config as part of restart. + Assert.assertEquals(secondCustomPeerConfigValue, admin.getReplicationPeerConfig("1"). + getConfiguration().get(secondCustomPeerConfigKey)); + // Peer 2 retains the updated value as before restart. + Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2"). + getConfiguration().get(secondCustomPeerConfigKey)); + } finally { + shutDownMiniClusters(); + baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); + } + } + @After public void tearDown() throws IOException { configurations = null;