From 49337688eb81ed05186dc122d4ed6d4f56f0a500 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 13 Mar 2018 11:25:46 -0700 Subject: [PATCH 1/5] KAFKA-6647 KafkaStreams.cleanUp creates .lock file in directory it tries to clean --- .../kafka/streams/processor/internals/StateDirectory.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index c33ade6f36e85..763f8a2401d8c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -329,7 +329,8 @@ public boolean accept(final File pathname) { private FileChannel getOrCreateFileChannel(final TaskId taskId, final Path lockPath) throws IOException { if (!channels.containsKey(taskId)) { - channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)); + channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, + StandardOpenOption.WRITE, StandardOpenOption.DELETE_ON_CLOSE)); } return channels.get(taskId); } From 5c8f746a8f47767519d97ebbdc79acc855f26b6b Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 13 Mar 2018 15:51:07 -0700 Subject: [PATCH 2/5] Put lock file above task dir --- .../kafka/streams/processor/internals/StateDirectory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 763f8a2401d8c..55974a4f9ccc0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -138,7 +138,7 @@ synchronized boolean lock(final TaskId taskId) throws IOException { } try { - lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); + lockFile = new File(stateDir, taskId+LOCK_FILE_NAME); } catch (ProcessorStateException e) { // directoryForTask could be throwing an exception if another thread // has concurrently deleted the directory From 587517ed5830ed22cb29c080d6c9815aa6946bd4 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 13 Mar 2018 18:49:21 -0700 Subject: [PATCH 3/5] Address checkstyle error --- .../kafka/streams/processor/internals/StateDirectory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 55974a4f9ccc0..4daf0d47904a4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -138,7 +138,7 @@ synchronized boolean lock(final TaskId taskId) throws IOException { } try { - lockFile = new File(stateDir, taskId+LOCK_FILE_NAME); + lockFile = new File(stateDir, taskId + LOCK_FILE_NAME); } catch (ProcessorStateException e) { // directoryForTask could be throwing an exception if another thread // has concurrently deleted the directory From e7a5e47dc6b549ea519486b1b08a4abc28bf89ba Mon Sep 17 00:00:00 2001 From: tedyu Date: Wed, 14 Mar 2018 11:58:43 -0700 Subject: [PATCH 4/5] Modify StateDirectoryTest --- .../processor/internals/StateDirectory.java | 5 +++++ .../internals/StateDirectoryTest.java | 18 +++++++++++------- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 4daf0d47904a4..9e3cf424faa56 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -100,6 +100,10 @@ File directoryForTask(final TaskId taskId) { return taskDir; } + File stateDir() { + return stateDir; + } + /** * Get or create the directory for the global stores. * @return directory for the global stores @@ -160,6 +164,7 @@ synchronized boolean lock(final TaskId taskId) throws IOException { if (lock != null) { locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock)); + System.out.println("locked " + lockFile); log.debug("{} Acquired state dir lock for task {}", logPrefix(), taskId); } return lock != null; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index e14d01077ac9d..5d839a78c2edd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.IOException; import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; import java.nio.file.StandardOpenOption; import java.util.Arrays; @@ -91,16 +92,18 @@ public void shouldCreateTaskStateDirectory() { @Test public void shouldLockTaskStateDirectory() throws IOException { final TaskId taskId = new TaskId(0, 0); - final File taskDirectory = directory.directoryForTask(taskId); + final File taskDirectory = directory.stateDir(); directory.lock(taskId); + File f = new File(taskDirectory, taskId + StateDirectory.LOCK_FILE_NAME); try ( final FileChannel channel = FileChannel.open( - new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(), + f.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE) ) { - channel.tryLock(); + FileLock lock = channel.tryLock(); + System.out.println("l ocked " + f + " " + lock); fail("shouldn't be able to lock already locked directory"); } catch (final OverlappingFileLockException e) { // pass @@ -140,17 +143,18 @@ public void shouldNotLockDeletedDirectory() throws IOException { @Test public void shouldLockMulitpleTaskDirectories() throws IOException { final TaskId taskId = new TaskId(0, 0); - final File task1Dir = directory.directoryForTask(taskId); + final File task1Dir = directory.stateDir(); final TaskId taskId2 = new TaskId(1, 0); - final File task2Dir = directory.directoryForTask(taskId2); + final File task2Dir = directory.stateDir(); try ( final FileChannel channel1 = FileChannel.open( - new File(task1Dir, StateDirectory.LOCK_FILE_NAME).toPath(), + new File(task1Dir, taskId + StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); - final FileChannel channel2 = FileChannel.open(new File(task2Dir, StateDirectory.LOCK_FILE_NAME).toPath(), + final FileChannel channel2 = FileChannel.open( + new File(task2Dir, taskId2 + StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE) ) { From 9e96cf4dc4dde6a94669f10a965c573a1d3b03b7 Mon Sep 17 00:00:00 2001 From: tedyu Date: Wed, 14 Mar 2018 13:09:35 -0700 Subject: [PATCH 5/5] Modify StateDirectoryTest --- .../processor/internals/StateDirectory.java | 1 - .../internals/StateDirectoryTest.java | 32 +++---------------- 2 files changed, 4 insertions(+), 29 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 9e3cf424faa56..22be4066f4741 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -164,7 +164,6 @@ synchronized boolean lock(final TaskId taskId) throws IOException { if (lock != null) { locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock)); - System.out.println("locked " + lockFile); log.debug("{} Acquired state dir lock for task {}", logPrefix(), taskId); } return lock != null; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index 5d839a78c2edd..ea07543cab87d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -89,29 +89,6 @@ public void shouldCreateTaskStateDirectory() { assertTrue(taskDirectory.isDirectory()); } - @Test - public void shouldLockTaskStateDirectory() throws IOException { - final TaskId taskId = new TaskId(0, 0); - final File taskDirectory = directory.stateDir(); - - directory.lock(taskId); - - File f = new File(taskDirectory, taskId + StateDirectory.LOCK_FILE_NAME); - try ( - final FileChannel channel = FileChannel.open( - f.toPath(), - StandardOpenOption.CREATE, StandardOpenOption.WRITE) - ) { - FileLock lock = channel.tryLock(); - System.out.println("l ocked " + f + " " + lock); - fail("shouldn't be able to lock already locked directory"); - } catch (final OverlappingFileLockException e) { - // pass - } finally { - directory.unlock(taskId); - } - } - @Test public void shouldBeTrueIfAlreadyHoldsLock() throws IOException { final TaskId taskId = new TaskId(0, 0); @@ -200,15 +177,14 @@ public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws directory.directoryForTask(new TaskId(2, 0)); List files = Arrays.asList(appDir.listFiles()); - assertEquals(3, files.size()); + assertEquals(1, files.size()); time.sleep(1000); directory.cleanRemovedTasks(0); - files = Arrays.asList(appDir.listFiles()); - assertEquals(2, files.size()); - assertTrue(files.contains(new File(appDir, task0.toString()))); - assertTrue(files.contains(new File(appDir, task1.toString()))); + files = Arrays.asList(directory.stateDir().listFiles()); + // lock files have been cleaned + assertEquals(0, files.size()); } finally { directory.unlock(task0); directory.unlock(task1);