diff --git a/examples/.github/workflows/base.yml b/examples/.github/workflows/base.yml new file mode 100644 index 000000000..f405167e2 --- /dev/null +++ b/examples/.github/workflows/base.yml @@ -0,0 +1,17 @@ +name: check +on: [push, pull_request] +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + branch: v1.9.2 + - uses: actions/setup-go@v3 + with: + go-version: '=1.11' + - run: | + cd $PWD/kafka + go build ./... + + diff --git a/examples/admin_describe_consumer_groups/admin_describe_consumer_groups.go b/examples/admin_describe_consumer_groups/admin_describe_consumer_groups.go index 6f8b0b28a..c5950055e 100644 --- a/examples/admin_describe_consumer_groups/admin_describe_consumer_groups.go +++ b/examples/admin_describe_consumer_groups/admin_describe_consumer_groups.go @@ -18,18 +18,20 @@ package main import ( + "context" "fmt" "os" + "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { - if len(os.Args) < 2 { + if len(os.Args) < 3 { fmt.Fprintf( os.Stderr, - "Usage: %s [group1 group2 ....]\nIf no groups are specified, all groups are listed\n", + "Usage: %s [ ...]\n", os.Args[0]) os.Exit(1) } @@ -46,18 +48,28 @@ func main() { fmt.Printf("Failed to create Admin client: %s\n", err) os.Exit(1) } + defer a.Close() - groupInfos, err := a.DescribeConsumerGroups(groups) + // Call DescribeConsumerGroups. + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + groupInfos, err := a.DescribeConsumerGroups(ctx, groups) if err != nil { fmt.Printf("Failed to describe groups: %s\n", err) os.Exit(1) } // Print results - fmt.Printf("A total of %d consumer groups described:\n", len(groupInfos)) - for _, groupInfo := range groupInfos { - fmt.Println(groupInfo) + fmt.Printf("A total of %d consumer group(s) described:\n\n", len(groupInfos)) + for _, g := range groupInfos { + fmt.Printf("GroupId: %s\n"+ + "Error: %s\n"+ + "IsSimpleConsumerGroup: %v\n"+ + "PartitionAssignor: %s\n"+ + "State: %s\n"+ + "Coordinator: %+v\n"+ + "Members: %+v\n\n", + g.GroupId, g.Error, g.IsSimpleConsumerGroup, g.PartitionAssignor, + g.State, g.Coordinator, g.Members) } - - a.Close() } diff --git a/examples/admin_list_consumer_group_offsets/admin_list_consumer_group_offsets.go b/examples/admin_list_consumer_group_offsets/admin_list_consumer_group_offsets.go index f12ab3bb4..2666b8df6 100644 --- a/examples/admin_list_consumer_group_offsets/admin_list_consumer_group_offsets.go +++ b/examples/admin_list_consumer_group_offsets/admin_list_consumer_group_offsets.go @@ -74,7 +74,7 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - res, err := ac.ListConsumerGroupOffsets(ctx, gps, kafka.SetAdminRequireStable(requireStable)) + res, err := ac.ListConsumerGroupOffsets(ctx, gps, kafka.SetAdminRequireStableOffsets(requireStable)) if err != nil { panic(err) } diff --git a/examples/admin_list_consumer_groups/admin_list_consumer_groups.go b/examples/admin_list_consumer_groups/admin_list_consumer_groups.go index 3e0b9b354..631d72edb 100644 --- a/examples/admin_list_consumer_groups/admin_list_consumer_groups.go +++ b/examples/admin_list_consumer_groups/admin_list_consumer_groups.go @@ -18,9 +18,9 @@ package main import ( + "context" "fmt" "os" - "strconv" "time" "github.com/confluentinc/confluent-kafka-go/kafka" @@ -29,18 +29,23 @@ import ( func main() { if len(os.Args) < 2 { - fmt.Fprintf(os.Stderr, "Usage: %s [ = infinite]\n", os.Args[0]) + fmt.Fprintf(os.Stderr, + "Usage: %s [ ...]\n", os.Args[0]) os.Exit(1) } bootstrapServers := os.Args[1] - - timeout_parsed := -1 - var err error + var states []kafka.ConsumerGroupState if len(os.Args) > 2 { - timeout_parsed, err = strconv.Atoi(os.Args[2]) - if err != nil { - fmt.Printf("Error parsing the timeout %s\n: %s", os.Args[2], err) + statesStr := os.Args[2:] + for _, stateStr := range statesStr { + state, err := kafka.ConsumerGroupStateFromString(stateStr) + if err != nil { + fmt.Fprintf(os.Stderr, + "Given state %s is not a valid state\n", stateStr) + os.Exit(1) + } + states = append(states, state) } } @@ -50,25 +55,26 @@ func main() { fmt.Printf("Failed to create Admin client: %s\n", err) os.Exit(1) } + defer a.Close() - var groupInfos []kafka.GroupInfo - if timeout_parsed == -1 { - groupInfos, err = a.ListConsumerGroups() - } else { - timeout := time.Duration(timeout_parsed) * time.Second - groupInfos, err = a.ListConsumerGroups(kafka.SetListConsumerGroupsOptionRequestTimeout(timeout)) - } + // Call ListConsumerGroups. + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupRes, err := a.ListConsumerGroups( + ctx, kafka.SetAdminConsumerGroupStates(states)) - if err != nil { - fmt.Printf("Failed to list groups: %s\n", err) + if err != nil || len(listGroupRes.Errors) > 0 { + fmt.Printf("Failed to list groups: %s %v\n", err, listGroupRes.Errors) os.Exit(1) } // Print results - fmt.Printf("A total of %d consumer groups listed:\n", len(groupInfos)) - for _, groupInfo := range groupInfos { - fmt.Println(groupInfo) + groups := listGroupRes.ConsumerGroupListings + fmt.Printf("A total of %d consumer group(s) listed:\n", len(groups)) + for _, group := range groups { + fmt.Printf("GroupId: %s\n", group.GroupId) + fmt.Printf("State: %s\n", group.State) + fmt.Printf("IsSimpleConsumerGroup: %v\n", group.IsSimpleConsumerGroup) + fmt.Println() } - - a.Close() } diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 608903329..8e4c50d7f 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -77,18 +77,25 @@ AclBinding_by_idx (const rd_kafka_AclBinding_t **acl_bindings, size_t cnt, size_ return acl_bindings[idx]; } -static const struct rd_kafka_group_info * -group_list_group_info_by_idx(struct rd_kafka_group_list *group_list, size_t cnt, size_t idx) { +static const rd_kafka_ConsumerGroupListing_t * +ConsumerGroupListing_by_idx(const rd_kafka_ConsumerGroupListing_t **result_groups, size_t cnt, size_t idx) { if (idx >= cnt) return NULL; - return &group_list->groups[idx]; + return result_groups[idx]; } -static const struct rd_kafka_group_member_info * -group_member_info_by_idx(struct rd_kafka_group_member_info *member_infos, size_t cnt, size_t idx) { +static const rd_kafka_ConsumerGroupDescription_t * +ConsumerGroupDescription_by_idx(const rd_kafka_ConsumerGroupDescription_t **result_groups, size_t cnt, size_t idx) { if (idx >= cnt) return NULL; - return &member_infos[idx]; + return result_groups[idx]; +} + +static const rd_kafka_error_t * +error_by_idx(const rd_kafka_error_t **errors, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return errors[idx]; } */ import "C" @@ -114,6 +121,14 @@ type TopicResult struct { Error Error } +// String returns a human-readable representation of a TopicResult. +func (t TopicResult) String() string { + if t.Error.code == 0 { + return t.Topic + } + return fmt.Sprintf("%s (%s)", t.Topic, t.Error.str) +} + // GroupResult provides per-group operation result (error) information. type GroupResult struct { // Group name @@ -122,16 +137,12 @@ type GroupResult struct { Error Error } -// GroupMemberInfo provides information about a member of a consumer group. -type GroupMemberInfo struct { - // Member ID generated by the broker (consumer id) - MemberID string - // Client's client.id - ClientID string - // Client's hostname - ClientHost string - // Topic partitions which are assigned to this member - MemberAssignment []TopicPartition +// String returns a human-readable representation of a GroupResult. +func (g GroupResult) String() string { + if g.Error.code == ErrNoError { + return g.Group + } + return fmt.Sprintf("%s (%s)", g.Group, g.Error.str) } // ConsumerGroupState represents a consumer group state @@ -139,22 +150,23 @@ type ConsumerGroupState int const ( // ConsumerGroupStateUnknown - Unknown ConsumerGroupState - ConsumerGroupStateUnknown = ConsumerGroupState(C.RD_KAFKA_CGRP_STATE_UNKNOWN) + ConsumerGroupStateUnknown = ConsumerGroupState(C.RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN) // ConsumerGroupStatePreparingReblance - preparing rebalance - ConsumerGroupStatePreparingReblance = ConsumerGroupState(C.RD_KAFKA_CGRP_STATE_PREPARING_REBALANCE) + ConsumerGroupStatePreparingReblance = ConsumerGroupState(C.RD_KAFKA_CONSUMER_GROUP_STATE_PREPARING_REBALANCE) // ConsumerGroupStateCompletingRebalance - completing rebalance - ConsumerGroupStateCompletingRebalance = ConsumerGroupState(C.RD_KAFKA_CGRP_STATE_COMPLETING_REBALANCE) + ConsumerGroupStateCompletingRebalance = ConsumerGroupState(C.RD_KAFKA_CONSUMER_GROUP_STATE_COMPLETING_REBALANCE) // ConsumerGroupStateStable - stable - ConsumerGroupStateStable = ConsumerGroupState(C.RD_KAFKA_CGRP_STATE_STABLE) + ConsumerGroupStateStable = ConsumerGroupState(C.RD_KAFKA_CONSUMER_GROUP_STATE_STABLE) // ConsumerGroupStateDead - dead group - ConsumerGroupStateDead = ConsumerGroupState(C.RD_KAFKA_CGRP_STATE_DEAD) + ConsumerGroupStateDead = ConsumerGroupState(C.RD_KAFKA_CONSUMER_GROUP_STATE_DEAD) // ConsumerGroupStateEmpty - empty group - ConsumerGroupStateEmpty = ConsumerGroupState(C.RD_KAFKA_CGRP_STATE_EMPTY) + ConsumerGroupStateEmpty = ConsumerGroupState(C.RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY) ) // String returns the human-readable representation of a consumer_group_state func (t ConsumerGroupState) String() string { - return C.GoString(C.rd_kafka_consumer_group_state_name(C.rd_kafka_consumer_group_state_t(t))) + return C.GoString(C.rd_kafka_consumer_group_state_name( + C.rd_kafka_consumer_group_state_t(t))) } // ConsumerGroupStateFromString translates a consumer group state name/string to @@ -171,65 +183,70 @@ func ConsumerGroupStateFromString(stateString string) (ConsumerGroupState, error return state, nil } -// GroupInfo provides information about a consumer group. -type GroupInfo struct { - // Group name - Group string - // Error, if any, of result. Check with `Error.Code() != ErrNoError`. - Error Error - // Group state - State ConsumerGroupState - // Protocol type: "consumer" or "" for simple consumer groups. - ProtocolType string - // Is a simple consumer group +// ConsumerGroupListing represents the result of ListConsumerGroups for a single +// group. +type ConsumerGroupListing struct { + // Group id. + GroupId string + // Is a simple consumer group. IsSimpleConsumerGroup bool - - // The following fields are not populated in case of listing a group. - // They are populated only while describing a group: - - // Protocol (partition assignor) - Protocol string - // Broker information (coordinator) - Broker BrokerMetadata - // Members' info - Members []GroupMemberInfo + // Group state. + State ConsumerGroupState } -// String returns a human-readable representation of a TopicResult. -func (t TopicResult) String() string { - if t.Error.code == 0 { - return t.Topic - } - return fmt.Sprintf("%s (%s)", t.Topic, t.Error.str) +// ListConsumerGroupsResult represents ListConsumerGroups results and errors. +type ListConsumerGroupsResult struct { + // List of valid ConsumerGroupListing. + ConsumerGroupListings []ConsumerGroupListing + // List of errors. + Errors []error } -// String returns a human-readable representation of a GroupResult. -func (g GroupResult) String() string { - if g.Error.code == ErrNoError { - return g.Group - } - return fmt.Sprintf("%s (%s)", g.Group, g.Error.str) +// Node represents a Kafka broker. +type Node struct { + // Node id. + Id int + // Node host. + Host string + // Node port. + Port int } -func (m GroupMemberInfo) String() string { - return fmt.Sprintf("%s %s %s %v", m.MemberID, m.ClientID, m.ClientHost, m.MemberAssignment) +// MemberAssignment represents the assignment of a consumer group member. +type MemberAssignment struct { + // Partitions assigned to current member. + TopicPartitions []TopicPartition } -func (g GroupInfo) String() string { - if g.Error.code != ErrNoError { - return fmt.Sprintf("%s (%s)", g.Group, g.Error.str) - } - - // At this point I am not sure how we tell what sort of a request it was... Describe or List. - // This is a best-effort guess - if Broker.Host is nil, then we are most likely just - // listing the group, and need only the group name. - // TODO(Milind): is there a better way for this? - if g.Broker.Host == "" { - return g.Group - } +// MemberDescription represents the description of a consumer group member. +type MemberDescription struct { + // Client id. + CientId string + // Consumer id. + ConsumerId string + // Group member host. + Host string + // Member assignment. + MemberAssignment MemberAssignment +} - return fmt.Sprintf("%s {State=%s, IsSimpleConsumerGroup=%v, ProtocolType=%s, Protocol=%s, Broker=%s, Members=%v}", - g.Group, g.State, g.IsSimpleConsumerGroup, g.ProtocolType, g.Protocol, g.Broker, g.Members) +// ConsumerGroupDescription represents the result of DescribeConsumerGroups for +// a single group. +type ConsumerGroupDescription struct { + // Group id. + GroupId string + // Error, if any, of result. Check with `Error.Code() != ErrNoError`. + Error Error + // Is a simple consumer group + IsSimpleConsumerGroup bool + // Partition assignor identifier. + PartitionAssignor string + // Consumer group state. + State ConsumerGroupState + // Consumer group coordinator (broker). + Coordinator Node + // Members list. + Members []MemberDescription } // TopicSpecification holds parameters for creating a new topic. @@ -761,7 +778,6 @@ func (a *AdminClient) waitResult(ctx context.Context, cQueue *C.rd_kafka_queue_t // cToGroupResults converts a C group_result_t array to Go GroupResult list. func (a *AdminClient) cToGroupResults(cGroupRes **C.rd_kafka_group_result_t, cCnt C.size_t) (result []GroupResult, err error) { - result = make([]GroupResult, int(cCnt)) for i := 0; i < int(cCnt); i++ { @@ -789,69 +805,110 @@ func (a *AdminClient) cToTopicResults(cTopicRes **C.rd_kafka_topic_result_t, cCn return result, nil } -// cToGroupMemberInfos converts a C rd_kafka_group_member_info array to a Go GroupMemberInfo slice. -func (a *AdminClient) cToGroupMemberInfos(cGroupMemberInfoList *C.struct_rd_kafka_group_member_info, cCnt C.size_t) (result []GroupMemberInfo, err error) { - if cGroupMemberInfoList == nil || cCnt == 0 { - return nil, nil - } - - result = make([]GroupMemberInfo, int(cCnt)) +// cToConsumerGroupDescriptions converts a C rd_kafka_ConsumerGroupDescription_t +// array to a Go ConsumerGroupDescription slice. +func (a *AdminClient) cToConsumerGroupDescriptions( + cGroups **C.rd_kafka_ConsumerGroupDescription_t, + cGroupCount C.size_t) (result []ConsumerGroupDescription) { + result = make([]ConsumerGroupDescription, cGroupCount) + for idx := 0; idx < int(cGroupCount); idx++ { + cGroup := C.ConsumerGroupDescription_by_idx( + cGroups, cGroupCount, C.size_t(idx)) + + groupId := C.GoString( + C.rd_kafka_ConsumerGroupDescription_group_id(cGroup)) + err := newErrorFromCError( + C.rd_kafka_ConsumerGroupDescription_error(cGroup)) + isSimple := cint2bool( + C.rd_kafka_ConsumerGroupDescription_is_simple_consumer_group(cGroup)) + paritionAssignor := C.GoString( + C.rd_kafka_ConsumerGroupDescription_partition_assignor(cGroup)) + state := ConsumerGroupState( + C.rd_kafka_ConsumerGroupDescription_state(cGroup)) + + cNode := C.rd_kafka_ConsumerGroupDescription_coordinator(cGroup) + coordinator := Node{ + Id: int(C.rd_kafka_Node_id(cNode)), + Host: C.GoString(C.rd_kafka_Node_host(cNode)), + Port: int(C.rd_kafka_Node_port(cNode)), + } - for i := 0; i < int(cCnt); i++ { - cGroupMemberInfo := C.group_member_info_by_idx(cGroupMemberInfoList, cCnt, C.size_t(i)) + membersCount := int( + C.rd_kafka_ConsumerGroupDescription_member_count(cGroup)) + members := make([]MemberDescription, membersCount) + + for midx := 0; midx < membersCount; midx++ { + cMember := + C.rd_kafka_ConsumerGroupDescription_member(cGroup, C.int(midx)) + cMemberAssignment := + C.rd_kafka_MemberDescription_assignment(cMember) + cToppars := + C.rd_kafka_MemberAssignment_topic_partitions(cMemberAssignment) + memberAssignment := MemberAssignment{} + if cToppars != nil { + memberAssignment.TopicPartitions = newTopicPartitionsFromCparts(cToppars) + } + members[midx] = MemberDescription{ + CientId: C.GoString( + C.rd_kafka_MemberDescription_client_id(cMember)), + ConsumerId: C.GoString( + C.rd_kafka_MemberDescription_consumer_id(cMember)), + Host: C.GoString( + C.rd_kafka_MemberDescription_host(cMember)), + MemberAssignment: memberAssignment, + } - // Member assignment may be nil in case group is rebalancing. - var memberAssignment []TopicPartition - if cGroupMemberInfo.member_assignment_toppars != nil { - memberAssignment = newTopicPartitionsFromCparts(cGroupMemberInfo.member_assignment_toppars) } - result[i] = GroupMemberInfo{ - MemberID: C.GoString(cGroupMemberInfo.member_id), - ClientID: C.GoString(cGroupMemberInfo.client_id), - ClientHost: C.GoString(cGroupMemberInfo.client_host), - MemberAssignment: memberAssignment, + result[idx] = ConsumerGroupDescription{ + GroupId: groupId, + Error: err, + IsSimpleConsumerGroup: isSimple, + PartitionAssignor: paritionAssignor, + State: state, + Coordinator: coordinator, + Members: members, } } - return result, nil + return result } -// cToGroupInfos converts a C rd_kafka_group_list to a Go GroupInfo slice. -// Elements of rd_kafka_group_list contain only the group name and error, or the entire group information -// depending on how it was created. The argument `groupNamesOnly` should be set accordingly. -func (a *AdminClient) cToGroupInfos(cGroupInfoList *C.struct_rd_kafka_group_list, groupNamesOnly bool) (result []GroupInfo, err error) { - if cGroupInfoList == nil { - return nil, nil - } +// ConsumerGroupDescription converts a C rd_kafka_ConsumerGroupListing_t array +// to a Go ConsumerGroupListing slice. +func (a *AdminClient) cToConsumerGroupListings( + cGroups **C.rd_kafka_ConsumerGroupListing_t, + cGroupCount C.size_t) (result []ConsumerGroupListing) { + result = make([]ConsumerGroupListing, cGroupCount) - count := int(cGroupInfoList.group_cnt) - result = make([]GroupInfo, count) + for idx := 0; idx < int(cGroupCount); idx++ { + cGroup := + C.ConsumerGroupListing_by_idx(cGroups, cGroupCount, C.size_t(idx)) + state := ConsumerGroupState( + C.rd_kafka_ConsumerGroupListing_state(cGroup)) - for i := 0; i < count; i++ { - cGroupInfo := C.group_list_group_info_by_idx(cGroupInfoList, C.size_t(count), C.size_t(i)) - result[i].Group = C.GoString(cGroupInfo.group) - result[i].Error = newCErrorFromString(cGroupInfo.err, "error while listing group") - result[i].State = ConsumerGroupState(cGroupInfo.state_code) - result[i].ProtocolType = C.GoString(cGroupInfo.protocol_type) - result[i].IsSimpleConsumerGroup = cGroupInfo.is_simple_consumer_group != 0 - if groupNamesOnly { - continue + result[idx] = ConsumerGroupListing{ + GroupId: C.GoString( + C.rd_kafka_ConsumerGroupListing_group_id(cGroup)), + State: state, + IsSimpleConsumerGroup: cint2bool( + C.rd_kafka_ConsumerGroupListing_is_simple_consumer_group(cGroup)), } + } - result[i].Protocol = C.GoString(cGroupInfo.protocol) - result[i].Broker = BrokerMetadata{ - ID: int32(cGroupInfo.broker.id), - Host: C.GoString(cGroupInfo.broker.host), - Port: int(cGroupInfo.broker.port), - } + return result +} - memberCount := C.size_t(cGroupInfo.member_cnt) - if result[i].Members, err = a.cToGroupMemberInfos(cGroupInfo.members, memberCount); err != nil { - return nil, err - } +// cToErrorList converts a C rd_kafka_error_t array to a Go errors slice. +func (a *AdminClient) cToErrorList( + cErrs **C.rd_kafka_error_t, cErrCount C.size_t) (errs []error) { + errs = make([]error, cErrCount) + + for idx := 0; idx < int(cErrCount); idx++ { + cErr := C.error_by_idx(cErrs, cErrCount, C.size_t(idx)) + errs[idx] = newErrorFromCError(cErr) } - return result, nil + return errs } // cConfigResourceToResult converts a C ConfigResource result array to Go ConfigResourceResult @@ -1632,7 +1689,6 @@ func (a *AdminClient) cToGroupTopicPartitions( result = make([]GroupTopicPartitions, uint(cGroupCount)) for i := uint(0); i < uint(cGroupCount); i++ { - cGroupResult := C.group_result_by_idx(cGroupResults, cGroupCount, C.size_t(i)) cGroupPartitions := C.rd_kafka_group_result_partitions(cGroupResult) result[i] = GroupTopicPartitions{ @@ -1970,10 +2026,6 @@ func (a *AdminClient) AlterConsumerGroupOffsets( cGroupsPartitions := make([]*C.rd_kafka_AlterConsumerGroupOffsets_t, len(groupsPartitions)) - cErrstrSize := C.size_t(512) - cErrstr := (*C.char)(C.malloc(cErrstrSize)) - defer C.free(unsafe.Pointer(cErrstr)) - // Convert Go GroupTopicPartitions to C AlterConsumerGroupOffsets. for i, groupPartitions := range groupsPartitions { // We don't need to destroy this list because rd_kafka_AlterConsumerGroupOffsets_destroy @@ -2024,60 +2076,22 @@ func (a *AdminClient) AlterConsumerGroupOffsets( return a.cToGroupTopicPartitions(cGroups, cGroupCount), nil } -// ListConsumerGroups lists client groups in cluster. +// DescribeConsumerGroups describes groups from cluster as specified by the +// groups list. // // Parameters: -// * `options` - is the struct of options to be passed while listing -// the groups. Can be set to nil. -// * `options` - options for the list consumer group operation. +// * `ctx` - context with the maximum amount of time to block, or nil for +// indefinite. +// * `groups` - Slice of groups to describe. This should not be nil/empty. +// * `options` - DescribeConsumerGroupsOption options. // -// Returns a slice of GroupInfo, each element of which corresponds to a group, plus an -// error that is not `nil` for client level errors. -// Each GroupInfo returned will always have the `Group` field populated, and the `Error` -// field populated in case of an error. The other fields will NOT be populated. -func (a *AdminClient) ListConsumerGroups(options ...ListConsumerGroupsOption) (result []GroupInfo, err error) { - // Convert Go []ListConsumerGroupsOption to C list_consumer_groups_options. - cListConsumerGroupOptions := listConsumerGroupsOptionsSetup(options) - if cListConsumerGroupOptions != nil { - defer C.rd_kafka_list_consumer_groups_options_destroy(cListConsumerGroupOptions) - } - - // Call librdkafka's implementation of the method. - var cGroupInfoList *C.struct_rd_kafka_group_list - cErr := C.rd_kafka_list_consumer_groups( - a.handle.rk, - &cGroupInfoList, - cListConsumerGroupOptions) - err = newError(cErr) - if err.(Error).Code() == ErrNoError { - err = nil - } - if err != nil && err.(Error).Code() != ErrPartial { - return nil, err - } - defer C.rd_kafka_group_list_destroy(cGroupInfoList) - - // Parse GroupResult from the resultant array . - // Use a different error variable to retain the error in case of ErrPartial. - result, err_parsing := a.cToGroupInfos(cGroupInfoList, true /* groupNamesOnly */) - if err_parsing != nil { - return nil, err_parsing - } +// Returns a slice of ConsumerGroupDescriptions corresponding to the input +// groups, plus an error that is not `nil` for client level errors. Individual +// ConsumerGroupDescriptions inside the slice should also be checked for errors. +func (a *AdminClient) DescribeConsumerGroups( + ctx context.Context, groups []string, + options ...DescribeConsumerGroupsOption) (result []ConsumerGroupDescription, err error) { - return result, err -} - -// DescribeConsumerGroups describes client groups in cluster. -// -// Parameters: -// * `groups` - is the slice of groups that we need to describe. This is optional, and nil -// means that all groups will be described. -// * `options` - options for the describe consumer group operation. -// -// Returns a slice of GroupInfo, each element of which corresponds to a group, plus an -// error that is not `nil` for client level errors. -// Each GroupInfo returned will haved the `Error` field populated in case of any error. -func (a *AdminClient) DescribeConsumerGroups(groups []string, options ...DescribeConsumerGroupsOption) (result []GroupInfo, err error) { // Convert group names into char** required by the implementation. cGroupNameList := make([]*C.char, len(groups)) cGroupNameCount := C.size_t(len(groups)) @@ -2092,37 +2106,108 @@ func (a *AdminClient) DescribeConsumerGroups(groups []string, options ...Describ cGroupNameListPtr = ((**C.char)(&cGroupNameList[0])) } - // Convert Go []DescribeConsumerGroupsOption] to C describe_consumer_groups_options. - cDescribeConsumerGroupOptions := describeConsumerGroupsOptionsSetup(options) - if cDescribeConsumerGroupOptions != nil { - defer C.rd_kafka_describe_consumer_groups_options_destroy(cDescribeConsumerGroupOptions) + // Convert Go AdminOptions (if any) to C AdminOptions. + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] } + cOptions, err := adminOptionsSetup( + a.handle, C.RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS, genericOptions) + if err != nil { + return nil, err + } + defer C.rd_kafka_AdminOptions_destroy(cOptions) - // Call librdkafka's implementation of the method. - var cGroupInfoList *C.struct_rd_kafka_group_list - cErr := C.rd_kafka_describe_consumer_groups( + // Create temporary queue for async operation. + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Call rd_kafka_DescribeConsumerGroups (asynchronous). + C.rd_kafka_DescribeConsumerGroups( a.handle.rk, cGroupNameListPtr, cGroupNameCount, - &cGroupInfoList, - cDescribeConsumerGroupOptions) - err = newError(cErr) - if err.(Error).Code() == ErrNoError { - err = nil - } - if err != nil && err.(Error).Code() != ErrPartial { + cOptions, + cQueue) + + // Wait for result, error or context timeout. + rkev, err := a.waitResult( + ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT) + if err != nil { return nil, err } - defer C.rd_kafka_group_list_destroy(cGroupInfoList) + defer C.rd_kafka_event_destroy(rkev) + + cRes := C.rd_kafka_event_DescribeConsumerGroups_result(rkev) + + // Convert result from C to Go. + var cGroupCount C.size_t + cGroups := C.rd_kafka_DescribeConsumerGroups_result_groups(cRes, &cGroupCount) + result = a.cToConsumerGroupDescriptions(cGroups, cGroupCount) + + return result, nil +} + +// ListConsumerGroups lists the consumer groups available in the cluster. +// +// Parameters: +// * `ctx` - context with the maximum amount of time to block, or nil for +// indefinite. +// * `options` - ListConsumerGroupsOption options. +// Returns a ListConsumerGroupsResult, which contains a slice corresponding to +// each group in the cluster and a slice of errors encountered while listing. +// Additionally, an error that is not nil for client-level errors is returned. +// Both the returned error, and the errors slice should be checked. +func (a *AdminClient) ListConsumerGroups( + ctx context.Context, + options ...ListConsumerGroupsOption) (result ListConsumerGroupsResult, err error) { + result = ListConsumerGroupsResult{} + + // Convert Go AdminOptions (if any) to C AdminOptions. + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup(a.handle, + C.RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS, genericOptions) + if err != nil { + return result, err + } + defer C.rd_kafka_AdminOptions_destroy(cOptions) + + // Create temporary queue for async operation. + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Call rd_kafka_ListConsumerGroups (asynchronous). + C.rd_kafka_ListConsumerGroups( + a.handle.rk, + cOptions, + cQueue) + + // Wait for result, error or context timeout. + rkev, err := a.waitResult( + ctx, cQueue, C.RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT) + if err != nil { + return result, err + } + defer C.rd_kafka_event_destroy(rkev) - // Parse GroupResult from the resultant array. - // Use a different error variable to retain the error in case of ErrPartial. - result, err_parsing := a.cToGroupInfos(cGroupInfoList, false /* groupNamesOnly */) - if err_parsing != nil { - return nil, err_parsing + cRes := C.rd_kafka_event_ListConsumerGroups_result(rkev) + + // Convert result and broker errors from C to Go. + var cGroupCount C.size_t + cGroups := C.rd_kafka_ListConsumerGroups_result_valid(cRes, &cGroupCount) + result.ConsumerGroupListings = a.cToConsumerGroupListings(cGroups, cGroupCount) + + var cErrsCount C.size_t + cErrs := C.rd_kafka_ListConsumerGroups_result_errors(cRes, &cErrsCount) + if cErrsCount == 0 { + return result, nil } - return result, err + result.Errors = a.cToErrorList(cErrs, cErrsCount) + return result, nil } // NewAdminClient creats a new AdminClient instance with a new underlying client instance diff --git a/kafka/adminapi_test.go b/kafka/adminapi_test.go index 31c96bb3b..fc7f0fea4 100644 --- a/kafka/adminapi_test.go +++ b/kafka/adminapi_test.go @@ -662,7 +662,7 @@ func testAdminAPIs(what string, a *AdminClient, t *testing.T) { }, }, }, - SetAdminRequireStable(false)) + SetAdminRequireStableOffsets(false)) if lres != nil || err == nil { t.Fatalf("Expected ListConsumerGroupOffsets to fail, but got result: %v, err: %v", lres, err) } @@ -707,22 +707,39 @@ func testAdminAPIs(what string, a *AdminClient, t *testing.T) { if err != nil || state != ConsumerGroupStateStable { t.Fatalf("Expected ConsumerGroupStateFromString to work for Stable state") } + + ctx, cancel = context.WithTimeout(context.Background(), expDuration) + defer cancel() listres, err := a.ListConsumerGroups( - SetListConsumerGroupsOptionConsumerGroupState([]ConsumerGroupState{state}), - SetListConsumerGroupsOptionRequestTimeout(time.Second)) - if listres != nil || err == nil { + ctx, SetAdminRequestTimeout(time.Second), + SetAdminConsumerGroupStates([]ConsumerGroupState{ConsumerGroupStateStable})) + if err == nil { t.Fatalf("Expected ListConsumerGroups to fail, but got result: %v, err: %v", listres, err) } - if err.(Error).Code() != ErrTimedOut { - t.Fatalf("Expected ErrTimedOut, got %v", err) + if ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err()) } - descres, err := a.DescribeConsumerGroups(nil, SetDescribeConsumerGroupsOptionRequestTimeout(time.Second)) + ctx, cancel = context.WithTimeout(context.Background(), expDuration) + defer cancel() + descres, err := a.DescribeConsumerGroups( + ctx, nil, SetAdminRequestTimeout(time.Second)) if descres != nil || err == nil { t.Fatalf("Expected DescribeConsumerGroups to fail, but got result: %v, err: %v", descres, err) } - if err.(Error).Code() != ErrTimedOut { - t.Fatalf("Expected ErrTimedOut, got %v", err) + if err.(Error).Code() != ErrInvalidArg { + t.Fatalf("Expected ErrInvalidArg with empty groups list, but got %s", err) + } + + ctx, cancel = context.WithTimeout(context.Background(), expDuration) + defer cancel() + descres, err = a.DescribeConsumerGroups( + ctx, []string{"test"}, SetAdminRequestTimeout(time.Second)) + if descres != nil || err == nil { + t.Fatalf("Expected DescribeConsumerGroups to fail, but got result: %v, err: %v", descres, err) + } + if ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected DeadlineExceeded, not %s %v", err.(Error).Code(), ctx.Err()) } testAdminAPIsCreateACLs(what, a, t) diff --git a/kafka/adminoptions.go b/kafka/adminoptions.go index e7c6bb196..ddb7ff9a2 100644 --- a/kafka/adminoptions.go +++ b/kafka/adminoptions.go @@ -119,6 +119,10 @@ func (ao AdminOptionRequestTimeout) supportsListConsumerGroupOffsets() { } func (ao AdminOptionRequestTimeout) supportsAlterConsumerGroupOffsets() { } +func (ao AdminOptionRequestTimeout) supportsListConsumerGroups() { +} +func (ao AdminOptionRequestTimeout) supportsDescribeConsumerGroups() { +} func (ao AdminOptionRequestTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error { if !ao.isSet { @@ -215,21 +219,21 @@ func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly) { return ao } -// AdminOptionRequireStable decides if the broker should return stable +// AdminOptionRequireStableOffsets decides if the broker should return stable // offsets (transaction-committed). // // Default: false // // Valid for ListConsumerGroupOffsets. -type AdminOptionRequireStable struct { +type AdminOptionRequireStableOffsets struct { isSet bool val bool } -func (ao AdminOptionRequireStable) supportsListConsumerGroupOffsets() { +func (ao AdminOptionRequireStableOffsets) supportsListConsumerGroupOffsets() { } -func (ao AdminOptionRequireStable) apply(cOptions *C.rd_kafka_AdminOptions_t) error { +func (ao AdminOptionRequireStableOffsets) apply(cOptions *C.rd_kafka_AdminOptions_t) error { if !ao.isSet { return nil } @@ -238,25 +242,73 @@ func (ao AdminOptionRequireStable) apply(cOptions *C.rd_kafka_AdminOptions_t) er cErrstr := (*C.char)(C.malloc(cErrstrSize)) defer C.free(unsafe.Pointer(cErrstr)) - cErr := C.rd_kafka_AdminOptions_set_require_stable( - cOptions, bool2cint(ao.val), - cErrstr, cErrstrSize) - if cErr != 0 { + cError := C.rd_kafka_AdminOptions_set_require_stable_offsets( + cOptions, bool2cint(ao.val)) + if cError != nil { C.rd_kafka_AdminOptions_destroy(cOptions) - return newCErrorFromString(cErr, - fmt.Sprintf("%s", C.GoString(cErrstr))) + return newErrorFromCErrorDestroy(cError) } return nil } -// SetAdminRequireStable decides if the broker should return stable +// SetAdminRequireStableOffsets decides if the broker should return stable +// offsets (transaction-committed). +// +// Default: false +// +// Valid for ListConsumerGroupOffsets. +func SetAdminRequireStableOffsets(val bool) (ao AdminOptionRequireStableOffsets) { + ao.isSet = true + ao.val = val + return ao +} + +// AdminOptionRequireStableOffsets decides if the broker should return stable // offsets (transaction-committed). // // Default: false // // Valid for ListConsumerGroupOffsets. -func SetAdminRequireStable(val bool) (ao AdminOptionRequireStable) { +type AdminOptionConsumerGroupStates struct { + isSet bool + val []ConsumerGroupState +} + +func (ao AdminOptionConsumerGroupStates) supportsListConsumerGroups() { +} + +func (ao AdminOptionConsumerGroupStates) apply(cOptions *C.rd_kafka_AdminOptions_t) error { + if !ao.isSet || ao.val == nil { + return nil + } + + // Convert states from Go slice to C pointer. + cStates := make([]C.rd_kafka_consumer_group_state_t, len(ao.val)) + cStatesCount := C.size_t(len(ao.val)) + + for idx, state := range ao.val { + cStates[idx] = C.rd_kafka_consumer_group_state_t(state) + } + + cStatesPtr := ((*C.rd_kafka_consumer_group_state_t)(&cStates[0])) + cError := C.rd_kafka_AdminOptions_set_consumer_group_states( + cOptions, cStatesPtr, cStatesCount) + if cError != nil { + C.rd_kafka_AdminOptions_destroy(cOptions) + return newErrorFromCErrorDestroy(cError) + } + + return nil +} + +// SetAdminConsumerGroupStates decides what states to query for while listing +// groups. +// +// Default: nil (all states) +// +// Valid for ListConsumerGroups. +func SetAdminConsumerGroupStates(val []ConsumerGroupState) (ao AdminOptionConsumerGroupStates) { ao.isSet = true ao.val = val return ao @@ -326,6 +378,14 @@ type DescribeACLsAdminOption interface { apply(cOptions *C.rd_kafka_AdminOptions_t) error } +// DescribeConsumerGroupsOption - see setter. +// +// See SetAdminRequestTimeout. +type DescribeConsumerGroupsOption interface { + supportsDescribeConsumerGroups() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + // DeleteACLsAdminOption - see setter. // // See SetAdminRequestTimeout @@ -334,6 +394,14 @@ type DeleteACLsAdminOption interface { apply(cOptions *C.rd_kafka_AdminOptions_t) error } +// ListConsumerGroupsOption - see setter. +// +// See SetAdminRequestTimeout. +type ListConsumerGroupsOption interface { + supportsListConsumerGroups() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + // ListConsumerGroupOffsetsAdminOption - see setter. // // See SetAdminRequestTimeout, SetAdminRequireStable. @@ -342,7 +410,7 @@ type ListConsumerGroupOffsetsAdminOption interface { apply(cOptions *C.rd_kafka_AdminOptions_t) error } -// ListConsumerGroupOffsetsAdminOption - see setter. +// AlterConsumerGroupOffsetsAdminOption - see setter. // // See SetAdminRequestTimeout. type AlterConsumerGroupOffsetsAdminOption interface { @@ -372,147 +440,3 @@ func adminOptionsSetup(h *handle, opType C.rd_kafka_admin_op_t, options []AdminO return cOptions, nil } - -// ListConsumerGroupOption is to rd_kafka_list_consumer_groups_options_t, -// as AdminOption is to rd_kafka_admin_op_t. -// Because rd_kafka_list_consumer_groups_options_t does not expose any setters -// similar to admin options, we can't have a generic `apply` method, so we just -// have a marker-method, `isListConsumerGroupsOption`. -// Further, since only ListConsumerGroups operation will ever use these options, -// there is no further interface specializing this interface, and structs which -// represent various options directly implement this interface. -type ListConsumerGroupsOption interface { - isListConsumerGroupsOption() -} - -// ListConsumerGroupsOptionRequestTimeout is the (approximate) maximum time to wait for response -// from brokers. -// -// Default: -1 (infinite) -// See also: SetListConsumerGroupsOptionRequestTimeout -type ListConsumerGroupsOptionRequestTimeout struct { - isSet bool - val time.Duration -} - -func SetListConsumerGroupsOptionRequestTimeout(timeout time.Duration) (lcgo ListConsumerGroupsOption) { - return ListConsumerGroupsOptionRequestTimeout{ - isSet: true, - val: timeout, - } -} - -// isListConsumerGroupsOption is the marker-method implementation for ListConsumerGroupsOptionRequestTimeout. -func (ListConsumerGroupsOptionRequestTimeout) isListConsumerGroupsOption() { -} - -// ListConsumerGroupsOptionConsumerGroupState sets a slice with the states to query, nil -// to query for all the consumer group states. -// -// Default: nil (all slices) -// See also: SetListConsumerGroupsOptionConsumerGroupState -type ListConsumerGroupsOptionConsumerGroupState struct { - isSet bool - val []ConsumerGroupState -} - -func SetListConsumerGroupsOptionConsumerGroupState(states []ConsumerGroupState) (lcgo ListConsumerGroupsOption) { - return ListConsumerGroupsOptionConsumerGroupState{ - isSet: true, - val: states, - } -} - -// isListConsumerGroupsOption is the marker-method implementation for ListConsumerGroupsOptionConsumerGroupState. -func (ListConsumerGroupsOptionConsumerGroupState) isListConsumerGroupsOption() { -} - -// listConsumerGroupsOptionsSetup creates a rd_kafka_list_consumer_groups_options_t based on the `options`. -func listConsumerGroupsOptionsSetup(options []ListConsumerGroupsOption) *C.rd_kafka_list_consumer_groups_options_t { - if len(options) == 0 { - return nil - } - - timeoutMs := -1 - var cStates []C.rd_kafka_consumer_group_state_t = nil - - for _, option := range options { - switch typedOption := option.(type) { - case ListConsumerGroupsOptionRequestTimeout: - if !typedOption.isSet { - break - } - timeoutMs = durationToMilliseconds(typedOption.val) - case ListConsumerGroupsOptionConsumerGroupState: - if !typedOption.isSet || len(typedOption.val) == 0 { - break - } - cStates = make([]C.rd_kafka_consumer_group_state_t, len(typedOption.val)) - for idx, state := range typedOption.val { - cStates[idx] = C.rd_kafka_consumer_group_state_t(state) - } - } - } - - var cStateListPtr *C.rd_kafka_consumer_group_state_t = nil - if len(cStates) > 0 { - cStateListPtr = (*C.rd_kafka_consumer_group_state_t)(&cStates[0]) - } - - cListConsumerGroupOptions := - C.rd_kafka_list_consumer_groups_options_new( - C.int(timeoutMs), - cStateListPtr, - C.size_t(len(cStates))) - return cListConsumerGroupOptions -} - -// DescribeConsumerGroupsOption is to rd_kafka_describe_consumer_groups_options_t, -// as AdminOption is to rd_kafka_admin_op_t. -// See the comment for ListConsumerGroupsOption for more details. -type DescribeConsumerGroupsOption interface { - isDescribeConsumerGroupsOption() -} - -// DescribeConsumerGroupsOptionRequestTimeout is the (approximate) maximum time to wait for response -// from brokers. -// -// Default: -1 (infinite) -// See also: SetDescribeConsumerGroupsOptionRequestTimeout -type DescribeConsumerGroupsOptionRequestTimeout struct { - isSet bool - val time.Duration -} - -func SetDescribeConsumerGroupsOptionRequestTimeout(timeout time.Duration) (lcgo DescribeConsumerGroupsOption) { - return DescribeConsumerGroupsOptionRequestTimeout{ - isSet: true, - val: timeout, - } -} - -// isDescribeConsumerGroupsOption is the marker-method implementation for DescribeConsumerGroupsOptionRequestTimeout. -func (DescribeConsumerGroupsOptionRequestTimeout) isDescribeConsumerGroupsOption() { -} - -// describeConsumerGroupsOptionsSetup creates a rd_kafka_describe_consumer_groups_options_t based on the `options`. -func describeConsumerGroupsOptionsSetup(options []DescribeConsumerGroupsOption) *C.rd_kafka_describe_consumer_groups_options_t { - if len(options) == 0 { - return nil - } - - timeoutMs := -1 - for _, option := range options { - switch typedOption := option.(type) { - case DescribeConsumerGroupsOptionRequestTimeout: - if !typedOption.isSet { - break - } - timeoutMs = durationToMilliseconds(typedOption.val) - } - } - - cDescribeConsumerGroupOptions := - C.rd_kafka_describe_consumer_groups_options_new(C.int(timeoutMs)) - return cDescribeConsumerGroupOptions -} diff --git a/kafka/integration_test.go b/kafka/integration_test.go index 7c6f9de13..0536b281a 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -72,37 +72,47 @@ func msgtrackerStart(t *testing.T, expectedCnt int) (mt msgtracker) { return mt } -// findGroup returns the GroupInfo for a group with name `group` -// from a slice of GroupInfos, and nil otherwise. -func findGroup(groups []GroupInfo, group string) *GroupInfo { +// findConsumerGroupListings returns the ConsumerGroupListing for a group with name `group` +// from a slice of ConsumerGroupListings, and nil otherwise. +func findConsumerGroupListing(groups []ConsumerGroupListing, group string) *ConsumerGroupListing { for _, groupInfo := range groups { - if groupInfo.Group == group { + if groupInfo.GroupId == group { return &groupInfo } } return nil } -// checkGroupInfo is a helper function to check the validity of a GroupInfo. We can't -// directly use DeepEqual because some fields/slice orders change with every run. -func checkGroupInfo( - groupInfo *GroupInfo, state ConsumerGroupState, group string, protocol string, clientIdToPartitions map[string][]TopicPartition) bool { - if groupInfo.Group != group || - groupInfo.State != state || - groupInfo.Error.Code() != ErrNoError || - groupInfo.Protocol != protocol || +// findConsumerGroupListings returns the ConsumerGroupDescription for a group with name `group` +// from a slice of ConsumerGroupDescription, and nil otherwise. +func findConsumerGroupDescription(groups []ConsumerGroupDescription, group string) *ConsumerGroupDescription { + for _, groupInfo := range groups { + if groupInfo.GroupId == group { + return &groupInfo + } + } + return nil +} + +// checkGroupDesc is a helper function to check the validity of a ConsumerGroupDescription. +// We can't directly use DeepEqual because some fields/slice orders change with every run. +func checkGroupDesc( + groupDesc *ConsumerGroupDescription, state ConsumerGroupState, group string, protocol string, clientIdToPartitions map[string][]TopicPartition) bool { + if groupDesc.GroupId != group || + groupDesc.State != state || + groupDesc.Error.Code() != ErrNoError || + groupDesc.PartitionAssignor != protocol || // We can't check exactly the Broker information, but we add a check for the zero-value of the Host. - groupInfo.Broker.Host == "" || + groupDesc.Coordinator.Host == "" || // We will run all our tests on non-simple consumer groups only. - groupInfo.IsSimpleConsumerGroup || - groupInfo.ProtocolType != "consumer" || - len(groupInfo.Members) != len(clientIdToPartitions) { + groupDesc.IsSimpleConsumerGroup || + len(groupDesc.Members) != len(clientIdToPartitions) { return false } - for _, member := range groupInfo.Members { - if partitions, ok := clientIdToPartitions[member.ClientID]; !ok || - !reflect.DeepEqual(partitions, member.MemberAssignment) { + for _, member := range groupDesc.Members { + if partitions, ok := clientIdToPartitions[member.CientId]; !ok || + !reflect.DeepEqual(partitions, member.MemberAssignment.TopicPartitions) { return false } } @@ -1286,18 +1296,20 @@ func TestAdminClient_DeleteGroups(t *testing.T) { defer ac.Close() // Check that our group is not present initially. - groups, err := ac.ListConsumerGroups(SetListConsumerGroupsOptionRequestTimeout(30 * time.Second)) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err := ac.ListConsumerGroups(ctx, SetAdminRequestTimeout(30*time.Second)) if err != nil { t.Errorf("Error listing consumer groups %s\n", err) return } - if findGroup(groups, groupID) != nil { + if findConsumerGroupListing(listGroupResult.ConsumerGroupListings, groupID) != nil { t.Errorf("Consumer group present before consumer created: %s\n", groupID) return } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) defer cancel() // Create consumer @@ -1333,13 +1345,15 @@ func TestAdminClient_DeleteGroups(t *testing.T) { } // Check that the group exists. - groups, err = ac.ListConsumerGroups(SetListConsumerGroupsOptionRequestTimeout(30 * time.Second)) + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err = ac.ListConsumerGroups(ctx, SetAdminRequestTimeout(30*time.Second)) if err != nil { t.Errorf("Error listing consumer groups %s\n", err) return } - if findGroup(groups, groupID) == nil { + if findConsumerGroupListing(listGroupResult.ConsumerGroupListings, groupID) == nil { t.Errorf("Consumer group %s should be present\n", groupID) return } @@ -1389,13 +1403,15 @@ func TestAdminClient_DeleteGroups(t *testing.T) { } // Check for the absence of the consumer group after deletion. - groups, err = ac.ListConsumerGroups(SetListConsumerGroupsOptionRequestTimeout(30 * time.Second)) + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err = ac.ListConsumerGroups(ctx, SetAdminRequestTimeout(30*time.Second)) if err != nil { t.Errorf("Error listing consumer groups %s\n", err) return } - if findGroup(groups, groupID) != nil { + if findConsumerGroupListing(listGroupResult.ConsumerGroupListings, groupID) != nil { t.Errorf("Consumer group %s should not be present\n", groupID) return } @@ -1448,13 +1464,16 @@ func TestAdminClient_ListAndDescribeGroups(t *testing.T) { }() // Check the non-existence of consumer groups initially. - groups, err := ac.ListConsumerGroups(SetListConsumerGroupsOptionRequestTimeout(30 * time.Second)) - if err != nil { - t.Errorf("Error listing consumer groups %s\n", err) + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err := ac.ListConsumerGroups(ctx, SetAdminRequestTimeout(30*time.Second)) + if err != nil || len(listGroupResult.Errors) > 0 { + t.Errorf("Error listing consumer groups %s %v\n", err, listGroupResult.Errors) return } - if findGroup(groups, groupID) != nil || findGroup(groups, nonExistentGroupID) != nil { + groups := listGroupResult.ConsumerGroupListings + if findConsumerGroupListing(groups, groupID) != nil || findConsumerGroupListing(groups, nonExistentGroupID) != nil { t.Errorf("Consumer groups %s and %s should not be present\n", groupID, nonExistentGroupID) return } @@ -1487,40 +1506,44 @@ func TestAdminClient_ListAndDescribeGroups(t *testing.T) { consumer1.Poll(10 * 1000) // Check the existence of the group. - groups, err = ac.ListConsumerGroups(SetListConsumerGroupsOptionRequestTimeout(30 * time.Second)) - if err != nil { - t.Errorf("Error listing consumer groups %s\n", err) + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err = ac.ListConsumerGroups(ctx, SetAdminRequestTimeout(30*time.Second)) + if err != nil || len(listGroupResult.Errors) > 0 { + t.Errorf("Error listing consumer groups %s %v\n", err, listGroupResult.Errors) return } + groups = listGroupResult.ConsumerGroupListings - if findGroup(groups, groupID) == nil || findGroup(groups, nonExistentGroupID) != nil { + if findConsumerGroupListing(groups, groupID) == nil || findConsumerGroupListing(groups, nonExistentGroupID) != nil { t.Errorf("Consumer groups %s should be present and %s should not be\n", groupID, nonExistentGroupID) return } // Test the description of the consumer group. - groups, err = ac.DescribeConsumerGroups( - []string{groupID}, - SetDescribeConsumerGroupsOptionRequestTimeout(30*time.Second)) + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + groupDescs, err := ac.DescribeConsumerGroups( + ctx, []string{groupID}, SetAdminRequestTimeout(30*time.Second)) if err != nil { t.Errorf("Error describing consumer groups %s\n", err) return } - if len(groups) != 1 { + if len(groupDescs) != 1 { t.Errorf("Describing one group should give exactly one result %s\n", err) return } - groupInfo := &groups[0] + groupDesc := &groupDescs[0] clientIdToPartitions := make(map[string][]TopicPartition) clientIdToPartitions[clientID1] = []TopicPartition{ {Topic: &topic, Partition: 0, Offset: OffsetInvalid}, {Topic: &topic, Partition: 1, Offset: OffsetInvalid}, } - if !checkGroupInfo(groupInfo, ConsumerGroupStateStable, groupID, "range", clientIdToPartitions) { - t.Errorf("Expected description for consumer group %s is not same as actual: %v", groupID, groupInfo) + if !checkGroupDesc(groupDesc, ConsumerGroupStateStable, groupID, "range", clientIdToPartitions) { + t.Errorf("Expected description for consumer group %s is not same as actual: %v", groupID, groupDesc) return } @@ -1549,27 +1572,26 @@ func TestAdminClient_ListAndDescribeGroups(t *testing.T) { consumer2.Subscribe(topic, nil) // Call Poll to start triggering the rebalance. Give it enough time to run - // that it becomes stable and keep polling in the meanwhile. An arbitrary limit - // of around 10*10 = 100s is set so we don't get stuck here. - // TODO(milind): do we need this or not, not sure, but I was having some trouble - // when I just did consumer2.Poll(...). - for polled := 0; polled < 10; polled++ { - consumer2.Poll(5 * 1000) - consumer1.Poll(5 * 1000) - groups, err = ac.DescribeConsumerGroups(nil, SetDescribeConsumerGroupsOptionRequestTimeout(30*time.Second)) + // that it becomes stable. + // We need to make sure that the consumer group stabilizes since we will + // check for the state later on. + consumer2.Poll(5 * 1000) + consumer1.Poll(5 * 1000) + isGroupStable := false + for !isGroupStable { + groupDescs, err = ac.DescribeConsumerGroups(context.Background(), []string{groupID}, SetAdminRequestTimeout(30*time.Second)) if err != nil { t.Errorf("Error describing consumer groups %s\n", err) return } - groupInfo = findGroup(groups, groupID) - if groupInfo == nil { + groupDesc = findConsumerGroupDescription(groupDescs, groupID) + if groupDesc == nil { t.Errorf("Consumer group %s should be present\n", groupID) return } - if groupInfo.State == ConsumerGroupStateStable { - break - } + isGroupStable = groupDesc.State == ConsumerGroupStateStable + time.Sleep(time.Second) } clientIdToPartitions[clientID1] = []TopicPartition{ @@ -1578,8 +1600,8 @@ func TestAdminClient_ListAndDescribeGroups(t *testing.T) { clientIdToPartitions[clientID2] = []TopicPartition{ {Topic: &topic, Partition: 1, Offset: OffsetInvalid}, } - if !checkGroupInfo(groupInfo, ConsumerGroupStateStable, groupID, "range", clientIdToPartitions) { - t.Errorf("Expected description for consumer group %s is not same as actual %v", groupInfo, groupInfo) + if !checkGroupDesc(groupDesc, ConsumerGroupStateStable, groupID, "range", clientIdToPartitions) { + t.Errorf("Expected description for consumer group %s is not same as actual %v\n", groupID, groupDesc) return } @@ -1598,46 +1620,56 @@ func TestAdminClient_ListAndDescribeGroups(t *testing.T) { consumer2Closed = true // Try describing an empty group. - groups, err = ac.DescribeConsumerGroups(nil, SetDescribeConsumerGroupsOptionRequestTimeout(30*time.Second)) + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + groupDescs, err = ac.DescribeConsumerGroups(ctx, []string{groupID}, SetAdminRequestTimeout(30*time.Second)) if err != nil { t.Errorf("Error describing consumer groups %s\n", err) return } - groupInfo = findGroup(groups, groupID) - if groupInfo == nil { + groupDesc = findConsumerGroupDescription(groupDescs, groupID) + if groupDesc == nil { t.Errorf("Consumer group %s should be present\n", groupID) return } clientIdToPartitions = make(map[string][]TopicPartition) - if !checkGroupInfo(groupInfo, ConsumerGroupStateEmpty, groupID, "", clientIdToPartitions) { - t.Errorf("Expected description for consumer group %s is not same as actual %v\n", groupID, groupInfo) + if !checkGroupDesc(groupDesc, ConsumerGroupStateEmpty, groupID, "", clientIdToPartitions) { + t.Errorf("Expected description for consumer group %s is not same as actual %v\n", groupID, groupDesc) } // Try listing the Empty consumer group, and make sure that the States option // works while listing. - groups, err = ac.ListConsumerGroups(SetListConsumerGroupsOptionRequestTimeout(30 * time.Second)) - if err != nil { - t.Errorf("Error listing consumer groups %s\n", err) + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err = ac.ListConsumerGroups( + ctx, SetAdminRequestTimeout(30*time.Second), + SetAdminConsumerGroupStates([]ConsumerGroupState{ConsumerGroupStateEmpty})) + if err != nil || len(listGroupResult.Errors) > 0 { + t.Errorf("Error listing consumer groups %s %v\n", err, listGroupResult.Errors) return } + groups = listGroupResult.ConsumerGroupListings - groupInfo = findGroup(groups, groupID) + groupInfo := findConsumerGroupListing(listGroupResult.ConsumerGroupListings, groupID) if groupInfo == nil { t.Errorf("Consumer group %s should be present\n", groupID) return } - groups, err = ac.ListConsumerGroups( - SetListConsumerGroupsOptionConsumerGroupState([]ConsumerGroupState{ConsumerGroupStateStable}), - SetListConsumerGroupsOptionRequestTimeout(30*time.Second)) - if err != nil { - t.Errorf("Error listing consumer groups %s\n", err) + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err = ac.ListConsumerGroups( + ctx, SetAdminRequestTimeout(30*time.Second), + SetAdminConsumerGroupStates([]ConsumerGroupState{ConsumerGroupStateStable})) + if err != nil || len(listGroupResult.Errors) > 0 { + t.Errorf("Error listing consumer groups %s %v\n", err, listGroupResult.Errors) return } + groups = listGroupResult.ConsumerGroupListings - groupInfo = findGroup(groups, groupID) + groupInfo = findConsumerGroupListing(groups, groupID) if groupInfo != nil { t.Errorf("Consumer group %s should not be present\n", groupID) return diff --git a/kafka/librdkafka_vendor/rdkafka.h b/kafka/librdkafka_vendor/rdkafka.h index b424b2186..833d8fe62 100644 --- a/kafka/librdkafka_vendor/rdkafka.h +++ b/kafka/librdkafka_vendor/rdkafka.h @@ -165,7 +165,7 @@ typedef SSIZE_T ssize_t; * @remark This value should only be used during compile time, * for runtime checks of version use rd_kafka_version() */ -#define RD_KAFKA_VERSION 0x010902ff +#define RD_KAFKA_VERSION 0x020000ff /** * @brief Returns the librdkafka version as integer. @@ -1695,7 +1695,7 @@ const rd_kafka_conf_t *rd_kafka_conf(rd_kafka_t *rk); * Topic-level configuration properties may be set using this interface * in which case they are applied on the \c default_topic_conf. * If no \c default_topic_conf has been set one will be created. - * Any sub-sequent rd_kafka_conf_set_default_topic_conf() calls will + * Any subsequent rd_kafka_conf_set_default_topic_conf() calls will * replace the current default topic configuration. * * @returns \c rd_kafka_conf_res_t to indicate success or failure. @@ -2244,6 +2244,35 @@ void rd_kafka_conf_set_open_cb( int (*open_cb)(const char *pathname, int flags, mode_t mode, void *opaque)); #endif +/** Forward declaration to avoid netdb.h or winsock includes */ +struct addrinfo; + +/** + * @brief Set address resolution callback. + * + * The callback is responsible for resolving the hostname \p node and the + * service \p service into a list of socket addresses as \c getaddrinfo(3) + * would. The \p hints and \p res parameters function as they do for + * \c getaddrinfo(3). The callback's \p opaque argument is the opaque set with + * rd_kafka_conf_set_opaque(). + * + * If the callback is invoked with a NULL \p node, \p service, and \p hints, the + * callback should instead free the addrinfo struct specified in \p res. In this + * case the callback must succeed; the return value will not be checked by the + * caller. + * + * The callback's return value is interpreted as the return value of \p + * \c getaddrinfo(3). + * + * @remark The callback will be called from an internal librdkafka thread. + */ +RD_EXPORT void +rd_kafka_conf_set_resolve_cb(rd_kafka_conf_t *conf, + int (*resolve_cb)(const char *node, + const char *service, + const struct addrinfo *hints, + struct addrinfo **res, + void *opaque)); /** * @brief Sets the verification callback of the broker certificate @@ -2363,6 +2392,14 @@ typedef enum rd_kafka_cert_enc_t { * * @remark CA certificate in PEM format may also be set with the * `ssl.ca.pem` configuration property. + * + * @remark When librdkafka is linked to OpenSSL 3.0 and the certificate is + * encoded using an obsolete cipher, it might be necessary to set up + * an OpenSSL configuration file to load the "legacy" provider and + * set the OPENSSL_CONF environment variable. + * See + * https://github.com/openssl/openssl/blob/master/README-PROVIDERS.md for more + * information. */ RD_EXPORT rd_kafka_conf_res_t rd_kafka_conf_set_ssl_cert(rd_kafka_conf_t *conf, @@ -2527,9 +2564,8 @@ void rd_kafka_conf_properties_show(FILE *fp); /** * @name Topic configuration - * @{ - * * @brief Topic configuration property interface + * @{ * */ @@ -2845,7 +2881,7 @@ int32_t rd_kafka_msg_partitioner_fnv1a_random(const rd_kafka_topic_t *rkt, * \p conf is an optional struct created with `rd_kafka_conf_new()` that will * be used instead of the default configuration. * The \p conf object is freed by this function on success and must not be used - * or destroyed by the application sub-sequently. + * or destroyed by the application subsequently. * See `rd_kafka_conf_set()` et.al for more information. * * \p errstr must be a pointer to memory of at least size \p errstr_size where @@ -2991,7 +3027,7 @@ int32_t rd_kafka_controllerid(rd_kafka_t *rk, int timeout_ms); * `rd_kafka_topic_conf_new()` that will be used instead of the default * topic configuration. * The \p conf object is freed by this function and must not be used or - * destroyed by the application sub-sequently. + * destroyed by the application subsequently. * See `rd_kafka_topic_conf_set()` et.al for more information. * * Topic handles are refcounted internally and calling rd_kafka_topic_new() @@ -3051,22 +3087,22 @@ void *rd_kafka_topic_opaque(const rd_kafka_topic_t *rkt); /** * @brief Polls the provided kafka handle for events. * - * Events will cause application provided callbacks to be called. + * Events will cause application-provided callbacks to be called. * * The \p timeout_ms argument specifies the maximum amount of time * (in milliseconds) that the call will block waiting for events. * For non-blocking calls, provide 0 as \p timeout_ms. - * To wait indefinately for an event, provide -1. + * To wait indefinitely for an event, provide -1. * * @remark An application should make sure to call poll() at regular * intervals to serve any queued callbacks waiting to be called. * @remark If your producer doesn't have any callback set (in particular * via rd_kafka_conf_set_dr_msg_cb or rd_kafka_conf_set_error_cb) - * you might chose not to call poll(), though this is not + * you might choose not to call poll(), though this is not * recommended. * * Events: - * - delivery report callbacks (if dr_cb/dr_msg_cb is configured) [producer] + * - delivery report callbacks (if dr_cb/dr_msg_cb is configured) [producer] * - error callbacks (rd_kafka_conf_set_error_cb()) [all] * - stats callbacks (rd_kafka_conf_set_stats_cb()) [all] * - throttle callbacks (rd_kafka_conf_set_throttle_cb()) [all] @@ -3323,6 +3359,25 @@ RD_EXPORT rd_kafka_error_t *rd_kafka_sasl_background_callbacks_enable(rd_kafka_t *rk); +/** + * @brief Sets SASL credentials used for SASL PLAIN and SCRAM mechanisms by + * this Kafka client. + * + * This function sets or resets the SASL username and password credentials + * used by this Kafka client. The new credentials will be used the next time + * this client needs to authenticate to a broker. This function + * will not disconnect existing connections that might have been made using + * the old credentials. + * + * @remark This function only applies to the SASL PLAIN and SCRAM mechanisms. + * + * @returns NULL on success or an error object on error. + */ +RD_EXPORT +rd_kafka_error_t *rd_kafka_sasl_set_credentials(rd_kafka_t *rk, + const char *username, + const char *password); + /** * @returns a reference to the librdkafka consumer queue. * This is the queue served by rd_kafka_consumer_poll(). @@ -3764,6 +3819,8 @@ int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, void *commit_opaque); +/**@}*/ + /** * @name Simple Consumer API (legacy): Queue consumers * @{ @@ -3889,8 +3946,8 @@ rd_kafka_offsets_store(rd_kafka_t *rk, /** * @name KafkaConsumer (C) - * @{ * @brief High-level KafkaConsumer C API + * @{ * * * @@ -4419,13 +4476,13 @@ RD_EXPORT rd_kafka_error_t *rd_kafka_consumer_group_metadata_read( #define RD_KAFKA_MSG_F_BLOCK \ 0x4 /**< Block produce*() on message queue full. \ * WARNING: If a delivery report callback \ - * is used the application MUST \ + * is used, the application MUST \ * call rd_kafka_poll() (or equiv.) \ * to make sure delivered messages \ * are drained from the internal \ * delivery report queue. \ * Failure to do so will result \ - * in indefinately blocking on \ + * in indefinitely blocking on \ * the produce() call when the \ * message queue is full. */ #define RD_KAFKA_MSG_F_PARTITION \ @@ -4440,10 +4497,10 @@ RD_EXPORT rd_kafka_error_t *rd_kafka_consumer_group_metadata_read( * \p rkt is the target topic which must have been previously created with * `rd_kafka_topic_new()`. * - * `rd_kafka_produce()` is an asynch non-blocking API. + * `rd_kafka_produce()` is an asynchronous non-blocking API. * See `rd_kafka_conf_set_dr_msg_cb` on how to setup a callback to be called * once the delivery status (success or failure) is known. The delivery report - * is trigged by the application calling `rd_kafka_poll()` (at regular + * is triggered by the application calling `rd_kafka_poll()` (at regular * intervals) or `rd_kafka_flush()` (at termination). * * Since producing is asynchronous, you should call `rd_kafka_flush()` before @@ -4660,7 +4717,7 @@ rd_kafka_resp_err_t rd_kafka_flush(rd_kafka_t *rk, int timeout_ms); * RD_KAFKA_RESP_ERR__PURGE_INFLIGHT. * * @warning Purging messages that are in-flight to or from the broker - * will ignore any sub-sequent acknowledgement for these messages + * will ignore any subsequent acknowledgement for these messages * received from the broker, effectively making it impossible * for the application to know if the messages were successfully * produced or not. This may result in duplicate messages if the @@ -4763,6 +4820,11 @@ typedef struct rd_kafka_metadata { } rd_kafka_metadata_t; +/** + * @brief Node (broker) information. + */ +typedef struct rd_kafka_Node_s rd_kafka_Node_t; + /** * @brief Request Metadata from broker. * @@ -4798,6 +4860,36 @@ RD_EXPORT void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata); +/** + * @brief Get the id of \p node. + * + * @param node The Node instance. + * @return The node id. + */ +RD_EXPORT +int rd_kafka_Node_id(const rd_kafka_Node_t *node); + +/** + * @brief Get the host of \p node. + * + * @param node The Node instance. + * @return The node host. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p node object. + */ +RD_EXPORT +const char *rd_kafka_Node_host(const rd_kafka_Node_t *node); + +/** + * @brief Get the port of \p node. + * + * @param node The Node instance. + * @return The node port. + */ +RD_EXPORT +int rd_kafka_Node_port(const rd_kafka_Node_t *node); + /**@}*/ @@ -4829,6 +4921,21 @@ struct rd_kafka_group_member_info { int member_assignment_size; /**< Member assignment size in bytes */ }; +/** + * @enum rd_kafka_consumer_group_state_t + * + * @brief Consumer group state. + */ +typedef enum { + RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN = 0, + RD_KAFKA_CONSUMER_GROUP_STATE_PREPARING_REBALANCE = 1, + RD_KAFKA_CONSUMER_GROUP_STATE_COMPLETING_REBALANCE = 2, + RD_KAFKA_CONSUMER_GROUP_STATE_STABLE = 3, + RD_KAFKA_CONSUMER_GROUP_STATE_DEAD = 4, + RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY = 5, + RD_KAFKA_CONSUMER_GROUP_STATE__CNT +} rd_kafka_consumer_group_state_t; + /** * @brief Group information */ @@ -4857,7 +4964,7 @@ struct rd_kafka_group_list { /** * @brief List and describe client groups in cluster. * - * \p group is an optional group name to describe, otherwise (\p NULL) all + * \p group is an optional group name to describe, otherwise (\c NULL) all * groups are returned. * * \p timeout_ms is the (approximate) maximum time to wait for response @@ -4880,6 +4987,9 @@ struct rd_kafka_group_list { * group list. * * @sa Use rd_kafka_group_list_destroy() to release list memory. + * + * @deprecated Use rd_kafka_ListConsumerGroups() and + * rd_kafka_DescribeConsumerGroups() instead. */ RD_EXPORT rd_kafka_resp_err_t @@ -4888,6 +4998,24 @@ rd_kafka_list_groups(rd_kafka_t *rk, const struct rd_kafka_group_list **grplistp, int timeout_ms); +/** + * @brief Returns a name for a state code. + * + * \p state The state code. + */ +RD_EXPORT +const char * +rd_kafka_consumer_group_state_name(rd_kafka_consumer_group_state_t state); + +/** + * @brief Returns a code for a state name. + * + * \p state The state name. + */ +RD_EXPORT +rd_kafka_consumer_group_state_t +rd_kafka_consumer_group_state_code(const char *name); + /** * @brief Release list memory */ @@ -5141,6 +5269,15 @@ typedef int rd_kafka_event_type_t; #define RD_KAFKA_EVENT_CREATEACLS_RESULT 0x400 /**< CreateAcls_result_t */ #define RD_KAFKA_EVENT_DESCRIBEACLS_RESULT 0x800 /**< DescribeAcls_result_t */ #define RD_KAFKA_EVENT_DELETEACLS_RESULT 0x1000 /**< DeleteAcls_result_t */ +/** AlterConsumerGroupOffsets_result_t */ +#define RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT 0x1200 +/** ListConsumerGroupOffsets_result_t */ +#define RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT 0x1400 +/** ListConsumerGroupsResult_t */ +#define RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT 0x1600 +/** DescribeConsumerGroups_result_t */ +#define RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT 0x1800 + /** * @returns the event type for the given event. @@ -5288,8 +5425,12 @@ int rd_kafka_event_error_is_fatal(rd_kafka_event_t *rkev); * - RD_KAFKA_EVENT_DELETEACLS_RESULT * - RD_KAFKA_EVENT_ALTERCONFIGS_RESULT * - RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT + * - RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT + * - RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT * - RD_KAFKA_EVENT_DELETEGROUPS_RESULT * - RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT + * - RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT + * - RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT * - RD_KAFKA_EVENT_DELETERECORDS_RESULT */ RD_EXPORT @@ -5390,10 +5531,18 @@ typedef rd_kafka_event_t rd_kafka_AlterConfigs_result_t; typedef rd_kafka_event_t rd_kafka_DescribeConfigs_result_t; /*! DeleteRecords result type */ typedef rd_kafka_event_t rd_kafka_DeleteRecords_result_t; +/*! ListConsumerGroups result type */ +typedef rd_kafka_event_t rd_kafka_ListConsumerGroups_result_t; +/*! DescribeConsumerGroups result type */ +typedef rd_kafka_event_t rd_kafka_DescribeConsumerGroups_result_t; /*! DeleteGroups result type */ typedef rd_kafka_event_t rd_kafka_DeleteGroups_result_t; /*! DeleteConsumerGroupOffsets result type */ typedef rd_kafka_event_t rd_kafka_DeleteConsumerGroupOffsets_result_t; +/*! AlterConsumerGroupOffsets result type */ +typedef rd_kafka_event_t rd_kafka_AlterConsumerGroupOffsets_result_t; +/*! ListConsumerGroupOffsets result type */ +typedef rd_kafka_event_t rd_kafka_ListConsumerGroupOffsets_result_t; /** * @brief Get CreateTopics result. @@ -5465,6 +5614,36 @@ rd_kafka_event_DescribeConfigs_result(rd_kafka_event_t *rkev); RD_EXPORT const rd_kafka_DeleteRecords_result_t * rd_kafka_event_DeleteRecords_result(rd_kafka_event_t *rkev); +/** + * @brief Get ListConsumerGroups result. + * + * @returns the result of a ListConsumerGroups request, or NULL if event is of + * different type. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p rkev object. + * + * Event types: + * RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT + */ +RD_EXPORT const rd_kafka_ListConsumerGroups_result_t * +rd_kafka_event_ListConsumerGroups_result(rd_kafka_event_t *rkev); + +/** + * @brief Get DescribeConsumerGroups result. + * + * @returns the result of a DescribeConsumerGroups request, or NULL if event is + * of different type. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p rkev object. + * + * Event types: + * RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT + */ +RD_EXPORT const rd_kafka_DescribeConsumerGroups_result_t * +rd_kafka_event_DescribeConsumerGroups_result(rd_kafka_event_t *rkev); + /** * @brief Get DeleteGroups result. * @@ -5519,6 +5698,36 @@ rd_kafka_event_DescribeAcls_result(rd_kafka_event_t *rkev); RD_EXPORT const rd_kafka_DeleteAcls_result_t * rd_kafka_event_DeleteAcls_result(rd_kafka_event_t *rkev); +/** + * @brief Get AlterConsumerGroupOffsets result. + * + * @returns the result of a AlterConsumerGroupOffsets request, or NULL if + * event is of different type. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p rkev object. + * + * Event types: + * RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT + */ +RD_EXPORT const rd_kafka_AlterConsumerGroupOffsets_result_t * +rd_kafka_event_AlterConsumerGroupOffsets_result(rd_kafka_event_t *rkev); + +/** + * @brief Get ListConsumerGroupOffsets result. + * + * @returns the result of a ListConsumerGroupOffsets request, or NULL if + * event is of different type. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p rkev object. + * + * Event types: + * RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT + */ +RD_EXPORT const rd_kafka_ListConsumerGroupOffsets_result_t * +rd_kafka_event_ListConsumerGroupOffsets_result(rd_kafka_event_t *rkev); + /** * @brief Poll a queue for an event for max \p timeout_ms. * @@ -5567,6 +5776,7 @@ int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms); * and not statically. Failure to do so will lead to missing symbols * or finding symbols in another librdkafka library than the * application was linked with. + * @{ */ @@ -5985,6 +6195,28 @@ typedef rd_kafka_resp_err_t(rd_kafka_interceptor_f_on_thread_exit_t)( void *ic_opaque); +/** + * @brief on_broker_state_change() is called just after a broker + * has been created or its state has been changed. + * + * @param rk The client instance. + * @param broker_id The broker id (-1 is used for bootstrap brokers). + * @param secproto The security protocol. + * @param name The original name of the broker. + * @param port The port of the broker. + * @param ic_opaque The interceptor's opaque pointer specified in ..add..(). + * + * @returns an error code on failure, the error is logged but otherwise ignored. + */ +typedef rd_kafka_resp_err_t(rd_kafka_interceptor_f_on_broker_state_change_t)( + rd_kafka_t *rk, + int32_t broker_id, + const char *secproto, + const char *name, + int port, + const char *state, + void *ic_opaque); + /** * @brief Append an on_conf_set() interceptor. @@ -5995,7 +6227,7 @@ typedef rd_kafka_resp_err_t(rd_kafka_interceptor_f_on_thread_exit_t)( * @param ic_opaque Opaque value that will be passed to the function. * * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT - * if an existing intercepted with the same \p ic_name and function + * if an existing interceptor with the same \p ic_name and function * has already been added to \p conf. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_conf_interceptor_add_on_conf_set( @@ -6014,7 +6246,7 @@ RD_EXPORT rd_kafka_resp_err_t rd_kafka_conf_interceptor_add_on_conf_set( * @param ic_opaque Opaque value that will be passed to the function. * * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT - * if an existing intercepted with the same \p ic_name and function + * if an existing interceptor with the same \p ic_name and function * has already been added to \p conf. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_conf_interceptor_add_on_conf_dup( @@ -6061,7 +6293,7 @@ RD_EXPORT rd_kafka_resp_err_t rd_kafka_conf_interceptor_add_on_conf_destroy( * has not already been added. * * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT - * if an existing intercepted with the same \p ic_name and function + * if an existing interceptor with the same \p ic_name and function * has already been added to \p conf. */ RD_EXPORT rd_kafka_resp_err_t @@ -6081,7 +6313,7 @@ rd_kafka_conf_interceptor_add_on_new(rd_kafka_conf_t *conf, * @param ic_opaque Opaque value that will be passed to the function. * * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT - * if an existing intercepted with the same \p ic_name and function + * if an existing interceptor with the same \p ic_name and function * has already been added to \p conf. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_destroy( @@ -6118,7 +6350,7 @@ rd_kafka_interceptor_add_on_send(rd_kafka_t *rk, * @param ic_opaque Opaque value that will be passed to the function. * * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT - * if an existing intercepted with the same \p ic_name and function + * if an existing interceptor with the same \p ic_name and function * has already been added to \p conf. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_acknowledgement( @@ -6137,7 +6369,7 @@ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_acknowledgement( * @param ic_opaque Opaque value that will be passed to the function. * * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT - * if an existing intercepted with the same \p ic_name and function + * if an existing interceptor with the same \p ic_name and function * has already been added to \p conf. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_consume( @@ -6156,7 +6388,7 @@ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_consume( * @param ic_opaque Opaque value that will be passed to the function. * * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT - * if an existing intercepted with the same \p ic_name and function + * if an existing interceptor with the same \p ic_name and function * has already been added to \p conf. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_commit( @@ -6175,7 +6407,7 @@ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_commit( * @param ic_opaque Opaque value that will be passed to the function. * * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT - * if an existing intercepted with the same \p ic_name and function + * if an existing interceptor with the same \p ic_name and function * has already been added to \p conf. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_request_sent( @@ -6194,7 +6426,7 @@ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_request_sent( * @param ic_opaque Opaque value that will be passed to the function. * * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT - * if an existing intercepted with the same \p ic_name and function + * if an existing interceptor with the same \p ic_name and function * has already been added to \p conf. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_response_received( @@ -6213,7 +6445,7 @@ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_response_received( * @param ic_opaque Opaque value that will be passed to the function. * * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT - * if an existing intercepted with the same \p ic_name and function + * if an existing interceptor with the same \p ic_name and function * has already been added to \p conf. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_thread_start( @@ -6232,7 +6464,7 @@ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_thread_start( * @param ic_opaque Opaque value that will be passed to the function. * * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT - * if an existing intercepted with the same \p ic_name and function + * if an existing interceptor with the same \p ic_name and function * has already been added to \p conf. */ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_thread_exit( @@ -6242,6 +6474,26 @@ RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_thread_exit( void *ic_opaque); +/** + * @brief Append an on_broker_state_change() interceptor. + * + * @param rk Client instance. + * @param ic_name Interceptor name, used in logging. + * @param on_broker_state_change() Function pointer. + * @param ic_opaque Opaque value that will be passed to the function. + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT + * if an existing interceptor with the same \p ic_name and function + * has already been added to \p conf. + */ +RD_EXPORT +rd_kafka_resp_err_t rd_kafka_interceptor_add_on_broker_state_change( + rd_kafka_t *rk, + const char *ic_name, + rd_kafka_interceptor_f_on_broker_state_change_t *on_broker_state_change, + void *ic_opaque); + + /**@}*/ @@ -6359,20 +6611,26 @@ rd_kafka_group_result_partitions(const rd_kafka_group_result_t *groupres); * @sa rd_kafka_AdminOptions_new() */ typedef enum rd_kafka_admin_op_t { - RD_KAFKA_ADMIN_OP_ANY = 0, /**< Default value */ - RD_KAFKA_ADMIN_OP_CREATETOPICS, /**< CreateTopics */ - RD_KAFKA_ADMIN_OP_DELETETOPICS, /**< DeleteTopics */ - RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, /**< CreatePartitions */ - RD_KAFKA_ADMIN_OP_ALTERCONFIGS, /**< AlterConfigs */ - RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, /**< DescribeConfigs */ - RD_KAFKA_ADMIN_OP_DELETERECORDS, /**< DeleteRecords */ - RD_KAFKA_ADMIN_OP_DELETEGROUPS, /**< DeleteGroups */ + RD_KAFKA_ADMIN_OP_ANY = 0, /**< Default value */ + RD_KAFKA_ADMIN_OP_CREATETOPICS, /**< CreateTopics */ + RD_KAFKA_ADMIN_OP_DELETETOPICS, /**< DeleteTopics */ + RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, /**< CreatePartitions */ + RD_KAFKA_ADMIN_OP_ALTERCONFIGS, /**< AlterConfigs */ + RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, /**< DescribeConfigs */ + RD_KAFKA_ADMIN_OP_DELETERECORDS, /**< DeleteRecords */ + RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS, /**< ListConsumerGroups */ + RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS, /**< DescribeConsumerGroups */ + RD_KAFKA_ADMIN_OP_DELETEGROUPS, /**< DeleteGroups */ /** DeleteConsumerGroupOffsets */ RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS, RD_KAFKA_ADMIN_OP_CREATEACLS, /**< CreateAcls */ RD_KAFKA_ADMIN_OP_DESCRIBEACLS, /**< DescribeAcls */ RD_KAFKA_ADMIN_OP_DELETEACLS, /**< DeleteAcls */ - RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */ + /** AlterConsumerGroupOffsets */ + RD_KAFKA_ADMIN_OP_ALTERCONSUMERGROUPOFFSETS, + /** ListConsumerGroupOffsets */ + RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS, + RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */ } rd_kafka_admin_op_t; /** @@ -6533,6 +6791,37 @@ rd_kafka_AdminOptions_set_broker(rd_kafka_AdminOptions_t *options, size_t errstr_size); +/** + * @brief Whether broker should return stable offsets + * (transaction-committed). + * + * @param options Admin options. + * @param true_or_false Defaults to false. + * + * @return NULL on success, a new error instance that must be + * released with rd_kafka_error_destroy() in case of error. + * + * @remark This option is valid for ListConsumerGroupOffsets. + */ +RD_EXPORT +const rd_kafka_error_t *rd_kafka_AdminOptions_set_require_stable_offsets( + rd_kafka_AdminOptions_t *options, + int true_or_false); + +/** + * @brief Set consumer groups states to query for. + * + * @param options Admin options. + * @param consumer_group_states Array of consumer group states. + * @param consumer_group_states_cnt Size of the \p consumer_group_states array. + * @return NULL on success, a new error instance that must be + * released with rd_kafka_error_destroy() in case of error. + */ +RD_EXPORT +const rd_kafka_error_t *rd_kafka_AdminOptions_set_consumer_group_states( + rd_kafka_AdminOptions_t *options, + rd_kafka_consumer_group_state_t *consumer_group_states, + size_t consumer_group_states_cnt); /** * @brief Set application opaque value that can be extracted from the @@ -6542,10 +6831,12 @@ RD_EXPORT void rd_kafka_AdminOptions_set_opaque(rd_kafka_AdminOptions_t *options, void *ev_opaque); +/**@}*/ - -/* - * CreateTopics - create topics in cluster. +/** + * @name Admin API - Topics + * @brief Topic related operations. + * @{ * */ @@ -6759,9 +7050,12 @@ RD_EXPORT const rd_kafka_topic_result_t **rd_kafka_DeleteTopics_result_topics( size_t *cntp); +/**@}*/ -/* - * CreatePartitions - add partitions to topic. +/** + * @name Admin API - Partitions + * @brief Partition related operations. + * @{ * */ @@ -6880,10 +7174,12 @@ rd_kafka_CreatePartitions_result_topics( const rd_kafka_CreatePartitions_result_t *result, size_t *cntp); +/**@}*/ - -/* - * Cluster, broker, topic configuration entries, sources, etc. +/** + * @name Admin API - Configuration + * @brief Cluster, broker, topic configuration entries, sources, etc. + * @{ * */ @@ -7248,9 +7544,12 @@ rd_kafka_DescribeConfigs_result_resources( size_t *cntp); -/* - * DeleteRecords - delete records (messages) from partitions - * +/**@}*/ + +/** + * @name Admin API - DeleteRecords + * @brief delete records (messages) from partitions. + * @{ * */ @@ -7337,8 +7636,348 @@ RD_EXPORT const rd_kafka_topic_partition_list_t * rd_kafka_DeleteRecords_result_offsets( const rd_kafka_DeleteRecords_result_t *result); -/* - * DeleteGroups - delete groups from cluster +/**@}*/ + +/** + * @name Admin API - ListConsumerGroups + * @{ + */ + + +/** + * @brief ListConsumerGroups result for a single group + */ + +/**! ListConsumerGroups result for a single group */ +typedef struct rd_kafka_ConsumerGroupListing_s rd_kafka_ConsumerGroupListing_t; + +/**! ListConsumerGroups results and errors */ +typedef struct rd_kafka_ListConsumerGroupsResult_s + rd_kafka_ListConsumerGroupsResult_t; + +/** + * @brief List the consumer groups available in the cluster. + * + * @param rk Client instance. + * @param options Optional admin options, or NULL for defaults. + * @param rkqu Queue to emit result on. + * + * @remark The result event type emitted on the supplied queue is of type + * \c RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT + */ +RD_EXPORT +void rd_kafka_ListConsumerGroups(rd_kafka_t *rk, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu); + +/** + * @brief Gets the group id for the \p grplist group. + * + * @param grplist The group listing. + * @return The group id. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p grplist object. + */ +RD_EXPORT +const char *rd_kafka_ConsumerGroupListing_group_id( + const rd_kafka_ConsumerGroupListing_t *grplist); + +/** + * @brief Is the \p grplist group a simple consumer group. + * + * @param grplist The group listing. + * @return 1 if the group is a simple consumer group, + * else 0 (also if \p grplist is NULL). + */ +RD_EXPORT +int rd_kafka_ConsumerGroupListing_is_simple_consumer_group( + const rd_kafka_ConsumerGroupListing_t *grplist); + +/** + * @brief Gets state for the \p grplist group. + * + * @param grplist The group listing. + * @return A group state, or RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN if \p grplist + * is NULL. + */ +RD_EXPORT +rd_kafka_consumer_group_state_t rd_kafka_ConsumerGroupListing_state( + const rd_kafka_ConsumerGroupListing_t *grplist); + +/** + * @brief Get an array of valid list groups from a ListConsumerGroups result. + * + * The returned groups life-time is the same as the \p result object. + * + * @param result Result to get group results from. + * @param cntp is updated to the number of elements in the array. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p result object. + */ +RD_EXPORT +const rd_kafka_ConsumerGroupListing_t ** +rd_kafka_ListConsumerGroups_result_valid( + const rd_kafka_ListConsumerGroups_result_t *result, + size_t *cntp); + +/** + * @brief Get an array of errors from a ListConsumerGroups call result. + * + * The returned errors life-time is the same as the \p result object. + * + * @param result ListConsumerGroups result. + * @param cntp Is updated to the number of elements in the array. + * @return Array of errors in \p result. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p result object. + */ +RD_EXPORT +const rd_kafka_error_t **rd_kafka_ListConsumerGroups_result_errors( + const rd_kafka_ListConsumerGroups_result_t *result, + size_t *cntp); + +/**@}*/ + +/** + * @name Admin API - DescribeConsumerGroups + * @{ + */ + +/** + * @brief DescribeConsumerGroups result type. + * + */ +typedef struct rd_kafka_ConsumerGroupDescription_s + rd_kafka_ConsumerGroupDescription_t; + +/** + * @brief Member description included in ConsumerGroupDescription. + * + */ +typedef struct rd_kafka_MemberDescription_s rd_kafka_MemberDescription_t; + +/** + * @brief Member assignment included in MemberDescription. + * + */ +typedef struct rd_kafka_MemberAssignment_s rd_kafka_MemberAssignment_t; + +/** + * @brief Describe groups from cluster as specified by the \p groups + * array of size \p groups_cnt elements. + * + * @param rk Client instance. + * @param groups Array of groups to describe. + * @param groups_cnt Number of elements in \p groups array. + * @param options Optional admin options, or NULL for defaults. + * @param rkqu Queue to emit result on. + * + * @remark The result event type emitted on the supplied queue is of type + * \c RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT + */ +RD_EXPORT +void rd_kafka_DescribeConsumerGroups(rd_kafka_t *rk, + const char **groups, + size_t groups_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu); + +/** + * @brief Get an array of group results from a DescribeConsumerGroups result. + * + * The returned groups life-time is the same as the \p result object. + * + * @param result Result to get group results from. + * @param cntp is updated to the number of elements in the array. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p result object. + */ +RD_EXPORT +const rd_kafka_ConsumerGroupDescription_t ** +rd_kafka_DescribeConsumerGroups_result_groups( + const rd_kafka_DescribeConsumerGroups_result_t *result, + size_t *cntp); + + +/** + * @brief Gets the group id for the \p grpdesc group. + * + * @param grpdesc The group description. + * @return The group id. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p grpdesc object. + */ +RD_EXPORT +const char *rd_kafka_ConsumerGroupDescription_group_id( + const rd_kafka_ConsumerGroupDescription_t *grpdesc); + +/** + * @brief Gets the error for the \p grpdesc group. + * + * @param grpdesc The group description. + * @return The group description error. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p grpdesc object. + */ +RD_EXPORT +const rd_kafka_error_t *rd_kafka_ConsumerGroupDescription_error( + const rd_kafka_ConsumerGroupDescription_t *grpdesc); + +/** + * @brief Is the \p grpdesc group a simple consumer group. + * + * @param grpdesc The group description. + * @return 1 if the group is a simple consumer group, + * else 0 (also if \p grpdesc is NULL). + */ +RD_EXPORT +int rd_kafka_ConsumerGroupDescription_is_simple_consumer_group( + const rd_kafka_ConsumerGroupDescription_t *grpdesc); + + +/** + * @brief Gets the partition assignor for the \p grpdesc group. + * + * @param grpdesc The group description. + * @return The partition assignor. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p grpdesc object. + */ +RD_EXPORT +const char *rd_kafka_ConsumerGroupDescription_partition_assignor( + const rd_kafka_ConsumerGroupDescription_t *grpdesc); + + +/** + * @brief Gets state for the \p grpdesc group. + * + * @param grpdesc The group description. + * @return A group state, or RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN if + * \p grpdesc is + * NULL. + */ +RD_EXPORT +rd_kafka_consumer_group_state_t rd_kafka_ConsumerGroupDescription_state( + const rd_kafka_ConsumerGroupDescription_t *grpdesc); + +/** + * @brief Gets the coordinator for the \p grpdesc group. + * + * @param grpdesc The group description. + * @return The group coordinator. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p grpdesc object. + */ +RD_EXPORT +const rd_kafka_Node_t *rd_kafka_ConsumerGroupDescription_coordinator( + const rd_kafka_ConsumerGroupDescription_t *grpdesc); + +/** + * @brief Gets the members count of \p grpdesc group. + * + * @param grpdesc The group description. + * @return The member count, or 0 if \p grpdesc is NULL. + */ +RD_EXPORT +int rd_kafka_ConsumerGroupDescription_member_count( + const rd_kafka_ConsumerGroupDescription_t *grpdesc); + +/** + * @brief Gets a member of \p grpdesc group. + * + * @param grpdesc The group description. + * @param idx The member idx. + * @return A member at index \p idx, or NULL if + * \p idx is out of range. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p grpdesc object. + */ +RD_EXPORT +const rd_kafka_MemberDescription_t *rd_kafka_ConsumerGroupDescription_member( + const rd_kafka_ConsumerGroupDescription_t *grpdesc, + int idx); + +/** + * @brief Gets client id of a \p member. + * + * @param member The group member. + * @return The client id. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p member object. + */ +RD_EXPORT +const char *rd_kafka_MemberDescription_client_id( + const rd_kafka_MemberDescription_t *member); + +/** + * @brief Gets consumer id of a \p member. + * + * @param member The group member. + * @return The consumer id. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p member object. + */ +RD_EXPORT +const char *rd_kafka_MemberDescription_consumer_id( + const rd_kafka_MemberDescription_t *member); + +/** + * @brief Gets host of a \p member. + * + * @param member The group member. + * @return The host. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p member object. + */ +RD_EXPORT +const char * +rd_kafka_MemberDescription_host(const rd_kafka_MemberDescription_t *member); + +/** + * @brief Gets assignment of a \p member. + * + * @param member The group member. + * @return The member assignment. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p member object. + */ +RD_EXPORT +const rd_kafka_MemberAssignment_t *rd_kafka_MemberDescription_assignment( + const rd_kafka_MemberDescription_t *member); + +/** + * @brief Gets assigned partitions of a member \p assignment. + * + * @param assignment The group member assignment. + * @return The assigned partitions. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p assignment object. + */ +RD_EXPORT +const rd_kafka_topic_partition_list_t * +rd_kafka_MemberAssignment_topic_partitions( + const rd_kafka_MemberAssignment_t *assignment); + +/**@}*/ + +/** + * @name Admin API - DeleteGroups + * @brief Delete groups from cluster + * @{ * * */ @@ -7355,13 +7994,15 @@ typedef struct rd_kafka_DeleteGroup_s rd_kafka_DeleteGroup_t; * @returns a new allocated DeleteGroup object. * Use rd_kafka_DeleteGroup_destroy() to free object when done. */ -RD_EXPORT rd_kafka_DeleteGroup_t *rd_kafka_DeleteGroup_new(const char *group); +RD_EXPORT +rd_kafka_DeleteGroup_t *rd_kafka_DeleteGroup_new(const char *group); /** * @brief Destroy and free a DeleteGroup object previously created with * rd_kafka_DeleteGroup_new() */ -RD_EXPORT void rd_kafka_DeleteGroup_destroy(rd_kafka_DeleteGroup_t *del_group); +RD_EXPORT +void rd_kafka_DeleteGroup_destroy(rd_kafka_DeleteGroup_t *del_group); /** * @brief Helper function to destroy all DeleteGroup objects in @@ -7384,6 +8025,8 @@ rd_kafka_DeleteGroup_destroy_array(rd_kafka_DeleteGroup_t **del_groups, * * @remark The result event type emitted on the supplied queue is of type * \c RD_KAFKA_EVENT_DELETEGROUPS_RESULT + * + * @remark This function in called deleteConsumerGroups in the Java client. */ RD_EXPORT void rd_kafka_DeleteGroups(rd_kafka_t *rk, @@ -7410,9 +8053,202 @@ RD_EXPORT const rd_kafka_group_result_t **rd_kafka_DeleteGroups_result_groups( const rd_kafka_DeleteGroups_result_t *result, size_t *cntp); +/**@}*/ + +/** + * @name Admin API - ListConsumerGroupOffsets + * @{ + * + * + */ + +/*! Represents consumer group committed offsets to be listed. */ +typedef struct rd_kafka_ListConsumerGroupOffsets_s + rd_kafka_ListConsumerGroupOffsets_t; + +/** + * @brief Create a new ListConsumerGroupOffsets object. + * This object is later passed to rd_kafka_ListConsumerGroupOffsets(). + * + * @param group_id Consumer group id. + * @param partitions Partitions to list committed offsets for. + * Only the topic and partition fields are used. + * + * @returns a new allocated ListConsumerGroupOffsets object. + * Use rd_kafka_ListConsumerGroupOffsets_destroy() to free + * object when done. + */ +RD_EXPORT const rd_kafka_ListConsumerGroupOffsets_t * +rd_kafka_ListConsumerGroupOffsets_new( + const char *group_id, + const rd_kafka_topic_partition_list_t *partitions); + +/** + * @brief Destroy and free a ListConsumerGroupOffsets object previously + * created with rd_kafka_ListConsumerGroupOffsets_new() + */ +RD_EXPORT void rd_kafka_ListConsumerGroupOffsets_destroy( + const rd_kafka_ListConsumerGroupOffsets_t *list_grpoffsets); + +/** + * @brief Helper function to destroy all ListConsumerGroupOffsets objects in + * the \p list_grpoffsets array (of \p list_grpoffsets_cnt elements). + * The array itself is not freed. + */ +RD_EXPORT void rd_kafka_ListConsumerGroupOffsets_destroy_array( + const rd_kafka_ListConsumerGroupOffsets_t **list_grpoffsets, + size_t list_grpoffset_cnt); + +/** + * @brief List committed offsets for a set of partitions in a consumer + * group. + * + * @param rk Client instance. + * @param list_grpoffsets Array of group committed offsets to list. + * MUST only be one single element. + * @param list_grpoffsets_cnt Number of elements in \p list_grpoffsets array. + * MUST always be 1. + * @param options Optional admin options, or NULL for defaults. + * @param rkqu Queue to emit result on. + * + * @remark The result event type emitted on the supplied queue is of type + * \c RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT + * + * @remark The current implementation only supports one group per invocation. + */ +RD_EXPORT +void rd_kafka_ListConsumerGroupOffsets( + rd_kafka_t *rk, + const rd_kafka_ListConsumerGroupOffsets_t **list_grpoffsets, + size_t list_grpoffsets_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu); + + + +/* + * ListConsumerGroupOffsets result type and methods + */ + +/** + * @brief Get an array of results from a ListConsumerGroupOffsets result. + * + * The returned groups life-time is the same as the \p result object. + * + * @param result Result to get group results from. + * @param cntp is updated to the number of elements in the array. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p result object. + */ +RD_EXPORT const rd_kafka_group_result_t ** +rd_kafka_ListConsumerGroupOffsets_result_groups( + const rd_kafka_ListConsumerGroupOffsets_result_t *result, + size_t *cntp); + + + +/**@}*/ + +/** + * @name Admin API - AlterConsumerGroupOffsets + * @{ + * + * + */ + +/*! Represents consumer group committed offsets to be altered. */ +typedef struct rd_kafka_AlterConsumerGroupOffsets_s + rd_kafka_AlterConsumerGroupOffsets_t; + +/** + * @brief Create a new AlterConsumerGroupOffsets object. + * This object is later passed to rd_kafka_AlterConsumerGroupOffsets(). + * + * @param group_id Consumer group id. + * @param partitions Partitions to alter committed offsets for. + * Only the topic and partition fields are used. + * + * @returns a new allocated AlterConsumerGroupOffsets object. + * Use rd_kafka_AlterConsumerGroupOffsets_destroy() to free + * object when done. + */ +RD_EXPORT const rd_kafka_AlterConsumerGroupOffsets_t * +rd_kafka_AlterConsumerGroupOffsets_new( + const char *group_id, + const rd_kafka_topic_partition_list_t *partitions); + +/** + * @brief Destroy and free a AlterConsumerGroupOffsets object previously + * created with rd_kafka_AlterConsumerGroupOffsets_new() + */ +RD_EXPORT void rd_kafka_AlterConsumerGroupOffsets_destroy( + const rd_kafka_AlterConsumerGroupOffsets_t *alter_grpoffsets); + +/** + * @brief Helper function to destroy all AlterConsumerGroupOffsets objects in + * the \p alter_grpoffsets array (of \p alter_grpoffsets_cnt elements). + * The array itself is not freed. + */ +RD_EXPORT void rd_kafka_AlterConsumerGroupOffsets_destroy_array( + const rd_kafka_AlterConsumerGroupOffsets_t **alter_grpoffsets, + size_t alter_grpoffset_cnt); + +/** + * @brief Alter committed offsets for a set of partitions in a consumer + * group. This will succeed at the partition level only if the group + * is not actively subscribed to the corresponding topic. + * + * @param rk Client instance. + * @param alter_grpoffsets Array of group committed offsets to alter. + * MUST only be one single element. + * @param alter_grpoffsets_cnt Number of elements in \p alter_grpoffsets array. + * MUST always be 1. + * @param options Optional admin options, or NULL for defaults. + * @param rkqu Queue to emit result on. + * + * @remark The result event type emitted on the supplied queue is of type + * \c RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT + * + * @remark The current implementation only supports one group per invocation. + */ +RD_EXPORT +void rd_kafka_AlterConsumerGroupOffsets( + rd_kafka_t *rk, + const rd_kafka_AlterConsumerGroupOffsets_t **alter_grpoffsets, + size_t alter_grpoffsets_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu); + + /* - * DeleteConsumerGroupOffsets - delete groups from cluster + * AlterConsumerGroupOffsets result type and methods + */ + +/** + * @brief Get an array of results from a AlterConsumerGroupOffsets result. + * + * The returned groups life-time is the same as the \p result object. + * + * @param result Result to get group results from. + * @param cntp is updated to the number of elements in the array. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of \p result object. + */ +RD_EXPORT const rd_kafka_group_result_t ** +rd_kafka_AlterConsumerGroupOffsets_result_groups( + const rd_kafka_AlterConsumerGroupOffsets_result_t *result, + size_t *cntp); + + + +/**@}*/ + +/** + * @name Admin API - DeleteConsumerGroupOffsets + * @{ * * */ @@ -7455,7 +8291,7 @@ RD_EXPORT void rd_kafka_DeleteConsumerGroupOffsets_destroy_array( size_t del_grpoffset_cnt); /** - * @brief Delete committed offsets for a set of partitions in a conusmer + * @brief Delete committed offsets for a set of partitions in a consumer * group. This will succeed at the partition level only if the group * is not actively subscribed to the corresponding topic. * @@ -7499,6 +8335,13 @@ rd_kafka_DeleteConsumerGroupOffsets_result_groups( const rd_kafka_DeleteConsumerGroupOffsets_result_t *result, size_t *cntp); +/**@}*/ + +/** + * @name Admin API - ACL operations + * @{ + */ + /** * @brief ACL Binding is used to create access control lists. * @@ -7519,11 +8362,6 @@ RD_EXPORT const rd_kafka_error_t * rd_kafka_acl_result_error(const rd_kafka_acl_result_t *aclres); -/** - * @name AclOperation - * @{ - */ - /** * @enum rd_kafka_AclOperation_t * @brief Apache Kafka ACL operation types. @@ -7556,13 +8394,6 @@ typedef enum rd_kafka_AclOperation_t { RD_EXPORT const char * rd_kafka_AclOperation_name(rd_kafka_AclOperation_t acl_operation); -/**@}*/ - -/** - * @name AclPermissionType - * @{ - */ - /** * @enum rd_kafka_AclPermissionType_t * @brief Apache Kafka ACL permission types. @@ -7582,8 +8413,6 @@ typedef enum rd_kafka_AclPermissionType_t { RD_EXPORT const char *rd_kafka_AclPermissionType_name( rd_kafka_AclPermissionType_t acl_permission_type); -/**@}*/ - /** * @brief Create a new AclBinding object. This object is later passed to * rd_kafka_CreateAcls(). @@ -7754,7 +8583,7 @@ RD_EXPORT void rd_kafka_CreateAcls(rd_kafka_t *rk, rd_kafka_queue_t *rkqu); /** - * @section DescribeAcls - describe access control lists. + * DescribeAcls - describe access control lists. * * */ @@ -7790,7 +8619,7 @@ RD_EXPORT void rd_kafka_DescribeAcls(rd_kafka_t *rk, rd_kafka_queue_t *rkqu); /** - * @section DeleteAcls - delete access control lists. + * DeleteAcls - delete access control lists. * * */ @@ -8025,7 +8854,7 @@ rd_kafka_resp_err_t rd_kafka_oauthbearer_set_token_failure(rd_kafka_t *rk, * the global rd_kafka_fatal_error() code. * Fatal errors are raised by triggering the \c error_cb (see the * Fatal error chapter in INTRODUCTION.md for more information), and any - * sub-sequent transactional API calls will return RD_KAFKA_RESP_ERR__FATAL + * subsequent transactional API calls will return RD_KAFKA_RESP_ERR__FATAL * or have the fatal flag set (see rd_kafka_error_is_fatal()). * The originating fatal error code can be retrieved by calling * rd_kafka_fatal_error(). @@ -8085,9 +8914,15 @@ rd_kafka_resp_err_t rd_kafka_oauthbearer_set_token_failure(rd_kafka_t *rk, * @param timeout_ms The maximum time to block. On timeout the operation * may continue in the background, depending on state, * and it is okay to call init_transactions() again. + * If an infinite timeout (-1) is passed, the timeout will + * be adjusted to 2 * \c transaction.timeout.ms. * * @remark This function may block up to \p timeout_ms milliseconds. * + * @remark This call is resumable when a retriable timeout error is returned. + * Calling the function again will resume the operation that is + * progressing in the background. + * * @returns NULL on success or an error object on failure. * Check whether the returned error object permits retrying * by calling rd_kafka_error_is_retriable(), or whether a fatal @@ -8203,8 +9038,17 @@ rd_kafka_error_t *rd_kafka_begin_transaction(rd_kafka_t *rk); * * @remark Logical and invalid offsets (such as RD_KAFKA_OFFSET_INVALID) in * \p offsets will be ignored, if there are no valid offsets in - * \p offsets the function will return RD_KAFKA_RESP_ERR_NO_ERROR - * and no action will be taken. + * \p offsets the function will return NULL and no action will be taken. + * + * @remark This call is retriable but not resumable, which means a new request + * with a new set of provided offsets and group metadata will be + * sent to the transaction coordinator if the call is retried. + * + * @remark It is highly recommended to retry the call (upon retriable error) + * with identical \p offsets and \p cgmetadata parameters. + * Failure to do so risks inconsistent state between what is actually + * included in the transaction and what the application thinks is + * included in the transaction. * * @returns NULL on success or an error object on failure. * Check whether the returned error object permits retrying @@ -8225,9 +9069,7 @@ rd_kafka_error_t *rd_kafka_begin_transaction(rd_kafka_t *rk); * RD_KAFKA_RESP_ERR__NOT_CONFIGURED if transactions have not been * configured for the producer instance, * RD_KAFKA_RESP_ERR__INVALID_ARG if \p rk is not a producer instance, - * or if the \p consumer_group_id or \p offsets are empty, - * RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS if a previous - * rd_kafka_send_offsets_to_transaction() call is still in progress. + * or if the \p consumer_group_id or \p offsets are empty. * Other error codes not listed here may be returned, depending on * broker version. * @@ -8280,6 +9122,10 @@ rd_kafka_error_t *rd_kafka_send_offsets_to_transaction( * serve the event queue in a separate thread since rd_kafka_flush() * will not serve delivery reports in this mode. * + * @remark This call is resumable when a retriable timeout error is returned. + * Calling the function again will resume the operation that is + * progressing in the background. + * * @returns NULL on success or an error object on failure. * Check whether the returned error object permits retrying * by calling rd_kafka_error_is_retriable(), or whether an abortable @@ -8339,7 +9185,10 @@ rd_kafka_error_t *rd_kafka_commit_transaction(rd_kafka_t *rk, int timeout_ms); * If the application has enabled RD_KAFKA_EVENT_DR it must * serve the event queue in a separate thread since rd_kafka_flush() * will not serve delivery reports in this mode. - + * + * @remark This call is resumable when a retriable timeout error is returned. + * Calling the function again will resume the operation that is + * progressing in the background. * * @returns NULL on success or an error object on failure. * Check whether the returned error object permits retrying