Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-29064 POC: Implement Dual Offset Tracking for WAL Replication #6608

Open
wants to merge 2 commits into
base: HBASE-28957
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public int getTimeout() {
* the context are assumed to be persisted in the target cluster.
* @param replicateContext a context where WAL entries and other parameters can be obtained.
*/
boolean replicate(ReplicateContext replicateContext);
ReplicationResult replicate(ReplicateContext replicateContext);

// The below methods are inspired by Guava Service. See
// https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public enum ReplicationResult {
/* Batch has been replicated and persisted successfully. */
COMMITTED,

/* Batch has been submitted for replication, but not persisted yet. */
SUBMITTED,

/* Batch replicaton failed, should be re-tried */
FAILED
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ private void checkCell(Cell cell) {
}

@Override
public boolean replicate(ReplicateContext replicateContext) {
public ReplicationResult replicate(ReplicateContext replicateContext) {
replicateContext.entries.stream().map(WAL.Entry::getEdit).flatMap(e -> e.getCells().stream())
.forEach(this::checkCell);
return true;
return ReplicationResult.COMMITTED;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand Down Expand Up @@ -424,7 +425,7 @@ private long parallelReplicate(ReplicateContext replicateContext, List<List<Entr
* Do the shipping logic
*/
@Override
public boolean replicate(ReplicateContext replicateContext) {
public ReplicationResult replicate(ReplicateContext replicateContext) {
int sleepMultiplier = 1;
int initialTimeout = replicateContext.getTimeout();

Expand All @@ -444,7 +445,7 @@ public boolean replicate(ReplicateContext replicateContext) {
lastSinkFetchTime = EnvironmentEdgeManager.currentTime();
}
sleepForRetries("No sinks available at peer", sleepMultiplier);
return false;
return ReplicationResult.FAILED;
}

List<List<Entry>> batches = createBatches(replicateContext.getEntries());
Expand All @@ -458,7 +459,7 @@ public boolean replicate(ReplicateContext replicateContext) {
try {
// replicate the batches to sink side.
parallelReplicate(replicateContext, batches);
return true;
return ReplicationResult.COMMITTED;
} catch (IOException ioe) {
if (ioe instanceof RemoteException) {
if (dropOnDeletedTables && isTableNotFoundException(ioe)) {
Expand All @@ -467,14 +468,14 @@ public boolean replicate(ReplicateContext replicateContext) {
batches = filterNotExistTableEdits(batches);
if (batches.isEmpty()) {
LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return");
return true;
return ReplicationResult.COMMITTED;
}
} else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) {
batches = filterNotExistColumnFamilyEdits(batches);
if (batches.isEmpty()) {
LOG.warn("After filter not exist column family's edits, 0 edits to replicate, "
+ "just return");
return true;
return ReplicationResult.COMMITTED;
}
} else {
LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
Expand Down Expand Up @@ -506,7 +507,7 @@ public boolean replicate(ReplicateContext replicateContext) {
}
}
}
return false; // in case we exited before replicating
return ReplicationResult.FAILED; // in case we exited before replicating
}

protected boolean isPeerEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
Expand Down Expand Up @@ -155,7 +156,7 @@ private void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
updateLogPosition(entryBatch);
updateLogPosition(entryBatch, ReplicationResult.COMMITTED);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
Expand All @@ -182,21 +183,23 @@ private void shipEdits(WALEntryBatch entryBatch) {

long startTimeNs = System.nanoTime();
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged
boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
ReplicationResult replicated = source.getReplicationEndpoint().replicate(replicateContext);
long endTimeNs = System.nanoTime();

if (!replicated) {
if (replicated == ReplicationResult.FAILED) {
continue;
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
LOG.trace("shipped entry {}: ", entry);
if (replicated == ReplicationResult.COMMITTED) {
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
LOG.trace("shipped entry {}: ", entry);
}
}
// Log and clean up WAL logs
updateLogPosition(entryBatch);
updateLogPosition(entryBatch, replicated);

// offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
// this sizeExcludeBulkLoad has to use same calculation that when calling
Expand Down Expand Up @@ -253,18 +256,20 @@ private void cleanUpHFileRefs(WALEdit edit) throws IOException {
}
}

private boolean updateLogPosition(WALEntryBatch batch) {
private boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) {
boolean updated = false;
// if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
// record on zk, so let's call it. The last wal position maybe zero if end of file is true and
// there is no entry in the batch. It is OK because that the queue storage will ignore the zero
// position and the file will be removed soon in cleanOldLogs.
if (
batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath)
|| batch.getLastWalPosition() != currentPosition
) {
source.logPositionAndCleanOldLogs(batch);
updated = true;
if (replicated == ReplicationResult.COMMITTED) {
if (
batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath)
|| batch.getLastWalPosition() != currentPosition
) {
source.logPositionAndCleanOldLogs(batch);
updated = true;
}
}
// if end of file is true, then we can just skip to the next file in queue.
// the only exception is for recovered queue, if we reach the end of the queue, then there will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void peerConfigUpdated(ReplicationPeerConfig rpc) {
}

@Override
public boolean replicate(ReplicateContext replicateContext) {
public ReplicationResult replicate(ReplicateContext replicateContext) {
if (!delegator.canReplicateToSameCluster()) {
// Only when the replication is inter cluster replication we need to
// convert the visibility tags to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public WALEntryFilter getWALEntryfilter() {
}

@Override
public boolean replicate(ReplicateContext replicateContext) {
return true;
public ReplicationResult replicate(ReplicateContext replicateContext) {
return ReplicationResult.COMMITTED;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public UUID getPeerUUID() {
}

@Override
public boolean replicate(ReplicateContext replicateContext) {
public ReplicationResult replicate(ReplicateContext replicateContext) {
synchronized (WRITER) {
try {
for (Entry entry : replicateContext.getEntries()) {
Expand All @@ -92,7 +92,7 @@ public boolean replicate(ReplicateContext replicateContext) {
throw new UncheckedIOException(e);
}
}
return true;
return ReplicationResult.COMMITTED;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ protected Collection<ServerName> fetchPeerAddresses() {
}

@Override
public boolean replicate(ReplicateContext replicateContext) {
return false;
public ReplicationResult replicate(ReplicateContext replicateContext) {
return ReplicationResult.FAILED;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ public WALEntryFilter getWALEntryfilter() {
}

@Override
public boolean replicate(ReplicateContext replicateContext) {
public ReplicationResult replicate(ReplicateContext replicateContext) {
REPLICATED.set(true);
return true;
return ReplicationResult.COMMITTED;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ public ReplicationEndpointTest() {
}

@Override
public boolean replicate(ReplicateContext replicateContext) {
public ReplicationResult replicate(ReplicateContext replicateContext) {
replicateCount.incrementAndGet();
replicatedEntries.addAll(replicateContext.getEntries());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,10 @@ public UUID getPeerUUID() {
}

@Override
public boolean replicate(ReplicateContext replicateContext) {
public ReplicationResult replicate(ReplicateContext replicateContext) {
replicateCount.incrementAndGet();
lastEntries = new ArrayList<>(replicateContext.entries);
return true;
return ReplicationResult.COMMITTED;
}

@Override
Expand Down Expand Up @@ -526,12 +526,12 @@ public void init(Context context) throws IOException {
}

@Override
public boolean replicate(ReplicateContext context) {
public ReplicationResult replicate(ReplicateContext context) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
return ReplicationResult.FAILED;
}
return super.replicate(context);
}
Expand All @@ -548,9 +548,9 @@ public InterClusterReplicationEndpointForTest() {
}

@Override
public boolean replicate(ReplicateContext replicateContext) {
boolean success = super.replicate(replicateContext);
if (success) {
public ReplicationResult replicate(ReplicateContext replicateContext) {
ReplicationResult success = super.replicate(replicateContext);
if (success == ReplicationResult.COMMITTED) {
replicateCount.addAndGet(replicateContext.entries.size());
}
return success;
Expand All @@ -577,7 +577,7 @@ public static class ReplicationEndpointReturningFalse extends ReplicationEndpoin
static AtomicBoolean replicated = new AtomicBoolean(false);

@Override
public boolean replicate(ReplicateContext replicateContext) {
public ReplicationResult replicate(ReplicateContext replicateContext) {
try {
// check row
doAssert(row);
Expand All @@ -589,7 +589,7 @@ public boolean replicate(ReplicateContext replicateContext) {
LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());

replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
return replicated.get();
return replicated.get() ? ReplicationResult.COMMITTED : ReplicationResult.FAILED;
}
}

Expand All @@ -598,14 +598,14 @@ public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEnd
static AtomicReference<Exception> ex = new AtomicReference<>(null);

@Override
public boolean replicate(ReplicateContext replicateContext) {
public ReplicationResult replicate(ReplicateContext replicateContext) {
try {
super.replicate(replicateContext);
doAssert(row);
} catch (Exception e) {
ex.set(e);
}
return true;
return ReplicationResult.COMMITTED;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class TestVerifyCellsReplicationEndpoint {
public static final class EndpointForTest extends VerifyWALEntriesReplicationEndpoint {

@Override
public boolean replicate(ReplicateContext replicateContext) {
public ReplicationResult replicate(ReplicateContext replicateContext) {
LOG.info(replicateContext.getEntries().toString());
replicateContext.entries.stream().map(WAL.Entry::getEdit).map(WALEdit::getCells)
.forEachOrdered(CELLS::addAll);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -94,7 +95,7 @@ public UUID getPeerUUID() {
}

@Override
public boolean replicate(ReplicateContext replicateContext) {
public ReplicationResult replicate(ReplicateContext replicateContext) {
synchronized (WRITER) {
try {
for (Entry entry : replicateContext.getEntries()) {
Expand All @@ -105,7 +106,7 @@ public boolean replicate(ReplicateContext replicateContext) {
throw new UncheckedIOException(e);
}
}
return true;
return ReplicationResult.COMMITTED;
}

@Override
Expand Down
Loading