-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature delete consumergroups #2040
Feature delete consumergroups #2040
Conversation
@@ -341,6 +343,30 @@ def _find_coordinator_id(self, group_id): | |||
response = future.value | |||
return self._find_coordinator_id_process_response(response) | |||
|
|||
def _find_many_coordinator_ids(self, group_ids): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Java have this method? When I wrote a bunch of this code two years ago I don't recall seeing anything that handled multiple group_ids at once, but I haven't been spelunking in their code for a while...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know, this one is mostly copy pasted from the method above. I went through https://kafka.apache.org/protocol but it doesn't look like there is an API that can provide the coordinator id for more than one group. Not even the DescribeGroups API returns the coordinators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this more, I certainly think it'd be convenient to have this method... we could also use it within the describe_consumer_groups()
method to emit all the group coordinator requests in parallel: #2124
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@jeffwidman I ran this on a cluster with 4 nodes and it deleted 3k groups without an issue... I suppose coordinator discovery works well. |
ping @dpkp @jeffwidman |
test/test_admin_integration.py
Outdated
consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} | ||
assert "test2-group1" not in consumergroups | ||
assert "test2-group2" in consumergroups | ||
assert "test2-group3" not in consumergroups |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing newline at end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Btw. an editorconfig and/or pre-commit config could help with this 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call.. PR welcome: #2125
results.extend(self._convert_delete_groups_response(f.value)) | ||
return results | ||
|
||
def _convert_delete_groups_response(self, response): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is outside the scope of this PR, but this client code is a little inconsistent currently between _convert_X(response)
and _X_process_response(response)
... it'd be nice to make the naming more consistent... but I took a quick look and it's IMO not worth fixing as how they are used varies slightly... somethings are converting things into requests, some are parsing responses, and some are doing pre/post conversion of one type into another...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I also thought about this one a while... In the end my reasoning was something like this: the convert
methods seem to have little to no logic and really do only conversions. The process
methods involve more computation. So I took the former naming.
@@ -341,6 +343,30 @@ def _find_coordinator_id(self, group_id): | |||
response = future.value | |||
return self._find_coordinator_id_process_response(response) | |||
|
|||
def _find_many_coordinator_ids(self, group_ids): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this more, I certainly think it'd be convenient to have this method... we could also use it within the describe_consumer_groups()
method to emit all the group coordinator requests in parallel: #2124
I am fine with leaving as-is... agreed that doing multiple brokers is a lot of extra work. It's probably better done as a |
That's really tough... in Here I'm hesitant to raise since everything is sent async so some groups may complete and others may fail... it is idempotent to simply re-run, but if some will always fail with errors like "group unknown/ doesn't exist" etc, then you want to know what happened to the others in the list... did they delete successfully? So I'd probably lean (slightly) toward just returning the error codes and letting the caller inspect to see which completed and which failed. |
@swenzel can you rebase to fix conflicts? Take a look at my feedback above... the comment and newline are nits but if you're already rebasing be nice to fix these. |
Update fork
6d78133
to
2f678ae
Compare
Thanks @swenzel! |
* Add consumergroup related errors * Add DeleteGroups to protocol.admin * Implement delete_groups feature on KafkaAdminClient
I didn't find any issue requesting this, but I could use it so I just went ahead and implemented it. It's kind of a work in progress, just wanted to get it out for early feedback.
Tests are running fine and it looks good but what's still missing is testing if group coordinator discovery works properly when there are different brokers.
Also I'm not quite sure how to handle errors. Right now they are ignored and it's left for the caller to inspect the result but we could also just raise any errors.
WDYT?
This change is