diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index eee521c8b770..e8a4f4d9c5cd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hbase.replication; +import com.google.common.base.Splitter; + import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -43,6 +46,9 @@ public class ReplicationPeerConfig { private Map> tableCFsMap = null; private long bandwidth = 0; + public static final String HBASE_REPLICATION_PEER_BASE_CONFIG = + "hbase.replication.peer.default.config"; + public ReplicationPeerConfig() { this.peerData = new TreeMap(Bytes.BYTES_COMPARATOR); this.configuration = new HashMap(0); @@ -99,6 +105,35 @@ public ReplicationPeerConfig setBandwidth(long bandwidth) { return this; } + /** + * Helper method to add base peer configs from Configuration to ReplicationPeerConfig + * if not present in latter. + * + * This merges the user supplied peer configuration + * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs + * provided as property hbase.replication.peer.base.configs in hbase configuration. + * Expected format for this hbase configuration is "k1=v1;k2=v2,v2_1". Original value + * of conf is retained if already present in ReplicationPeerConfig. + * + * @param conf Configuration + */ + public void addBasePeerConfigsIfNotPresent(Configuration conf) { + String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, ""); + + if (basePeerConfigs.length() != 0) { + Map basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings() + .withKeyValueSeparator("=").split(basePeerConfigs); + for (Map.Entry entry : basePeerConfigMap.entrySet()) { + String configName = entry.getKey(); + String configValue = entry.getValue(); + // Only override if base config does not exist in existing replication peer configs + if (!this.getConfiguration().containsKey(configName)) { + this.getConfiguration().put(configName, configValue); + } + } + } + } + @Override public String toString() { StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(","); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 6fefb367c9f0..79e73aee398c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -123,6 +123,7 @@ public void addPeer(String id, ReplicationPeerConfig peerConfig) checkQueuesDeleted(id); + peerConfig.addBasePeerConfigsIfNotPresent(this.conf); ZKUtil.createWithParents(this.zookeeper, this.peersZNode); List listOfOps = new ArrayList(); @@ -451,6 +452,9 @@ public boolean createAndAddPeer(String peerId) throws ReplicationException { } ReplicationPeerZKImpl previous = ((ConcurrentMap) peerClusters).putIfAbsent(peerId, peer); + ReplicationPeerConfig peerConfig = peerClusters.get(peerId).getPeerConfig(); + peerConfig.addBasePeerConfigsIfNotPresent(this.conf); + updatePeerConfig(peerId, peerConfig); if (previous == null) { LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey()); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index c6367cddabc9..31ba8817c879 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -493,6 +493,86 @@ public void testReplicateWALEntryWhenReplicationIsDisabled() throws Exception { } } + /** + * Tests that base replication peer configs are applied on peer creation + * and the configs are overridden if updated as part of updatePeerConfig() + * + */ + @Test + public void testBasePeerConfigsForPeerMutations() + throws Exception { + LOG.info("testBasePeerConfigsForPeerMutations"); + String firstCustomPeerConfigKey = "hbase.xxx.custom_config"; + String firstCustomPeerConfigValue = "test"; + String firstCustomPeerConfigUpdatedValue = "test_updated"; + + String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config"; + String secondCustomPeerConfigValue = "testSecond"; + String secondCustomPeerConfigUpdatedValue = "testSecondUpdated"; + try { + baseConfiguration.set(ReplicationPeerConfig.HBASE_REPLICATION_PEER_BASE_CONFIG, + firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue)); + startMiniClusters(2); + addPeer("1", 0, 1); + addPeer("2", 0, 1); + + ReplicationAdmin replicationAdmin = new ReplicationAdmin(configurations[0]); + ReplicationPeerConfig replicationPeerConfig1 = replicationAdmin.getPeerConfig("1"); + ReplicationPeerConfig replicationPeerConfig2 = replicationAdmin.getPeerConfig("2"); + + // Validates base configs 1 is present for both peer. + assertEquals(firstCustomPeerConfigValue, replicationPeerConfig1. + getConfiguration().get(firstCustomPeerConfigKey)); + assertEquals(firstCustomPeerConfigValue, replicationPeerConfig2. + getConfiguration().get(firstCustomPeerConfigKey)); + + // override value of configuration 1 for peer "1". + replicationPeerConfig1.getConfiguration().put(firstCustomPeerConfigKey, + firstCustomPeerConfigUpdatedValue); + + // add configuration 2 for peer "2". + replicationPeerConfig2.getConfiguration().put(secondCustomPeerConfigKey, + secondCustomPeerConfigUpdatedValue); + + replicationAdmin.updatePeerConfig("1", replicationPeerConfig1); + replicationAdmin.updatePeerConfig("2", replicationPeerConfig2); + + // validates configuration is overridden by updateReplicationPeerConfig + assertEquals(firstCustomPeerConfigUpdatedValue, replicationAdmin.getPeerConfig("1"). + getConfiguration().get(firstCustomPeerConfigKey)); + assertEquals(secondCustomPeerConfigUpdatedValue, replicationAdmin.getPeerConfig("2"). + getConfiguration().get(secondCustomPeerConfigKey)); + + // Add second config to base config and perform restart. + utilities[0].getConfiguration().set(ReplicationPeerConfig. + HBASE_REPLICATION_PEER_BASE_CONFIG, firstCustomPeerConfigKey.concat("="). + concat(firstCustomPeerConfigValue).concat(";").concat(secondCustomPeerConfigKey) + .concat("=").concat(secondCustomPeerConfigValue)); + + utilities[0].shutdownMiniHBaseCluster(); + utilities[0].restartHBaseCluster(1); + replicationAdmin = new ReplicationAdmin(configurations[0]); + + // Both retains the value of base configuration 1 value as before restart. + // Peer 1 (Update value), Peer 2 (Base Value) + assertEquals(firstCustomPeerConfigUpdatedValue, replicationAdmin.getPeerConfig("1"). + getConfiguration().get(firstCustomPeerConfigKey)); + assertEquals(firstCustomPeerConfigValue, replicationAdmin.getPeerConfig("2"). + getConfiguration().get(firstCustomPeerConfigKey)); + + // Peer 1 gets new base config as part of restart. + assertEquals(secondCustomPeerConfigValue, replicationAdmin.getPeerConfig("1"). + getConfiguration().get(secondCustomPeerConfigKey)); + // Peer 2 retains the updated value as before restart. + assertEquals(secondCustomPeerConfigUpdatedValue, replicationAdmin.getPeerConfig("2"). + getConfiguration().get(secondCustomPeerConfigKey)); + } finally { + shutDownMiniClusters(); + baseConfiguration.unset(ReplicationPeerConfig.HBASE_REPLICATION_PEER_BASE_CONFIG); + } + } + + @After public void tearDown() throws IOException { configurations = null;