-
Notifications
You must be signed in to change notification settings - Fork 398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
not trying to hold leases on WAL files if we are holding them already. #142
Conversation
Can one of the admins verify this patch? |
@skyahead We meet the same question recently. ERROR Error closing hdfs://192.168.101.55:8020/logs/*_/1/log. |
@lakeofsand I do not see exactly the same exceptions, but I do see lots of similar ones :-) But I think after all those 'Error closing' exceptions, there is a 'topicPartitionWriters.remove(tp)' call in DataWriter.java's close() method, which will remove this writer and a new one will be recreated for the current partition. I.e., the connect code should survive these exceptions. Do you see your coding keeps generating these errors for ever? But, It seems you are using a version that is older than this commit: b2b1c61#diff-d4f63c72e615f6185c4d472918ba1e95, and the close() method is called |
Our issue seems like because of some bug in hdfs client(): @skyahead org.apache.hadoop.hdfs.DFSOutputStream.java: |
@lakeofsand When your errors happen, do you see anything wrong in your HDFS namenode log file? |
@skyahead Neither the process itself nor other process‘s opertions to this file will be failed for the lease is still owned by the previous process. " |
@skyahead it's definitely dangerous to parse an exception message (these can change without warning if we say upgrade the HDFS client dependency). However, the symptom you describe in issue #141 looks like the writer isn't set to null which happens here It seems possible that the writer didn't close properly in your scenario because you cut the network. In which case we thrown an exception and never null out the writer, so we don't ever create a new writer (thus obtaining a new lease). Maybe a better approach is to null out the writer and reader in a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@skyahead left you a few comments and suggestions. I think there's a couple of accidental changes in here and I think we can be a bit cleaner by modifying close() instead of adding a hard reset option
@@ -96,6 +97,11 @@ public void acquireLease() throws ConnectException { | |||
} | |||
} | |||
|
|||
private void reset() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding a new method, could we just do this in the close()
method in a finally block so that we attempt to close the reader/writer gracefully first before nulling them out? Then replace calls to reset()
with close()
in this file instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point! Will do.
out = streamOption.getValue(); | ||
|
||
init(conf, out, ownStream); | ||
} catch (RemoteException re) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this part a bit more? I'm not sure why we need to look for this exception here but I might just be missing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the how the leaseException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException" can be caught. See here: https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java#L78.
When this exception is seen, it means we are creating a new lease from a same DFSClient. Previously, we give up and keep the original lease which may lead to the forever waiting issue. The change close open files and clear the FileSystem cache. A new lease will be regenerated by future writing/reading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I can't remember the details but I think the issue is which jar the class is defined in or something like that. So we had to go down the hacky route of checking the class name.
@@ -73,6 +73,7 @@ public void run() { | |||
}); | |||
thread.start(); | |||
|
|||
Thread.sleep(3001); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems unrelated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping to fix the test but it seems I was wrong. Will commit something new for this.
long start = startOpt == null ? 0 : startOpt.getValue(); | ||
// really set up | ||
initialize(filename, file, start, len, conf, headerOnly != null); | ||
} catch (RemoteException re) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the other RemoteException, can you explain why we need to catch this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WALTest fixed, please review
@@ -96,6 +97,11 @@ public void acquireLease() throws ConnectException { | |||
} | |||
} | |||
|
|||
private void reset() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point! Will do.
out = streamOption.getValue(); | ||
|
||
init(conf, out, ownStream); | ||
} catch (RemoteException re) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the how the leaseException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException" can be caught. See here: https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java#L78.
When this exception is seen, it means we are creating a new lease from a same DFSClient. Previously, we give up and keep the original lease which may lead to the forever waiting issue. The change close open files and clear the FileSystem cache. A new lease will be regenerated by future writing/reading.
@@ -73,6 +73,7 @@ public void run() { | |||
}); | |||
thread.start(); | |||
|
|||
Thread.sleep(3001); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping to fix the test but it seems I was wrong. Will commit something new for this.
@@ -60,12 +61,16 @@ public void append(String tempFile, String committedFile) throws ConnectExceptio | |||
writer.append(key, value); | |||
writer.hsync(); | |||
} catch (IOException e) { | |||
close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, so just for the sake of debuggability, if we're doing work in the exception handler that could itself throw exceptions, I think we might want to at least log that something went wrong. Because with the current code, if close()
throws an exception, then all the info about the original exception is lost.
This is true for a couple of the other similar changes below. The only alternative I could think of is allowing close()
to take an extra parameter that's the original cause of calling close (or null if there isn't one) and attaching that as the cause if it fails, but then we also lose info about the other exception. I think logging this one proactively is probably fine since these should all be exceptional conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please have a look now.
out = streamOption.getValue(); | ||
|
||
init(conf, out, ownStream); | ||
} catch (RemoteException re) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I can't remember the details but I think the issue is which jar the class is defined in or something like that. So we had to go down the hacky route of checking the class name.
|
||
init(conf, out, ownStream); | ||
} catch (RemoteException re) { | ||
log.error("Failed creating a WAL Writer: " + re.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do
log.error("Failed creating a WAL Writer: ", re);
instead so the log will include stack trace info?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, is the log even going to be useful since it gets logged again by FSWAL
which calls this constructor? I think this might be the case in the other case below as well, though I haven't extensively checked all callers. I guess worst case we're just logging a bit more information, so perhaps being conservative about logging the details isn't a bad idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was hoping to log a bit more information so that it is easier to read the debug log which is VERY long.
So re.getMessage() is the short version that I like :-) If we log the stack trace, the same info will be logged again all the way up at https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L324.
log.error("Failed creating a WAL Writer: " + re.getMessage()); | ||
if (re.getClassName().equals(leaseException)) { | ||
if (fs != null) { | ||
fs.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need protection from a possible IOException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. I think IOException will be caught here: https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L324.
@@ -27,4 +27,5 @@ | |||
void truncate() throws ConnectException; | |||
void close() throws ConnectException; | |||
String getLogFile(); | |||
long getSleepIntervalMs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, a bit of a nit, but since this is only used for testing I'm not sure about making it public for testing. In fact, we separately have some work going on to refactor some of this code to make it more reusable and this seems like an odd addition to the interface given that it's really just public for testing. /cc @kkonstantine
I'm wondering if just casting to the more specific FSWAL class in the test would be a better solution (and label the FSWAL public API as public for testing)? Keeps the interface clean but allows the test to access the info that it needs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great idea
I don't have any other comments, what do you think @ewencp ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your PR. I got a pointer to review an addition to the WAL interface that's not present any more, so I thought I give it an look.
Looks good in general. I've added some nitpicks that would help clean-up the code a little bit.
throw new ConnectException(e); | ||
} | ||
} | ||
|
||
public long getSleepIntervalMs() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could have "package private" scope instead and a non-javadoc comment above to show it's destined for testing.
@@ -143,6 +151,8 @@ public void truncate() throws ConnectException { | |||
close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant call, since close
has been added in the finally
block.
@@ -159,6 +169,9 @@ public void close() throws ConnectException { | |||
} | |||
} catch (IOException e) { | |||
throw new ConnectException("Error closing " + logFile, e); | |||
} finally { | |||
writer = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't point to the actual lines, but nullifying reader and writer above, is redundant now that it's done within the finally
block.
@@ -56,6 +57,8 @@ | |||
private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash | |||
private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash | |||
|
|||
private static final String leaseException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It'd be nice for variable naming to be consistent with our code style for static final
member fields.
Thus, it should be something like: LEASE_EXCEPTION_CLASS_NAME
(I added more to the name to show it's a class name string and not to actual class. Your call).
I know we don't run checkstyle currently, but we will soon.
@@ -43,7 +43,7 @@ public void testWALMultiClient() throws Exception { | |||
Storage storage = StorageFactory.createStorage(storageClass, conf, url); | |||
|
|||
final WAL wal1 = storage.wal(topicsDir, TOPIC_PARTITION); | |||
final WAL wal2 = storage.wal(topicsDir, TOPIC_PARTITION); | |||
final FSWAL wal2 = (FSWAL)storage.wal(topicsDir, TOPIC_PARTITION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single white space needed between casting and storage
// holding the lease for awhile | ||
Thread.sleep(3000); | ||
// holding the lease for time that is less than wal2's retry interval, which is 1000 ms. | ||
Thread.sleep(wal2.getSleepIntervalMs()-100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
White space needed around -
(Again checkstyle coming soon :) )
@@ -73,6 +73,8 @@ public void run() { | |||
}); | |||
thread.start(); | |||
|
|||
// AcquireLease will try to acquire the same lease that wal1 is holding and fail |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd use the actual method name (acquireLease
) if I wanted to refer to it, instead of the concept ("Acquiring the lease")
@kkonstantine Thanks for the review! how about now? |
Great, thanks. I see you also pushed the declaration of initial and max intervals as constants outside the method, I forgot to mention that. Ok by me, I'd wait for final comments from @ewencp. |
@kkonstantine thanks for replying so promptly :-) |
@ewencp Can you have a final look at this PR please? |
@ewencp @kkonstantine |
Good day for everyone! |
Can one of the admins verify this patch? |
ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM. I merged w/ master to clean up the conflicts. @kkonstantine any further thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems ok overall, but one change needs justification (see comment)
|
||
private WALFile.Writer writer = null; | ||
private WALFile.Reader reader = null; | ||
private String logFile = null; | ||
private HdfsSinkConnectorConfig conf = null; | ||
private HdfsStorage storage = null; | ||
private long sleepIntervalMs = WALConstants.INITIAL_SLEEP_INTERVAL_MS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this one upgraded from local variable to member field variable? The behavior definitely changes on repeated calls on the same object. Notice that this variable is mutated iteratively as part of the while
loop in acquiredLease
. Even if it happens and there's only one call of acquireLease
per FSWAL
object I think it's a better practice to keep it local if that's what we intend to do here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
@@ -0,0 +1,23 @@ | |||
/** | |||
* Copyright 2015 Confluent Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: If you apply any changes based on the comment above, you may change this one too to be 2017
instead. Otherwise, never mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! Thanks for the fix @skyahead!
LGTM
This PR is for: #141