-
Notifications
You must be signed in to change notification settings - Fork 398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
not trying to hold leases on WAL files if we are holding them already. #142
Changes from 4 commits
cbf52b3
20184d5
4d51138
06cca38
d76ad1d
c10caff
943a089
e28ec4f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,7 @@ public class FSWAL implements WAL { | |
private String logFile = null; | ||
private Configuration conf = null; | ||
private Storage storage = null; | ||
private long sleepIntervalMs = 1000L; | ||
|
||
public FSWAL(String logsDir, TopicPartition topicPart, Storage storage) | ||
throws ConnectException { | ||
|
@@ -60,12 +61,16 @@ public void append(String tempFile, String committedFile) throws ConnectExceptio | |
writer.append(key, value); | ||
writer.hsync(); | ||
} catch (IOException e) { | ||
close(); | ||
throw new ConnectException(e); | ||
} | ||
} | ||
|
||
public long getSleepIntervalMs() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Could have "package private" scope instead and a non-javadoc comment above to show it's destined for testing. |
||
return sleepIntervalMs; | ||
} | ||
|
||
public void acquireLease() throws ConnectException { | ||
long sleepIntervalMs = 1000L; | ||
long MAX_SLEEP_INTERVAL_MS = 16000L; | ||
while (sleepIntervalMs < MAX_SLEEP_INTERVAL_MS) { | ||
try { | ||
|
@@ -128,6 +133,7 @@ public void apply() throws ConnectException { | |
} | ||
} | ||
} catch (IOException e) { | ||
close(); | ||
throw new ConnectException(e); | ||
} | ||
} | ||
|
@@ -143,6 +149,8 @@ public void truncate() throws ConnectException { | |
close(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Redundant call, since |
||
} catch (IOException e) { | ||
throw new ConnectException(e); | ||
} finally { | ||
close(); | ||
} | ||
} | ||
|
||
|
@@ -159,6 +167,9 @@ public void close() throws ConnectException { | |
} | ||
} catch (IOException e) { | ||
throw new ConnectException("Error closing " + logFile, e); | ||
} finally { | ||
writer = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't point to the actual lines, but nullifying reader and writer above, is redundant now that it's done within the |
||
reader = null; | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,4 +27,5 @@ public interface WAL { | |
void truncate() throws ConnectException; | ||
void close() throws ConnectException; | ||
String getLogFile(); | ||
long getSleepIntervalMs(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, a bit of a nit, but since this is only used for testing I'm not sure about making it public for testing. In fact, we separately have some work going on to refactor some of this code to make it more reusable and this seems like an odd addition to the interface given that it's really just public for testing. /cc @kkonstantine I'm wondering if just casting to the more specific FSWAL class in the test would be a better solution (and label the FSWAL public API as public for testing)? Keeps the interface clean but allows the test to access the info that it needs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. great idea |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ | |
import org.apache.hadoop.io.serializer.Deserializer; | ||
import org.apache.hadoop.io.serializer.SerializationFactory; | ||
import org.apache.hadoop.io.serializer.Serializer; | ||
import org.apache.hadoop.ipc.RemoteException; | ||
import org.apache.hadoop.util.Options; | ||
import org.apache.hadoop.util.Time; | ||
|
||
|
@@ -56,6 +57,8 @@ public class WALFile { | |
private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash | ||
private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash | ||
|
||
private static final String leaseException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: It'd be nice for variable naming to be consistent with our code style for I know we don't run checkstyle currently, but we will soon. |
||
|
||
/** | ||
* The number of bytes between sync points. | ||
*/ | ||
|
@@ -194,40 +197,52 @@ static class AppendIfExistsOption extends Options.BooleanOption implements Optio | |
"compatible with stream"); | ||
} | ||
|
||
FileSystem fs = null; | ||
FSDataOutputStream out; | ||
boolean ownStream = fileOption != null; | ||
if (ownStream) { | ||
Path p = fileOption.getValue(); | ||
FileSystem fs; | ||
fs = p.getFileSystem(conf); | ||
int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : | ||
bufferSizeOption.getValue(); | ||
short replication = replicationOption == null ? | ||
fs.getDefaultReplication(p) : | ||
(short) replicationOption.getValue(); | ||
long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : | ||
blockSizeOption.getValue(); | ||
|
||
if (appendIfExistsOption != null && appendIfExistsOption.getValue() | ||
|
||
try { | ||
if (ownStream) { | ||
Path p = fileOption.getValue(); | ||
fs = p.getFileSystem(conf); | ||
int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : | ||
bufferSizeOption.getValue(); | ||
short replication = replicationOption == null ? | ||
fs.getDefaultReplication(p) : | ||
(short) replicationOption.getValue(); | ||
long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : | ||
blockSizeOption.getValue(); | ||
|
||
if (appendIfExistsOption != null && appendIfExistsOption.getValue() | ||
&& fs.exists(p)) { | ||
// Read the file and verify header details | ||
try (WALFile.Reader reader = | ||
// Read the file and verify header details | ||
try (WALFile.Reader reader = | ||
new WALFile.Reader(conf, WALFile.Reader.file(p), new Reader.OnlyHeaderOption())){ | ||
if (reader.getVersion() != VERSION[3]) { | ||
throw new VersionMismatchException(VERSION[3], reader.getVersion()); | ||
if (reader.getVersion() != VERSION[3]) { | ||
throw new VersionMismatchException(VERSION[3], reader.getVersion()); | ||
} | ||
sync = reader.getSync(); | ||
} | ||
sync = reader.getSync(); | ||
out = fs.append(p, bufferSize); | ||
this.appendMode = true; | ||
} else { | ||
out = fs.create(p, true, bufferSize, replication, blockSize); | ||
} | ||
out = fs.append(p, bufferSize); | ||
this.appendMode = true; | ||
} else { | ||
out = fs.create(p, true, bufferSize, replication, blockSize); | ||
out = streamOption.getValue(); | ||
} | ||
} else { | ||
out = streamOption.getValue(); | ||
|
||
init(conf, out, ownStream); | ||
} catch (RemoteException re) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain this part a bit more? I'm not sure why we need to look for this exception here but I might just be missing it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the how the leaseException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException" can be caught. See here: https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java#L78. When this exception is seen, it means we are creating a new lease from a same DFSClient. Previously, we give up and keep the original lease which may lead to the forever waiting issue. The change close open files and clear the FileSystem cache. A new lease will be regenerated by future writing/reading. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I can't remember the details but I think the issue is which jar the class is defined in or something like that. So we had to go down the hacky route of checking the class name. |
||
log.error("Failed creating a WAL Writer: " + re.getMessage()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we do
instead so the log will include stack trace info? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact, is the log even going to be useful since it gets logged again by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I was hoping to log a bit more information so that it is easier to read the debug log which is VERY long. So re.getMessage() is the short version that I like :-) If we log the stack trace, the same info will be logged again all the way up at https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L324. |
||
if (re.getClassName().equals(leaseException)) { | ||
if (fs != null) { | ||
fs.close(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need protection from a possible There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope. I think IOException will be caught here: https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L324. |
||
} | ||
} | ||
throw re; | ||
} | ||
|
||
init(conf, out, ownStream); | ||
} | ||
|
||
void init(Configuration conf, FSDataOutputStream out, boolean ownStream) | ||
|
@@ -499,6 +514,7 @@ public Reader(Configuration conf, Option... opts) throws IOException { | |
BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class, opts); | ||
OnlyHeaderOption headerOnly = | ||
Options.getOption(OnlyHeaderOption.class, opts); | ||
|
||
// check for consistency | ||
if ((fileOpt == null) == (streamOpt == null)) { | ||
throw new | ||
|
@@ -508,25 +524,38 @@ public Reader(Configuration conf, Option... opts) throws IOException { | |
throw new IllegalArgumentException("buffer size can only be set when" + | ||
" a file is specified."); | ||
} | ||
|
||
// figure out the real values | ||
Path filename = null; | ||
FSDataInputStream file; | ||
final long len; | ||
if (fileOpt != null) { | ||
filename = fileOpt.getValue(); | ||
FileSystem fs = filename.getFileSystem(conf); | ||
int bufSize = bufOpt == null ? getBufferSize(conf) : bufOpt.getValue(); | ||
len = null == lenOpt | ||
? fs.getFileStatus(filename).getLen() | ||
: lenOpt.getValue(); | ||
file = openFile(fs, filename, bufSize, len); | ||
} else { | ||
len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); | ||
file = streamOpt.getValue(); | ||
FileSystem fs = null; | ||
|
||
try { | ||
if (fileOpt != null) { | ||
filename = fileOpt.getValue(); | ||
fs = filename.getFileSystem(conf); | ||
int bufSize = bufOpt == null ? getBufferSize(conf) : bufOpt.getValue(); | ||
len = null == lenOpt | ||
? fs.getFileStatus(filename).getLen() | ||
: lenOpt.getValue(); | ||
file = openFile(fs, filename, bufSize, len); | ||
} else { | ||
len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); | ||
file = streamOpt.getValue(); | ||
} | ||
long start = startOpt == null ? 0 : startOpt.getValue(); | ||
// really set up | ||
initialize(filename, file, start, len, conf, headerOnly != null); | ||
} catch (RemoteException re) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as the other RemoteException, can you explain why we need to catch this? |
||
log.error("Failed creating a WAL Reader: " + re.getMessage()); | ||
if (re.getClassName().equals(leaseException)) { | ||
if (fs != null) { | ||
fs.close(); | ||
} | ||
} | ||
throw re; | ||
} | ||
long start = startOpt == null ? 0 : startOpt.getValue(); | ||
// really set up | ||
initialize(filename, file, start, len, conf, headerOnly != null); | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,7 @@ public class WALTest extends TestWithMiniDFSCluster { | |
private static final String extension = ".avro"; | ||
|
||
@Test | ||
public void testWALMultiClient() throws Exception { | ||
public void testMultiWALFromOneDFSClient() throws Exception { | ||
fs.delete(new Path(FileUtils.directoryName(url, topicsDir, TOPIC_PARTITION)), true); | ||
|
||
@SuppressWarnings("unchecked") | ||
|
@@ -62,8 +62,8 @@ public void testWALMultiClient() throws Exception { | |
@Override | ||
public void run() { | ||
try { | ||
// holding the lease for awhile | ||
Thread.sleep(3000); | ||
// holding the lease for time that is less than wal2's retry interval, which is 1000 ms. | ||
Thread.sleep(wal2.getSleepIntervalMs()-100); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. White space needed around |
||
closed = true; | ||
wal1.close(); | ||
} catch (ConnectException | InterruptedException e) { | ||
|
@@ -73,6 +73,8 @@ public void run() { | |
}); | ||
thread.start(); | ||
|
||
// AcquireLease will try to acquire the same lease that wal1 is holding and fail | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I'd use the actual method name ( |
||
// It will then retry after waiting for 1000 ms. | ||
wal2.acquireLease(); | ||
assertTrue(closed); | ||
wal2.apply(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, so just for the sake of debuggability, if we're doing work in the exception handler that could itself throw exceptions, I think we might want to at least log that something went wrong. Because with the current code, if
close()
throws an exception, then all the info about the original exception is lost.This is true for a couple of the other similar changes below. The only alternative I could think of is allowing
close()
to take an extra parameter that's the original cause of calling close (or null if there isn't one) and attaching that as the cause if it fails, but then we also lose info about the other exception. I think logging this one proactively is probably fine since these should all be exceptional conditions.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please have a look now.