Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
public final class ReplicationPeerConfigUtil {

private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
public static final String HBASE_REPLICATION_PEER_DEFAULT_CONFIG= "hbase.replication.peer.default.config";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it private?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also getting referenced in other classes for testing, so kept it public.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this 'default' or 'base' configuration for all peers?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in the latest commit.


private ReplicationPeerConfigUtil() {}

Expand Down Expand Up @@ -450,6 +451,45 @@ public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig(
return builder.build();
}

/**
Comment thread
bharathv marked this conversation as resolved.
Sample Configuration
<property>
<name>hbase.replication.peer.default.configs</name>
<value>hbase.replication.source.custom.walentryfilters=x,y,z;hbase.xxx.custom_property=123</value>
</property>
*/

/**
* Helper method to add default peer configs from HBase Configuration to ReplicationPeerConfig
* @param conf Configuration
* @return true if new configurations was added.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return value javadoc seems wrong.

*/
public static ReplicationPeerConfig addDefaultPeerConfigsIfNotPresent(Configuration conf, ReplicationPeerConfig receivedPeerConfig){

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind fixing the check-style issues from precommit? Bunch of overflows.


ReplicationPeerConfigBuilder copiedPeerConfigBuilder = ReplicationPeerConfig.newBuilder(receivedPeerConfig);
String defaultPeerConfigs = conf.get(HBASE_REPLICATION_PEER_DEFAULT_CONFIG);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: get(CONFIG, default)


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Remove multiple extraneous new lines in this method.

Map<String,String> peerConfigurations = receivedPeerConfig.getConfiguration();

if(defaultPeerConfigs != null && defaultPeerConfigs.length() != 0){

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See how rest of code has space between keyword and brackets as in 'if (' rather than 'if('.... you do this a few times in this PR.

String[] defaultPeerConfigList = defaultPeerConfigs.split(";");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

';' is safe character to use as delimiter for sure? Will never be part of a config value? Are the constraint on peer values at all so you could choose a delimiter that was outside of the constraint set?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you could choose a delimiter that was outside of the constraint set
Are there any current constraints regarding the convention on peer values that I can refer to?

I looked around and since there can be multiple values of a particular peer config that are generally delimited by , so I decided to use ; for delimiting different peer configs. But I am happy to change this if it can cause any problems in your opinion. Thanks

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also Splitter from Guava provides a clean API to do this parsing (couple of lines of code).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you missed this, this code can be condensed into

Splitter.on(';').withKeyValueSeparator('=').split(value).trimResults();


for(String defaultPeerConfig : defaultPeerConfigList){
String[] configSplit = defaultPeerConfig.split("=");
if(configSplit != null && configSplit.length == 2){
String configName = configSplit[0];
String configValue = configSplit[1];

// Only override if default property does not exist in existing peer configs or its value is different.
if(!peerConfigurations.containsKey(configName) || !peerConfigurations.get(configName).equalsIgnoreCase(configValue)){
copiedPeerConfigBuilder.putConfiguration(configName,configValue);
}
}
}
}
return copiedPeerConfigBuilder.build();
}

public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
throws ReplicationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -144,7 +145,11 @@ private ReplicationPeerImpl createPeer(String peerId) throws ReplicationExceptio
SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId);
SyncReplicationState newSyncReplicationState =
peerStorage.getPeerNewSyncReplicationState(peerId);

ReplicationPeerConfig updatedPeerConfig = ReplicationPeerConfigUtil.addDefaultPeerConfigsIfNotPresent(this.conf, peerConfig);
peerStorage.updatePeerConfig(peerId,updatedPeerConfig);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking this happens only in the master code paths. Ex: ReplicationPeerManager#create (for existing peers) or addPeer() for new peers etc. That way the configuration in storage remains consistent.

Doing from the RS code paths (ReplicationPeers) means that if different RS run with different configs it can result in a different final state (depending which RS does this RPC last). Also doing this from HMaster seems logical since this is more like an admin operation whereas RS based codepaths are just consumers of this config.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that completely makes sense. Updated in the latest commit.


return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState);
peerId, updatedPeerConfig, enabled, syncReplicationState, newSyncReplicationState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@
import java.util.Random;
import java.util.Set;
import java.util.stream.Stream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
Expand Down Expand Up @@ -215,4 +218,40 @@ public void testNoSyncReplicationState()
assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
STORAGE.getNewSyncReplicationStateNode(peerId)));
}

@Test
public void testDefaultReplicationPeerConfigIsAppliedIfNotAlreadySet(){

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two tests can be merged into one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since they were testing different behaviors I kept them as different, do you think we should still merge them?

String customPeerConfigKey = "hbase.xxx.custom_config";
String customPeerConfigValue = "test";

ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);

// custom config not present
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);

Configuration conf = UTIL.getConfiguration();
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_DEFAULT_CONFIG,
customPeerConfigKey.concat("=").concat(customPeerConfigValue));

ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.addDefaultPeerConfigsIfNotPresent(conf,existingReplicationPeerConfig);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long lines? 100chars is max.

assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
}

@Test
public void testDefaultReplicationPeerConfigOverrideIfAlreadySet(){

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need more coverage for the following cases.

  • Existing peer config gets the config override
  • Admin code paths (for getPeerConfig, updatePeerConfig, etc) work well with the overlays (updating an existing / non-existing config etc)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the tests for admin code paths.

The behavior after this patch is that a peer config will only be updated by configuration object if that new configuration was not present in ReplicationPeerConfig. If it was already present then old value is retained and values from configuration object won't make any changes.


String customPeerConfigKey = "hbase.xxx.custom_config";
String customPeerConfigValue = "test";
String customPeerConfigUpdatedValue = "test_updated";

ReplicationPeerConfig existingReplicationPeerConfig = ReplicationPeerConfig.newBuilder(getConfig(1))
.putConfiguration(customPeerConfigKey,customPeerConfigValue).build();

Configuration conf = UTIL.getConfiguration();
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_DEFAULT_CONFIG,
customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue));

ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.addDefaultPeerConfigsIfNotPresent(conf,existingReplicationPeerConfig);
assertEquals(customPeerConfigUpdatedValue, updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
Expand Down Expand Up @@ -119,6 +120,8 @@ void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
if (peerId.contains("-")) {
throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
}
ReplicationPeerConfig updatedPeerConfig = ReplicationPeerConfigUtil.addDefaultPeerConfigsIfNotPresent(conf,peerConfig);
peerConfig = updatedPeerConfig;
checkPeerConfig(peerConfig);
if (peerConfig.isSyncReplication()) {
checkSyncReplicationPeerConfigConflict(peerConfig);
Expand Down