Skip to content

Commit

Permalink
[KIP-848 EA] Admin API for listing consumer groups now has (#1830)
Browse files Browse the repository at this point in the history
an optional filter to return only groups of given types


Co-authored-by: mahajanadhitya <[email protected]>
  • Loading branch information
emasab and mahajanadhitya authored Oct 10, 2024
1 parent 979343a commit 645b0d5
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 16 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Confluent's Python client for Apache Kafka

## v2.6.0

v2.6.0 is a feature release with the following features, fixes and enhancements:

- [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): Admin API for listing consumer groups now has an optional filter to return only groups of given types (#1830).

confluent-kafka-python is based on librdkafka v2.6.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.


## v2.5.3

v2.5.3 is a maintenance release with the following fixes and enhancements:
Expand Down
46 changes: 39 additions & 7 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
# Example use of AdminClient operations.

from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions,
TopicPartition, ConsumerGroupState, TopicCollection,
IsolationLevel, ElectionType)
TopicPartition, ConsumerGroupState,
TopicCollection, IsolationLevel,
ConsumerGroupType, ElectionType)
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
ConfigEntry, ConfigSource, AclBinding,
AclBindingFilter, ResourceType, ResourcePatternType,
Expand All @@ -30,6 +31,7 @@
import sys
import threading
import logging
import argparse

logging.basicConfig()

Expand Down Expand Up @@ -471,18 +473,47 @@ def example_list(a, args):
print("id {} client_id: {} client_host: {}".format(m.id, m.client_id, m.client_host))


def parse_list_consumer_groups_args(args, states, types):
parser = argparse.ArgumentParser(prog='list_consumer_groups')
parser.add_argument('-states')
parser.add_argument('-types')
parsed_args = parser.parse_args(args)

def usage(message):
print(message)
parser.print_usage()
sys.exit(1)

if parsed_args.states:
for arg in parsed_args.states.split(","):
try:
states.add(ConsumerGroupState[arg])
except KeyError:
usage(f"Invalid state: {arg}")
if parsed_args.types:
for arg in parsed_args.types.split(","):
try:
types.add(ConsumerGroupType[arg])
except KeyError:
usage(f"Invalid type: {arg}")


def example_list_consumer_groups(a, args):
"""
List Consumer Groups
"""
states = {ConsumerGroupState[state] for state in args}
future = a.list_consumer_groups(request_timeout=10, states=states)

states = set()
types = set()
parse_list_consumer_groups_args(args, states, types)

future = a.list_consumer_groups(request_timeout=10, states=states, types=types)
try:
list_consumer_groups_result = future.result()
print("{} consumer groups".format(len(list_consumer_groups_result.valid)))
for valid in list_consumer_groups_result.valid:
print(" id: {} is_simple: {} state: {}".format(
valid.group_id, valid.is_simple_consumer_group, valid.state))
print(" id: {} is_simple: {} state: {} type: {}".format(
valid.group_id, valid.is_simple_consumer_group, valid.state, valid.type))
print("{} errors".format(len(list_consumer_groups_result.errors)))
for error in list_consumer_groups_result.errors:
print(" error: {}".format(error))
Expand Down Expand Up @@ -937,7 +968,8 @@ def example_elect_leaders(a, args):
sys.stderr.write(' delete_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
'<principal1> <host1> <operation1> <permission_type1> ..\n')
sys.stderr.write(' list [<all|topics|brokers|groups>]\n')
sys.stderr.write(' list_consumer_groups [<state1> <state2> ..]\n')
sys.stderr.write(' list_consumer_groups [-states <state1>,<state2>,..] ' +
'[-types <type1>,<type2>,..]\n')
sys.stderr.write(' describe_consumer_groups <include_authorized_operations> <group1> <group2> ..\n')
sys.stderr.write(' describe_topics <include_authorized_operations> <topic1> <topic2> ..\n')
sys.stderr.write(' describe_cluster <include_authorized_operations>\n')
Expand Down
4 changes: 3 additions & 1 deletion src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ._model import (Node, # noqa: F401
ConsumerGroupTopicPartitions,
ConsumerGroupState,
ConsumerGroupType,
TopicCollection,
TopicPartitionInfo,
IsolationLevel,
Expand Down Expand Up @@ -49,7 +50,8 @@
'Producer', 'DeserializingConsumer',
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid',
'ConsumerGroupTopicPartitions', 'ConsumerGroupState',
'ConsumerGroupType', 'Uuid',
'IsolationLevel', 'TopicCollection', 'TopicPartitionInfo']

