Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

not trying to hold leases on WAL files if we are holding them already. #142

Merged
merged 8 commits into from
Aug 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<!-- TODO: fix all of these -->
<suppress
checks="AbbreviationAsWordInName"
files="(DataWriter|FSWAL|TopicPartitionWriter|WAL|WALEntry|WALFile).java"
files="(DataWriter|FSWAL|TopicPartitionWriter|WAL|WALEntry|WALFile|WALConstants).java"
/>

<suppress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public void write() {
closeTempFile();
appendToWAL();
commitFile();
} catch (DataException e) {
} catch (ConnectException e) {
log.error("Exception on topic partition {}: ", tp, e);
failureTime = System.currentTimeMillis();
setRetryTimeout(timeoutMs);
Expand Down
33 changes: 18 additions & 15 deletions src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@
public class FSWAL implements WAL {

private static final Logger log = LoggerFactory.getLogger(FSWAL.class);
private static final String leaseException =
"org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException";
private static final long MAX_SLEEP_INTERVAL_MS = 16000L;

private WALFile.Writer writer = null;
private WALFile.Reader reader = null;
Expand All @@ -62,13 +59,15 @@ public void append(String tempFile, String committedFile) throws ConnectExceptio
writer.append(key, value);
writer.hsync();
} catch (IOException e) {
log.error("Error appending WAL file: {}, {}", logFile, e);
close();
throw new DataException(e);
}
}

