Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6647 KafkaStreams.cleanUp creates .lock file in directory it tries to clean #5650

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ public File directoryForTask(final TaskId taskId) {
return taskDir;
}

File stateDir() {
return stateDir;
}

/**
* Get or create the directory for the global stores.
* @return directory for the global stores
Expand Down Expand Up @@ -141,7 +145,7 @@ synchronized boolean lock(final TaskId taskId) throws IOException {
}

try {
lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
lockFile = new File(stateDir, taskId + LOCK_FILE_NAME);
} catch (final ProcessorStateException e) {
// directoryForTask could be throwing an exception if another thread
// has concurrently deleted the directory
Expand Down Expand Up @@ -331,7 +335,8 @@ public boolean accept(final File pathname) {
private FileChannel getOrCreateFileChannel(final TaskId taskId,
final Path lockPath) throws IOException {
if (!channels.containsKey(taskId)) {
channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE));
channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE,
StandardOpenOption.WRITE, StandardOpenOption.DELETE_ON_CLOSE));
}
return channels.get(taskId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,27 +88,6 @@ public void shouldCreateTaskStateDirectory() {
assertTrue(taskDirectory.isDirectory());
}

@Test
public void shouldLockTaskStateDirectory() throws IOException {
final TaskId taskId = new TaskId(0, 0);
final File taskDirectory = directory.directoryForTask(taskId);

directory.lock(taskId);

try (
final FileChannel channel = FileChannel.open(
new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove this test? Seems, we should update the FileChannel here to use the new lock file name?

StandardOpenOption.CREATE, StandardOpenOption.WRITE)
) {
channel.tryLock();
fail("shouldn't be able to lock already locked directory");
} catch (final OverlappingFileLockException e) {
// pass
} finally {
directory.unlock(taskId);
}
}

@Test
public void shouldBeTrueIfAlreadyHoldsLock() throws IOException {
final TaskId taskId = new TaskId(0, 0);
Expand Down Expand Up @@ -140,17 +119,18 @@ public void shouldNotLockDeletedDirectory() throws IOException {
@Test
public void shouldLockMulitpleTaskDirectories() throws IOException {
final TaskId taskId = new TaskId(0, 0);
final File task1Dir = directory.directoryForTask(taskId);
final File task1Dir = directory.stateDir();
final TaskId taskId2 = new TaskId(1, 0);
final File task2Dir = directory.directoryForTask(taskId2);
final File task2Dir = directory.stateDir();


try (
final FileChannel channel1 = FileChannel.open(
new File(task1Dir, StateDirectory.LOCK_FILE_NAME).toPath(),
new File(task1Dir, taskId + StateDirectory.LOCK_FILE_NAME).toPath(),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE);
final FileChannel channel2 = FileChannel.open(new File(task2Dir, StateDirectory.LOCK_FILE_NAME).toPath(),
final FileChannel channel2 = FileChannel.open(
new File(task2Dir, taskId2 + StateDirectory.LOCK_FILE_NAME).toPath(),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE)
) {
Expand Down Expand Up @@ -196,15 +176,14 @@ public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws
directory.directoryForTask(new TaskId(2, 0));

List<File> files = Arrays.asList(appDir.listFiles());
assertEquals(3, files.size());
assertEquals(1, files.size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this? Shouldn't directory.lock(task0); and directory.lock(task1); have create a lock file each?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will dig some more on these tests once Guozhang confirms the plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was due to the specification of StandardOpenOption.DELETE_ON_CLOSE to FileChannel.open call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax
I want to get your opinion on whether StandardOpenOption.DELETE_ON_CLOSE should be kept in the PR.
This would affect how test is modified.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late reply. It was a little crazy the last weeks and I did not find time earlier.

I cannot remember why we want to add the DELETE_ON_CLOSE option? Can you refresh my mind?

Also, I am not sure why this option reduced the file count? I understand that the task directories are actually not created any longer, however, we moved both lock files up the hierarchy and thus the count should not change?

Also, did you see this older comment: #5650 (comment) For a clean upgrade path, addressing this issue is important.


time.sleep(1000);
directory.cleanRemovedTasks(0);

files = Arrays.asList(appDir.listFiles());
assertEquals(2, files.size());
assertTrue(files.contains(new File(appDir, task0.toString())));
assertTrue(files.contains(new File(appDir, task1.toString())));
files = Arrays.asList(directory.stateDir().listFiles());
// lock files have been cleaned
assertEquals(0, files.size());
} finally {
directory.unlock(task0);
directory.unlock(task1);
Expand Down Expand Up @@ -358,4 +337,4 @@ public void shouldCleanupAllTaskDirectoriesIncludingGlobalOne() {
files = Arrays.asList(appDir.listFiles());
assertEquals(0, files.size());
}
}
}