__version__ = version()[0]
Expand Down
20 changes: 20 additions & 0 deletions src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,26 @@ def __lt__(self, other):
return self.value < other.value


class ConsumerGroupType(Enum):
"""
Enumerates the different types of Consumer Group Type.
Values:
-------
"""
#: Type is not known or not set
UNKNOWN = cimpl.CONSUMER_GROUP_TYPE_UNKNOWN
#: Consumer Type
CONSUMER = cimpl.CONSUMER_GROUP_TYPE_CONSUMER
#: Classic Type
CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC

def __lt__(self, other):
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value


class TopicCollection:
"""
Represents collection of topics in the form of different identifiers
Expand Down
13 changes: 13 additions & 0 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from ._records import DeletedRecords # noqa: F401

from .._model import (TopicCollection as _TopicCollection,
ConsumerGroupType as _ConsumerGroupType,
ElectionType as _ElectionType)

from ..cimpl import (KafkaException, # noqa: F401
Expand Down Expand Up @@ -898,6 +899,8 @@ def list_consumer_groups(self, **kwargs):
on broker, and response. Default: `socket.timeout.ms/1000.0`
:param set(ConsumerGroupState) states: only list consumer groups which are currently in
these states.
:param set(ConsumerGroupType) types: only list consumer groups of
these types.
:returns: a future. Result method of the future returns :class:`ListConsumerGroupsResult`.
Expand All @@ -917,6 +920,16 @@ def list_consumer_groups(self, **kwargs):
raise TypeError("All elements of states must be of type ConsumerGroupState")
kwargs["states_int"] = [state.value for state in states]
kwargs.pop("states")
if "types" in kwargs:
types = kwargs["types"]
if types is not None:
if not isinstance(types, set):
raise TypeError("'types' must be a set")
for type in types:
if not isinstance(type, _ConsumerGroupType):
raise TypeError("All elements of types must be of type ConsumerGroupType")
kwargs["types_int"] = [type.value for type in types]
kwargs.pop("types")

f, _ = AdminClient._make_futures([], None, AdminClient._make_list_consumer_groups_result)

Expand Down
8 changes: 6 additions & 2 deletions src/confluent_kafka/admin/_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


from .._util import ConversionUtil
from .._model import ConsumerGroupState
from .._model import ConsumerGroupState, ConsumerGroupType
from ._acl import AclOperation


Expand All @@ -31,13 +31,17 @@ class ConsumerGroupListing:
Whether a consumer group is simple or not.
state : ConsumerGroupState
Current state of the consumer group.
type : ConsumerGroupType
Type of the consumer group.
"""

def __init__(self, group_id, is_simple_consumer_group, state=None):
def __init__(self, group_id, is_simple_consumer_group, state=None, type=None):
self.group_id = group_id
self.is_simple_consumer_group = is_simple_consumer_group
if state is not None:
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
if type is not None:
self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType)


class ListConsumerGroupsResult:
Expand Down
59 changes: 54 additions & 5 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ struct Admin_options {
rd_kafka_IsolationLevel_t isolation_level;
rd_kafka_consumer_group_state_t* states;
int states_cnt;
rd_kafka_consumer_group_type_t* types;
int types_cnt;
};

/**@brief "unset" value initializers for Admin_options
Expand All @@ -96,6 +98,8 @@ struct Admin_options {
Admin_options_def_int, \
Admin_options_def_ptr, \
Admin_options_def_cnt, \
Admin_options_def_ptr, \
Admin_options_def_cnt, \
}

#define Admin_options_is_set_int(v) ((v) != Admin_options_def_int)
Expand Down Expand Up @@ -185,6 +189,13 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api,
goto err;
}

if (Admin_options_is_set_ptr(options->types) &&
(err_obj = rd_kafka_AdminOptions_set_match_consumer_group_types(
c_options, options->types, options->types_cnt))) {
snprintf(errstr, sizeof(errstr), "%s", rd_kafka_error_string(err_obj));
goto err;
}

return c_options;

err:
Expand Down Expand Up @@ -1698,24 +1709,28 @@ static const char Admin_delete_acls_doc[] = PyDoc_STR(
* @brief List consumer groups
*/
PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kwargs) {
PyObject *future, *states_int = NULL;
PyObject *future, *states_int = NULL, *types_int = NULL;
struct Admin_options options = Admin_options_INITIALIZER;
rd_kafka_AdminOptions_t *c_options = NULL;
CallState cs;
rd_kafka_queue_t *rkqu;
rd_kafka_consumer_group_state_t *c_states = NULL;
rd_kafka_consumer_group_type_t *c_types = NULL;
int states_cnt = 0;
int types_cnt = 0;
int i = 0;

static char *kws[] = {"future",
/* options */
"states_int",
"types_int",
"request_timeout",
NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|Of", kws,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OOf", kws,
&future,
&states_int,
&types_int,
&options.request_timeout)) {
goto err;
}
Expand All @@ -1736,7 +1751,7 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
PyObject *state = PyList_GET_ITEM(states_int, i);
if(!cfl_PyInt_Check(state)) {
PyErr_SetString(PyExc_ValueError,
"Element of states must be a valid state");
"Element of states must be valid states");
goto err;
}
c_states[i] = (rd_kafka_consumer_group_state_t) cfl_PyInt_AsInt(state);
Expand All @@ -1746,6 +1761,33 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
}
}

