From 5bedce212439663dd744267b10bb4e8349328d05 Mon Sep 17 00:00:00 2001 From: Andor Molnar Date: Thu, 16 Jan 2025 12:55:08 -0600 Subject: [PATCH 1/2] HBASE-29064. Use 3-state replication result in ReplicationEndpoint --- .../replication/ReplicationEndpoint.java | 2 +- .../hbase/replication/ReplicationResult.java | 34 +++++++++++++++++++ .../VerifyWALEntriesReplicationEndpoint.java | 4 +-- .../HBaseInterClusterReplicationEndpoint.java | 13 +++---- .../ReplicationSourceShipper.java | 33 ++++++++++-------- .../VisibilityReplicationEndpoint.java | 3 +- .../replication/DummyReplicationEndpoint.java | 4 +-- .../SerialReplicationTestBase.java | 4 +-- .../TestHBaseReplicationEndpoint.java | 4 +-- .../TestNonHBaseReplicationEndpoint.java | 4 +-- .../replication/TestReplicationBase.java | 2 +- .../replication/TestReplicationEndpoint.java | 22 ++++++------ .../TestVerifyCellsReplicationEndpoint.java | 2 +- ...TestRaceWhenCreatingReplicationSource.java | 5 +-- .../TestReplicationSourceManager.java | 5 +-- .../regionserver/TestReplicator.java | 3 +- .../TestVisibilityLabelsReplication.java | 5 +-- 17 files changed, 96 insertions(+), 53 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 5edd5b3e8c92..e155b3797b2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -208,7 +208,7 @@ public int getTimeout() { * the context are assumed to be persisted in the target cluster. * @param replicateContext a context where WAL entries and other parameters can be obtained. */ - boolean replicate(ReplicateContext replicateContext); + ReplicationResult replicate(ReplicateContext replicateContext); // The below methods are inspired by Guava Service. See // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java new file mode 100644 index 000000000000..bc6290809c84 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public enum ReplicationResult { + /* Batch has been replicated and persisted successfully. */ + COMMITTED, + + /* Batch has been submitted for replication, but not persisted yet. */ + SUBMITTED, + + /* Batch replicaton failed, should be re-tried */ + FAILED +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java index 229cec57e976..a9674407bd2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java @@ -59,10 +59,10 @@ private void checkCell(Cell cell) { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { replicateContext.entries.stream().map(WAL.Entry::getEdit).flatMap(e -> e.getCells().stream()) .forEach(this::checkCell); - return true; + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 6bdc97732644..4f9a4909d784 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -424,7 +425,7 @@ private long parallelReplicate(ReplicateContext replicateContext, List> batches = createBatches(replicateContext.getEntries()); @@ -458,7 +459,7 @@ public boolean replicate(ReplicateContext replicateContext) { try { // replicate the batches to sink side. parallelReplicate(replicateContext, batches); - return true; + return ReplicationResult.COMMITTED; } catch (IOException ioe) { if (ioe instanceof RemoteException) { if (dropOnDeletedTables && isTableNotFoundException(ioe)) { @@ -467,14 +468,14 @@ public boolean replicate(ReplicateContext replicateContext) { batches = filterNotExistTableEdits(batches); if (batches.isEmpty()) { LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return"); - return true; + return ReplicationResult.COMMITTED; } } else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) { batches = filterNotExistColumnFamilyEdits(batches); if (batches.isEmpty()) { LOG.warn("After filter not exist column family's edits, 0 edits to replicate, " + "just return"); - return true; + return ReplicationResult.COMMITTED; } } else { LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(), @@ -506,7 +507,7 @@ public boolean replicate(ReplicateContext replicateContext) { } } } - return false; // in case we exited before replicating + return ReplicationResult.FAILED; // in case we exited before replicating } protected boolean isPeerEnabled() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 6d0730d76b6e..b111a33c9ee3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -155,7 +156,7 @@ private void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; if (entries.isEmpty()) { - updateLogPosition(entryBatch); + updateLogPosition(entryBatch, ReplicationResult.COMMITTED); return; } int currentSize = (int) entryBatch.getHeapSize(); @@ -182,21 +183,23 @@ private void shipEdits(WALEntryBatch entryBatch) { long startTimeNs = System.nanoTime(); // send the edits to the endpoint. Will block until the edits are shipped and acknowledged - boolean replicated = source.getReplicationEndpoint().replicate(replicateContext); + ReplicationResult replicated = source.getReplicationEndpoint().replicate(replicateContext); long endTimeNs = System.nanoTime(); - if (!replicated) { + if (replicated == ReplicationResult.FAILED) { continue; } else { sleepMultiplier = Math.max(sleepMultiplier - 1, 0); } - // Clean up hfile references - for (Entry entry : entries) { - cleanUpHFileRefs(entry.getEdit()); - LOG.trace("shipped entry {}: ", entry); + if (replicated == ReplicationResult.COMMITTED) { + // Clean up hfile references + for (Entry entry : entries) { + cleanUpHFileRefs(entry.getEdit()); + LOG.trace("shipped entry {}: ", entry); + } } // Log and clean up WAL logs - updateLogPosition(entryBatch); + updateLogPosition(entryBatch, replicated); // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) // this sizeExcludeBulkLoad has to use same calculation that when calling @@ -253,18 +256,18 @@ private void cleanUpHFileRefs(WALEdit edit) throws IOException { } } - private boolean updateLogPosition(WALEntryBatch batch) { + private boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) { boolean updated = false; // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file // record on zk, so let's call it. The last wal position maybe zero if end of file is true and // there is no entry in the batch. It is OK because that the queue storage will ignore the zero // position and the file will be removed soon in cleanOldLogs. - if ( - batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) - || batch.getLastWalPosition() != currentPosition - ) { - source.logPositionAndCleanOldLogs(batch); - updated = true; + if (replicated == ReplicationResult.COMMITTED) { + if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) + || batch.getLastWalPosition() != currentPosition) { + source.logPositionAndCleanOldLogs(batch); + updated = true; + } } // if end of file is true, then we can just skip to the next file in queue. // the only exception is for recovered queue, if we reach the end of the queue, then there will diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java index b97a08c01c38..a32ce78b0c78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -63,7 +64,7 @@ public void peerConfigUpdated(ReplicationPeerConfig rpc) { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { if (!delegator.canReplicateToSameCluster()) { // Only when the replication is inter cluster replication we need to // convert the visibility tags to diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java index e6a39e7fede1..f0e627316cd4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java @@ -42,8 +42,8 @@ public WALEntryFilter getWALEntryfilter() { } @Override - public boolean replicate(ReplicateContext replicateContext) { - return true; + public ReplicationResult replicate(ReplicateContext replicateContext) { + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index f54c39316997..a8c76033d02d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -81,7 +81,7 @@ public UUID getPeerUUID() { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { synchronized (WRITER) { try { for (Entry entry : replicateContext.getEntries()) { @@ -92,7 +92,7 @@ public boolean replicate(ReplicateContext replicateContext) { throw new UncheckedIOException(e); } } - return true; + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java index 058564dc0ecf..e3a9e46b8e4a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java @@ -199,8 +199,8 @@ protected Collection fetchPeerAddresses() { } @Override - public boolean replicate(ReplicateContext replicateContext) { - return false; + public ReplicationResult replicate(ReplicateContext replicateContext) { + return ReplicationResult.FAILED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java index 70cae18b4561..c98b46c8e4be 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java @@ -127,9 +127,9 @@ public WALEntryFilter getWALEntryfilter() { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { REPLICATED.set(true); - return true; + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 70a6d73c6202..f53d9acc24f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -415,7 +415,7 @@ public ReplicationEndpointTest() { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { replicateCount.incrementAndGet(); replicatedEntries.addAll(replicateContext.getEntries()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 057a9f3567f5..77cd5da8de0a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -463,10 +463,10 @@ public UUID getPeerUUID() { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { replicateCount.incrementAndGet(); lastEntries = new ArrayList<>(replicateContext.entries); - return true; + return ReplicationResult.COMMITTED; } @Override @@ -526,12 +526,12 @@ public void init(Context context) throws IOException { } @Override - public boolean replicate(ReplicateContext context) { + public ReplicationResult replicate(ReplicateContext context) { try { Thread.sleep(duration); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - return false; + return ReplicationResult.FAILED; } return super.replicate(context); } @@ -548,9 +548,9 @@ public InterClusterReplicationEndpointForTest() { } @Override - public boolean replicate(ReplicateContext replicateContext) { - boolean success = super.replicate(replicateContext); - if (success) { + public ReplicationResult replicate(ReplicateContext replicateContext) { + ReplicationResult success = super.replicate(replicateContext); + if (success == ReplicationResult.COMMITTED) { replicateCount.addAndGet(replicateContext.entries.size()); } return success; @@ -577,7 +577,7 @@ public static class ReplicationEndpointReturningFalse extends ReplicationEndpoin static AtomicBoolean replicated = new AtomicBoolean(false); @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { try { // check row doAssert(row); @@ -589,7 +589,7 @@ public boolean replicate(ReplicateContext replicateContext) { LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get()); replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false - return replicated.get(); + return replicated.get() ? ReplicationResult.COMMITTED : ReplicationResult.FAILED; } } @@ -598,14 +598,14 @@ public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEnd static AtomicReference ex = new AtomicReference<>(null); @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { try { super.replicate(replicateContext); doAssert(row); } catch (Exception e) { ex.set(e); } - return true; + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java index b990916ae75f..50b0911970a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java @@ -71,7 +71,7 @@ public class TestVerifyCellsReplicationEndpoint { public static final class EndpointForTest extends VerifyWALEntriesReplicationEndpoint { @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { LOG.info(replicateContext.getEntries().toString()); replicateContext.entries.stream().map(WAL.Entry::getEdit).map(WALEdit::getCells) .forEachOrdered(CELLS::addAll); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java index 66f04dca36d5..d7b5bcdcccbf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -94,7 +95,7 @@ public UUID getPeerUUID() { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { synchronized (WRITER) { try { for (Entry entry : replicateContext.getEntries()) { @@ -105,7 +106,7 @@ public boolean replicate(ReplicateContext replicateContext) { throw new UncheckedIOException(e); } } - return true; + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 663b444dc4e4..c99f25380de4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; @@ -94,13 +95,13 @@ public static final class ReplicationEndpointForTest extends DummyReplicationEnd private String clusterKey; @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { // if you want to block the replication, for example, do not want the recovered source to be // removed if (clusterKey.endsWith("error")) { throw new RuntimeException("Inject error"); } - return true; + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java index 979db712ef34..cdbd1c73a2a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -218,7 +219,7 @@ public static void setEntriesCount(int i) { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { try { await(); } catch (InterruptedException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java index 7d5a5627d2c0..ffbc0d2cee5a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.SecurityTests; @@ -473,8 +474,8 @@ public VisibilityReplicationEndPointForTest(ReplicationEndpoint endpoint, } @Override - public boolean replicate(ReplicateContext replicateContext) { - boolean ret = super.replicate(replicateContext); + public ReplicationResult replicate(ReplicateContext replicateContext) { + ReplicationResult ret = super.replicate(replicateContext); lastEntries = replicateContext.getEntries(); replicateCount.incrementAndGet(); return ret; From a9a12c3ca59e382bdaddc07875ff9d9f48ab9323 Mon Sep 17 00:00:00 2001 From: Andor Molnar Date: Thu, 16 Jan 2025 15:03:55 -0600 Subject: [PATCH 2/2] HBASE-29064. Spotless apply --- .../apache/hadoop/hbase/replication/ReplicationResult.java | 1 - .../replication/regionserver/ReplicationSourceShipper.java | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java index bc6290809c84..03ed0ce6799f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.replication; import org.apache.hadoop.hbase.HBaseInterfaceAudience; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index b111a33c9ee3..f57f76c4441e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -263,8 +263,10 @@ private boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replica // there is no entry in the batch. It is OK because that the queue storage will ignore the zero // position and the file will be removed soon in cleanOldLogs. if (replicated == ReplicationResult.COMMITTED) { - if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) - || batch.getLastWalPosition() != currentPosition) { + if ( + batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) + || batch.getLastWalPosition() != currentPosition + ) { source.logPositionAndCleanOldLogs(batch); updated = true; }