diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index 630d8e30d..3c688c31b 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -18,7 +18,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.IllegalWorkerStateException; @@ -546,8 +545,11 @@ private void writeRecord(SinkRecord record) throws IOException { private void closeTempFile(String encodedPartition) throws IOException { if (writers.containsKey(encodedPartition)) { RecordWriter writer = writers.get(encodedPartition); - writer.close(); - writers.remove(encodedPartition); + try { + writer.close(); + } finally { + writers.remove(encodedPartition); + } } } diff --git a/src/main/java/io/confluent/connect/hdfs/storage/Storage.java b/src/main/java/io/confluent/connect/hdfs/storage/Storage.java index e1f6a1f1b..55951851a 100644 --- a/src/main/java/io/confluent/connect/hdfs/storage/Storage.java +++ b/src/main/java/io/confluent/connect/hdfs/storage/Storage.java @@ -19,6 +19,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.Path; import org.apache.kafka.common.TopicPartition; import java.io.IOException; @@ -37,4 +38,5 @@ public interface Storage { FileStatus[] listStatus(String path) throws IOException; String url(); Configuration conf(); + boolean recoverFileLease(Path f) throws IOException; } diff --git a/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java b/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java index c4ce18c65..20659d490 100644 --- a/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java +++ b/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java @@ -64,6 +64,32 @@ public void append(String tempFile, String committedFile) throws ConnectExceptio } } + public boolean canRecover(String fileName) throws ConnectException { + boolean flag = false; + int MAX_RETRY = 10; + for (int i=0; i= MAX_SLEEP_INTERVAL_MS) { @@ -128,7 +169,10 @@ public void apply() throws ConnectException { } } } catch (IOException e) { - throw new ConnectException(e); + if (e.getMessage().contains("Cannot obtain block length") && !canRecover(logFile)) { + // this can happen when the WAL file is corrupted + throw new ConnectException(e); + } } }