Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6647 KafkaStreams.cleanUp creates .lock file in directory it tries to clean #4702

Closed
wants to merge 5 commits into from

Conversation

tedyu
Copy link
Contributor

@tedyu tedyu commented Mar 13, 2018

Specify StandardOpenOption#DELETE_ON_CLOSE when creating the FileChannel.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@gbloggs
Copy link

gbloggs commented Mar 13, 2018

Whilst this may be a nice to have I remain unconvinced that this will fix the issue.
Looking at clean() in the StateDirectory, it catches any IOExceptions thrown from Utils.delete(). Utils.delete in postVisitDirectory will throw java.nio.file.DirectoryNotEmptyException so a new StreamsException(e) will be thrown

public synchronized void clean() {
        try {
            cleanRemovedTasks(0, true);
        } catch (final Exception e) {
            // this is already logged within cleanRemovedTasks
            throw new StreamsException(e);
        }
        try {
            Utils.delete(globalStateDir().getAbsoluteFile());
        } catch (final IOException e) {
            log.error("{} Failed to delete global state directory due to an unexpected exception", logPrefix(), e);
            throw new StreamsException(e);
        }
    }

@mjsax mjsax added the streams label Mar 14, 2018
@mjsax mjsax requested review from dguy, mjsax and guozhangwang March 14, 2018 05:40
@mjsax
Copy link
Member

mjsax commented Mar 14, 2018

\cc @bbejeck @vvcephei

@@ -138,7 +138,7 @@ synchronized boolean lock(final TaskId taskId) throws IOException {
}

try {
lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
lockFile = new File(stateDir, taskId + LOCK_FILE_NAME);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the directoryForTask method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The location of the lock file is lifted one level up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion on #4713 I think this idea should actually work. Nit: Can we rename the lock to LOCK_FILE_NAME + " -" + taskId though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can rename the lock if people think we can continue with this solution.

@bbejeck
Copy link
Member

bbejeck commented Mar 14, 2018

Left one minor comment. Plus do we need to add a new test?

@tedyu
Copy link
Contributor Author

tedyu commented Mar 14, 2018

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a comment on https://issues.apache.org/jira/browse/KAFKA-6647 ticket itself.

@@ -88,27 +89,6 @@ public void shouldCreateTaskStateDirectory() {
assertTrue(taskDirectory.isDirectory());
}

@Test
public void shouldLockTaskStateDirectory() throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

directory.lock(taskId) is at different level than channel.tryLock() , rendering the test useless.

@tedyu
Copy link
Contributor Author

tedyu commented Mar 19, 2018

Guozhang has different opinion on the approach.
Waiting for discussion to conclude before taking further action.

@gartho
Copy link

gartho commented Jun 18, 2018

Any update on this issue?

@guozhangwang
Copy link
Contributor

@gartho We are still pondering what is the best fix for this issue across different file systems. What's your use case and is this a blocker issue for your app?

@gartho
Copy link

gartho commented Jun 19, 2018

I’m coming across this issue during integration level testing. I’ve decided to resolve it for now by clearing out the state store folder before I run my tests so it’s not blocking me at present.

@philippkamps
Copy link

Any updates?

@guozhangwang
Copy link
Contributor

@gartho @philippkamps Sorry for being late on this PR. @tedyu I'd suggest follow the option 2) I've proposed in the JIRA ticket, i.e.

Just another note, for locking purposes, when the apps shutdown cleanly, it is OK to have the lock file left in the state directory since threads are excluding each other not via the ownership of the file but via locking the file handle. So for your case, if you indeed want to clean up the whole directory upon shutting down, then I think there is still a valid point to close all the file channels upon shutting down. For which we can consider:

1) either use StandardOpenOption.DELETE_ON_CLOSE as you did in the PR.
2) or add a new function in the state directory class that closes all the managed file channels upon KafkaStreams.close(); which may be safer than 1) since StandardOpenOption.DELETE_ON_CLOSE is best-effort.

WDYT?

@pgrabarsky
Copy link

+1

@mjsax
Copy link
Member

mjsax commented Sep 7, 2018

