Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

not trying to hold leases on WAL files if we are holding them already. #142

Merged
merged 8 commits into from
Aug 17, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public void write() {
closeTempFile();
appendToWAL();
commitFile();
} catch (IOException e) {
} catch (IOException | ConnectException e) {
log.error("Exception on topic partition {}: ", tp, e);
failureTime = System.currentTimeMillis();
setRetryTimeout(timeoutMs);
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class FSWAL implements WAL {
private String logFile = null;
private Configuration conf = null;
private Storage storage = null;
private long sleepIntervalMs = 1000L;

public FSWAL(String logsDir, TopicPartition topicPart, Storage storage)
throws ConnectException {
Expand All @@ -60,12 +61,17 @@ public void append(String tempFile, String committedFile) throws ConnectExceptio
writer.append(key, value);
writer.hsync();
} catch (IOException e) {
log.error("Error appending WAL file: {}, {}", logFile, e);
close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so just for the sake of debuggability, if we're doing work in the exception handler that could itself throw exceptions, I think we might want to at least log that something went wrong. Because with the current code, if close() throws an exception, then all the info about the original exception is lost.

This is true for a couple of the other similar changes below. The only alternative I could think of is allowing close() to take an extra parameter that's the original cause of calling close (or null if there isn't one) and attaching that as the cause if it fails, but then we also lose info about the other exception. I think logging this one proactively is probably fine since these should all be exceptional conditions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please have a look now.

throw new ConnectException(e);
}
}

public long getSleepIntervalMs() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could have "package private" scope instead and a non-javadoc comment above to show it's destined for testing.

return sleepIntervalMs;
}

public void acquireLease() throws ConnectException {
long sleepIntervalMs = 1000L;
long MAX_SLEEP_INTERVAL_MS = 16000L;
while (sleepIntervalMs < MAX_SLEEP_INTERVAL_MS) {
try {
Expand Down Expand Up @@ -128,6 +134,8 @@ public void apply() throws ConnectException {
}
}
} catch (IOException e) {
log.error("Error applying WAL file: {}, {}", logFile, e);
close();
throw new ConnectException(e);
}
}
Expand All @@ -143,6 +151,8 @@ public void truncate() throws ConnectException {
close();
Copy link
Member

@kkonstantine kkonstantine Feb 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant call, since close has been added in the finally block.

} catch (IOException e) {
throw new ConnectException(e);
} finally {
close();
}
}

Expand All @@ -159,6 +169,9 @@ public void close() throws ConnectException {
}
} catch (IOException e) {
throw new ConnectException("Error closing " + logFile, e);
} finally {
writer = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't point to the actual lines, but nullifying reader and writer above, is redundant now that it's done within the finally block.

reader = null;
}
}

Expand Down
105 changes: 67 additions & 38 deletions src/main/java/io/confluent/connect/hdfs/wal/WALFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Options;
import org.apache.hadoop.util.Time;

Expand All @@ -56,6 +57,8 @@ public class WALFile {
private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash

private static final String leaseException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It'd be nice for variable naming to be consistent with our code style for static final member fields.
Thus, it should be something like: LEASE_EXCEPTION_CLASS_NAME
(I added more to the name to show it's a class name string and not to actual class. Your call).

I know we don't run checkstyle currently, but we will soon.


/**
* The number of bytes between sync points.
*/
Expand Down Expand Up @@ -194,40 +197,52 @@ static class AppendIfExistsOption extends Options.BooleanOption implements Optio
"compatible with stream");
}

