-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF #2975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -237,6 +237,10 @@ public void enqueueLog(Path log) { | |
| } | ||
| } | ||
|
|
||
| public Map<String, PriorityBlockingQueue<Path>> getQueues() { | ||
| return queues; | ||
| } | ||
|
|
||
| @Override | ||
| public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) | ||
| throws ReplicationException { | ||
|
|
@@ -840,7 +844,7 @@ public void uncaughtException(final Thread t, final Throwable e) { | |
|
|
||
| // If this is a recovered queue, the queue is already full and the first log | ||
| // normally has a position (unless the RS failed between 2 logs) | ||
| private long getRecoveredQueueStartPos(long startPosition) { | ||
| public long getRecoveredQueueStartPos(long startPosition) { | ||
|
||
| try { | ||
| startPosition = | ||
| (replicationQueues.getLogPosition(peerClusterZnode, this.queue.peek().getName())); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -50,7 +50,8 @@ | |
| import org.apache.hadoop.hbase.wal.WALKey; | ||
|
|
||
| /** | ||
| * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue | ||
| * Reads and filters WAL entries, groups the filtered entries into batches, | ||
| * and puts the batches onto a queue | ||
| * | ||
| */ | ||
| @InterfaceAudience.Private | ||
|
|
@@ -87,7 +88,7 @@ public class ReplicationSourceWALReaderThread extends Thread { | |
| * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the | ||
| * entries, and puts them on a batch queue. | ||
| * @param manager replication manager | ||
| * @param replicationQueueInfo | ||
| * @param replicationQueueInfo replication queue info | ||
| * @param logQueue The WAL queue to read off of | ||
| * @param startPosition position in the first WAL to start reading from | ||
| * @param fs the files system to use | ||
|
|
@@ -133,71 +134,126 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager, | |
| @Override | ||
| public void run() { | ||
| int sleepMultiplier = 1; | ||
| while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream | ||
| try (WALEntryStream entryStream = | ||
| new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) { | ||
| while (isReaderRunning()) { // loop here to keep reusing stream while we can | ||
| if (!source.isPeerEnabled()) { | ||
| Threads.sleep(sleepForRetries); | ||
| continue; | ||
| } | ||
| if (!checkQuota()) { | ||
| continue; | ||
| } | ||
| WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity); | ||
| boolean hasNext; | ||
| while ((hasNext = entryStream.hasNext()) == true) { | ||
| Entry entry = entryStream.next(); | ||
| entry = filterEntry(entry); | ||
| if (entry != null) { | ||
| WALEdit edit = entry.getEdit(); | ||
| if (edit != null && !edit.isEmpty()) { | ||
| long entrySize = getEntrySizeIncludeBulkLoad(entry); | ||
| long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); | ||
| batch.addEntry(entry, entrySize); | ||
| updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); | ||
| boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad); | ||
| // Stop if too many entries or too big | ||
| if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity | ||
| WALEntryBatch batch = null; | ||
| WALEntryStream entryStream = | ||
| new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics); | ||
| try { | ||
sandeepvinayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| while (isReaderRunning()) { // we only loop back here if something fatal happens to stream | ||
| try { | ||
| entryStream = new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics); | ||
| while (isReaderRunning()) { // loop here to keep reusing stream while we can | ||
| if (!source.isPeerEnabled()) { | ||
| Threads.sleep(sleepForRetries); | ||
| continue; | ||
| } | ||
| if (!checkQuota()) { | ||
| continue; | ||
| } | ||
| batch = new WALEntryBatch(replicationBatchCountCapacity); | ||
| boolean hasNext = entryStream.hasNext(); | ||
| while (hasNext) { | ||
| Entry entry = entryStream.next(); | ||
| entry = filterEntry(entry); | ||
| if (entry != null) { | ||
| WALEdit edit = entry.getEdit(); | ||
| if (edit != null && !edit.isEmpty()) { | ||
| long entrySize = getEntrySizeIncludeBulkLoad(entry); | ||
| long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); | ||
| batch.addEntry(entry, entrySize); | ||
| updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); | ||
| boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad); | ||
| // Stop if too many entries or too big | ||
| if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity | ||
| || batch.getNbEntries() >= replicationBatchCountCapacity) { | ||
| break; | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| hasNext = entryStream.hasNext(); | ||
| } | ||
| } | ||
|
|
||
| updateBatch(entryStream, batch, hasNext); | ||
| if (isShippable(batch)) { | ||
| sleepMultiplier = 1; | ||
| entryBatchQueue.put(batch); | ||
| if (!batch.hasMoreEntries()) { | ||
| // we're done with queue recovery, shut ourselves down | ||
| setReaderRunning(false); | ||
| // If the batch has data to max capacity or stream doesn't have anything | ||
| // try to ship it | ||
| if (isBatchQueuedToBeShipped(entryStream, batch, hasNext, false)) { | ||
| sleepMultiplier = 1; | ||
| } | ||
| } | ||
| } catch (IOException | WALEntryStreamRuntimeException e) { // stream related | ||
| if (handleEofException(e, entryStream, batch)) { | ||
| sleepMultiplier = 1; | ||
| } else { | ||
| Thread.sleep(sleepForRetries); | ||
| if (sleepMultiplier < maxRetriesMultiplier) { | ||
| LOG.debug("Failed to read stream of replication entries: " + e); | ||
sandeepvinayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| sleepMultiplier++; | ||
| } else { | ||
| LOG.error("Failed to read stream of replication entries", e); | ||
| } | ||
| Threads.sleep(sleepForRetries * sleepMultiplier); | ||
| } | ||
| resetStream(entryStream); | ||
| } catch (InterruptedException e) { | ||
| LOG.trace("Interrupted while sleeping between WAL reads"); | ||
| Thread.currentThread().interrupt(); | ||
| } finally { | ||
sandeepvinayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| entryStream.close(); | ||
sandeepvinayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } catch (IOException | WALEntryStreamRuntimeException e) { // stream related | ||
| if (sleepMultiplier < maxRetriesMultiplier) { | ||
| LOG.debug("Failed to read stream of replication entries: " + e); | ||
| sleepMultiplier++; | ||
| } else { | ||
| LOG.error("Failed to read stream of replication entries", e); | ||
| handleEofException(e); | ||
| } | ||
| Threads.sleep(sleepForRetries * sleepMultiplier); | ||
| } catch (InterruptedException e) { | ||
| LOG.trace("Interrupted while sleeping between WAL reads"); | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } catch (IOException e) { | ||
sandeepvinayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (sleepMultiplier < maxRetriesMultiplier) { | ||
| LOG.debug("Failed to read stream of replication entries: " + e); | ||
| sleepMultiplier++; | ||
| } else { | ||
| LOG.error("Failed to read stream of replication entries", e); | ||
| } | ||
| Threads.sleep(sleepForRetries * sleepMultiplier); | ||
| } catch (InterruptedException e) { | ||
| LOG.trace("Interrupted while sleeping between WAL reads"); | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } | ||
|
|
||
| private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData) { | ||
| /** | ||
| * Update the batch try to ship and return true if shipped | ||
| * @param entryStream stream of the WALs | ||
| * @param batch Batch of entries to ship | ||
| * @param hasMoreData if the stream has more yet more data to read | ||
| * @param isEOFException if we have hit the EOF exception before this. For EOF exception, | ||
| * we do not want to reset the stream since entry stream doesn't | ||
| * have correct information. | ||
| * @return if batch is shipped successfully | ||
| * @throws InterruptedException throws interrupted exception | ||
| * @throws IOException throws io exception from stream | ||
| */ | ||
| private boolean isBatchQueuedToBeShipped(WALEntryStream entryStream, WALEntryBatch batch, | ||
|
||
| boolean hasMoreData, boolean isEOFException) throws InterruptedException, IOException { | ||
| updateBatch(entryStream, batch, hasMoreData, isEOFException); | ||
| boolean isDataQueued = false; | ||
| if (isShippable(batch)) { | ||
| isDataQueued = true; | ||
| entryBatchQueue.put(batch); | ||
| if (!batch.hasMoreEntries()) { | ||
| // we're done with queue recovery, shut ourselves down | ||
sandeepvinayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| setReaderRunning(false); | ||
| } | ||
| } else { | ||
| Thread.sleep(sleepForRetries); | ||
| } | ||
|
|
||
| if (!isEOFException) { | ||
| resetStream(entryStream); | ||
| } | ||
| return isDataQueued; | ||
| } | ||
|
|
||
| private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData, | ||
| boolean isEOFException) { | ||
| logMessage(batch); | ||
| batch.updatePosition(entryStream); | ||
| // In case of EOF exception we can utilize the last read path and position | ||
| // since we do not have the current information. | ||
| if (isEOFException) { | ||
| batch.updatePosition(lastReadPath, lastReadPosition); | ||
| } else { | ||
| batch.updatePosition(entryStream.getCurrentPath(), entryStream.getPosition()); | ||
| } | ||
| batch.setMoreEntries(!replicationQueueInfo.isQueueRecovered() || moreData); | ||
| } | ||
|
|
||
|
|
@@ -227,10 +283,18 @@ private void resetStream(WALEntryStream stream) throws IOException { | |
| stream.reset(); // reuse stream | ||
| } | ||
|
|
||
| // if we get an EOF due to a zero-length log, and there are other logs in queue | ||
| // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is | ||
| // enabled, then dump the log | ||
| private void handleEofException(Exception e) { | ||
| /** | ||
| * This is to handle the EOFException from the WAL entry stream. EOFException should | ||
sandeepvinayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * be handled carefully because there are chances of data loss because of never replicating | ||
| * the data. | ||
| * If EOFException happens on the last log in recovered queue, we can safely stop | ||
| * the reader. | ||
| * If EOException doesn't happen on the last log in recovered queue, we should | ||
| * not stop the reader. | ||
| * @return true only the IOE can be handled | ||
| */ | ||
| private boolean handleEofException(Exception e, WALEntryStream entryStream, | ||
| WALEntryBatch batch) throws InterruptedException { | ||
| boolean isRecoveredSource = manager.getOldSources().contains(source); | ||
| // Dump the log even if logQueue size is 1 if the source is from recovered Source since we don't | ||
| // add current log to recovered source queue so it is safe to remove. | ||
|
|
@@ -241,11 +305,22 @@ private void handleEofException(Exception e) { | |
| LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); | ||
| lastReadPath = logQueue.remove(); | ||
| lastReadPosition = 0; | ||
|
|
||
| // If it was on last log in the recovered queue, | ||
| // the stream doesn't have more data, we should stop the reader | ||
| boolean hasMoreData = !logQueue.isEmpty(); | ||
| // After we removed the WAL from the queue, we should | ||
| // try shipping the existing batch of entries, we do not want to reset | ||
| // stream since entry stream doesn't have the correct data at this point | ||
| isBatchQueuedToBeShipped(entryStream, batch, hasMoreData, true); | ||
|
||
| return true; | ||
| } | ||
| } catch (IOException ioe) { | ||
| LOG.warn("Couldn't get file length information about log " + logQueue.peek()); | ||
| } | ||
| } | ||
|
|
||
| return false; | ||
| } | ||
|
|
||
| public Path getCurrentPath() { | ||
|
|
@@ -295,7 +370,8 @@ public long getEntrySizeExcludeBulkLoad(Entry entry) { | |
| return edit.heapSize() + key.estimatedSerializedSizeOf(); | ||
| } | ||
|
|
||
| private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) { | ||
| private void updateBatchStats(WALEntryBatch batch, Entry entry, | ||
| long entryPosition, long entrySize) { | ||
| WALEdit edit = entry.getEdit(); | ||
| if (edit != null && !edit.isEmpty()) { | ||
| batch.incrementHeapSize(entrySize); | ||
|
|
@@ -405,7 +481,7 @@ public long getLastReadPosition() { | |
| * Holds a batch of WAL entries to replicate, along with some statistics | ||
| * | ||
| */ | ||
| static class WALEntryBatch { | ||
| final static class WALEntryBatch { | ||
sandeepvinayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private List<Pair<Entry, Long>> walEntriesWithSize; | ||
| // last WAL that was read | ||
| private Path lastWalPath; | ||
|
|
@@ -511,9 +587,9 @@ public boolean isEmpty() { | |
| return walEntriesWithSize.isEmpty(); | ||
| } | ||
|
|
||
| public void updatePosition(WALEntryStream entryStream) { | ||
| lastWalPath = entryStream.getCurrentPath(); | ||
| lastWalPosition = entryStream.getPosition(); | ||
| public void updatePosition(Path currentPath, long currentPosition) { | ||
sandeepvinayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| lastWalPath = currentPath; | ||
| lastWalPosition = currentPosition; | ||
| } | ||
|
|
||
| public boolean hasMoreEntries() { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.