Skip to content

Commit

Permalink
fix wal test
Browse files Browse the repository at this point in the history
  • Loading branch information
skyahead committed Feb 1, 2017
1 parent 4d51138 commit 06cca38
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
6 changes: 5 additions & 1 deletion src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class FSWAL implements WAL {
private String logFile = null;
private Configuration conf = null;
private Storage storage = null;
private long sleepIntervalMs = 1000L;

public FSWAL(String logsDir, TopicPartition topicPart, Storage storage)
throws ConnectException {
Expand All @@ -65,8 +66,11 @@ public void append(String tempFile, String committedFile) throws ConnectExceptio
}
}

public long getSleepIntervalMs() {
return sleepIntervalMs;
}

public void acquireLease() throws ConnectException {
long sleepIntervalMs = 1000L;
long MAX_SLEEP_INTERVAL_MS = 16000L;
while (sleepIntervalMs < MAX_SLEEP_INTERVAL_MS) {
try {
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/confluent/connect/hdfs/wal/WAL.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ public interface WAL {
void truncate() throws ConnectException;
void close() throws ConnectException;
String getLogFile();
long getSleepIntervalMs();
}
9 changes: 5 additions & 4 deletions src/test/java/io/confluent/connect/hdfs/wal/WALTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class WALTest extends TestWithMiniDFSCluster {
private static final String extension = ".avro";

@Test
public void testWALMultiClient() throws Exception {
public void testMultiWALFromOneDFSClient() throws Exception {
fs.delete(new Path(FileUtils.directoryName(url, topicsDir, TOPIC_PARTITION)), true);

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -62,8 +62,8 @@ public void testWALMultiClient() throws Exception {
@Override
public void run() {
try {
// holding the lease for awhile
Thread.sleep(3000);
// holding the lease for time that is less than wal2's retry interval, which is 1000 ms.
Thread.sleep(wal2.getSleepIntervalMs()-100);
closed = true;
wal1.close();
} catch (ConnectException | InterruptedException e) {
Expand All @@ -73,7 +73,8 @@ public void run() {
});
thread.start();

Thread.sleep(3001);
// AcquireLease will try to acquire the same lease that wal1 is holding and fail
// It will then retry after waiting for 1000 ms.
wal2.acquireLease();
assertTrue(closed);
wal2.apply();
Expand Down

0 comments on commit 06cca38

Please sign in to comment.