Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6647 KafkaStreams.cleanUp creates .lock file in directory it tries to clean #4702

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ File directoryForTask(final TaskId taskId) {
return taskDir;
}

File stateDir() {
return stateDir;
}

/**
* Get or create the directory for the global stores.
* @return directory for the global stores
Expand Down Expand Up @@ -138,7 +142,7 @@ synchronized boolean lock(final TaskId taskId) throws IOException {
}

try {
lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
lockFile = new File(stateDir, taskId + LOCK_FILE_NAME);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the directoryForTask method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The location of the lock file is lifted one level up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion on #4713 I think this idea should actually work. Nit: Can we rename the lock to LOCK_FILE_NAME + " -" + taskId though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can rename the lock if people think we can continue with this solution.

} catch (ProcessorStateException e) {
// directoryForTask could be throwing an exception if another thread
// has concurrently deleted the directory
Expand Down Expand Up @@ -329,7 +333,8 @@ public boolean accept(final File pathname) {
private FileChannel getOrCreateFileChannel(final TaskId taskId,
final Path lockPath) throws IOException {
if (!channels.containsKey(taskId)) {
channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE));
channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE,
StandardOpenOption.WRITE, StandardOpenOption.DELETE_ON_CLOSE));
}
return channels.get(taskId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
Expand Down Expand Up @@ -88,27 +89,6 @@ public void shouldCreateTaskStateDirectory() {
assertTrue(taskDirectory.isDirectory());
}

@Test
public void shouldLockTaskStateDirectory() throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

directory.lock(taskId) is at different level than channel.tryLock() , rendering the test useless.

final TaskId taskId = new TaskId(0, 0);
final File taskDirectory = directory.directoryForTask(taskId);

directory.lock(taskId);

try (
final FileChannel channel = FileChannel.open(
new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(),
StandardOpenOption.CREATE, StandardOpenOption.WRITE)
) {
channel.tryLock();
fail("shouldn't be able to lock already locked directory");
} catch (final OverlappingFileLockException e) {
// pass
} finally {
directory.unlock(taskId);
}
}

@Test
public void shouldBeTrueIfAlreadyHoldsLock() throws IOException {
final TaskId taskId = new TaskId(0, 0);
Expand Down Expand Up @@ -140,17 +120,18 @@ public void shouldNotLockDeletedDirectory() throws IOException {
@Test
public void shouldLockMulitpleTaskDirectories() throws IOException {
final TaskId taskId = new TaskId(0, 0);
final File task1Dir = directory.directoryForTask(taskId);
final File task1Dir = directory.stateDir();
final TaskId taskId2 = new TaskId(1, 0);
final File task2Dir = directory.directoryForTask(taskId2);
final File task2Dir = directory.stateDir();


try (
final FileChannel channel1 = FileChannel.open(
new File(task1Dir, StateDirectory.LOCK_FILE_NAME).toPath(),
new File(task1Dir, taskId + StateDirectory.LOCK_FILE_NAME).toPath(),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE);
final FileChannel channel2 = FileChannel.open(new File(task2Dir, StateDirectory.LOCK_FILE_NAME).toPath(),
final FileChannel channel2 = FileChannel.open(
new File(task2Dir, taskId2 + StateDirectory.LOCK_FILE_NAME).toPath(),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE)
) {
Expand Down Expand Up @@ -196,15 +177,14 @@ public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws
directory.directoryForTask(new TaskId(2, 0));

List<File> files = Arrays.asList(appDir.listFiles());
assertEquals(3, files.size());
assertEquals(1, files.size());

time.sleep(1000);
directory.cleanRemovedTasks(0);

files = Arrays.asList(appDir.listFiles());
assertEquals(2, files.size());
assertTrue(files.contains(new File(appDir, task0.toString())));
assertTrue(files.contains(new File(appDir, task1.toString())));
files = Arrays.asList(directory.stateDir().listFiles());
// lock files have been cleaned
assertEquals(0, files.size());
} finally {
directory.unlock(task0);
directory.unlock(task1);
Expand Down