Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

not trying to hold leases on WAL files if we are holding them already. #142

Merged
merged 8 commits into from
Aug 17, 2017
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<!-- TODO: fix all of these -->
<suppress
checks="AbbreviationAsWordInName"
files="(DataWriter|FSWAL|TopicPartitionWriter|WAL|WALEntry|WALFile).java"
files="(DataWriter|FSWAL|TopicPartitionWriter|WAL|WALEntry|WALFile|WALConstants).java"
/>

<suppress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public void write() {
closeTempFile();
appendToWAL();
commitFile();
} catch (DataException e) {
} catch (ConnectException e) {
log.error("Exception on topic partition {}: ", tp, e);
failureTime = System.currentTimeMillis();
setRetryTimeout(timeoutMs);
Expand Down
33 changes: 18 additions & 15 deletions src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@
public class FSWAL implements WAL {

private static final Logger log = LoggerFactory.getLogger(FSWAL.class);
private static final String leaseException =
"org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException";
private static final long MAX_SLEEP_INTERVAL_MS = 16000L;

private WALFile.Writer writer = null;
private WALFile.Reader reader = null;
private String logFile = null;
private HdfsSinkConnectorConfig conf = null;
private HdfsStorage storage = null;
private long sleepIntervalMs = WALConstants.INITIAL_SLEEP_INTERVAL_MS;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this one upgraded from local variable to member field variable? The behavior definitely changes on repeated calls on the same object. Notice that this variable is mutated iteratively as part of the while loop in acquiredLease. Even if it happens and there's only one call of acquireLease per FSWAL object I think it's a better practice to keep it local if that's what we intend to do here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!


public FSWAL(String logsDir, TopicPartition topicPart, HdfsStorage storage)
throws ConnectException {
Expand All @@ -62,13 +60,14 @@ public void append(String tempFile, String committedFile) throws ConnectExceptio
writer.append(key, value);
writer.hsync();
} catch (IOException e) {
log.error("Error appending WAL file: {}, {}", logFile, e);
close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so just for the sake of debuggability, if we're doing work in the exception handler that could itself throw exceptions, I think we might want to at least log that something went wrong. Because with the current code, if close() throws an exception, then all the info about the original exception is lost.

This is true for a couple of the other similar changes below. The only alternative I could think of is allowing close() to take an extra parameter that's the original cause of calling close (or null if there isn't one) and attaching that as the cause if it fails, but then we also lose info about the other exception. I think logging this one proactively is probably fine since these should all be exceptional conditions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please have a look now.

throw new DataException(e);
}
}