public void acquireLease() throws ConnectException {
long sleepIntervalMs = 1000L;
while (sleepIntervalMs < MAX_SLEEP_INTERVAL_MS) {
long sleepIntervalMs = WALConstants.INITIAL_SLEEP_INTERVAL_MS;
while (sleepIntervalMs < WALConstants.MAX_SLEEP_INTERVAL_MS) {
try {
if (writer == null) {
writer = WALFile.createWriter(conf, Writer.file(new Path(logFile)),
Expand All @@ -77,7 +76,7 @@ public void acquireLease() throws ConnectException {
}
break;
} catch (RemoteException e) {
if (e.getClassName().equals(leaseException)) {
if (e.getClassName().equals(WALConstants.LEASE_EXCEPTION_CLASS_NAME)) {
log.info("Cannot acquire lease on WAL {}", logFile);
try {
Thread.sleep(sleepIntervalMs);
Expand All @@ -92,7 +91,7 @@ public void acquireLease() throws ConnectException {
throw new DataException("Error creating writer for log file " + logFile, e);
}
}
if (sleepIntervalMs >= MAX_SLEEP_INTERVAL_MS) {
if (sleepIntervalMs >= WALConstants.MAX_SLEEP_INTERVAL_MS) {
throw new ConnectException("Cannot acquire lease after timeout, will retry.");
}
}
Expand Down Expand Up @@ -129,33 +128,37 @@ public void apply() throws ConnectException {
}
}
} catch (IOException e) {
log.error("Error applying WAL file: {}, {}", logFile, e);
close();
throw new DataException(e);
}
}

@Override
public void truncate() throws ConnectException {
String oldLogFile = logFile + ".1";
storage.delete(oldLogFile);
storage.commit(logFile, oldLogFile);
// Clean out references to the current WAL file.
// Open a new one on the next lease acquisition.
close();
try {
String oldLogFile = logFile + ".1";
storage.delete(oldLogFile);
storage.commit(logFile, oldLogFile);
} finally {
close();
}
}

@Override
public void close() throws ConnectException {
try {
if (writer != null) {
writer.close();
writer = null;
}
if (reader != null) {
reader.close();
reader = null;
}
} catch (IOException e) {
throw new DataException("Error closing " + logFile, e);
} finally {
writer = null;
reader = null;
}
}

Expand Down
23 changes: 23 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/wal/WALConstants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright 2017 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
**/


package io.confluent.connect.hdfs.wal;

public class WALConstants {
protected static final String LEASE_EXCEPTION_CLASS_NAME
= "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException";
protected static final long MAX_SLEEP_INTERVAL_MS = 16000L;
protected static final long INITIAL_SLEEP_INTERVAL_MS = 1000L;
}
119 changes: 74 additions & 45 deletions src/main/java/io/confluent/connect/hdfs/wal/WALFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Options;
import org.apache.hadoop.util.Time;

Expand Down Expand Up @@ -132,42 +133,56 @@ public static class Writer implements Closeable, Syncable {
throw new IllegalArgumentException("file modifier options not compatible with stream");
}

FileSystem fs = null;
FSDataOutputStream out;
boolean ownStream = fileOption != null;
if (ownStream) {
Path p = fileOption.getValue();
FileSystem fs;
fs = p.getFileSystem(conf);
int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
bufferSizeOption.getValue();
short replication = replicationOption == null
? fs.getDefaultReplication(p)
: (short) replicationOption.getValue();
long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
blockSizeOption.getValue();

if (appendIfExistsOption != null && appendIfExistsOption.getValue()
&& fs.exists(p)) {
// Read the file and verify header details
try (
WALFile.Reader reader =
new WALFile.Reader(conf, WALFile.Reader.file(p), new Reader.OnlyHeaderOption())
) {
if (reader.getVersion() != VERSION[3]) {
throw new VersionMismatchException(VERSION[3], reader.getVersion());

try {
if (ownStream) {
Path p = fileOption.getValue();
fs = p.getFileSystem(conf);
int bufferSize = bufferSizeOption == null
? getBufferSize(conf)
: bufferSizeOption.getValue();
short replication = replicationOption == null
? fs.getDefaultReplication(p)
: (short) replicationOption.getValue();
long blockSize = blockSizeOption == null
? fs.getDefaultBlockSize(p)
: blockSizeOption.getValue();

if (appendIfExistsOption != null && appendIfExistsOption.getValue() && fs.exists(p)) {
// Read the file and verify header details
try (WALFile.Reader reader = new WALFile.Reader(
connectorConfig.getHadoopConfiguration(),
WALFile.Reader.file(p),
new Reader.OnlyHeaderOption()
)) {
if (reader.getVersion() != VERSION[3]) {
throw new VersionMismatchException(VERSION[3], reader.getVersion());
}
sync = reader.getSync();
}
sync = reader.getSync();
out = fs.append(p, bufferSize);
this.appendMode = true;
} else {
out = fs.create(p, true, bufferSize, replication, blockSize);
}
out = fs.append(p, bufferSize);
this.appendMode = true;
} else {
out = fs.create(p, true, bufferSize, replication, blockSize);
out = streamOption.getValue();
}
} else {
out = streamOption.getValue();

init(connectorConfig, out, ownStream);
} catch (RemoteException re) {
log.error("Failed creating a WAL Writer: " + re.getMessage());
if (re.getClassName().equals(WALConstants.LEASE_EXCEPTION_CLASS_NAME)) {
if (fs != null) {
fs.close();
}
}
throw re;
}

init(connectorConfig, out, ownStream);
}

public static Option file(Path value) {
Expand Down Expand Up @@ -394,6 +409,7 @@ public Reader(Configuration conf, Option... opts) throws IOException {
InputStreamOption streamOpt = Options.getOption(InputStreamOption.class, opts);
LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class, opts);

// check for consistency
if ((fileOpt == null) == (streamOpt == null)) {
throw new
Expand All @@ -402,27 +418,40 @@ public Reader(Configuration conf, Option... opts) throws IOException {
if (fileOpt == null && bufOpt != null) {
throw new IllegalArgumentException("buffer size can only be set when a file is specified.");
}

// figure out the real values
Path filename = null;
FSDataInputStream file;
final long len;
if (fileOpt != null) {
filename = fileOpt.getValue();
FileSystem fs = filename.getFileSystem(conf);
int bufSize = bufOpt == null ? getBufferSize(conf) : bufOpt.getValue();
len = null == lenOpt
? fs.getFileStatus(filename).getLen()
: lenOpt.getValue();
file = openFile(fs, filename, bufSize, len);
} else {
len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
file = streamOpt.getValue();
}
StartOption startOpt = Options.getOption(StartOption.class, opts);
long start = startOpt == null ? 0 : startOpt.getValue();
// really set up
OnlyHeaderOption headerOnly = Options.getOption(OnlyHeaderOption.class, opts);
initialize(filename, file, start, len, conf, headerOnly != null);
FileSystem fs = null;

try {
if (fileOpt != null) {
filename = fileOpt.getValue();
fs = filename.getFileSystem(conf);
int bufSize = bufOpt == null ? getBufferSize(conf) : bufOpt.getValue();
len = null == lenOpt
? fs.getFileStatus(filename).getLen()
: lenOpt.getValue();
file = openFile(fs, filename, bufSize, len);
} else {
len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
file = streamOpt.getValue();
}
StartOption startOpt = Options.getOption(StartOption.class, opts);
long start = startOpt == null ? 0 : startOpt.getValue();
// really set up
OnlyHeaderOption headerOnly = Options.getOption(OnlyHeaderOption.class, opts);
initialize(filename, file, start, len, conf, headerOnly != null);
} catch (RemoteException re) {
log.error("Failed creating a WAL Reader: " + re.getMessage());
if (re.getClassName().equals(WALConstants.LEASE_EXCEPTION_CLASS_NAME)) {
if (fs != null) {
fs.close();
}
}
throw re;
}
}

/**
Expand Down
9 changes: 5 additions & 4 deletions src/test/java/io/confluent/connect/hdfs/wal/WALTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class WALTest extends TestWithMiniDFSCluster {
private static final String extension = ".avro";

@Test
public void testWALMultiClient() throws Exception {
public void testMultiWALFromOneDFSClient() throws Exception {
setUp();
fs.delete(new Path(FileUtils.directoryName(url, topicsDir, TOPIC_PARTITION)), true);

Expand All @@ -50,7 +50,7 @@ public void testWALMultiClient() throws Exception {
url
);
final WAL wal1 = storage.wal(topicsDir, TOPIC_PARTITION);
final WAL wal2 = storage.wal(topicsDir, TOPIC_PARTITION);
final FSWAL wal2 = (FSWAL) storage.wal(topicsDir, TOPIC_PARTITION);

String directory = TOPIC + "/" + String.valueOf(PARTITION);
final String tempfile = FileUtils.tempFileName(url, topicsDir, directory, extension);
Expand All @@ -75,8 +75,8 @@ public void testWALMultiClient() throws Exception {
@Override
public void run() {
try {
// holding the lease for awhile
Thread.sleep(3000);
// holding the lease for time that is less than wal2's initial retry interval, which is 1000 ms.
Thread.sleep(WALConstants.INITIAL_SLEEP_INTERVAL_MS - 100);
closed = true;
wal1.close();
} catch (ConnectException | InterruptedException e) {
Expand All @@ -86,6 +86,7 @@ public void run() {
});
thread.start();

// acquireLease() will try to acquire the lease that wal1 is holding and fail. It will retry after 1000 ms.
wal2.acquireLease();
assertTrue(closed);
wal2.apply();
Expand Down