diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d29a60b766d4..c8af65915240 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; @@ -436,14 +437,30 @@ public String getPeerClusterId() { } @Override + @VisibleForTesting public Path getCurrentPath() { - // only for testing for (ReplicationSourceShipperThread worker : workerThreads.values()) { if (worker.getCurrentPath() != null) return worker.getCurrentPath(); } return null; } + @VisibleForTesting + public Path getLastLoggedPath() { + for (ReplicationSourceShipperThread worker : workerThreads.values()) { + return worker.getLastLoggedPath(); + } + return null; + } + + @VisibleForTesting + public long getLastLoggedPosition() { + for (ReplicationSourceShipperThread worker : workerThreads.values()) { + return worker.getLastLoggedPosition(); + } + return 0; + } + private boolean isSourceActive() { return !this.stopper.isStopped() && this.sourceRunning; } @@ -478,8 +495,8 @@ public String getStats() { for (Map.Entry entry : workerThreads.entrySet()) { String walGroupId = entry.getKey(); ReplicationSourceShipperThread worker = entry.getValue(); - long position = worker.getCurrentPosition(); - Path currentPath = worker.getCurrentPath(); + long position = worker.getLastLoggedPosition(); + Path currentPath = worker.getLastLoggedPath(); sb.append("walGroup [").append(walGroupId).append("]: "); if (currentPath != null) { sb.append("currently replicating from: ").append(currentPath).append(" at position: ") @@ -513,7 +530,7 @@ public class ReplicationSourceShipperThread extends Thread { // Last position in the log that we sent to ZooKeeper private long lastLoggedPosition = -1; // Path of the current log - private volatile Path currentPath; + private volatile Path lastLoggedPath; // Current state of the worker thread private WorkerState state; ReplicationSourceWALReaderThread entryReader; @@ -553,13 +570,11 @@ public void run() { try { WALEntryBatch entryBatch = entryReader.take(); shipEdits(entryBatch); - if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty() - && entryBatch.getLastSeqIds().isEmpty()) { - LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " - + peerClusterZnode); + if (!entryBatch.hasMoreEntries()) { + LOG.debug("Finished recovering queue for group " + + walGroupId + " of peer " + peerClusterZnode); metrics.incrCompletedRecoveryQueue(); setWorkerState(WorkerState.FINISHED); - continue; } } catch (InterruptedException e) { LOG.trace("Interrupted while waiting for next replication entry batch", e); @@ -567,7 +582,7 @@ public void run() { } } - if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) { + if (getWorkerState() == WorkerState.FINISHED) { // use synchronize to make sure one last thread will clean the queue synchronized (this) { Threads.sleep(100);// wait a short while for other worker thread to fully exit @@ -635,15 +650,13 @@ private void checkBandwidthChangeAndResetThrottler() { protected void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); long lastReadPosition = entryBatch.getLastWalPosition(); - currentPath = entryBatch.getLastWalPath(); + lastLoggedPath = entryBatch.getLastWalPath(); int sleepMultiplier = 0; if (entries.isEmpty()) { - if (lastLoggedPosition != lastReadPosition) { - updateLogPosition(lastReadPosition); - // if there was nothing to ship and it's not an error - // set "ageOfLastShippedOp" to to indicate that we're current - metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId); - } + updateLogPosition(lastReadPosition); + // if there was nothing to ship and it's not an error + // set "ageOfLastShippedOp" to to indicate that we're current + metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId); return; } int currentSize = (int) entryBatch.getHeapSize(); @@ -727,8 +740,7 @@ protected void shipEdits(WALEntryBatch entryBatch) { } private void updateLogPosition(long lastReadPosition) { - manager.setPendingShipment(false); - manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition, + manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition, this.replicationQueueInfo.isQueueRecovered(), false); lastLoggedPosition = lastReadPosition; } @@ -740,7 +752,7 @@ public void startup() { public void uncaughtException(final Thread t, final Throwable e) { RSRpcServices.exitIfOOME(e); LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath=" - + getCurrentPath(), e); + + getLastLoggedPath(), e); stopper.stop("Unexpected exception in ReplicationSourceWorkerThread"); } }; @@ -881,8 +893,12 @@ public Path getCurrentPath() { return this.entryReader.getCurrentPath(); } - public long getCurrentPosition() { - return this.lastLoggedPosition; + public Path getLastLoggedPath() { + return lastLoggedPath; + } + + public long getLastLoggedPosition() { + return lastLoggedPosition; } private boolean isWorkerActive() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 071d2c57cf7a..3185e4796065 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -120,7 +120,6 @@ public class ReplicationSourceManager implements ReplicationListener { private final Random rand; private final boolean replicationForBulkLoadDataEnabled; - private boolean pendingShipment; /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -188,19 +187,13 @@ public ReplicationSourceManager(final ReplicationQueues replicationQueues, * @param holdLogInZK if true then the log is retained in ZK */ public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position, - boolean queueRecovered, boolean holdLogInZK) { - if (!this.pendingShipment) { - String fileName = log.getName(); - this.replicationQueues.setLogPosition(id, fileName, position); - if (holdLogInZK) { - return; - } - cleanOldLogs(fileName, id, queueRecovered); + boolean queueRecovered, boolean holdLogInZK) { + String fileName = log.getName(); + this.replicationQueues.setLogPosition(id, fileName, position); + if (holdLogInZK) { + return; } - } - - public synchronized void setPendingShipment(boolean pendingShipment) { - this.pendingShipment = pendingShipment; + cleanOldLogs(fileName, id, queueRecovered); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index ddbf1c9f9407..5b7c179076a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -21,12 +21,11 @@ import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,7 +64,8 @@ public class ReplicationSourceWALReaderThread extends Thread { // max count of each batch - multiply by number of batches in queue to get total private int replicationBatchCountCapacity; // position in the WAL to start reading at - private long currentPosition; + private long lastReadPosition; + private Path lastReadPath; private WALEntryFilter filter; private long sleepForRetries; //Indicates whether this particular worker is running @@ -73,7 +73,6 @@ public class ReplicationSourceWALReaderThread extends Thread { private ReplicationQueueInfo replicationQueueInfo; private int maxRetriesMultiplier; private MetricsSource metrics; - private ReplicationSourceManager replicationSourceManager; /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the @@ -93,7 +92,8 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager, FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) { this.replicationQueueInfo = replicationQueueInfo; this.logQueue = logQueue; - this.currentPosition = startPosition; + this.lastReadPath = logQueue.peek(); + this.lastReadPosition = startPosition; this.fs = fs; this.conf = conf; this.filter = filter; @@ -103,7 +103,6 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager, // memory used will be batchSizeCapacity * (nb.batches + 1) // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); - this.replicationSourceManager = manager; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = @@ -122,13 +121,11 @@ public void run() { int sleepMultiplier = 1; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream try (WALEntryStream entryStream = - new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { + new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can - WALEntryBatch batch = null; - while (entryStream.hasNext()) { - if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); - } + WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity); + boolean hasNext; + while ((hasNext = entryStream.hasNext()) == true) { Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { @@ -136,7 +133,6 @@ public void run() { if (edit != null && !edit.isEmpty()) { long entrySize = getEntrySize(entry); batch.addEntry(entry); - replicationSourceManager.setPendingShipment(true); updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); // Stop if too many entries or too big if (batch.getHeapSize() >= replicationBatchSizeCapacity @@ -144,34 +140,21 @@ public void run() { break; } } - } else { - replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(), - this.replicationQueueInfo.getPeerClusterZnode(), - entryStream.getPosition(), - this.replicationQueueInfo.isQueueRecovered(), false); } } - if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) { - if (LOG.isTraceEnabled()) { - LOG.trace(String.format("Read %s WAL entries eligible for replication", - batch.getNbEntries())); - } - entryBatchQueue.put(batch); + + updateBatch(entryStream, batch, hasNext); + if (isShippable(batch)) { sleepMultiplier = 1; - } else { // got no entries and didn't advance position in WAL - LOG.trace("Didn't read any new entries from WAL"); - if (replicationQueueInfo.isQueueRecovered()) { - // we're done with queue recovery, shut ourself down + entryBatchQueue.put(batch); + if (!batch.hasMoreEntries()) { + // we're done with queue recovery, shut ourselves down setReaderRunning(false); - // shuts down shipper thread immediately - entryBatchQueue.put(batch != null ? batch - : new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath())); - } else { - Thread.sleep(sleepForRetries); } + } else { + Thread.sleep(sleepForRetries); } - currentPosition = entryStream.getPosition(); - entryStream.reset(); // reuse stream + resetStream(entryStream); } } catch (IOException | WALEntryStreamRuntimeException e) { // stream related if (sleepMultiplier < maxRetriesMultiplier) { @@ -189,6 +172,38 @@ public void run() { } } + private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData) { + logMessage(batch); + batch.updatePosition(entryStream); + batch.setMoreEntries(!replicationQueueInfo.isQueueRecovered() || moreData); + } + + private void logMessage(WALEntryBatch batch) { + if (LOG.isTraceEnabled()) { + if (batch.isEmpty()) { + LOG.trace("Didn't read any new entries from WAL"); + } else { + LOG.trace(String.format("Read %s WAL entries eligible for replication", + batch.getNbEntries())); + } + } + } + + private boolean isShippable(WALEntryBatch batch) { + return !batch.isEmpty() || checkIfWALRolled(batch) || !batch.hasMoreEntries(); + } + + private boolean checkIfWALRolled(WALEntryBatch batch) { + return lastReadPath == null && batch.lastWalPath != null + || lastReadPath != null && !lastReadPath.equals(batch.lastWalPath); + } + + private void resetStream(WALEntryStream stream) throws IOException { + lastReadPosition = stream.getPosition(); + lastReadPath = stream.getCurrentPath(); + stream.reset(); // reuse stream + } + // if we get an EOF due to a zero-length log, and there are other logs in queue // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is // enabled, then dump the log @@ -198,8 +213,8 @@ private void handleEofException(Exception e) { try { if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); - logQueue.remove(); - currentPosition = 0; + lastReadPath = logQueue.remove(); + lastReadPosition = 0; } } catch (IOException ioe) { LOG.warn("Couldn't get file length information about log " + logQueue.peek()); @@ -208,12 +223,6 @@ private void handleEofException(Exception e) { } public Path getCurrentPath() { - // if we've read some WAL entries, get the Path we read from - WALEntryBatch batchQueueHead = entryBatchQueue.peek(); - if (batchQueueHead != null) { - return batchQueueHead.lastWalPath; - } - // otherwise, we must be currently reading from the head of the log queue return logQueue.peek(); } @@ -235,6 +244,10 @@ public WALEntryBatch take() throws InterruptedException { return entryBatchQueue.take(); } + public WALEntryBatch poll(long timeout) throws InterruptedException { + return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS); + } + private long getEntrySize(Entry entry) { WALEdit edit = entry.getEdit(); return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit); @@ -334,6 +347,10 @@ public void setReaderRunning(boolean readerRunning) { this.isReaderRunning = readerRunning; } + public long getLastReadPosition() { + return this.lastReadPosition; + } + /** * Holds a batch of WAL entries to replicate, along with some statistics * @@ -350,17 +367,14 @@ static class WALEntryBatch { private int nbHFiles = 0; // heap size of data we need to replicate private long heapSize = 0; - // save the last sequenceid for each region if the table has serial-replication scope - private Map lastSeqIds = new HashMap<>(); + // whether more entries to read exist in WALs or not + private boolean moreEntries = true; /** - * @param walEntries - * @param lastWalPath Path of the WAL the last entry in this batch was read from - * @param lastWalPosition Position in the WAL the last entry in this batch was read from + * @param maxNbEntries the number of entries a batch can have */ - private WALEntryBatch(int maxNbEntries, Path lastWalPath) { + private WALEntryBatch(int maxNbEntries) { this.walEntries = new ArrayList<>(maxNbEntries); - this.lastWalPath = lastWalPath; } public void addEntry(Entry entry) { @@ -420,13 +434,6 @@ public long getHeapSize() { return heapSize; } - /** - * @return the last sequenceid for each region if the table has serial-replication scope - */ - public Map getLastSeqIds() { - return lastSeqIds; - } - private void incrementNbRowKeys(int increment) { nbRowKeys += increment; } @@ -439,8 +446,21 @@ private void incrementHeapSize(long increment) { heapSize += increment; } - private void setLastPosition(String region, Long sequenceId) { - getLastSeqIds().put(region, sequenceId); + public boolean isEmpty() { + return walEntries.isEmpty(); + } + + public void updatePosition(WALEntryStream entryStream) { + lastWalPath = entryStream.getCurrentPath(); + lastWalPosition = entryStream.getPosition(); + } + + public boolean hasMoreEntries() { + return moreEntries; + } + + public void setMoreEntries(boolean moreEntries) { + this.moreEntries = moreEntries; } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 990c5fd81f59..e825a06d3f66 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -16,13 +16,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hbase.replication; +import static org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -33,34 +50,37 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CoordinatedStateManager; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; - -import static org.mockito.Mockito.mock; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; @Category(MediumTests.class) public class TestReplicationSource { @@ -90,6 +110,32 @@ public static void setUpBeforeClass() throws Exception { if (FS.exists(logDir)) FS.delete(logDir, true); } + @Before + public void setup() throws IOException { + if (!FS.exists(logDir)) { + FS.mkdirs(logDir); + } + if (!FS.exists(oldLogDir)) { + FS.mkdirs(oldLogDir); + } + + ReplicationEndpointForTest.contructedCount.set(0); + ReplicationEndpointForTest.startedCount.set(0); + ReplicationEndpointForTest.replicateCount.set(0); + ReplicationEndpointForTest.stoppedCount.set(0); + ReplicationEndpointForTest.lastEntries = null; + } + + @After + public void tearDown() throws IOException { + if (FS.exists(oldLogDir)) { + FS.delete(oldLogDir, true); + } + if (FS.exists(logDir)) { + FS.delete(logDir, true); + } + } + @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL_PEER.shutdownMiniHBaseCluster(); @@ -106,8 +152,6 @@ public static void tearDownAfterClass() throws Exception { @Test public void testLogMoving() throws Exception{ Path logPath = new Path(logDir, "log"); - if (!FS.exists(logDir)) FS.mkdirs(logDir); - if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir); WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration()); for(int i = 0; i < 3; i++) { @@ -183,10 +227,183 @@ public boolean evaluate() throws Exception { } + private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException { + for (int i = 0; i < numEntries; i++) { + byte[] b = Bytes.toBytes(Integer.toString(i)); + KeyValue kv = new KeyValue(b,b,b); + WALEdit edit = new WALEdit(); + edit.add(kv); + WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0, + HConstants.DEFAULT_CLUSTER_ID); + NavigableMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); + key.setScopes(scopes); + writer.append(new WAL.Entry(key, edit)); + writer.sync(); + } + writer.close(); + } + + private long getPosition(WALFactory wals, Path log2, int numEntries) throws IOException { + WAL.Reader reader = wals.createReader(FS, log2); + for (int i = 0; i < numEntries; i++) { + reader.next(); + } + return reader.getPosition(); + } + + private static final class Mocks { + private final ReplicationSourceManager manager = mock(ReplicationSourceManager.class); + private final ReplicationQueues queues = mock(ReplicationQueues.class); + private final ReplicationPeers peers = mock(ReplicationPeers.class); + private final MetricsSource metrics = mock(MetricsSource.class); + private final ReplicationPeer peer = mock(ReplicationPeer.class); + private final ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); + + private Mocks() { + when(peers.getStatusOfPeer(anyString())).thenReturn(true); + when(context.getReplicationPeer()).thenReturn(peer); + } + + ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint) + throws IOException { + final ReplicationSource source = new ReplicationSource(); + endpoint.init(context); + source.init(conf, FS, manager, queues, peers, mock(Stoppable.class), + "testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics); + return source; + } + } + + @Test + public void testSetLogPositionForWALCurrentlyReadingWhenLogsRolled() throws Exception { + final int numWALEntries = 5; + conf.setInt("replication.source.nb.capacity", numWALEntries); + + Mocks mocks = new Mocks(); + final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() { + @Override + public WALEntryFilter getWALEntryfilter() { + return null; + } + }; + WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test"); + final Path log1 = new Path(logDir, "log.1"); + final Path log2 = new Path(logDir, "log.2"); + + WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration()); + WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration()); + + appendEntries(writer1, 3); + appendEntries(writer2, 2); + + long pos = getPosition(wals, log2, 2); + + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint); + source.run(); + + source.enqueueLog(log1); + // log rolled + source.enqueueLog(log2); + + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return endpoint.replicateCount.get() > 0; + } + }); + + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class); + verify(mocks.manager, times(1)) + .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(), + anyBoolean(), anyBoolean()); + assertTrue(endpoint.lastEntries.size() == 5); + assertThat(pathCaptor.getValue(), is(log2)); + assertThat(positionCaptor.getValue(), is(pos)); + } + + @Test + public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws Exception { + Mocks mocks = new Mocks(); + + final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest(); + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint); + WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test"); + + final Path log1 = new Path(logDir, "log.1"); + final Path log2 = new Path(logDir, "log.2"); + + WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration()).close(); + WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration()).close(); + final long startPos = getPosition(wals, log2, 0); + + source.run(); + source.enqueueLog(log1); + source.enqueueLog(log2); + + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return log2.equals(source.getLastLoggedPath()) + && source.getLastLoggedPosition() >= startPos; + } + }); + + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class); + + verify(mocks.manager, times(1)) + .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(), + anyBoolean(), anyBoolean()); + assertThat(pathCaptor.getValue(), is(log2)); + assertThat(positionCaptor.getValue(), is(startPos)); + } + + @Test + public void testSetLogPositionAndRemoveOldWALsEvenIfNoCfsReplicated() throws Exception { + Mocks mocks = new Mocks(); + // set table cfs to filter all cells out + final TableName replicatedTable = TableName.valueOf("replicated_table"); + final Map> cfs = + Collections.singletonMap(replicatedTable, Collections.emptyList()); + when(mocks.peer.getTableCFs()).thenReturn(cfs); + + WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test"); + final Path log1 = new Path(logDir, "log.1"); + final Path log2 = new Path(logDir, "log.2"); + + WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration()); + WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration()); + + appendEntries(writer1, 3); + appendEntries(writer2, 2); + final long pos = getPosition(wals, log2, 2); + + final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest(); + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint); + source.enqueueLog(log1); + source.enqueueLog(log2); + source.run(); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + // wait until reader read all cells + return log2.equals(source.getLastLoggedPath()) && source.getLastLoggedPosition() >= pos; + } + }); + + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class); + + // all old wals should be removed by updating wal position, even if all cells are filtered out. + verify(mocks.manager, times(1)) + .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(), + anyBoolean(), anyBoolean()); + assertThat(pathCaptor.getValue(), is(log2)); + assertThat(positionCaptor.getValue(), is(pos)); + } + /** * Tests that recovered queues are preserved on a regionserver shutdown. * See HBASE-18192 - * @throws Exception */ @Test public void testServerShutdownRecoveredQueue() throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 794fed2830fe..dc0204211d6c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -22,17 +22,18 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.NavigableMap; import java.util.NoSuchElementException; import java.util.TreeMap; @@ -42,16 +43,21 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; +import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.TableCfWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -71,7 +77,6 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; @@ -371,36 +376,123 @@ public void testReplicationSourceWALReaderThread() throws Exception { } @Test - public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() throws Exception { + public void testReplicationSourceWALReaderThreadRecoveredQueue() throws Exception { appendEntriesToLog(3); - // get ending position + log.rollWriter(); + appendEntriesToLog(2); + long position; - try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new WALEntryStream(new PriorityBlockingQueue<>(walQueue), + fs, conf, new MetricsSource("1"))) { + entryStream.next(); + entryStream.next(); entryStream.next(); entryStream.next(); entryStream.next(); position = entryStream.getPosition(); } - // start up a readerThread with a WALEntryFilter that always filter the entries - ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); - ReplicationSourceWALReaderThread readerThread = new ReplicationSourceWALReaderThread( - mockSourceManager, getQueueInfo(), walQueue, 0, fs, conf, new WALEntryFilter() { - @Override - public Entry filter(Entry entry) { - return null; - } - }, new MetricsSource("1")); - readerThread.start(); - Thread.sleep(100); - ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class); - verify(mockSourceManager, times(3)) - .logPositionAndCleanOldLogs(any(Path.class), - anyString(), - positionCaptor.capture(), - anyBoolean(), - anyBoolean()); - assertEquals(position, positionCaptor.getValue().longValue()); + + ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); + ReplicationSourceWALReaderThread reader = + new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(), + walQueue, 0, fs, conf, getDummyFilter(), new MetricsSource("1")); + Path walPath = walQueue.toArray(new Path[2])[1]; + reader.start(); + WALEntryBatch entryBatch = reader.take(); + + assertNotNull(entryBatch); + assertEquals(5, entryBatch.getWalEntries().size()); + assertEquals(position, entryBatch.getLastWalPosition()); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertFalse(entryBatch.hasMoreEntries()); + } + + @Test + public void testReplicationSourceWALReaderThreadWithFilter() throws Exception { + final byte[] notReplicatedCf = Bytes.toBytes("notReplicated"); + final Map> tableCfs = new HashMap<>(); + tableCfs.put(tableName, Collections.singletonList(Bytes.toString(family))); + ReplicationPeer peer = mock(ReplicationPeer.class); + when(peer.getTableCFs()).thenReturn(tableCfs); + WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); + + // add filterable entries + appendToLogPlus(3, notReplicatedCf); + appendToLogPlus(3, notReplicatedCf); + appendToLogPlus(3, notReplicatedCf); + + // add non filterable entries + appendEntriesToLog(2); + + ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); + final ReplicationSourceWALReaderThread reader = + new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue, + 0, fs, conf, filter, new MetricsSource("1")); + reader.start(); + + WALEntryBatch entryBatch = reader.take(); + + assertNotNull(entryBatch); + assertFalse(entryBatch.isEmpty()); + List walEntries = entryBatch.getWalEntries(); + assertEquals(2, walEntries.size()); + for (Entry entry : walEntries) { + ArrayList cells = entry.getEdit().getCells(); + assertTrue(cells.size() == 1); + assertTrue(CellUtil.matchingFamily(cells.get(0), family)); + } + } + + @Test + public void testReplicationSourceWALReaderThreadWithFilterWhenLogRolled() throws Exception { + final byte[] notReplicatedCf = Bytes.toBytes("notReplicated"); + final Map> tableCfs = new HashMap<>(); + tableCfs.put(tableName, Collections.singletonList(Bytes.toString(family))); + ReplicationPeer peer = mock(ReplicationPeer.class); + when(peer.getTableCFs()).thenReturn(tableCfs); + WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); + + appendToLogPlus(3, notReplicatedCf); + + Path firstWAL = walQueue.peek(); + final long eof = getPosition(firstWAL); + + ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); + final ReplicationSourceWALReaderThread reader = + new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue, + 0, fs, conf, filter, new MetricsSource("1")); + reader.start(); + + // reader won't put any batch, even if EOF reached. + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() { + return reader.getLastReadPosition() >= eof; + } + }); + assertNull(reader.poll(0)); + + log.rollWriter(); + + // should get empty batch with current wal position, after wal rolled + WALEntryBatch entryBatch = reader.take(); + + Path lastWAL= walQueue.peek(); + long positionToBeLogged = getPosition(lastWAL); + + assertNotNull(entryBatch); + assertTrue(entryBatch.isEmpty()); + assertEquals(1, walQueue.size()); + assertNotEquals(firstWAL, entryBatch.getLastWalPath()); + assertEquals(lastWAL, entryBatch.getLastWalPath()); + assertEquals(positionToBeLogged, entryBatch.getLastWalPosition()); + } + + private long getPosition(Path walPath) throws IOException { + WALEntryStream entryStream = + new WALEntryStream(new PriorityBlockingQueue<>(Collections.singletonList(walPath)), + fs, conf, new MetricsSource("1")); + entryStream.hasNext(); + return entryStream.getPosition(); } private String getRow(WAL.Entry entry) { @@ -426,17 +518,25 @@ private void appendToLog() throws IOException { } private void appendToLogPlus(int count) throws IOException { + appendToLogPlus(count, family, qualifier); + } + + private void appendToLogPlus(int count, byte[] cf) throws IOException { + appendToLogPlus(count, cf, qualifier); + } + + private void appendToLogPlus(int count, byte[] cf, byte[] cq) throws IOException { final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc), - getWALEdits(count), true); + getWALEdits(count, cf, cq), true); log.sync(txid); } - private WALEdit getWALEdits(int count) { + private WALEdit getWALEdits(int count, byte[] cf, byte[] cq) { WALEdit edit = new WALEdit(); for (int i = 0; i < count; i++) { - edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, - System.currentTimeMillis(), qualifier)); + edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), cf, cq, + System.currentTimeMillis(), cq)); } return edit; } @@ -458,8 +558,16 @@ public Entry filter(Entry entry) { }; } + private ReplicationQueueInfo getRecoveredQueueInfo() { + return getQueueInfo("1-1"); + } + private ReplicationQueueInfo getQueueInfo() { - return new ReplicationQueueInfo("1"); + return getQueueInfo("1"); + } + + private ReplicationQueueInfo getQueueInfo(String znode) { + return new ReplicationQueueInfo(znode); } class PathWatcher extends WALActionsListener.Base { @@ -472,5 +580,4 @@ public void preLogRoll(Path oldPath, Path newPath) throws IOException { currentPath = newPath; } } - }