if(types_int != NULL && types_int != Py_None) {
if(!PyList_Check(types_int)) {
PyErr_SetString(PyExc_ValueError,
"types must of type list");
goto err;
}

types_cnt = (int)PyList_Size(types_int);

if(types_cnt > 0) {
c_types = (rd_kafka_consumer_group_type_t *)
malloc(types_cnt *
sizeof(rd_kafka_consumer_group_type_t));
for(i = 0 ; i < types_cnt ; i++) {
PyObject *type = PyList_GET_ITEM(types_int, i);
if(!cfl_PyInt_Check(type)) {
PyErr_SetString(PyExc_ValueError,
"Element of types must be valid group types");
goto err;
}
c_types[i] = (rd_kafka_consumer_group_type_t) cfl_PyInt_AsInt(type);
}
options.types = c_types;
options.types_cnt = types_cnt;
}
}

c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS,
&options, future);
if (!c_options) {
Expand Down Expand Up @@ -1774,22 +1816,27 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
if(c_states) {
free(c_states);
}
if(c_types) {
free(c_types);
}
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
rd_kafka_AdminOptions_destroy(c_options);

Py_RETURN_NONE;
err:
if(c_states) {
free(c_states);
}
if(c_types) {
free(c_types);
}
if (c_options) {
rd_kafka_AdminOptions_destroy(c_options);
Py_DECREF(future);
}
return NULL;
}
const char Admin_list_consumer_groups_doc[] = PyDoc_STR(
".. py:function:: list_consumer_groups(future, [states_int], [request_timeout])\n"
".. py:function:: list_consumer_groups(future, [states_int], [types_int], [request_timeout])\n"
"\n"
" List all the consumer groups.\n"
"\n"
Expand Down Expand Up @@ -3711,6 +3758,8 @@ static PyObject *Admin_c_ListConsumerGroupsResults_to_py(

cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupListing_state(c_valid_responses[i]));

cfl_PyDict_SetInt(kwargs, "type", rd_kafka_ConsumerGroupListing_type(c_valid_responses[i]));

args = PyTuple_New(0);

valid_result = PyObject_Call(ConsumerGroupListing_type, args, kwargs);
Expand Down
9 changes: 8 additions & 1 deletion src/confluent_kafka/src/AdminTypes.c
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,14 @@ static void AdminTypes_AddObjectsConsumerGroupStates (PyObject *m) {
PyModule_AddIntConstant(m, "CONSUMER_GROUP_STATE_EMPTY", RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY);
}

static void AdminTypes_AddObjectsConsumerGroupTypes (PyObject *m) {
/* rd_kafka_consumer_group_type_t */
PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_UNKNOWN", RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN);
PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CONSUMER", RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER);
PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CLASSIC", RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC);
}

static void AdminTypes_AddObjectsAlterConfigOpType (PyObject *m) {
/* rd_kafka_consumer_group_state_t */
PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_SET", RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET);
PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_DELETE", RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE);
PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_APPEND", RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND);
Expand Down Expand Up @@ -620,6 +626,7 @@ void AdminTypes_AddObjects (PyObject *m) {
AdminTypes_AddObjectsAclOperation(m);
AdminTypes_AddObjectsAclPermissionType(m);
AdminTypes_AddObjectsConsumerGroupStates(m);
AdminTypes_AddObjectsConsumerGroupTypes(m);
AdminTypes_AddObjectsAlterConfigOpType(m);
AdminTypes_AddObjectsScramMechanismType(m);
AdminTypes_AddObjectsIsolationLevel(m);
Expand Down

0 comments on commit 645b0d5

Please sign in to comment.