-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10199: Close pending active tasks to init on partitions lost #16545
Conversation
With enabled state updater tasks that are created but not initialized are stored in a set. In each poll iteration the stream thread drains that set, intializes the tasks, and adds them to the state updater. On partition lost, all active tasks are closed. This commit ensures that active tasks pending initialization in the set mentioned above are closed cleanly on partition lost.
if (stateUpdater != null) { | ||
final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new LinkedHashMap<>(); | ||
final Map<TaskId, RuntimeException> failedTasksDuringCleanClose = new HashMap<>(); | ||
final Set<Task> tasksToCloseClean = new HashSet<>(); | ||
final Set<Task> tasksToCloseClean = new HashSet<>(tasks.drainPendingActiveTasksToInit()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the actual fix.
@@ -107,6 +108,20 @@ public Set<Task> drainPendingTasksToInit() { | |||
return result; | |||
} | |||
|
|||
@Override | |||
public Set<Task> drainPendingActiveTasksToInit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Helper method for the fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Definitely a bug, and I can imagine how this will cause lots of lock exceptions.
The only failure we got in the last run was unrelated. |
…pache#16545) With enabled state updater tasks that are created but not initialized are stored in a set. In each poll iteration the stream thread drains that set, intializes the tasks, and adds them to the state updater. On partition lost, all active tasks are closed. This commit ensures that active tasks pending initialization in the set mentioned above are closed cleanly on partition lost.
…16545) (#16550) With enabled state updater tasks that are created but not initialized are stored in a set. In each poll iteration the stream thread drains that set, intializes the tasks, and adds them to the state updater. On partition lost, all active tasks are closed. This commit ensures that active tasks pending initialization in the set mentioned above are closed cleanly on partition lost. Reviewer: Lucas Brutschy <[email protected]>
With enabled state updater tasks that are created but not initialized are stored in a set. In each poll iteration the stream thread drains that set, intializes the tasks, and adds them to the state updater.
On partition lost, all active tasks are closed.
This commit ensures that active tasks pending initialization in the set mentioned above are closed cleanly on partition lost.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)