@tedyu What is the status of this PR?

@tedyu
Copy link
Contributor Author

tedyu commented Sep 7, 2018

From @guozhangwang 's comment on the JIRA, I thought he acknowledged that lifting the lock up one dir would solve the problem for Windows environment.

@mjsax
Copy link
Member

mjsax commented Sep 7, 2018

I just lost the overview :) Last comment from @guozhangwang (#4702 (comment)) seems to indicate that he wants some updates to the PR. Or is this already contained and the PR is ready for reviewing?

Maybe you can rebase the PR to resolve the merge conflict first? Thanks a lot!

@tedyu
Copy link
Contributor Author

tedyu commented Sep 7, 2018

Can we discuss how to achieve compatibility with existing locking structure first ?

Thanks

@mjsax
Copy link
Member

mjsax commented Sep 7, 2018

Sure. However, I am not exactly sure what you mean? Can you elaborate?

@tedyu
Copy link
Contributor Author

tedyu commented Sep 7, 2018

I meant that, after the move of the lock file, is there any compatibility issue we need to consider.

@tedyu
Copy link
Contributor Author

tedyu commented Sep 9, 2018

I didn't find the local repo for the original PR.

Need to open a new one if the approach in current PR is confirmed.

@mjsax
Copy link
Member

mjsax commented Sep 13, 2018

@tedyu Sorry for late reply.

I meant that, after the move of the lock file, is there any compatibility issue we need to consider.

I don't think so. If an app is shut down, it should free all locks. On startup, it can just use the new locking strategy. Or do I miss anything? It would only be an issue, if multiple thread (or different versions) would operate in parallel -- but this case should be be possible on a single machine and thus we should be fine?

@tedyu
Copy link
Contributor Author

tedyu commented Sep 13, 2018

bq. but this case should be be possible on a single machine

I guess you meant 'should not be'

@tedyu tedyu closed this Sep 14, 2018
@tedyu
Copy link
Contributor Author

tedyu commented Sep 14, 2018

Created #5650

@guozhangwang
Copy link
Contributor

@tedyu @mjsax My concern about compatibility is that if we have multiple instances of Streams running on the same machine, say A and B, where A shuts down and upgrades to a new version, it will then try to create the lock file in a different directory (one layer up) while B is still holding locks on the old directory, is it possible that both instances think they now own the task's directory and move on to modify the state (e.g. one still owning the task while the other moves in and "cleanup" the state)?

@mjsax
Copy link
Member

mjsax commented Sep 14, 2018

If you run two KafkaStreams instanced on one machine, you need to configure it with two different state.dir -- thus, both should be isolated and the case you describe cannot happen. Or am I wrong with the assumption about different state.dirs ? IIRC, we consider using the same state.dir a configuration error.

@bbejeck
Copy link
Member

bbejeck commented Sep 14, 2018

I just started up a small example app that merely created the state.dir. I ran two instances of the application.

In one case I used the same app-id, and both instances started up fine. It makes sense as the different instances will end up writing to different task-id directories.

In the other case, I changed the app-id of the second instance and a new directory state.dir/app-id/<task-directories>.

So I think we need more testing to cover the scenario @guozhangwang outlined above. But it wouldn't hurt to follow @mjsax suggestion to configure with different state.dir entries, but it would be easy to forget to do so.

@guozhangwang
Copy link
Contributor

Today we do not actually enforce different state.dir configs for different instances: imagine if Streams is specified via K8s, where the config values are wired in and are the same. K8s may allocate containers on the same machine.

Thoughts @vvcephei @bbejeck @tedyu @dguy ?

@tedyu
Copy link
Contributor Author

tedyu commented Sep 15, 2018

I think for K8s deployment, the state directory should be customized for each streaming app.

@bhavesh3184
Copy link

@tedyu What is the status of this PR?

@tedyu
Copy link
Contributor Author

tedyu commented Sep 27, 2018

See the above comment from Guozhang.
It seems we should consider the case where state.dir is shared between streaming apps

@mjsax
Copy link
Member

mjsax commented Oct 3, 2018

Can we move the discussion to the new PR? Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants