-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-24764: Add support of adding default peer configs via hbase-site.xml for all replication peers. #2284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
8510b99
4b83e2c
b50ceab
283a180
68b769e
d815e13
560154a
6d5ac0f
df48cb0
a65f3e4
e2f08b0
20e3350
9147c93
eb9c543
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,7 @@ | |
| import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; | ||
| import org.apache.hadoop.hbase.replication.SyncReplicationState; | ||
| import org.apache.hadoop.hbase.util.Bytes; | ||
| import org.apache.hbase.thirdparty.com.google.common.base.Splitter; | ||
| import org.apache.yetus.audience.InterfaceAudience; | ||
| import org.apache.yetus.audience.InterfaceStability; | ||
| import org.slf4j.Logger; | ||
|
|
@@ -60,6 +61,8 @@ | |
| public final class ReplicationPeerConfigUtil { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class); | ||
| public static final String HBASE_REPLICATION_PEER_BASE_CONFIG = | ||
| "hbase.replication.peer.base.config"; | ||
|
|
||
| private ReplicationPeerConfigUtil() {} | ||
|
|
||
|
|
@@ -450,6 +453,45 @@ public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig( | |
| return builder.build(); | ||
| } | ||
|
|
||
| /** | ||
| * Helper method to add base peer configs from Configuration to ReplicationPeerConfig | ||
| * if not present in latter. | ||
| * | ||
| * This merges the user supplied peer configuration | ||
| * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs | ||
| * provided as property hbase.replication.peer.base.configs in hbase configuration. | ||
| * Expected format for this hbase configuration is "k1=v1;k2=v2,v2_1". Original value | ||
| * of conf is retained if already present in ReplicationPeerConfig. | ||
| * | ||
| * @param conf Configuration | ||
| * @return ReplicationPeerConfig containing updated configs. | ||
| */ | ||
| public static ReplicationPeerConfig addBasePeerConfigsIfNotPresent(Configuration conf, | ||
| ReplicationPeerConfig receivedPeerConfig) { | ||
| String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, null); | ||
|
||
|
|
||
| ReplicationPeerConfigBuilder copiedPeerConfigBuilder = ReplicationPeerConfig. | ||
| newBuilder(receivedPeerConfig); | ||
| Map<String,String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration(); | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Remove multiple extraneous new lines in this method. |
||
| if (basePeerConfigs != null && basePeerConfigs.length() != 0) { | ||
|
|
||
| Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings() | ||
| .withKeyValueSeparator("=").split(basePeerConfigs); | ||
|
|
||
| for (Map.Entry<String,String> entry : basePeerConfigMap.entrySet()) { | ||
| String configName = entry.getKey(); | ||
| String configValue = entry.getValue(); | ||
| // Only override if base config does not exist in existing peer configs | ||
| if (!receivedPeerConfigMap.containsKey(configName)) { | ||
| copiedPeerConfigBuilder.putConfiguration(configName,configValue); | ||
|
||
| } | ||
| } | ||
| } | ||
|
|
||
| return copiedPeerConfigBuilder.build(); | ||
| } | ||
|
|
||
| public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig( | ||
| Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig) | ||
| throws ReplicationException { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ | |
| import org.apache.hadoop.hbase.HBaseConfiguration; | ||
| import org.apache.hadoop.hbase.ServerName; | ||
| import org.apache.hadoop.hbase.TableName; | ||
| import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; | ||
| import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; | ||
| import org.apache.hadoop.hbase.replication.ReplicationEndpoint; | ||
| import org.apache.hadoop.hbase.replication.ReplicationException; | ||
|
|
@@ -232,6 +233,9 @@ public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean ena | |
| // this should be a retry, just return | ||
| return; | ||
| } | ||
| ReplicationPeerConfig updatedPeerConfig = ReplicationPeerConfigUtil. | ||
| addBasePeerConfigsIfNotPresent(conf,peerConfig); | ||
|
||
| peerConfig = updatedPeerConfig; | ||
|
||
| ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); | ||
| SyncReplicationState syncReplicationState = | ||
| copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE | ||
|
|
@@ -546,9 +550,13 @@ public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, St | |
| ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>(); | ||
| for (String peerId : peerStorage.listPeerIds()) { | ||
| ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); | ||
|
|
||
| ReplicationPeerConfig updatedPeerConfig = ReplicationPeerConfigUtil. | ||
|
||
| addBasePeerConfigsIfNotPresent(conf,peerConfig); | ||
| peerStorage.updatePeerConfig(peerId,updatedPeerConfig); | ||
|
||
| boolean enabled = peerStorage.isPeerEnabled(peerId); | ||
| SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId); | ||
| peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state)); | ||
| peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, updatedPeerConfig, state)); | ||
| } | ||
| return new ReplicationPeerManager(peerStorage, | ||
| ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.