Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6647 KafkaStreams.cleanUp creates .lock file in directory it tries to clean #5650

Closed
wants to merge 3 commits into from

Conversation

tedyu
Copy link
Contributor

@tedyu tedyu commented Sep 14, 2018

Specify StandardOpenOption#DELETE_ON_CLOSE when creating the FileChannel.

Move lock file up one level.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax
Copy link
Member

mjsax commented Oct 3, 2018

@guozhangwang I was thinking about your compatibility concerns. Could we fix is with the following approach: we encode which lock structure to use in rebalance protocol (we can simply pump up the version) -- if at least one instance is on old version, we still use old locks -- after all instances are on new version, we switch from old lock files to new lock files (for this, code must hold old lock, get new lock, releases old lock).

Thoughts?


try (
final FileChannel channel = FileChannel.open(
new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove this test? Seems, we should update the FileChannel here to use the new lock file name?

@@ -196,15 +176,14 @@ public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws
directory.directoryForTask(new TaskId(2, 0));

List<File> files = Arrays.asList(appDir.listFiles());
assertEquals(3, files.size());
assertEquals(1, files.size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this? Shouldn't directory.lock(task0); and directory.lock(task1); have create a lock file each?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will dig some more on these tests once Guozhang confirms the plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was due to the specification of StandardOpenOption.DELETE_ON_CLOSE to FileChannel.open call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax
I want to get your opinion on whether StandardOpenOption.DELETE_ON_CLOSE should be kept in the PR.
This would affect how test is modified.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late reply. It was a little crazy the last weeks and I did not find time earlier.

I cannot remember why we want to add the DELETE_ON_CLOSE option? Can you refresh my mind?

Also, I am not sure why this option reduced the file count? I understand that the task directories are actually not created any longer, however, we moved both lock files up the hierarchy and thus the count should not change?

Also, did you see this older comment: #5650 (comment) For a clean upgrade path, addressing this issue is important.

@guozhangwang
Copy link
Contributor

@guozhangwang I was thinking about your compatibility concerns. Could we fix is with the following approach: we encode which lock structure to use in rebalance protocol (we can simply pump up the version) -- if at least one instance is on old version, we still use old locks -- after all instances are on new version, we switch from old lock files to new lock files (for this, code must hold old lock, get new lock, releases old lock).

Just to clarify, this decision is done at the leader side when assigning partitions right? If yes, that sounds good to me.

@mjsax
Copy link
Member

mjsax commented Oct 5, 2018

Yes, the consumer group leader collects all consumer versions, and downgrades via version probing if necessary.

@tedyu
Copy link
Contributor Author

tedyu commented Nov 5, 2018

I ran the failed tests from Java 11 locally which passed.

@tedyu
Copy link
Contributor Author

tedyu commented Nov 5, 2018

w.r.t. consumer group leader collecting consumer versions, a bit more pointer is appreciated.

Thanks

@tedyu
Copy link
Contributor Author

tedyu commented Nov 5, 2018

The failed tests in jdk8 run were not related to PR.

@mjsax
Copy link
Member

mjsax commented Nov 6, 2018

The rebalance version thing is basically based on KIP-268 https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade

We can exploit this, by bumping the version number to indicate a change (ie, we need to dump the number in SubscriptionInfo and AssignmentInfo and update StreamsPartitionAssignor#assign() accordingly, so handles the new version). The actual metadata must not be changed for this. With version probing, the AssignmentInfo will contain the used metadata version, and as long as we received a downgraded metadata version number, we use the old locking scheme. Only, if we receive the new metadata version, we switch to the new logging scheme. Thus, StreamsPartitionAssignor#onAssignment() must evaluate the metadata version and provide this information to the StateDirectory class (I guess a boolean flag, "useOldLogging" should be sufficient).

Does this make sense? I can provide more details if necessary. Hope it's good starting point for you to dig into it.

@tedyu
Copy link
Contributor Author

tedyu commented Nov 17, 2018

Thanks for the hint.
Looking at StreamsPartitionAssignor#onAssignment() , I am trying to find how the locking scheme specification can be passed to StateDirectory class.

@tedyu
Copy link
Contributor Author

tedyu commented Nov 17, 2018

Currently taskManager#taskCreator is not accessible to onAssignment method.
Should a getter of stateDirectory be added to TaskManager for passing lock scheme (stateDirectory is a field of AbstractTaskCreator) ?

@tedyu
Copy link
Contributor Author

tedyu commented Nov 17, 2018

StateDirectory is constructed and passed to StreamThread.
If lock method is called before assignment, only the old lock scheme can be used, right ?

@mjsax
Copy link
Member

mjsax commented Nov 20, 2018

If lock method is called before assignment, only the old lock scheme can be used, right ?

I think this should never happen, because we only lock a task directory after the task was assigned. Maybe we can put a guard to avoid bug with StateDirectory: initialize a variable that tracks the locking schema to "UNKNOWN" and set to concrete locking schema in StreamsPartitionAssignor#onAssignment()-- if lock is called when schema is UNKNOWN, we throw an IllegalStateException.

For the first question, it seems ok to me to add a method to taskManager to set the locking schema and the taskmanager can "forward" it -- it would not pass the StateDirectory into StreamsPartitionAssignor#onAssignment().

Does this help?

@jukkakarvanen
Copy link
Contributor

@mjsax , @bbejeck

I have been checking this KAFKA-6647 with two locking policy approach.
And I have some draft version available:
trunk...jukkakarvanen:KAFKA-6647_MultipleLockings

Some questions:
There seems to be one shared StateDirectory per KafkaStreams. So multiple threads and tasks sharing it.

So can this useOldLocking field be one per StateDirectory or should it be set by each TaskId or StreamThread?

When this locking of StateDirectory is used?
I found it in AbstractTask registerStateStores where it locks the task and closeStateManager to unlock it and in task removal in StateDirectory. Based on Matthias above comment this onAssignment should always happen before locking, so registerStateStores or removal. In my current version, I made this useOldLocking as map of taskId and there are logic throw error if locking policy not set for task.
This revealed one failing scenario at least in ResetIntegrationTest where KafkaStreams is created twice and the cleanup on second KafkaStreams is trying to remove and that way lock task based on directory structures which locking policy is not set.

If the rebalance is happening and there is new onAssignment, is this closeStateManager called before it or can the task be already locked?
This is related to do we need to migrate new locks to old locks and which of the locks need to be migrate at the same time? And is there need to migrate old locks to new locks?
What are the scenarios where these migrations need to happen?

  • New lock to old lock: Application with old Stream API connected
  • Old lock to new lock: Applications with old Stream API disconnected

Related to AssignmentInfo and SubscriptionInfo:
Is there possibility that some of the assignments are handled by only new apps and that way getting newer version and different locking than with those shared with old and new applications?

@mjsax
Copy link
Member

mjsax commented Dec 29, 2022

Closing this stale PR. The corresponding Jira ticket is marked as "resolved" already.

@mjsax mjsax closed this Dec 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants