Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add group coordinator lookup #1641

Merged
merged 1 commit into from
Nov 18, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions kafka/admin/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import logging
import socket
from kafka.client_async import KafkaClient, selectors
import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
NodeNotReadyError, NotControllerError)
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest)
from kafka.protocol.commit import GroupCoordinatorRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.version import __version__

Expand Down Expand Up @@ -243,6 +245,44 @@ def _refresh_controller_id(self):
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
.format(version))

def _find_group_coordinator_id(self, group_id):
"""Find the broker node_id of the coordinator of the given group.

Sends a FindCoordinatorRequest message to the cluster. Will block until
the FindCoordinatorResponse is received. Any errors are immediately
raised.

:param group_id: The consumer group ID. This is typically the group
name as a string.
:return: The node_id of the broker that is the coordinator.
"""
# Note: Java may change how this is implemented in KAFKA-6791.
#
# TODO add support for dynamically picking version of
# GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
# When I experimented with this, GroupCoordinatorResponse_v1 didn't
# match GroupCoordinatorResponse_v0 and I couldn't figure out why.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that v1 adds the idea of a group type. All groups I have seen are consumers, but I think Kafka Streams may use group coordination for producers or even other abstract groups. In any event, we'd need to add a parameter for group type I believe.

Copy link
Collaborator Author

@jeffwidman jeffwidman Nov 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good thought.

I'm going to merge this for now and will add that as part of switching it to a FindCoordinatorRequest (which is a bigger change of renaming things across the client/consumer code). Added it to #1630 (comment)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm, TransactionCoordinator is looked up by that too)

gc_request = GroupCoordinatorRequest[0](group_id)
gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request)
# use the extra error checking in add_group_coordinator() rather than
# immediately returning the group coordinator.
success = self._client.cluster.add_group_coordinator(group_id, gc_response)
if not success:
error_type = Errors.for_code(gc_response.error_code)
assert error_type is not Errors.NoError
# Note: When error_type.retriable, Java will retry... see
# KafkaAdminClient's handleFindCoordinatorError method
raise error_type(
"Could not identify group coordinator for group_id '{}' from response '{}'."
.format(group_id, gc_response))
group_coordinator = self._client.cluster.coordinator_for_group(group_id)
# will be None if the coordinator was never populated, which should never happen here
assert group_coordinator is not None
# will be -1 if add_group_coordinator() failed... but by this point the
# error should have been raised.
assert group_coordinator != -1
return group_coordinator

def _send_request_to_node(self, node, request):
"""Send a kafka protocol message to a specific broker. Will block until the message result is received.

Expand Down