public void acquireLease() throws ConnectException {
long sleepIntervalMs = 1000L;
while (sleepIntervalMs < MAX_SLEEP_INTERVAL_MS) {
while (sleepIntervalMs < WALConstants.MAX_SLEEP_INTERVAL_MS) {
try {
if (writer == null) {
writer = WALFile.createWriter(conf, Writer.file(new Path(logFile)),
Expand All @@ -77,7 +76,7 @@ public void acquireLease() throws ConnectException {
}
break;
} catch (RemoteException e) {
if (e.getClassName().equals(leaseException)) {
if (e.getClassName().equals(WALConstants.LEASE_EXCEPTION_CLASS_NAME)) {
log.info("Cannot acquire lease on WAL {}", logFile);
try {
Thread.sleep(sleepIntervalMs);
Expand All @@ -92,7 +91,7 @@ public void acquireLease() throws ConnectException {
throw new DataException("Error creating writer for log file " + logFile, e);
}
}
if (sleepIntervalMs >= MAX_SLEEP_INTERVAL_MS) {
if (sleepIntervalMs >= WALConstants.MAX_SLEEP_INTERVAL_MS) {
throw new ConnectException("Cannot acquire lease after timeout, will retry.");
}
}
Expand Down Expand Up @@ -129,33 +128,37 @@ public void apply() throws ConnectException {
}
}
} catch (IOException e) {
log.error("Error applying WAL file: {}, {}", logFile, e);
close();
throw new DataException(e);
}
}

@Override
public void truncate() throws ConnectException {
String oldLogFile = logFile + ".1";
storage.delete(oldLogFile);
storage.commit(logFile, oldLogFile);
// Clean out references to the current WAL file.
// Open a new one on the next lease acquisition.
close();
try {
String oldLogFile = logFile + ".1";
storage.delete(oldLogFile);
storage.commit(logFile, oldLogFile);
} finally {
close();
}
}

@Override
public void close() throws ConnectException {
try {
if (writer != null) {
writer.close();
writer = null;
}
if (reader != null) {
reader.close();
reader = null;
}
} catch (IOException e) {
throw new DataException("Error closing " + logFile, e);
} finally {
writer = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't point to the actual lines, but nullifying reader and writer above, is redundant now that it's done within the finally block.

reader = null;
}
}

Expand Down
23 changes: 23 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/wal/WALConstants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright 2015 Confluent Inc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If you apply any changes based on the comment above, you may change this one too to be 2017 instead. Otherwise, never mind.

*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
**/


package io.confluent.connect.hdfs.wal;

public class WALConstants {
protected static final String LEASE_EXCEPTION_CLASS_NAME
= "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException";
protected static final long MAX_SLEEP_INTERVAL_MS = 16000L;
protected static final long INITIAL_SLEEP_INTERVAL_MS = 1000L;
}
119 changes: 74 additions & 45 deletions src/main/java/io/confluent/connect/hdfs/wal/WALFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Options;
import org.apache.hadoop.util.Time;

Expand Down Expand Up @@ -132,42 +133,56 @@ public static class Writer implements Closeable, Syncable {
throw new IllegalArgumentException("file modifier options not compatible with stream");
}

FileSystem fs = null;
FSDataOutputStream out;
boolean ownStream = fileOption != null;
if (ownStream) {
Path p = fileOption.getValue();
FileSystem fs;
fs = p.getFileSystem(conf);
int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
bufferSizeOption.getValue();
short replication = replicationOption == null
? fs.getDefaultReplication(p)
: (short) replicationOption.getValue();
long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
blockSizeOption.getValue();

if (appendIfExistsOption != null && appendIfExistsOption.getValue()
&& fs.exists(p)) {
// Read the file and verify header details
try (
WALFile.Reader reader =
new WALFile.Reader(conf, WALFile.Reader.file(p), new Reader.OnlyHeaderOption())
) {
if (reader.getVersion() != VERSION[3]) {
throw new VersionMismatchException(VERSION[3], reader.getVersion());

try {
if (ownStream) {
Path p = fileOption.getValue();
fs = p.getFileSystem(conf);
int bufferSize = bufferSizeOption == null
? getBufferSize(conf)
: bufferSizeOption.getValue();
short replication = replicationOption == null
? fs.getDefaultReplication(p)
: (short) replicationOption.getValue();
long blockSize = blockSizeOption == null
? fs.getDefaultBlockSize(p)
: blockSizeOption.getValue();

if (appendIfExistsOption != null && appendIfExistsOption.getValue() && fs.exists(p)) {
// Read the file and verify header details
try (WALFile.Reader reader = new WALFile.Reader(
connectorConfig.getHadoopConfiguration(),
WALFile.Reader.file(p),
new Reader.OnlyHeaderOption()
)) {
if (reader.getVersion() != VERSION[3]) {
throw new VersionMismatchException(VERSION[3], reader.getVersion());
}
sync = reader.getSync();
}
sync = reader.getSync();
out = fs.append(p, bufferSize);
this.appendMode = true;
} else {
out = fs.create(p, true, bufferSize, replication, blockSize);
}
out = fs.append(p, bufferSize);
this.appendMode = true;
} else {
out = fs.create(p, true, bufferSize, replication, blockSize);
out = streamOption.getValue();
}
} else {
out = streamOption.getValue();

init(connectorConfig, out, ownStream);
} catch (RemoteException re) {
log.error("Failed creating a WAL Writer: " + re.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do

log.error("Failed creating a WAL Writer: ", re);

instead so the log will include stack trace info?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, is the log even going to be useful since it gets logged again by FSWAL which calls this constructor? I think this might be the case in the other case below as well, though I haven't extensively checked all callers. I guess worst case we're just logging a bit more information, so perhaps being conservative about logging the details isn't a bad idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was hoping to log a bit more information so that it is easier to read the debug log which is VERY long.

So re.getMessage() is the short version that I like :-) If we log the stack trace, the same info will be logged again all the way up at https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L324.

if (re.getClassName().equals(WALConstants.LEASE_EXCEPTION_CLASS_NAME)) {
if (fs != null) {
fs.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need protection from a possible IOException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
throw re;
}

init(connectorConfig, out, ownStream);
}

public static Option file(Path value) {
Expand Down Expand Up @@ -394,6 +409,7 @@ public Reader(Configuration conf, Option... opts) throws IOException {
InputStreamOption streamOpt = Options.getOption(InputStreamOption.class, opts);
LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class, opts);

// check for consistency
if ((fileOpt == null) == (streamOpt == null)) {
throw new
Expand All @@ -402,27 +418,40 @@ public Reader(Configuration conf, Option... opts) throws IOException {
if (fileOpt == null && bufOpt != null) {
throw new IllegalArgumentException("buffer size can only be set when a file is specified.");
}

// figure out the real values
Path filename = null;
FSDataInputStream file;
final long len;
if (fileOpt != null) {
filename = fileOpt.getValue();
FileSystem fs = filename.getFileSystem(conf);
int bufSize = bufOpt == null ? getBufferSize(conf) : bufOpt.getValue();
len = null == lenOpt
? fs.getFileStatus(filename).getLen()
: lenOpt.getValue();
file = openFile(fs, filename, bufSize, len);
} else {
len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
file = streamOpt.getValue();
}
StartOption startOpt = Options.getOption(StartOption.class, opts);
long start = startOpt == null ? 0 : startOpt.getValue();
// really set up
OnlyHeaderOption headerOnly = Options.getOption(OnlyHeaderOption.class, opts);
initialize(filename, file, start, len, conf, headerOnly != null);
FileSystem fs = null;

try {
if (fileOpt != null) {
filename = fileOpt.getValue();
fs = filename.getFileSystem(conf);
int bufSize = bufOpt == null ? getBufferSize(conf) : bufOpt.getValue();
len = null == lenOpt
? fs.getFileStatus(filename).getLen()
: lenOpt.getValue();
file = openFile(fs, filename, bufSize, len);
} else {
len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
file = streamOpt.getValue();
}
StartOption startOpt = Options.getOption(StartOption.class, opts);
long start = startOpt == null ? 0 : startOpt.getValue();
// really set up
OnlyHeaderOption headerOnly = Options.getOption(OnlyHeaderOption.class, opts);
initialize(filename, file, start, len, conf, headerOnly != null);
} catch (RemoteException re) {
log.error("Failed creating a WAL Reader: " + re.getMessage());
if (re.getClassName().equals(WALConstants.LEASE_EXCEPTION_CLASS_NAME)) {
if (fs != null) {
fs.close();
}
}
throw re;
}
}

/**
Expand Down
9 changes: 5 additions & 4 deletions src/test/java/io/confluent/connect/hdfs/wal/WALTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class WALTest extends TestWithMiniDFSCluster {
private static final String extension = ".avro";

@Test
public void testWALMultiClient() throws Exception {
public void testMultiWALFromOneDFSClient() throws Exception {
setUp();
fs.delete(new Path(FileUtils.directoryName(url, topicsDir, TOPIC_PARTITION)), true);

Expand All @@ -50,7 +50,7 @@ public void testWALMultiClient() throws Exception {
url
);
final WAL wal1 = storage.wal(topicsDir, TOPIC_PARTITION);
final WAL wal2 = storage.wal(topicsDir, TOPIC_PARTITION);
final FSWAL wal2 = (FSWAL) storage.wal(topicsDir, TOPIC_PARTITION);

String directory = TOPIC + "/" + String.valueOf(PARTITION);
final String tempfile = FileUtils.tempFileName(url, topicsDir, directory, extension);
Expand All @@ -75,8 +75,8 @@ public void testWALMultiClient() throws Exception {
@Override
public void run() {
try {
// holding the lease for awhile
Thread.sleep(3000);
// holding the lease for time that is less than wal2's initial retry interval, which is 1000 ms.
Thread.sleep(WALConstants.INITIAL_SLEEP_INTERVAL_MS - 100);
closed = true;
wal1.close();
} catch (ConnectException | InterruptedException e) {
Expand All @@ -86,6 +86,7 @@ public void run() {
});
thread.start();

// acquireLease() will try to acquire the lease that wal1 is holding and fail. It will retry after 1000 ms.
wal2.acquireLease();
assertTrue(closed);
wal2.apply();
Expand Down