-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-22380 break circle replication when doing bulkload #494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 3 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
91a5daf
HBASE-22380 break circle replication when doing bulkload
wchevreuil b59ab27
fixed previous UT failure; Added IT to check this loop condition does…
wchevreuil b117b55
added IT.
wchevreuil 11a946e
Addressing review comment: 'nit. Do this a few lines higher and then …
wchevreuil f03dda6
Addressing latest checkstyle issue reported.
wchevreuil 436351c
Added a 3rd cluster in the test, with cyclic replication within clust…
wchevreuil f6db028
Changed bulk load RPC request to pass a list of cluster ids, so that …
wchevreuil 83f3eaf
Redesigned test to validate cyclic replication between 3 clusters.
wchevreuil 3bd3f48
addressing latest build checkstyle issues
wchevreuil 11d8ecf
addressing suggestion from Busbey to use entrySet and sort the findbu…
wchevreuil f108251
addressing latest checktyle issue.
wchevreuil File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -174,8 +174,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells, | |
| // invocation of this method per table and cluster id. | ||
| Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>(); | ||
|
|
||
| // Map of table name Vs list of pair of family and list of hfile paths from its namespace | ||
| Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null; | ||
| Map<String, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null; | ||
|
|
||
| for (WALEntry entry : entries) { | ||
| TableName table = | ||
|
|
@@ -204,10 +203,19 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells, | |
| Cell cell = cells.current(); | ||
| // Handle bulk load hfiles replication | ||
| if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { | ||
| BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); | ||
| if(bulkLoadsPerClusters == null) { | ||
| bulkLoadsPerClusters = new HashMap<>(); | ||
| } | ||
| // Map of table name Vs list of pair of family and list of | ||
| // hfile paths from its namespace | ||
| Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = | ||
| bulkLoadsPerClusters.get(bld.getClusterId()); | ||
| if (bulkLoadHFileMap == null) { | ||
| bulkLoadHFileMap = new HashMap<>(); | ||
| bulkLoadsPerClusters.put(bld.getClusterId(), bulkLoadHFileMap); | ||
| } | ||
| buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell); | ||
| buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); | ||
| } else { | ||
| // Handle wal replication | ||
| if (isNewRowOrType(previousCell, cell)) { | ||
|
|
@@ -243,14 +251,20 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells, | |
| LOG.debug("Finished replicating mutations."); | ||
| } | ||
|
|
||
| if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { | ||
| LOG.debug("Started replicating bulk loaded data."); | ||
| HFileReplicator hFileReplicator = | ||
| new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), | ||
| if(bulkLoadsPerClusters != null) { | ||
| for (String clusterId : bulkLoadsPerClusters.keySet()) { | ||
|
||
| Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = | ||
| bulkLoadsPerClusters.get(clusterId); | ||
| if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { | ||
| LOG.debug("Started replicating bulk loaded data from cluster id: {}.", clusterId); | ||
| HFileReplicator hFileReplicator = | ||
| new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), | ||
| sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, | ||
| getConnection()); | ||
| hFileReplicator.replicate(); | ||
| LOG.debug("Finished replicating bulk loaded data."); | ||
| getConnection(), clusterId); | ||
| hFileReplicator.replicate(); | ||
| LOG.debug("Finished replicating bulk loaded data from cluster id: {}", clusterId); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| int size = entries.size(); | ||
|
|
@@ -265,8 +279,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells, | |
|
|
||
| private void buildBulkLoadHFileMap( | ||
| final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, | ||
| Cell cell) throws IOException { | ||
| BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); | ||
| BulkLoadDescriptor bld) throws IOException { | ||
| List<StoreDescriptor> storesList = bld.getStoresList(); | ||
| int storesSize = storesList.size(); | ||
| for (int j = 0; j < storesSize; j++) { | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Do this a few lines higher and then you can replace a few request.getClusterId() calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, have addressed it on the last changes.