FileSystem fs = null;
FSDataOutputStream out;
boolean ownStream = fileOption != null;
if (ownStream) {
Path p = fileOption.getValue();
FileSystem fs;
fs = p.getFileSystem(conf);
int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
bufferSizeOption.getValue();
short replication = replicationOption == null ?
fs.getDefaultReplication(p) :
(short) replicationOption.getValue();
long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
blockSizeOption.getValue();

if (appendIfExistsOption != null && appendIfExistsOption.getValue()

try {
if (ownStream) {
Path p = fileOption.getValue();
fs = p.getFileSystem(conf);
int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
bufferSizeOption.getValue();
short replication = replicationOption == null ?
fs.getDefaultReplication(p) :
(short) replicationOption.getValue();
long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
blockSizeOption.getValue();

if (appendIfExistsOption != null && appendIfExistsOption.getValue()
&& fs.exists(p)) {
// Read the file and verify header details
try (WALFile.Reader reader =
// Read the file and verify header details
try (WALFile.Reader reader =
new WALFile.Reader(conf, WALFile.Reader.file(p), new Reader.OnlyHeaderOption())){
if (reader.getVersion() != VERSION[3]) {
throw new VersionMismatchException(VERSION[3], reader.getVersion());
if (reader.getVersion() != VERSION[3]) {
throw new VersionMismatchException(VERSION[3], reader.getVersion());
}
sync = reader.getSync();
}
sync = reader.getSync();
out = fs.append(p, bufferSize);
this.appendMode = true;
} else {
out = fs.create(p, true, bufferSize, replication, blockSize);
}
out = fs.append(p, bufferSize);
this.appendMode = true;
} else {
out = fs.create(p, true, bufferSize, replication, blockSize);
out = streamOption.getValue();
}
} else {
out = streamOption.getValue();

init(conf, out, ownStream);
} catch (RemoteException re) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this part a bit more? I'm not sure why we need to look for this exception here but I might just be missing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the how the leaseException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException" can be caught. See here: https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java#L78.

When this exception is seen, it means we are creating a new lease from a same DFSClient. Previously, we give up and keep the original lease which may lead to the forever waiting issue. The change close open files and clear the FileSystem cache. A new lease will be regenerated by future writing/reading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can't remember the details but I think the issue is which jar the class is defined in or something like that. So we had to go down the hacky route of checking the class name.

log.error("Failed creating a WAL Writer: " + re.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do

log.error("Failed creating a WAL Writer: ", re);

instead so the log will include stack trace info?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, is the log even going to be useful since it gets logged again by FSWAL which calls this constructor? I think this might be the case in the other case below as well, though I haven't extensively checked all callers. I guess worst case we're just logging a bit more information, so perhaps being conservative about logging the details isn't a bad idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was hoping to log a bit more information so that it is easier to read the debug log which is VERY long.

So re.getMessage() is the short version that I like :-) If we log the stack trace, the same info will be logged again all the way up at https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L324.

if (re.getClassName().equals(leaseException)) {
if (fs != null) {
fs.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need protection from a possible IOException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
throw re;
}

init(conf, out, ownStream);
}

void init(Configuration conf, FSDataOutputStream out, boolean ownStream)
Expand Down Expand Up @@ -499,6 +514,7 @@ public Reader(Configuration conf, Option... opts) throws IOException {
BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class, opts);
OnlyHeaderOption headerOnly =
Options.getOption(OnlyHeaderOption.class, opts);

// check for consistency
if ((fileOpt == null) == (streamOpt == null)) {
throw new
Expand All @@ -508,25 +524,38 @@ public Reader(Configuration conf, Option... opts) throws IOException {
throw new IllegalArgumentException("buffer size can only be set when" +
" a file is specified.");
}

// figure out the real values
Path filename = null;
FSDataInputStream file;
final long len;
if (fileOpt != null) {
filename = fileOpt.getValue();
FileSystem fs = filename.getFileSystem(conf);
int bufSize = bufOpt == null ? getBufferSize(conf) : bufOpt.getValue();
len = null == lenOpt
? fs.getFileStatus(filename).getLen()
: lenOpt.getValue();
file = openFile(fs, filename, bufSize, len);
} else {
len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
file = streamOpt.getValue();
FileSystem fs = null;

try {
if (fileOpt != null) {
filename = fileOpt.getValue();
fs = filename.getFileSystem(conf);
int bufSize = bufOpt == null ? getBufferSize(conf) : bufOpt.getValue();
len = null == lenOpt
? fs.getFileStatus(filename).getLen()
: lenOpt.getValue();
file = openFile(fs, filename, bufSize, len);
} else {
len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
file = streamOpt.getValue();
}
long start = startOpt == null ? 0 : startOpt.getValue();
// really set up
initialize(filename, file, start, len, conf, headerOnly != null);
} catch (RemoteException re) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the other RemoteException, can you explain why we need to catch this?

log.error("Failed creating a WAL Reader: " + re.getMessage());
if (re.getClassName().equals(leaseException)) {
if (fs != null) {
fs.close();
}
}
throw re;
}
long start = startOpt == null ? 0 : startOpt.getValue();
// really set up
initialize(filename, file, start, len, conf, headerOnly != null);
}

/**
Expand Down
10 changes: 6 additions & 4 deletions src/test/java/io/confluent/connect/hdfs/wal/WALTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class WALTest extends TestWithMiniDFSCluster {
private static final String extension = ".avro";

@Test
public void testWALMultiClient() throws Exception {
public void testMultiWALFromOneDFSClient() throws Exception {
fs.delete(new Path(FileUtils.directoryName(url, topicsDir, TOPIC_PARTITION)), true);

@SuppressWarnings("unchecked")
Expand All @@ -43,7 +43,7 @@ public void testWALMultiClient() throws Exception {
Storage storage = StorageFactory.createStorage(storageClass, conf, url);

final WAL wal1 = storage.wal(topicsDir, TOPIC_PARTITION);
final WAL wal2 = storage.wal(topicsDir, TOPIC_PARTITION);
final FSWAL wal2 = (FSWAL)storage.wal(topicsDir, TOPIC_PARTITION);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single white space needed between casting and storage


String directory = TOPIC + "/" + String.valueOf(PARTITION);
final String tempfile = FileUtils.tempFileName(url, topicsDir, directory, extension);
Expand All @@ -62,8 +62,8 @@ public void testWALMultiClient() throws Exception {
@Override
public void run() {
try {
// holding the lease for awhile
Thread.sleep(3000);
// holding the lease for time that is less than wal2's retry interval, which is 1000 ms.
Thread.sleep(wal2.getSleepIntervalMs()-100);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

White space needed around -
(Again checkstyle coming soon :) )

closed = true;
wal1.close();
} catch (ConnectException | InterruptedException e) {
Expand All @@ -73,6 +73,8 @@ public void run() {
});
thread.start();

// AcquireLease will try to acquire the same lease that wal1 is holding and fail
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd use the actual method name (acquireLease) if I wanted to refer to it, instead of the concept ("Acquiring the lease")

// It will then retry after waiting for 1000 ms.
wal2.acquireLease();
assertTrue(closed);
wal2.apply();
Expand Down