-
Notifications
You must be signed in to change notification settings - Fork 3.4k
[HBASE-25536] Remove 0 length wal file from logQueue if it belongs to… #2908
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ | |
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.FSDataOutputStream; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.hbase.Cell; | ||
|
|
@@ -652,4 +653,33 @@ public void testReadBeyondCommittedLength() throws IOException, InterruptedExcep | |
| assertFalse(entryStream.hasNext()); | ||
| } | ||
| } | ||
|
|
||
| /* | ||
| Test removal of 0 length log from logQueue if the source is a recovered source and | ||
| size of logQueue is only 1. | ||
| */ | ||
| @Test | ||
| public void testEOFExceptionForRecoveredQueue() throws Exception { | ||
| PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>(); | ||
| // Create a 0 length log. | ||
| Path emptyLog = new Path("emptyLog"); | ||
| FSDataOutputStream fsdos = fs.create(emptyLog); | ||
| fsdos.close(); | ||
| assertEquals(0, fs.getFileStatus(emptyLog).getLen()); | ||
| queue.add(emptyLog); | ||
|
|
||
| Configuration conf = new Configuration(CONF); | ||
| // Override the max retries multiplier to fail fast. | ||
| conf.setInt("replication.source.maxretriesmultiplier", 1); | ||
| conf.setBoolean("replication.source.eof.autorecovery", true); | ||
| // Create a reader thread with source as recovered source. | ||
| ReplicationSource source = mockReplicationSource(true, conf); | ||
| when(source.isPeerEnabled()).thenReturn(true); | ||
| ReplicationSourceWALReader reader = | ||
| new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), source); | ||
| reader.run(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
| // ReplicationSourceWALReaderThread#handleEofException method will | ||
| // remove empty log from logQueue. | ||
| assertEquals(0, queue.size()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, If we use It's upto you, this is anyways trivial. |
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice find, mind adding a quick comment since this is a subtle behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a related note, should we add a source and global metric counter to track the number of 0 size files dequeued? Seems more common than we thought, easy to monitor may be..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bharathv Good idea. Could you please create a separate jira to track this. Thank you !