Skip to content

Commit 06f6989

Browse files
committed
Add list_consumer_group_offsets()
Support fetching the offsets of a consumer group.
1 parent 2ab6cbf commit 06f6989

File tree

2 files changed

+77
-1
lines changed

2 files changed

+77
-1
lines changed

kafka/admin/kafka.py

+76-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
from __future__ import absolute_import
22

3+
from collections import defaultdict
34
import copy
45
import logging
56
import socket
7+
8+
from kafka.vendor import six
9+
610
from kafka.client_async import KafkaClient, selectors
711
import kafka.errors as Errors
812
from kafka.errors import (
@@ -12,8 +16,9 @@
1216
from kafka.protocol.admin import (
1317
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
1418
ListGroupsRequest, DescribeGroupsRequest)
15-
from kafka.protocol.commit import GroupCoordinatorRequest
19+
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
1620
from kafka.protocol.metadata import MetadataRequest
21+
from kafka.structs import TopicPartition, OffsetAndMetadata
1722
from kafka.version import __version__
1823

1924
log = logging.getLogger(__name__)
@@ -595,5 +600,75 @@ def list_consumer_groups(self):
595600
# TODO this is completely broken, as it needs to send to the group coordinator
596601
# return self._send(request)
597602

603+
def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
604+
partitions=None):
605+
"""Fetch Consumer Group Offsets.
606+
607+
Note:
608+
This does not verify that the group_id or partitions actually exist
609+
in the cluster.
610+
611+
As soon as any error is encountered, it is immediately raised.
612+
613+
:param group_id: The consumer group id name for which to fetch offsets.
614+
:param group_coordinator_id: The node_id of the group's coordinator
615+
broker. If set to None, will query the cluster to find the group
616+
coordinator. Explicitly specifying this can be useful to prevent
617+
that extra network round trip if you already know the group
618+
coordinator. Default: None.
619+
:param partitions: A list of TopicPartitions for which to fetch
620+
offsets. On brokers >= 0.10.2, this can be set to None to fetch all
621+
known offsets for the consumer group. Default: None.
622+
:return dictionary: A dictionary with TopicPartition keys and
623+
OffsetAndMetada values. Partitions that are not specified and for
624+
which the group_id does not have a recorded offset are omitted. An
625+
offset value of `-1` indicates the group_id has no offset for that
626+
TopicPartition. A `-1` can only happen for partitions that are
627+
explicitly specified.
628+
"""
629+
group_offsets_listing = {}
630+
if group_coordinator_id is None:
631+
group_coordinator_id = self._find_group_coordinator_id(group_id)
632+
version = self._matching_api_version(OffsetFetchRequest)
633+
if version <= 3:
634+
if partitions is None:
635+
if version <= 1:
636+
raise ValueError(
637+
"""OffsetFetchRequest_v{} requires specifying the
638+
partitions for which to fetch offsets. Omitting the
639+
partitions is only supported on brokers >= 0.10.2.
640+
For details, see KIP-88.""".format(version))
641+
topics_partitions = None
642+
else:
643+
# transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])]
644+
topics_partitions_dict = defaultdict(set)
645+
for topic, partition in partitions:
646+
topics_partitions_dict[topic].add(partition)
647+
topics_partitions = list(six.iteritems(topics_partitions_dict))
648+
request = OffsetFetchRequest[version](group_id, topics_partitions)
649+
response = self._send_request_to_node(group_coordinator_id, request)
650+
if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code
651+
error_type = Errors.for_code(response.error_code)
652+
if error_type is not Errors.NoError:
653+
# optionally we could retry if error_type.retriable
654+
raise error_type(
655+
"Request '{}' failed with response '{}'."
656+
.format(request, response))
657+
# transform response into a dictionary with TopicPartition keys and
658+
# OffsetAndMetada values--this is what the Java AdminClient returns
659+
for topic, partitions in response.topics:
660+
for partition, offset, metadata, error_code in partitions:
661+
error_type = Errors.for_code(error_code)
662+
if error_type is not Errors.NoError:
663+
raise error_type(
664+
"Unable to fetch offsets for group_id {}, topic {}, partition {}"
665+
.format(group_id, topic, partition))
666+
group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
667+
else:
668+
raise NotImplementedError(
669+
"Support for OffsetFetch v{} has not yet been added to KafkaAdmin."
670+
.format(version))
671+
return group_offsets_listing
672+
598673
# delete groups protocol not yet implemented
599674
# Note: send the request to the group's coordinator.

kafka/structs.py

+1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
["topic", "partition", "leader", "replicas", "isr", "error"])
7373

7474
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
75+
# TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata)
7576
["offset", "metadata"])
7677

7778
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",

0 commit comments

Comments
 (0)