From a246725f9bea4a52443d1fd552df8e7b559e19af Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 13 Sep 2018 17:07:46 -0700 Subject: [PATCH 1/3] KAFKA-6647 KafkaStreams.cleanUp creates .lock file in directory it tries to clean --- .../processor/internals/StateDirectory.java | 9 +++- .../internals/StateDirectoryTest.java | 41 +++++-------------- 2 files changed, 17 insertions(+), 33 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index a3227ca0c7d5b..b8de6302ae43c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -103,6 +103,10 @@ public File directoryForTask(final TaskId taskId) { return taskDir; } + File stateDir() { + return stateDir; + } + /** * Get or create the directory for the global stores. * @return directory for the global stores @@ -141,7 +145,7 @@ synchronized boolean lock(final TaskId taskId) throws IOException { } try { - lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); + lockFile = new File(stateDir, taskId + LOCK_FILE_NAME); } catch (final ProcessorStateException e) { // directoryForTask could be throwing an exception if another thread // has concurrently deleted the directory @@ -331,7 +335,8 @@ public boolean accept(final File pathname) { private FileChannel getOrCreateFileChannel(final TaskId taskId, final Path lockPath) throws IOException { if (!channels.containsKey(taskId)) { - channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)); + channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, + StandardOpenOption.WRITE, StandardOpenOption.DELETE_ON_CLOSE)); } return channels.get(taskId); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index e14d01077ac9d..da8c03acc15dd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -88,27 +88,6 @@ public void shouldCreateTaskStateDirectory() { assertTrue(taskDirectory.isDirectory()); } - @Test - public void shouldLockTaskStateDirectory() throws IOException { - final TaskId taskId = new TaskId(0, 0); - final File taskDirectory = directory.directoryForTask(taskId); - - directory.lock(taskId); - - try ( - final FileChannel channel = FileChannel.open( - new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(), - StandardOpenOption.CREATE, StandardOpenOption.WRITE) - ) { - channel.tryLock(); - fail("shouldn't be able to lock already locked directory"); - } catch (final OverlappingFileLockException e) { - // pass - } finally { - directory.unlock(taskId); - } - } - @Test public void shouldBeTrueIfAlreadyHoldsLock() throws IOException { final TaskId taskId = new TaskId(0, 0); @@ -140,17 +119,18 @@ public void shouldNotLockDeletedDirectory() throws IOException { @Test public void shouldLockMulitpleTaskDirectories() throws IOException { final TaskId taskId = new TaskId(0, 0); - final File task1Dir = directory.directoryForTask(taskId); + final File task1Dir = directory.stateDir(); final TaskId taskId2 = new TaskId(1, 0); - final File task2Dir = directory.directoryForTask(taskId2); + final File task2Dir = directory.stateDir(); try ( final FileChannel channel1 = FileChannel.open( - new File(task1Dir, StateDirectory.LOCK_FILE_NAME).toPath(), + new File(task1Dir, taskId + StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); - final FileChannel channel2 = FileChannel.open(new File(task2Dir, StateDirectory.LOCK_FILE_NAME).toPath(), + final FileChannel channel2 = FileChannel.open( + new File(task2Dir, taskId2 + StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE) ) { @@ -196,15 +176,14 @@ public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws directory.directoryForTask(new TaskId(2, 0)); List files = Arrays.asList(appDir.listFiles()); - assertEquals(3, files.size()); + assertEquals(1, files.size()); time.sleep(1000); directory.cleanRemovedTasks(0); - files = Arrays.asList(appDir.listFiles()); - assertEquals(2, files.size()); - assertTrue(files.contains(new File(appDir, task0.toString()))); - assertTrue(files.contains(new File(appDir, task1.toString()))); + files = Arrays.asList(directory.stateDir().listFiles()); + // lock files have been cleaned + assertEquals(0, files.size()); } finally { directory.unlock(task0); directory.unlock(task1); @@ -358,4 +337,4 @@ public void shouldCleanupAllTaskDirectoriesIncludingGlobalOne() { files = Arrays.asList(appDir.listFiles()); assertEquals(0, files.size()); } -} \ No newline at end of file +} From 4ed75bc81690ebb141b8a8b1cafcd9534621d7c7 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 4 Nov 2018 13:59:05 -0800 Subject: [PATCH 2/3] drop StandardOpenOption.DELETE_ON_CLOSE --- .../processor/internals/StateDirectory.java | 6 ++++- .../internals/StateDirectoryTest.java | 24 +++++++++++++++++-- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index b8de6302ae43c..b1341baffaf88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -226,6 +226,10 @@ synchronized void unlock(final TaskId taskId) throws IOException { final FileChannel fileChannel = channels.remove(taskId); if (fileChannel != null) { fileChannel.close(); + final File lockFile = new File(stateDir, taskId + LOCK_FILE_NAME); + if (!lockFile.delete()) { + log.debug("{} was not deleted", lockFile.toString()); + } } } } @@ -336,7 +340,7 @@ private FileChannel getOrCreateFileChannel(final TaskId taskId, final Path lockPath) throws IOException { if (!channels.containsKey(taskId)) { channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, - StandardOpenOption.WRITE, StandardOpenOption.DELETE_ON_CLOSE)); + StandardOpenOption.WRITE)); } return channels.get(taskId); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index da8c03acc15dd..6d4d06c2ff239 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -88,6 +88,26 @@ public void shouldCreateTaskStateDirectory() { assertTrue(taskDirectory.isDirectory()); } + @Test + public void shouldLockTaskStateDirectory() throws IOException { + final TaskId taskId = new TaskId(0, 0); + + directory.lock(taskId); + + try ( + final FileChannel channel = FileChannel.open( + new File(appDir, taskId+StateDirectory.LOCK_FILE_NAME).toPath(), + StandardOpenOption.CREATE, StandardOpenOption.WRITE) + ) { + channel.tryLock(); + fail("shouldn't be able to lock already locked directory"); + } catch (final OverlappingFileLockException e) { + // pass + } finally { + directory.unlock(taskId); + } + } + @Test public void shouldBeTrueIfAlreadyHoldsLock() throws IOException { final TaskId taskId = new TaskId(0, 0); @@ -176,14 +196,14 @@ public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws directory.directoryForTask(new TaskId(2, 0)); List files = Arrays.asList(appDir.listFiles()); - assertEquals(1, files.size()); + assertEquals(3, files.size()); time.sleep(1000); directory.cleanRemovedTasks(0); files = Arrays.asList(directory.stateDir().listFiles()); // lock files have been cleaned - assertEquals(0, files.size()); + assertEquals(2, files.size()); } finally { directory.unlock(task0); directory.unlock(task1); From 0fd3096979cb32d6b1b44c8445bb90ab177520f6 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 4 Nov 2018 19:02:34 -0800 Subject: [PATCH 3/3] Address checkstyle --- .../kafka/streams/processor/internals/StateDirectoryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index 6d4d06c2ff239..c78ec50a4eb44 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -96,7 +96,7 @@ public void shouldLockTaskStateDirectory() throws IOException { try ( final FileChannel channel = FileChannel.open( - new File(appDir, taskId+StateDirectory.LOCK_FILE_NAME).toPath(), + new File(appDir, taskId + StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE) ) { channel.tryLock();