diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index a3227ca0c7d5b..b1341baffaf88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -103,6 +103,10 @@ public File directoryForTask(final TaskId taskId) { return taskDir; } + File stateDir() { + return stateDir; + } + /** * Get or create the directory for the global stores. * @return directory for the global stores @@ -141,7 +145,7 @@ synchronized boolean lock(final TaskId taskId) throws IOException { } try { - lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); + lockFile = new File(stateDir, taskId + LOCK_FILE_NAME); } catch (final ProcessorStateException e) { // directoryForTask could be throwing an exception if another thread // has concurrently deleted the directory @@ -222,6 +226,10 @@ synchronized void unlock(final TaskId taskId) throws IOException { final FileChannel fileChannel = channels.remove(taskId); if (fileChannel != null) { fileChannel.close(); + final File lockFile = new File(stateDir, taskId + LOCK_FILE_NAME); + if (!lockFile.delete()) { + log.debug("{} was not deleted", lockFile.toString()); + } } } } @@ -331,7 +339,8 @@ public boolean accept(final File pathname) { private FileChannel getOrCreateFileChannel(final TaskId taskId, final Path lockPath) throws IOException { if (!channels.containsKey(taskId)) { - channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)); + channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, + StandardOpenOption.WRITE)); } return channels.get(taskId); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index e14d01077ac9d..c78ec50a4eb44 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -91,13 +91,12 @@ public void shouldCreateTaskStateDirectory() { @Test public void shouldLockTaskStateDirectory() throws IOException { final TaskId taskId = new TaskId(0, 0); - final File taskDirectory = directory.directoryForTask(taskId); directory.lock(taskId); try ( final FileChannel channel = FileChannel.open( - new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(), + new File(appDir, taskId + StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE) ) { channel.tryLock(); @@ -140,17 +139,18 @@ public void shouldNotLockDeletedDirectory() throws IOException { @Test public void shouldLockMulitpleTaskDirectories() throws IOException { final TaskId taskId = new TaskId(0, 0); - final File task1Dir = directory.directoryForTask(taskId); + final File task1Dir = directory.stateDir(); final TaskId taskId2 = new TaskId(1, 0); - final File task2Dir = directory.directoryForTask(taskId2); + final File task2Dir = directory.stateDir(); try ( final FileChannel channel1 = FileChannel.open( - new File(task1Dir, StateDirectory.LOCK_FILE_NAME).toPath(), + new File(task1Dir, taskId + StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); - final FileChannel channel2 = FileChannel.open(new File(task2Dir, StateDirectory.LOCK_FILE_NAME).toPath(), + final FileChannel channel2 = FileChannel.open( + new File(task2Dir, taskId2 + StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE) ) { @@ -201,10 +201,9 @@ public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws time.sleep(1000); directory.cleanRemovedTasks(0); - files = Arrays.asList(appDir.listFiles()); + files = Arrays.asList(directory.stateDir().listFiles()); + // lock files have been cleaned assertEquals(2, files.size()); - assertTrue(files.contains(new File(appDir, task0.toString()))); - assertTrue(files.contains(new File(appDir, task1.toString()))); } finally { directory.unlock(task0); directory.unlock(task1); @@ -358,4 +357,4 @@ public void shouldCleanupAllTaskDirectoriesIncludingGlobalOne() { files = Arrays.asList(appDir.listFiles()); assertEquals(0, files.size()); } -} \ No newline at end of file +}