From 2a41eb98e43f477c3cdf08f82d3dafeb0fbccc5d Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 17 Jan 2023 09:10:01 +0100 Subject: [PATCH] [admin] Add consumer group bindings for KIP 222, 518, 396 (partial) and DeleteConsumerGroups (#923) Implement ListConsumerGroups, DescribeConsumerGroups, ListConsumerGroupOffsets (KIP-222), DeleteConsumerGroups, AlterConsumerGroupOffsets (KIP-396), Allow listing consumer groups per state (KIP-518). Co-authored-by: Milind L Co-authored-by: Santwana Verma --- CHANGELOG.md | 19 + examples/.gitignore | 5 + .../admin_alter_consumer_group_offsets.go | 86 +++ .../admin_delete_consumer_groups.go | 68 ++ .../admin_describe_consumer_groups.go | 72 ++ .../admin_list_consumer_group_offsets.go | 89 +++ .../admin_list_consumer_groups.go | 90 +++ kafka/adminapi.go | 674 +++++++++++++++++ kafka/adminapi_test.go | 126 ++++ kafka/adminoptions.go | 160 +++- kafka/integration_test.go | 693 ++++++++++++++++++ kafka/kafka.go | 60 +- 12 files changed, 2132 insertions(+), 10 deletions(-) create mode 100644 examples/admin_alter_consumer_group_offsets/admin_alter_consumer_group_offsets.go create mode 100644 examples/admin_delete_consumer_groups/admin_delete_consumer_groups.go create mode 100644 examples/admin_describe_consumer_groups/admin_describe_consumer_groups.go create mode 100644 examples/admin_list_consumer_group_offsets/admin_list_consumer_group_offsets.go create mode 100644 examples/admin_list_consumer_groups/admin_list_consumer_groups.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a2a8edce9..0d558287b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,25 @@ This is a maintenance release: used, the method will block until the fetcher state is updated (typically within microseconds). * The minimum version of Go supported has been changed from 1.11 to 1.14. + * [KIP-222](https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API) + Add Consumer Group operations to Admin API. + * [KIP-518](https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state) + Allow listing consumer groups per state. + * [KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484) + Partially implemented: support for AlterConsumerGroupOffsets. + * As result of the above KIPs, added (#923) + - `ListConsumerGroups` Admin operation. Supports listing by state. + - `DescribeConsumerGroups` Admin operation. Supports multiple groups. + - `DeleteConsumerGroups` Admin operation. Supports multiple groups (@vsantwana). + - `ListConsumerGroupOffsets` Admin operation. Currently, only supports + 1 group with multiple partitions. Supports the `requireStable` option. + - `AlterConsumerGroupOffsets` Admin operation. Currently, only supports + 1 group with multiple offsets. + + +confluent-kafka-go is based on librdkafka v2.0.0, see the +[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v2.0.0) +for a complete list of changes, enhancements, fixes and upgrade considerations. ## v1.9.2 diff --git a/examples/.gitignore b/examples/.gitignore index 7123c3e79..d37ba008f 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -1,9 +1,14 @@ +admin_alter_consumer_group_offsets/admin_alter_consumer_group_offsets admin_create_acls/admin_create_acls admin_create_topic/admin_create_topic admin_delete_acls/admin_delete_acls +admin_delete_consumer_groups/admin_delete_consumer_groups admin_delete_topics/admin_delete_topics admin_describe_acls/admin_describe_acls admin_describe_config/admin_describe_config +admin_describe_consumer_groups/admin_describe_consumer_groups +admin_list_consumer_groups/admin_list_consumer_groups +admin_list_consumer_group_offsets/admin_list_consumer_group_offsets avro_generic_consumer_example/avro_generic_consumer_example avro_generic_producer_example/avro_generic_producer_example avro_specific_consumer_example/avro_specific_consumer_example diff --git a/examples/admin_alter_consumer_group_offsets/admin_alter_consumer_group_offsets.go b/examples/admin_alter_consumer_group_offsets/admin_alter_consumer_group_offsets.go new file mode 100644 index 000000000..2a18dbcc6 --- /dev/null +++ b/examples/admin_alter_consumer_group_offsets/admin_alter_consumer_group_offsets.go @@ -0,0 +1,86 @@ +/** + * Copyright 2022 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Alter consumer group offsets +package main + +import ( + "context" + "fmt" + "os" + "strconv" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +func main() { + args := os.Args + + if len(args) < 6 { + fmt.Fprintf(os.Stderr, + "Usage: %s "+ + " [ ...]\n", + args[0]) + os.Exit(1) + } + + // Create new AdminClient. + ac, err := kafka.NewAdminClient(&kafka.ConfigMap{ + "bootstrap.servers": args[1], + }) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + defer ac.Close() + + var partitions []kafka.TopicPartition + for i := 3; i+2 < len(args); i += 3 { + partition, err := strconv.ParseInt(args[i+1], 10, 32) + if err != nil { + panic(err) + } + offset, err := strconv.ParseUint(args[i+2], 10, 64) + if err != nil { + panic(err) + } + + partitions = append(partitions, kafka.TopicPartition{ + Topic: &args[i], + Partition: int32(partition), + Offset: kafka.Offset(offset), + }) + } + + gps := []kafka.ConsumerGroupTopicPartitions{ + { + Group: args[2], + Partitions: partitions, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + res, err := ac.AlterConsumerGroupOffsets(ctx, gps) + if err != nil { + fmt.Printf("Failed to alter consumer group offsets: %s\n", err) + os.Exit(1) + } + + fmt.Printf("AlterConsumerGroupOffsets result: %+v\n", res) +} diff --git a/examples/admin_delete_consumer_groups/admin_delete_consumer_groups.go b/examples/admin_delete_consumer_groups/admin_delete_consumer_groups.go new file mode 100644 index 000000000..0dcc2dd8e --- /dev/null +++ b/examples/admin_delete_consumer_groups/admin_delete_consumer_groups.go @@ -0,0 +1,68 @@ +/** + * Copyright 2022 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Delete consumer group +package main + +import ( + "context" + "fmt" + "os" + "strconv" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +func main() { + args := os.Args + + if len(args) < 4 { + fmt.Fprintf(os.Stderr, + "Usage: %s [ ...]\n", args[0]) + os.Exit(1) + } + + // Create new AdminClient. + ac, err := kafka.NewAdminClient(&kafka.ConfigMap{ + "bootstrap.servers": args[1], + }) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + defer ac.Close() + + timeoutSec, err := strconv.Atoi(args[2]) + if err != nil { + fmt.Printf("Failed to parse timeout: %s\n", err) + os.Exit(1) + } + + groups := args[3:] + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + res, err := ac.DeleteConsumerGroups(ctx, groups, + kafka.SetAdminRequestTimeout(time.Duration(timeoutSec)*time.Second)) + if err != nil { + fmt.Printf("Failed to delete groups: %s\n", err) + os.Exit(1) + } + + fmt.Printf("DeleteConsumerGroups result: %+v\n", res) +} diff --git a/examples/admin_describe_consumer_groups/admin_describe_consumer_groups.go b/examples/admin_describe_consumer_groups/admin_describe_consumer_groups.go new file mode 100644 index 000000000..abee0c4df --- /dev/null +++ b/examples/admin_describe_consumer_groups/admin_describe_consumer_groups.go @@ -0,0 +1,72 @@ +/** + * Copyright 2022 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Describe consumer groups +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +func main() { + if len(os.Args) < 3 { + fmt.Fprintf( + os.Stderr, + "Usage: %s [ ...]\n", + os.Args[0]) + os.Exit(1) + } + + bootstrapServers := os.Args[1] + groups := os.Args[2:] + + // Create a new AdminClient. + a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + defer a.Close() + + // Call DescribeConsumerGroups. + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + describeGroupsResult, err := a.DescribeConsumerGroups(ctx, groups) + if err != nil { + fmt.Printf("Failed to describe groups: %s\n", err) + os.Exit(1) + } + + // Print results + fmt.Printf("A total of %d consumer group(s) described:\n\n", + len(describeGroupsResult.ConsumerGroupDescriptions)) + for _, g := range describeGroupsResult.ConsumerGroupDescriptions { + fmt.Printf("GroupId: %s\n"+ + "Error: %s\n"+ + "IsSimpleConsumerGroup: %v\n"+ + "PartitionAssignor: %s\n"+ + "State: %s\n"+ + "Coordinator: %+v\n"+ + "Members: %+v\n\n", + g.GroupID, g.Error, g.IsSimpleConsumerGroup, g.PartitionAssignor, + g.State, g.Coordinator, g.Members) + } +} diff --git a/examples/admin_list_consumer_group_offsets/admin_list_consumer_group_offsets.go b/examples/admin_list_consumer_group_offsets/admin_list_consumer_group_offsets.go new file mode 100644 index 000000000..71657eade --- /dev/null +++ b/examples/admin_list_consumer_group_offsets/admin_list_consumer_group_offsets.go @@ -0,0 +1,89 @@ +/** + * Copyright 2022 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// List consumer group offsets +package main + +import ( + "context" + "fmt" + "os" + "strconv" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +func main() { + args := os.Args + + if len(args) < 6 { + fmt.Fprintf(os.Stderr, + "Usage: %s "+ + " [ .... ]\n", args[0]) + os.Exit(1) + } + + requireStable, err := strconv.ParseBool(args[3]) + if err != nil { + fmt.Printf( + "Failed to parse value of require_stable %s: %s\n", args[3], err) + os.Exit(1) + } + + // Create new AdminClient. + ac, err := kafka.NewAdminClient(&kafka.ConfigMap{ + "bootstrap.servers": args[1], + }) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + defer ac.Close() + + var partitions []kafka.TopicPartition + for i := 4; i+1 < len(args); i += 2 { + partition, err := strconv.ParseInt(args[i+1], 10, 32) + if err != nil { + fmt.Printf("Failed to parse partition %s: %s\n", args[i+1], err) + os.Exit(1) + } + + partitions = append(partitions, kafka.TopicPartition{ + Topic: &args[i], + Partition: int32(partition), + }) + } + + gps := []kafka.ConsumerGroupTopicPartitions{ + { + Group: args[2], + Partitions: partitions, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + res, err := ac.ListConsumerGroupOffsets( + ctx, gps, kafka.SetAdminRequireStableOffsets(requireStable)) + if err != nil { + fmt.Printf("Failed to list consumer group offsets %s\n", err) + os.Exit(1) + } + + fmt.Printf("ListConsumerGroupOffset result: %+v\n", res) +} diff --git a/examples/admin_list_consumer_groups/admin_list_consumer_groups.go b/examples/admin_list_consumer_groups/admin_list_consumer_groups.go new file mode 100644 index 000000000..bd606b199 --- /dev/null +++ b/examples/admin_list_consumer_groups/admin_list_consumer_groups.go @@ -0,0 +1,90 @@ +/** + * Copyright 2022 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// List consumer groups +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +func main() { + + if len(os.Args) < 2 { + fmt.Fprintf(os.Stderr, + "Usage: %s [ ...]\n", os.Args[0]) + os.Exit(1) + } + + bootstrapServers := os.Args[1] + var states []kafka.ConsumerGroupState + if len(os.Args) > 2 { + statesStr := os.Args[2:] + for _, stateStr := range statesStr { + state, err := kafka.ConsumerGroupStateFromString(stateStr) + if err != nil { + fmt.Fprintf(os.Stderr, + "Given state %s is not a valid state\n", stateStr) + os.Exit(1) + } + states = append(states, state) + } + } + + // Create a new AdminClient. + a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + defer a.Close() + + // Call ListConsumerGroups. + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupRes, err := a.ListConsumerGroups( + ctx, kafka.SetAdminMatchConsumerGroupStates(states)) + + if err != nil { + fmt.Printf("Failed to list groups with client-level error %s\n", err) + os.Exit(1) + } + + // Print results + groups := listGroupRes.Valid + fmt.Printf("A total of %d consumer group(s) listed:\n", len(groups)) + for _, group := range groups { + fmt.Printf("GroupId: %s\n", group.GroupID) + fmt.Printf("State: %s\n", group.State) + fmt.Printf("IsSimpleConsumerGroup: %v\n", group.IsSimpleConsumerGroup) + fmt.Println() + } + + errs := listGroupRes.Errors + if len(errs) == 0 { + return + } + + fmt.Printf("A total of %d error(s) while listing:\n", len(errs)) + for _, err := range errs { + fmt.Println(err) + } +} diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 95f1db280..8da90e05d 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -28,6 +28,13 @@ import ( #include "select_rdkafka.h" #include +static const rd_kafka_group_result_t * +group_result_by_idx (const rd_kafka_group_result_t **groups, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return groups[idx]; +} + static const rd_kafka_topic_result_t * topic_result_by_idx (const rd_kafka_topic_result_t **topics, size_t cnt, size_t idx) { if (idx >= cnt) @@ -69,6 +76,27 @@ AclBinding_by_idx (const rd_kafka_AclBinding_t **acl_bindings, size_t cnt, size_ return NULL; return acl_bindings[idx]; } + +static const rd_kafka_ConsumerGroupListing_t * +ConsumerGroupListing_by_idx(const rd_kafka_ConsumerGroupListing_t **result_groups, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return result_groups[idx]; +} + +static const rd_kafka_ConsumerGroupDescription_t * +ConsumerGroupDescription_by_idx(const rd_kafka_ConsumerGroupDescription_t **result_groups, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return result_groups[idx]; +} + +static const rd_kafka_error_t * +error_by_idx(const rd_kafka_error_t **errors, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return errors[idx]; +} */ import "C" @@ -101,6 +129,143 @@ func (t TopicResult) String() string { return fmt.Sprintf("%s (%s)", t.Topic, t.Error.str) } +// GroupResult provides per-group operation result (error) information. +type GroupResult struct { + // Group name + Group string + // Error, if any, of result. Check with `Error.Code() != ErrNoError`. + Error Error +} + +// String returns a human-readable representation of a GroupResult. +func (g GroupResult) String() string { + if g.Error.code == ErrNoError { + return g.Group + } + return fmt.Sprintf("%s (%s)", g.Group, g.Error.str) +} + +// ConsumerGroupState represents a consumer group state +type ConsumerGroupState int + +const ( + // ConsumerGroupStateUnknown - Unknown ConsumerGroupState + ConsumerGroupStateUnknown = ConsumerGroupState(C.RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN) + // ConsumerGroupStatePreparingRebalance - preparing rebalance + ConsumerGroupStatePreparingRebalance = ConsumerGroupState(C.RD_KAFKA_CONSUMER_GROUP_STATE_PREPARING_REBALANCE) + // ConsumerGroupStateCompletingRebalance - completing rebalance + ConsumerGroupStateCompletingRebalance = ConsumerGroupState(C.RD_KAFKA_CONSUMER_GROUP_STATE_COMPLETING_REBALANCE) + // ConsumerGroupStateStable - stable + ConsumerGroupStateStable = ConsumerGroupState(C.RD_KAFKA_CONSUMER_GROUP_STATE_STABLE) + // ConsumerGroupStateDead - dead group + ConsumerGroupStateDead = ConsumerGroupState(C.RD_KAFKA_CONSUMER_GROUP_STATE_DEAD) + // ConsumerGroupStateEmpty - empty group + ConsumerGroupStateEmpty = ConsumerGroupState(C.RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY) +) + +// String returns the human-readable representation of a consumer_group_state +func (t ConsumerGroupState) String() string { + return C.GoString(C.rd_kafka_consumer_group_state_name( + C.rd_kafka_consumer_group_state_t(t))) +} + +// ConsumerGroupStateFromString translates a consumer group state name/string to +// a ConsumerGroupStateFromString value. +func ConsumerGroupStateFromString(stateString string) (ConsumerGroupState, error) { + cStr := C.CString(stateString) + defer C.free(unsafe.Pointer(cStr)) + state := ConsumerGroupState(C.rd_kafka_consumer_group_state_code(cStr)) + return state, nil +} + +// ConsumerGroupListing represents the result of ListConsumerGroups for a single +// group. +type ConsumerGroupListing struct { + // Group id. + GroupID string + // Is a simple consumer group. + IsSimpleConsumerGroup bool + // Group state. + State ConsumerGroupState +} + +// ListConsumerGroupsResult represents ListConsumerGroups results and errors. +type ListConsumerGroupsResult struct { + // List of valid ConsumerGroupListings. + Valid []ConsumerGroupListing + // List of errors. + Errors []error +} + +// MemberAssignment represents the assignment of a consumer group member. +type MemberAssignment struct { + // Partitions assigned to current member. + TopicPartitions []TopicPartition +} + +// MemberDescription represents the description of a consumer group member. +type MemberDescription struct { + // Client id. + ClientID string + // Group instance id. + GroupInstanceID string + // Consumer id. + ConsumerID string + // Group member host. + Host string + // Member assignment. + Assignment MemberAssignment +} + +// ConsumerGroupDescription represents the result of DescribeConsumerGroups for +// a single group. +type ConsumerGroupDescription struct { + // Group id. + GroupID string + // Error, if any, of result. Check with `Error.Code() != ErrNoError`. + Error Error + // Is a simple consumer group. + IsSimpleConsumerGroup bool + // Partition assignor identifier. + PartitionAssignor string + // Consumer group state. + State ConsumerGroupState + // Consumer group coordinator (broker). + Coordinator Node + // Members list. + Members []MemberDescription +} + +// DescribeConsumerGroupsResult represents the result of a +// DescribeConsumerGroups call. +type DescribeConsumerGroupsResult struct { + // Slice of ConsumerGroupDescription. + ConsumerGroupDescriptions []ConsumerGroupDescription +} + +// DeleteConsumerGroupResult represents the result of a DeleteConsumerGroups +// call. +type DeleteConsumerGroupResult struct { + // Slice of GroupResult. + GroupResults []GroupResult +} + +// ListConsumerGroupOffsetsResult represents the result of a +// ListConsumerGroupOffsets operation. +type ListConsumerGroupOffsetsResult struct { + // A slice of ConsumerGroupTopicPartitions, each element represents a group's + // TopicPartitions and Offsets. + ConsumerGroupsTopicPartitions []ConsumerGroupTopicPartitions +} + +// AlterConsumerGroupOffsetsResult represents the result of a +// AlterConsumerGroupOffsets operation. +type AlterConsumerGroupOffsetsResult struct { + // A slice of ConsumerGroupTopicPartitions, each element represents a group's + // TopicPartitions and Offsets. + ConsumerGroupsTopicPartitions []ConsumerGroupTopicPartitions +} + // TopicSpecification holds parameters for creating a new topic. // TopicSpecification is analogous to NewTopic in the Java Topic Admin API. type TopicSpecification struct { @@ -620,6 +785,20 @@ func (a *AdminClient) waitResult(ctx context.Context, cQueue *C.rd_kafka_queue_t } } +// cToGroupResults converts a C group_result_t array to Go GroupResult list. +func (a *AdminClient) cToGroupResults( + cGroupRes **C.rd_kafka_group_result_t, cCnt C.size_t) (result []GroupResult, err error) { + result = make([]GroupResult, int(cCnt)) + + for idx := 0; idx < int(cCnt); idx++ { + cGroup := C.group_result_by_idx(cGroupRes, cCnt, C.size_t(idx)) + result[idx].Group = C.GoString(C.rd_kafka_group_result_name(cGroup)) + result[idx].Error = newErrorFromCError(C.rd_kafka_group_result_error(cGroup)) + } + + return result, nil +} + // cToTopicResults converts a C topic_result_t array to Go TopicResult list. func (a *AdminClient) cToTopicResults(cTopicRes **C.rd_kafka_topic_result_t, cCnt C.size_t) (result []TopicResult, err error) { @@ -636,6 +815,113 @@ func (a *AdminClient) cToTopicResults(cTopicRes **C.rd_kafka_topic_result_t, cCn return result, nil } +// cToConsumerGroupDescriptions converts a C rd_kafka_ConsumerGroupDescription_t +// array to a Go ConsumerGroupDescription slice. +func (a *AdminClient) cToConsumerGroupDescriptions( + cGroups **C.rd_kafka_ConsumerGroupDescription_t, + cGroupCount C.size_t) (result []ConsumerGroupDescription) { + result = make([]ConsumerGroupDescription, cGroupCount) + for idx := 0; idx < int(cGroupCount); idx++ { + cGroup := C.ConsumerGroupDescription_by_idx( + cGroups, cGroupCount, C.size_t(idx)) + + groupID := C.GoString( + C.rd_kafka_ConsumerGroupDescription_group_id(cGroup)) + err := newErrorFromCError( + C.rd_kafka_ConsumerGroupDescription_error(cGroup)) + isSimple := cint2bool( + C.rd_kafka_ConsumerGroupDescription_is_simple_consumer_group(cGroup)) + paritionAssignor := C.GoString( + C.rd_kafka_ConsumerGroupDescription_partition_assignor(cGroup)) + state := ConsumerGroupState( + C.rd_kafka_ConsumerGroupDescription_state(cGroup)) + + cNode := C.rd_kafka_ConsumerGroupDescription_coordinator(cGroup) + coordinator := Node{ + ID: int(C.rd_kafka_Node_id(cNode)), + Host: C.GoString(C.rd_kafka_Node_host(cNode)), + Port: int(C.rd_kafka_Node_port(cNode)), + } + + membersCount := int( + C.rd_kafka_ConsumerGroupDescription_member_count(cGroup)) + members := make([]MemberDescription, membersCount) + + for midx := 0; midx < membersCount; midx++ { + cMember := + C.rd_kafka_ConsumerGroupDescription_member(cGroup, C.size_t(midx)) + cMemberAssignment := + C.rd_kafka_MemberDescription_assignment(cMember) + cToppars := + C.rd_kafka_MemberAssignment_partitions(cMemberAssignment) + memberAssignment := MemberAssignment{} + if cToppars != nil { + memberAssignment.TopicPartitions = newTopicPartitionsFromCparts(cToppars) + } + members[midx] = MemberDescription{ + ClientID: C.GoString( + C.rd_kafka_MemberDescription_client_id(cMember)), + GroupInstanceID: C.GoString( + C.rd_kafka_MemberDescription_group_instance_id(cMember)), + ConsumerID: C.GoString( + C.rd_kafka_MemberDescription_consumer_id(cMember)), + Host: C.GoString( + C.rd_kafka_MemberDescription_host(cMember)), + Assignment: memberAssignment, + } + } + + result[idx] = ConsumerGroupDescription{ + GroupID: groupID, + Error: err, + IsSimpleConsumerGroup: isSimple, + PartitionAssignor: paritionAssignor, + State: state, + Coordinator: coordinator, + Members: members, + } + } + return result +} + +// ConsumerGroupDescription converts a C rd_kafka_ConsumerGroupListing_t array +// to a Go ConsumerGroupListing slice. +func (a *AdminClient) cToConsumerGroupListings( + cGroups **C.rd_kafka_ConsumerGroupListing_t, + cGroupCount C.size_t) (result []ConsumerGroupListing) { + result = make([]ConsumerGroupListing, cGroupCount) + + for idx := 0; idx < int(cGroupCount); idx++ { + cGroup := + C.ConsumerGroupListing_by_idx(cGroups, cGroupCount, C.size_t(idx)) + state := ConsumerGroupState( + C.rd_kafka_ConsumerGroupListing_state(cGroup)) + + result[idx] = ConsumerGroupListing{ + GroupID: C.GoString( + C.rd_kafka_ConsumerGroupListing_group_id(cGroup)), + State: state, + IsSimpleConsumerGroup: cint2bool( + C.rd_kafka_ConsumerGroupListing_is_simple_consumer_group(cGroup)), + } + } + + return result +} + +// cToErrorList converts a C rd_kafka_error_t array to a Go errors slice. +func (a *AdminClient) cToErrorList( + cErrs **C.rd_kafka_error_t, cErrCount C.size_t) (errs []error) { + errs = make([]error, cErrCount) + + for idx := 0; idx < int(cErrCount); idx++ { + cErr := C.error_by_idx(cErrs, cErrCount, C.size_t(idx)) + errs[idx] = newErrorFromCError(cErr) + } + + return errs +} + // cConfigResourceToResult converts a C ConfigResource result array to Go ConfigResourceResult func (a *AdminClient) cConfigResourceToResult(cRes **C.rd_kafka_ConfigResource_t, cCnt C.size_t) (result []ConfigResourceResult, err error) { @@ -1566,6 +1852,394 @@ func (a *AdminClient) Close() { C.rd_kafka_destroy(a.handle.rk) } +// ListConsumerGroups lists the consumer groups available in the cluster. +// +// Parameters: +// * `ctx` - context with the maximum amount of time to block, or nil for +// indefinite. +// * `options` - ListConsumerGroupsAdminOption options. +// Returns a ListConsumerGroupsResult, which contains a slice corresponding to +// each group in the cluster and a slice of errors encountered while listing. +// Additionally, an error that is not nil for client-level errors is returned. +// Both the returned error, and the errors slice should be checked. +func (a *AdminClient) ListConsumerGroups( + ctx context.Context, + options ...ListConsumerGroupsAdminOption) (result ListConsumerGroupsResult, err error) { + result = ListConsumerGroupsResult{} + + // Convert Go AdminOptions (if any) to C AdminOptions. + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup(a.handle, + C.RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS, genericOptions) + if err != nil { + return result, err + } + defer C.rd_kafka_AdminOptions_destroy(cOptions) + + // Create temporary queue for async operation. + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Call rd_kafka_ListConsumerGroups (asynchronous). + C.rd_kafka_ListConsumerGroups( + a.handle.rk, + cOptions, + cQueue) + + // Wait for result, error or context timeout. + rkev, err := a.waitResult( + ctx, cQueue, C.RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT) + if err != nil { + return result, err + } + defer C.rd_kafka_event_destroy(rkev) + + cRes := C.rd_kafka_event_ListConsumerGroups_result(rkev) + + // Convert result and broker errors from C to Go. + var cGroupCount C.size_t + cGroups := C.rd_kafka_ListConsumerGroups_result_valid(cRes, &cGroupCount) + result.Valid = a.cToConsumerGroupListings(cGroups, cGroupCount) + + var cErrsCount C.size_t + cErrs := C.rd_kafka_ListConsumerGroups_result_errors(cRes, &cErrsCount) + if cErrsCount == 0 { + return result, nil + } + + result.Errors = a.cToErrorList(cErrs, cErrsCount) + return result, nil +} + +// DescribeConsumerGroups describes groups from cluster as specified by the +// groups list. +// +// Parameters: +// * `ctx` - context with the maximum amount of time to block, or nil for +// indefinite. +// * `groups` - Slice of groups to describe. This should not be nil/empty. +// * `options` - DescribeConsumerGroupsAdminOption options. +// +// Returns DescribeConsumerGroupsResult, which contains a slice of +// ConsumerGroupDescriptions corresponding to the input groups, plus an error +// that is not `nil` for client level errors. Individual +// ConsumerGroupDescriptions inside the slice should also be checked for +// errors. +func (a *AdminClient) DescribeConsumerGroups( + ctx context.Context, groups []string, + options ...DescribeConsumerGroupsAdminOption) (result DescribeConsumerGroupsResult, err error) { + + // Convert group names into char** required by the implementation. + cGroupNameList := make([]*C.char, len(groups)) + cGroupNameCount := C.size_t(len(groups)) + describeResult := DescribeConsumerGroupsResult{} + + for idx, group := range groups { + cGroupNameList[idx] = C.CString(group) + defer C.free(unsafe.Pointer(cGroupNameList[idx])) + } + + var cGroupNameListPtr **C.char + if cGroupNameCount > 0 { + cGroupNameListPtr = ((**C.char)(&cGroupNameList[0])) + } + + // Convert Go AdminOptions (if any) to C AdminOptions. + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup( + a.handle, C.RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS, genericOptions) + if err != nil { + return describeResult, err + } + defer C.rd_kafka_AdminOptions_destroy(cOptions) + + // Create temporary queue for async operation. + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Call rd_kafka_DescribeConsumerGroups (asynchronous). + C.rd_kafka_DescribeConsumerGroups( + a.handle.rk, + cGroupNameListPtr, + cGroupNameCount, + cOptions, + cQueue) + + // Wait for result, error or context timeout. + rkev, err := a.waitResult( + ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT) + if err != nil { + return describeResult, err + } + defer C.rd_kafka_event_destroy(rkev) + + cRes := C.rd_kafka_event_DescribeConsumerGroups_result(rkev) + + // Convert result from C to Go. + var cGroupCount C.size_t + cGroups := C.rd_kafka_DescribeConsumerGroups_result_groups(cRes, &cGroupCount) + describeResult.ConsumerGroupDescriptions = a.cToConsumerGroupDescriptions(cGroups, cGroupCount) + + return describeResult, nil +} + +// DeleteConsumerGroups deletes a batch of consumer groups. +// Parameters: +// * `ctx` - context with the maximum amount of time to block, or nil for +// indefinite. +// * `groups` - A slice of groupIDs to delete. +// * `options` - DeleteConsumerGroupsAdminOption options. +// +// Returns a DeleteConsumerGroupResult containing a slice of GroupResults, with +// group-level errors, (if any) contained inside; and an error that is not nil +// for client level errors. +func (a *AdminClient) DeleteConsumerGroups( + ctx context.Context, + groups []string, options ...DeleteConsumerGroupsAdminOption) (result DeleteConsumerGroupResult, err error) { + cGroups := make([]*C.rd_kafka_DeleteGroup_t, len(groups)) + deleteResult := DeleteConsumerGroupResult{} + + // Convert Go DeleteGroups to C DeleteGroups + for i, group := range groups { + cGroupID := C.CString(group) + defer C.free(unsafe.Pointer(cGroupID)) + + cGroups[i] = C.rd_kafka_DeleteGroup_new(cGroupID) + if cGroups[i] == nil { + return deleteResult, newErrorFromString(ErrInvalidArg, + fmt.Sprintf("Invalid arguments for group %s", group)) + } + + defer C.rd_kafka_DeleteGroup_destroy(cGroups[i]) + } + + // Convert Go AdminOptions (if any) to C AdminOptions + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup( + a.handle, C.RD_KAFKA_ADMIN_OP_DELETEGROUPS, genericOptions) + if err != nil { + return deleteResult, err + } + defer C.rd_kafka_AdminOptions_destroy(cOptions) + + // Create temporary queue for async operation + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Asynchronous call + C.rd_kafka_DeleteGroups( + a.handle.rk, + (**C.rd_kafka_DeleteGroup_t)(&cGroups[0]), + C.size_t(len(cGroups)), + cOptions, + cQueue) + + // Wait for result, error or context timeout + rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DELETEGROUPS_RESULT) + if err != nil { + return deleteResult, err + } + defer C.rd_kafka_event_destroy(rkev) + + cRes := C.rd_kafka_event_DeleteGroups_result(rkev) + + // Convert result from C to Go + var cCnt C.size_t + cGroupRes := C.rd_kafka_DeleteGroups_result_groups(cRes, &cCnt) + + deleteResult.GroupResults, err = a.cToGroupResults(cGroupRes, cCnt) + return deleteResult, err +} + +// ListConsumerGroupOffsets fetches the offsets for topic partition(s) for +// consumer group(s). +// +// Parameters: +// * `ctx` - context with the maximum amount of time to block, or nil for indefinite. +// * `groupsPartitions` - a slice of ConsumerGroupTopicPartitions, each element of which +// has the id of a consumer group, and a slice of the TopicPartitions we +// need to fetch the offsets for. +// Currently, the size of `groupsPartitions` has to be exactly one. +// * `options` - ListConsumerGroupOffsetsAdminOption options. +// +// Returns a ListConsumerGroupOffsetsResult, containing a slice of +// ConsumerGroupTopicPartitions corresponding to the input slice, plus an error that is +// not `nil` for client level errors. Individual TopicPartitions inside each of +// the ConsumerGroupTopicPartitions should also be checked for errors. +func (a *AdminClient) ListConsumerGroupOffsets( + ctx context.Context, groupsPartitions []ConsumerGroupTopicPartitions, + options ...ListConsumerGroupOffsetsAdminOption) (lcgor ListConsumerGroupOffsetsResult, err error) { + lcgor.ConsumerGroupsTopicPartitions = nil + + // For now, we only support one group at a time given as a single element of + // groupsPartitions. + // Code has been written so that only this if-guard needs to be removed when + // we add support for multiple ConsumerGroupTopicPartitions. + if len(groupsPartitions) != 1 { + return lcgor, fmt.Errorf( + "expected length of groupsPartitions is 1, got %d", len(groupsPartitions)) + } + + cGroupsPartitions := make([]*C.rd_kafka_ListConsumerGroupOffsets_t, + len(groupsPartitions)) + + // Convert Go ConsumerGroupTopicPartitions to C ListConsumerGroupOffsets. + for i, groupPartitions := range groupsPartitions { + // We need to destroy this list because rd_kafka_ListConsumerGroupOffsets_new + // creates a copy of it. + cPartitions := newCPartsFromTopicPartitions(groupPartitions.Partitions) + defer C.rd_kafka_topic_partition_list_destroy(cPartitions) + + cGroupID := C.CString(groupPartitions.Group) + defer C.free(unsafe.Pointer(cGroupID)) + + cGroupsPartitions[i] = + C.rd_kafka_ListConsumerGroupOffsets_new(cGroupID, cPartitions) + defer C.rd_kafka_ListConsumerGroupOffsets_destroy(cGroupsPartitions[i]) + } + + // Convert Go AdminOptions (if any) to C AdminOptions. + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup( + a.handle, C.RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS, genericOptions) + if err != nil { + return lcgor, err + } + defer C.rd_kafka_AdminOptions_destroy(cOptions) + + // Create temporary queue for async operation. + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Call rd_kafka_ListConsumerGroupOffsets (asynchronous). + C.rd_kafka_ListConsumerGroupOffsets( + a.handle.rk, + (**C.rd_kafka_ListConsumerGroupOffsets_t)(&cGroupsPartitions[0]), + C.size_t(len(cGroupsPartitions)), + cOptions, + cQueue) + + // Wait for result, error or context timeout. + rkev, err := a.waitResult( + ctx, cQueue, C.RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT) + if err != nil { + return lcgor, err + } + defer C.rd_kafka_event_destroy(rkev) + + cRes := C.rd_kafka_event_ListConsumerGroupOffsets_result(rkev) + + // Convert result from C to Go. + var cGroupCount C.size_t + cGroups := C.rd_kafka_ListConsumerGroupOffsets_result_groups(cRes, &cGroupCount) + lcgor.ConsumerGroupsTopicPartitions = a.cToConsumerGroupTopicPartitions(cGroups, cGroupCount) + + return lcgor, nil +} + +// AlterConsumerGroupOffsets alters the offsets for topic partition(s) for +// consumer group(s). +// +// Parameters: +// * `ctx` - context with the maximum amount of time to block, or nil for +// indefinite. +// * `groupsPartitions` - a slice of ConsumerGroupTopicPartitions, each element of +// which has the id of a consumer group, and a slice of the TopicPartitions +// we need to alter the offsets for. Currently, the size of +// `groupsPartitions` has to be exactly one. +// * `options` - AlterConsumerGroupOffsetsAdminOption options. +// +// Returns a AlterConsumerGroupOffsetsResult, containing a slice of +// ConsumerGroupTopicPartitions corresponding to the input slice, plus an error +// that is not `nil` for client level errors. Individual TopicPartitions inside +// each of the ConsumerGroupTopicPartitions should also be checked for errors. +// This will succeed at the partition level only if the group is not actively +// subscribed to the corresponding topic(s). +func (a *AdminClient) AlterConsumerGroupOffsets( + ctx context.Context, groupsPartitions []ConsumerGroupTopicPartitions, + options ...AlterConsumerGroupOffsetsAdminOption) (acgor AlterConsumerGroupOffsetsResult, err error) { + acgor.ConsumerGroupsTopicPartitions = nil + + // For now, we only support one group at a time given as a single element of groupsPartitions. + // Code has been written so that only this if-guard needs to be removed when we add support for + // multiple ConsumerGroupTopicPartitions. + if len(groupsPartitions) != 1 { + return acgor, fmt.Errorf( + "expected length of groupsPartitions is 1, got %d", + len(groupsPartitions)) + } + + cGroupsPartitions := make( + []*C.rd_kafka_AlterConsumerGroupOffsets_t, len(groupsPartitions)) + + // Convert Go ConsumerGroupTopicPartitions to C AlterConsumerGroupOffsets. + for idx, groupPartitions := range groupsPartitions { + // We need to destroy this list because rd_kafka_AlterConsumerGroupOffsets_new + // creates a copy of it. + cPartitions := newCPartsFromTopicPartitions(groupPartitions.Partitions) + + cGroupID := C.CString(groupPartitions.Group) + defer C.free(unsafe.Pointer(cGroupID)) + + cGroupsPartitions[idx] = + C.rd_kafka_AlterConsumerGroupOffsets_new(cGroupID, cPartitions) + defer C.rd_kafka_AlterConsumerGroupOffsets_destroy(cGroupsPartitions[idx]) + } + + // Convert Go AdminOptions (if any) to C AdminOptions. + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup( + a.handle, C.RD_KAFKA_ADMIN_OP_ALTERCONSUMERGROUPOFFSETS, genericOptions) + if err != nil { + return acgor, err + } + defer C.rd_kafka_AdminOptions_destroy(cOptions) + + // Create temporary queue for async operation. + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Call rd_kafka_AlterConsumerGroupOffsets (asynchronous). + C.rd_kafka_AlterConsumerGroupOffsets( + a.handle.rk, + (**C.rd_kafka_AlterConsumerGroupOffsets_t)(&cGroupsPartitions[0]), + C.size_t(len(cGroupsPartitions)), + cOptions, + cQueue) + + // Wait for result, error or context timeout. + rkev, err := a.waitResult( + ctx, cQueue, C.RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT) + if err != nil { + return acgor, err + } + defer C.rd_kafka_event_destroy(rkev) + + cRes := C.rd_kafka_event_AlterConsumerGroupOffsets_result(rkev) + + // Convert result from C to Go. + var cGroupCount C.size_t + cGroups := C.rd_kafka_AlterConsumerGroupOffsets_result_groups(cRes, &cGroupCount) + acgor.ConsumerGroupsTopicPartitions = a.cToConsumerGroupTopicPartitions(cGroups, cGroupCount) + + return acgor, nil +} + // NewAdminClient creats a new AdminClient instance with a new underlying client instance func NewAdminClient(conf *ConfigMap) (*AdminClient, error) { diff --git a/kafka/adminapi_test.go b/kafka/adminapi_test.go index 110618573..59fec3981 100644 --- a/kafka/adminapi_test.go +++ b/kafka/adminapi_test.go @@ -420,6 +420,125 @@ func testAdminAPIsDeleteACLs(what string, a *AdminClient, t *testing.T) { } } +func testAdminAPIsListConsumerGroups( + what string, a *AdminClient, expDuration time.Duration, t *testing.T) { + state, err := ConsumerGroupStateFromString("Stable") + if err != nil || state != ConsumerGroupStateStable { + t.Fatalf("Expected ConsumerGroupStateFromString to work for Stable state") + } + + ctx, cancel := context.WithTimeout(context.Background(), expDuration) + defer cancel() + listres, err := a.ListConsumerGroups( + ctx, SetAdminRequestTimeout(time.Second), + SetAdminMatchConsumerGroupStates([]ConsumerGroupState{state})) + if err == nil { + t.Fatalf("Expected ListConsumerGroups to fail, but got result: %v, err: %v", + listres, err) + } + if ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err()) + } +} + +func testAdminAPIsDescribeConsumerGroups( + what string, a *AdminClient, expDuration time.Duration, t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), expDuration) + defer cancel() + descres, err := a.DescribeConsumerGroups( + ctx, nil, SetAdminRequestTimeout(time.Second)) + if descres.ConsumerGroupDescriptions != nil || err == nil { + t.Fatalf("Expected DescribeConsumerGroups to fail, but got result: %v, err: %v", + descres, err) + } + if err.(Error).Code() != ErrInvalidArg { + t.Fatalf("Expected ErrInvalidArg with empty groups list, but got %s", err) + } + + ctx, cancel = context.WithTimeout(context.Background(), expDuration) + defer cancel() + descres, err = a.DescribeConsumerGroups( + ctx, []string{"test"}, SetAdminRequestTimeout(time.Second)) + if descres.ConsumerGroupDescriptions != nil || err == nil { + t.Fatalf("Expected DescribeConsumerGroups to fail, but got result: %v, err: %v", + descres, err) + } + if ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected DeadlineExceeded, not %s %v", err.(Error).Code(), ctx.Err()) + } +} + +func testAdminAPIsDeleteConsumerGroups( + what string, a *AdminClient, expDuration time.Duration, t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), expDuration) + defer cancel() + dgres, err := a.DeleteConsumerGroups(ctx, []string{"group1"}, + SetAdminRequestTimeout(time.Second)) + if dgres.GroupResults != nil || err == nil { + t.Fatalf("Expected DeleteGroups to fail, but got result: %v, err: %v", + dgres, err) + } + if ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err()) + } +} + +func testAdminAPIsListConsumerGroupOffsets( + what string, a *AdminClient, expDuration time.Duration, t *testing.T) { + topic := "topic" + ctx, cancel := context.WithTimeout(context.Background(), expDuration) + defer cancel() + lres, err := a.ListConsumerGroupOffsets( + ctx, + []ConsumerGroupTopicPartitions{ + { + "test", + []TopicPartition{ + { + Topic: &topic, + Partition: 0, + }, + }, + }, + }, + SetAdminRequireStableOffsets(false)) + if lres.ConsumerGroupsTopicPartitions != nil || err == nil { + t.Fatalf("Expected ListConsumerGroupOffsets to fail, but got result: %v, err: %v", + lres, err) + } + if ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err()) + } +} + +func testAdminAPIsAlterConsumerGroupOffsets( + what string, a *AdminClient, expDuration time.Duration, t *testing.T) { + topic := "topic" + ctx, cancel := context.WithTimeout(context.Background(), expDuration) + defer cancel() + ares, err := a.AlterConsumerGroupOffsets( + ctx, + []ConsumerGroupTopicPartitions{ + { + "test", + []TopicPartition{ + { + Topic: &topic, + Partition: 0, + Offset: 10, + }, + }, + }, + }) + if ares.ConsumerGroupsTopicPartitions != nil || err == nil { + t.Fatalf("Expected AlterConsumerGroupOffsets to fail, but got result: %v, err: %v", + ares, err) + } + if ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err()) + } +} + func testAdminAPIs(what string, a *AdminClient, t *testing.T) { t.Logf("AdminClient API testing on %s: %s", a, what) @@ -649,6 +768,13 @@ func testAdminAPIs(what string, a *AdminClient, t *testing.T) { testAdminAPIsCreateACLs(what, a, t) testAdminAPIsDescribeACLs(what, a, t) testAdminAPIsDeleteACLs(what, a, t) + + testAdminAPIsListConsumerGroups(what, a, expDuration, t) + testAdminAPIsDescribeConsumerGroups(what, a, expDuration, t) + testAdminAPIsDeleteConsumerGroups(what, a, expDuration, t) + + testAdminAPIsListConsumerGroupOffsets(what, a, expDuration, t) + testAdminAPIsAlterConsumerGroupOffsets(what, a, expDuration, t) } // TestAdminAPIs dry-tests most Admin APIs, no broker is needed. diff --git a/kafka/adminoptions.go b/kafka/adminoptions.go index db55d2dc2..c0b9eb690 100644 --- a/kafka/adminoptions.go +++ b/kafka/adminoptions.go @@ -114,6 +114,26 @@ func (ao AdminOptionRequestTimeout) supportsAlterConfigs() { func (ao AdminOptionRequestTimeout) supportsDescribeConfigs() { } +func (ao AdminOptionRequestTimeout) supportsCreateACLs() { +} + +func (ao AdminOptionRequestTimeout) supportsDescribeACLs() { +} + +func (ao AdminOptionRequestTimeout) supportsDeleteACLs() { +} + +func (ao AdminOptionRequestTimeout) supportsListConsumerGroups() { +} +func (ao AdminOptionRequestTimeout) supportsDescribeConsumerGroups() { +} +func (ao AdminOptionRequestTimeout) supportsDeleteConsumerGroups() { +} +func (ao AdminOptionRequestTimeout) supportsListConsumerGroupOffsets() { +} +func (ao AdminOptionRequestTimeout) supportsAlterConsumerGroupOffsets() { +} + func (ao AdminOptionRequestTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error { if !ao.isSet { return nil @@ -166,15 +186,6 @@ func (ao AdminOptionValidateOnly) supportsCreatePartitions() { func (ao AdminOptionValidateOnly) supportsAlterConfigs() { } -func (ao AdminOptionRequestTimeout) supportsCreateACLs() { -} - -func (ao AdminOptionRequestTimeout) supportsDescribeACLs() { -} - -func (ao AdminOptionRequestTimeout) supportsDeleteACLs() { -} - func (ao AdminOptionValidateOnly) apply(cOptions *C.rd_kafka_AdminOptions_t) error { if !ao.isSet { return nil @@ -209,6 +220,97 @@ func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly) { return ao } +// AdminOptionRequireStableOffsets decides if the broker should return stable +// offsets (transaction-committed). +// +// Default: false +// +// Valid for ListConsumerGroupOffsets. +type AdminOptionRequireStableOffsets struct { + isSet bool + val bool +} + +func (ao AdminOptionRequireStableOffsets) supportsListConsumerGroupOffsets() { +} + +func (ao AdminOptionRequireStableOffsets) apply(cOptions *C.rd_kafka_AdminOptions_t) error { + if !ao.isSet { + return nil + } + + cError := C.rd_kafka_AdminOptions_set_require_stable_offsets( + cOptions, bool2cint(ao.val)) + if cError != nil { + C.rd_kafka_AdminOptions_destroy(cOptions) + return newErrorFromCErrorDestroy(cError) + } + + return nil +} + +// SetAdminRequireStableOffsets decides if the broker should return stable +// offsets (transaction-committed). +// +// Default: false +// +// Valid for ListConsumerGroupOffsets. +func SetAdminRequireStableOffsets(val bool) (ao AdminOptionRequireStableOffsets) { + ao.isSet = true + ao.val = val + return ao +} + +// AdminOptionMatchConsumerGroupStates decides groups in which state(s) should be +// listed. +// +// Default: nil (lists groups in all states). +// +// Valid for ListConsumerGroups. +type AdminOptionMatchConsumerGroupStates struct { + isSet bool + val []ConsumerGroupState +} + +func (ao AdminOptionMatchConsumerGroupStates) supportsListConsumerGroups() { +} + +func (ao AdminOptionMatchConsumerGroupStates) apply(cOptions *C.rd_kafka_AdminOptions_t) error { + if !ao.isSet || ao.val == nil { + return nil + } + + // Convert states from Go slice to C pointer. + cStates := make([]C.rd_kafka_consumer_group_state_t, len(ao.val)) + cStatesCount := C.size_t(len(ao.val)) + + for idx, state := range ao.val { + cStates[idx] = C.rd_kafka_consumer_group_state_t(state) + } + + cStatesPtr := ((*C.rd_kafka_consumer_group_state_t)(&cStates[0])) + cError := C.rd_kafka_AdminOptions_set_match_consumer_group_states( + cOptions, cStatesPtr, cStatesCount) + if cError != nil { + C.rd_kafka_AdminOptions_destroy(cOptions) + return newErrorFromCErrorDestroy(cError) + } + + return nil +} + +// SetAdminMatchConsumerGroupStates decides groups in which state(s) should be +// listed. +// +// Default: nil (lists groups in all states). +// +// Valid for ListConsumerGroups. +func SetAdminMatchConsumerGroupStates(val []ConsumerGroupState) (ao AdminOptionMatchConsumerGroupStates) { + ao.isSet = true + ao.val = val + return ao +} + // CreateTopicsAdminOption - see setters. // // See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly. @@ -273,6 +375,46 @@ type DeleteACLsAdminOption interface { apply(cOptions *C.rd_kafka_AdminOptions_t) error } +// ListConsumerGroupsAdminOption - see setter. +// +// See SetAdminRequestTimeout, SetAdminMatchConsumerGroupStates. +type ListConsumerGroupsAdminOption interface { + supportsListConsumerGroups() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + +// DescribeConsumerGroupsAdminOption - see setter. +// +// See SetAdminRequestTimeout. +type DescribeConsumerGroupsAdminOption interface { + supportsDescribeConsumerGroups() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + +// DeleteConsumerGroupsAdminOption - see setters. +// +// See SetAdminRequestTimeout. +type DeleteConsumerGroupsAdminOption interface { + supportsDeleteConsumerGroups() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + +// ListConsumerGroupOffsetsAdminOption - see setter. +// +// See SetAdminRequestTimeout, SetAdminRequireStableOffsets. +type ListConsumerGroupOffsetsAdminOption interface { + supportsListConsumerGroupOffsets() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + +// AlterConsumerGroupOffsetsAdminOption - see setter. +// +// See SetAdminRequestTimeout. +type AlterConsumerGroupOffsetsAdminOption interface { + supportsAlterConsumerGroupOffsets() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + // AdminOption is a generic type not to be used directly. // // See CreateTopicsAdminOption et.al. diff --git a/kafka/integration_test.go b/kafka/integration_test.go index cd330375c..4545bd970 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -72,6 +72,55 @@ func msgtrackerStart(t *testing.T, expectedCnt int) (mt msgtracker) { return mt } +// findConsumerGroupListings returns the ConsumerGroupListing for a group with name `group` +// from a slice of ConsumerGroupListings, and nil otherwise. +func findConsumerGroupListing(groups []ConsumerGroupListing, group string) *ConsumerGroupListing { + for _, groupInfo := range groups { + if groupInfo.GroupID == group { + return &groupInfo + } + } + return nil +} + +// findConsumerGroupListings returns the ConsumerGroupDescription for a group with name `group` +// from a slice of ConsumerGroupDescription, and nil otherwise. +func findConsumerGroupDescription(groups []ConsumerGroupDescription, group string) *ConsumerGroupDescription { + for _, groupInfo := range groups { + if groupInfo.GroupID == group { + return &groupInfo + } + } + return nil +} + +// checkGroupDesc is a helper function to check the validity of a ConsumerGroupDescription. +// We can't directly use DeepEqual because some fields/slice orders change with every run. +func checkGroupDesc( + groupDesc *ConsumerGroupDescription, state ConsumerGroupState, group string, + protocol string, clientIDToPartitions map[string][]TopicPartition) bool { + if groupDesc.GroupID != group || + groupDesc.State != state || + groupDesc.Error.Code() != ErrNoError || + groupDesc.PartitionAssignor != protocol || + // We can't check exactly the Broker information, but we add a check for the zero-value of the Host. + groupDesc.Coordinator.Host == "" || + // We will run all our tests on non-simple consumer groups only. + groupDesc.IsSimpleConsumerGroup || + len(groupDesc.Members) != len(clientIDToPartitions) { + return false + } + + for _, member := range groupDesc.Members { + if partitions, ok := clientIDToPartitions[member.ClientID]; !ok || + !reflect.DeepEqual(partitions, member.Assignment.TopicPartitions) { + return false + } + } + + return true +} + var testMsgsInit = false var p0TestMsgs []*testmsgType // partition 0 test messages // pAllTestMsgs holds messages for various partitions including PartitionAny and invalid partitions @@ -1231,6 +1280,411 @@ func validateTopicResult(t *testing.T, result []TopicResult, expError map[string } } +// TestAdminClient_DeleteConsumerGroups verifies the working of the +// DeleteConsumerGroups API in the admin client. +// It does so by listing consumer groups before/after deletion. +func TestAdminClient_DeleteConsumerGroups(t *testing.T) { + if !testconfRead() { + t.Skipf("Missing testconf.json") + } + + rand.Seed(time.Now().Unix()) + + // Generating new groupID to ensure a fresh group is created. + groupID := fmt.Sprintf("%s-%d", testconf.GroupID, rand.Int()) + + ac := createAdminClient(t) + defer ac.Close() + + // Check that our group is not present initially. + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err := ac.ListConsumerGroups(ctx, SetAdminRequestTimeout(30*time.Second)) + if err != nil { + t.Errorf("Error listing consumer groups %s\n", err) + return + } + + if findConsumerGroupListing(listGroupResult.Valid, groupID) != nil { + t.Errorf("Consumer group present before consumer created: %s\n", groupID) + return + } + + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + // Create consumer + config := &ConfigMap{ + "bootstrap.servers": testconf.Brokers, + "group.id": groupID, + "auto.offset.reset": "earliest", + "enable.auto.offset.store": false, + } + config.updateFromTestconf() + consumer, err := NewConsumer(config) + if err != nil { + t.Errorf("Failed to create consumer: %s\n", err) + return + } + consumerClosed := false + defer func() { + if !consumerClosed { + consumer.Close() + } + }() + + if err := consumer.Subscribe(testconf.Topic, nil); err != nil { + t.Errorf("Failed to subscribe to %s: %s\n", testconf.Topic, err) + return + } + + // This ReadMessage gives some time for the rebalance to happen. + _, err = consumer.ReadMessage(5 * time.Second) + if err != nil && err.(Error).Code() != ErrTimedOut { + t.Errorf("Failed while reading message: %s\n", err) + return + } + + // Check that the group exists. + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err = ac.ListConsumerGroups(ctx, SetAdminRequestTimeout(30*time.Second)) + if err != nil { + t.Errorf("Error listing consumer groups %s\n", err) + return + } + + if findConsumerGroupListing(listGroupResult.Valid, groupID) == nil { + t.Errorf("Consumer group %s should be present\n", groupID) + return + } + + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + // Try deleting the group while consumer is active. It should fail. + result, err := ac.DeleteConsumerGroups(ctx, []string{groupID}) + if err != nil { + t.Errorf("DeleteConsumerGroups() failed: %s", err) + return + } + resultGroups := result.GroupResults + + if len(resultGroups) != 1 || resultGroups[0].Group != groupID { + t.Errorf("Wrong group affected/no group affected") + return + } + + if resultGroups[0].Error.code != ErrNonEmptyGroup { + t.Errorf("Encountered the wrong error after calling DeleteConsumerGroups %s", resultGroups[0].Error) + return + } + + // Close the consumer. + if err = consumer.Close(); err != nil { + t.Errorf("Could not close consumer %s", err) + return + } + consumerClosed = true + + // Delete the consumer group now. + result, err = ac.DeleteConsumerGroups(ctx, []string{groupID}) + if err != nil { + t.Errorf("DeleteConsumerGroups() failed: %s", err) + return + } + resultGroups = result.GroupResults + + if len(resultGroups) != 1 || resultGroups[0].Group != groupID { + t.Errorf("Wrong group affected/no group affected") + return + } + + if resultGroups[0].Error.code != ErrNoError { + t.Errorf("Encountered an error after calling DeleteConsumerGroups %s", resultGroups[0].Error) + return + } + + // Check for the absence of the consumer group after deletion. + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err = ac.ListConsumerGroups(ctx, SetAdminRequestTimeout(30*time.Second)) + if err != nil { + t.Errorf("Error listing consumer groups %s\n", err) + return + } + + if findConsumerGroupListing(listGroupResult.Valid, groupID) != nil { + t.Errorf("Consumer group %s should not be present\n", groupID) + return + } +} + +// TestAdminClient_ListAndDescribeConsumerGroups validates the working of the +// list consumer groups and describe consumer group APIs of the admin client. +// We test the following situations: +// 1. One consumer group with one client. +// 2. One consumer group with two clients. +// 3. Empty consumer group. +func TestAdminClient_ListAndDescribeConsumerGroups(t *testing.T) { + if !testconfRead() { + t.Skipf("Missing testconf.json") + } + + // Generating a new topic/groupID to ensure a fresh group/topic is created. + rand.Seed(time.Now().Unix()) + groupID := fmt.Sprintf("%s-%d", testconf.GroupID, rand.Int()) + topic := fmt.Sprintf("%s-%d", testconf.Topic, rand.Int()) + nonExistentGroupID := fmt.Sprintf("%s-nonexistent-%d", testconf.GroupID, rand.Int()) + + clientID1 := "test.client.1" + clientID2 := "test.client.2" + + ac := createAdminClient(t) + defer ac.Close() + + // Create a topic - we need to create here because we need 2 partitions. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err := ac.CreateTopics(ctx, []TopicSpecification{ + { + Topic: topic, + NumPartitions: 2, + }, + }) + if err != nil { + t.Errorf("Topic creation failed with error %v", err) + return + } + + // Delete the topic after the test is done. + defer func() { + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err = ac.DeleteTopics(ctx, []string{topic}) + if err != nil { + t.Errorf("Topic deletion failed with error %v", err) + } + }() + + // Check the non-existence of consumer groups initially. + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err := ac.ListConsumerGroups(ctx, SetAdminRequestTimeout(30*time.Second)) + if err != nil || len(listGroupResult.Errors) > 0 { + t.Errorf("Error listing consumer groups %s %v\n", err, listGroupResult.Errors) + return + } + + groups := listGroupResult.Valid + if findConsumerGroupListing(groups, groupID) != nil || findConsumerGroupListing(groups, nonExistentGroupID) != nil { + t.Errorf("Consumer groups %s and %s should not be present\n", groupID, nonExistentGroupID) + return + } + + // 1. One consumer group with one client. + // Create the first consumer. + config := &ConfigMap{ + "bootstrap.servers": testconf.Brokers, + "group.id": groupID, + "auto.offset.reset": "earliest", + "enable.auto.offset.store": false, + "client.id": clientID1, + "partition.assignment.strategy": "range", + } + config.updateFromTestconf() + consumer1, err := NewConsumer(config) + if err != nil { + t.Errorf("Failed to create consumer: %s\n", err) + return + } + consumer1Closed := false + defer func() { + if !consumer1Closed { + consumer1.Close() + } + }() + consumer1.Subscribe(topic, nil) + + // Call Poll to trigger a rebalance and give it enough time to finish. + consumer1.Poll(10 * 1000) + + // Check the existence of the group. + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err = ac.ListConsumerGroups(ctx, SetAdminRequestTimeout(30*time.Second)) + if err != nil || len(listGroupResult.Errors) > 0 { + t.Errorf("Error listing consumer groups %s %v\n", err, listGroupResult.Errors) + return + } + groups = listGroupResult.Valid + + if findConsumerGroupListing(groups, groupID) == nil || findConsumerGroupListing(groups, nonExistentGroupID) != nil { + t.Errorf("Consumer groups %s should be present and %s should not be\n", groupID, nonExistentGroupID) + return + } + + // Test the description of the consumer group. + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + groupDescResult, err := ac.DescribeConsumerGroups( + ctx, []string{groupID}, SetAdminRequestTimeout(30*time.Second)) + if err != nil { + t.Errorf("Error describing consumer groups %s\n", err) + return + } + + groupDescs := groupDescResult.ConsumerGroupDescriptions + if len(groupDescs) != 1 { + t.Errorf("Describing one group should give exactly one result %s\n", err) + return + } + + groupDesc := &groupDescs[0] + + clientIDToPartitions := make(map[string][]TopicPartition) + clientIDToPartitions[clientID1] = []TopicPartition{ + {Topic: &topic, Partition: 0, Offset: OffsetInvalid}, + {Topic: &topic, Partition: 1, Offset: OffsetInvalid}, + } + if !checkGroupDesc(groupDesc, ConsumerGroupStateStable, groupID, "range", clientIDToPartitions) { + t.Errorf("Expected description for consumer group %s is not same as actual: %v", groupID, groupDesc) + return + } + + // 2. One consumer group with two clients. + // Add another consumer to the same group. + config = &ConfigMap{ + "bootstrap.servers": testconf.Brokers, + "group.id": groupID, + "auto.offset.reset": "earliest", + "enable.auto.offset.store": false, + "client.id": clientID2, + "partition.assignment.strategy": "range", + } + config.updateFromTestconf() + consumer2, err := NewConsumer(config) + if err != nil { + t.Errorf("Failed to create consumer: %s\n", err) + return + } + consumer2Closed := false + defer func() { + if !consumer2Closed { + consumer2.Close() + } + }() + consumer2.Subscribe(topic, nil) + + // Call Poll to start triggering the rebalance. Give it enough time to run + // that it becomes stable. + // We need to make sure that the consumer group stabilizes since we will + // check for the state later on. + consumer2.Poll(5 * 1000) + consumer1.Poll(5 * 1000) + isGroupStable := false + for !isGroupStable { + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + groupDescResult, err = ac.DescribeConsumerGroups(ctx, []string{groupID}, SetAdminRequestTimeout(30*time.Second)) + if err != nil { + t.Errorf("Error describing consumer groups %s\n", err) + return + } + groupDescs = groupDescResult.ConsumerGroupDescriptions + groupDesc = findConsumerGroupDescription(groupDescs, groupID) + if groupDesc == nil { + t.Errorf("Consumer group %s should be present\n", groupID) + return + } + isGroupStable = groupDesc.State == ConsumerGroupStateStable + time.Sleep(time.Second) + } + + clientIDToPartitions[clientID1] = []TopicPartition{ + {Topic: &topic, Partition: 0, Offset: OffsetInvalid}, + } + clientIDToPartitions[clientID2] = []TopicPartition{ + {Topic: &topic, Partition: 1, Offset: OffsetInvalid}, + } + if !checkGroupDesc(groupDesc, ConsumerGroupStateStable, groupID, "range", clientIDToPartitions) { + t.Errorf("Expected description for consumer group %s is not same as actual %v\n", groupID, groupDesc) + return + } + + // 3. Empty consumer group. + // Close the existing consumers. + if consumer1.Close() != nil { + t.Errorf("Error closing the first consumer\n") + return + } + consumer1Closed = true + + if consumer2.Close() != nil { + t.Errorf("Error closing the second consumer\n") + return + } + consumer2Closed = true + + // Try describing an empty group. + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + groupDescResult, err = ac.DescribeConsumerGroups(ctx, []string{groupID}, SetAdminRequestTimeout(30*time.Second)) + groupDescs = groupDescResult.ConsumerGroupDescriptions + + if err != nil { + t.Errorf("Error describing consumer groups %s\n", err) + return + } + + groupDesc = findConsumerGroupDescription(groupDescs, groupID) + if groupDesc == nil { + t.Errorf("Consumer group %s should be present\n", groupID) + return + } + + clientIDToPartitions = make(map[string][]TopicPartition) + if !checkGroupDesc(groupDesc, ConsumerGroupStateEmpty, groupID, "", clientIDToPartitions) { + t.Errorf("Expected description for consumer group %s is not same as actual %v\n", groupID, groupDesc) + } + + // Try listing the Empty consumer group, and make sure that the States option + // works while listing. + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err = ac.ListConsumerGroups( + ctx, SetAdminRequestTimeout(30*time.Second), + SetAdminMatchConsumerGroupStates([]ConsumerGroupState{ConsumerGroupStateEmpty})) + if err != nil || len(listGroupResult.Errors) > 0 { + t.Errorf("Error listing consumer groups %s %v\n", err, listGroupResult.Errors) + return + } + groups = listGroupResult.Valid + + groupInfo := findConsumerGroupListing(listGroupResult.Valid, groupID) + if groupInfo == nil { + t.Errorf("Consumer group %s should be present\n", groupID) + return + } + + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + listGroupResult, err = ac.ListConsumerGroups( + ctx, SetAdminRequestTimeout(30*time.Second), + SetAdminMatchConsumerGroupStates([]ConsumerGroupState{ConsumerGroupStateStable})) + if err != nil || len(listGroupResult.Errors) > 0 { + t.Errorf("Error listing consumer groups %s %v\n", err, listGroupResult.Errors) + return + } + groups = listGroupResult.Valid + + groupInfo = findConsumerGroupListing(groups, groupID) + if groupInfo != nil { + t.Errorf("Consumer group %s should not be present\n", groupID) + return + } +} + func TestAdminTopics(t *testing.T) { rand.Seed(time.Now().Unix()) @@ -1852,3 +2306,242 @@ func TestAdminACLs(t *testing.T) { } checkExpectedResult(expectedDescribeACLs, *resultDescribeACLs) } + +// TestAdminClient_AlterListConsumerGroupOffsets tests the APIs +// ListConsumerGroupOffsets and AlterConsumerGroupOffsets. +// They are checked by producing to a topic, and consuming it, and then listing, +// modifying, and again listing the offset for that topic partition. +func TestAdminClient_AlterListConsumerGroupOffsets(t *testing.T) { + if !testconfRead() { + t.Skipf("Missing testconf.json") + } + + numMsgs := 5 // Needs to be > 1 to check alter. + + ac := createAdminClient(t) + defer ac.Close() + + // Create a topic. + topic := fmt.Sprintf("%s-%d", testconf.Topic, rand.Int()) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err := ac.CreateTopics(ctx, []TopicSpecification{ + { + Topic: topic, + NumPartitions: 1, + }, + }) + if err != nil { + t.Errorf("Topic creation failed with error %v", err) + return + } + + // Delete the topic after the test is done. + defer func() { + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err = ac.DeleteTopics(ctx, []string{topic}) + if err != nil { + t.Errorf("Topic deletion failed with error %v", err) + } + }() + + // Produce to the topic. + producer, err := NewProducer(&ConfigMap{ + "bootstrap.servers": testconf.Brokers, + }) + if err != nil { + t.Errorf("Producer could not be created with error %v", err) + return + } + defer producer.Close() + + for i := 0; i < numMsgs; i++ { + if err = producer.Produce(&Message{ + TopicPartition: TopicPartition{Topic: &topic, Partition: 0}, + Value: []byte("Value"), + }, nil); err != nil { + t.Errorf("Produce failed with error %v", err) + return + } + } + + producer.Flush(-1) + + // Consume from the topic. + consumer, err := NewConsumer(&ConfigMap{ + "bootstrap.servers": testconf.Brokers, + "group.id": testconf.GroupID, + "auto.offset.reset": "earliest", + "enable.auto.offset.store": false, + }) + if err != nil { + t.Errorf("Consumer could not be created with error %v", err) + return + } + consumerClosed := false + defer func() { + if !consumerClosed { + consumer.Close() + } + }() + + if err = consumer.Subscribe(topic, nil); err != nil { + t.Errorf("Consumer could not subscribe to the topic with an error %v", err) + return + } + + for i := 0; i < numMsgs; i++ { + msg, err := consumer.ReadMessage(-1) + if err != nil { + t.Errorf("Consumer failed to read a message with error %v", err) + return + } + + if _, err = consumer.StoreMessage(msg); err != nil { + t.Errorf("Consumer failed to store the message with error %v", err) + return + } + } + + if _, err = consumer.Commit(); err != nil { + t.Errorf("Consumer failed to commit with error %v", err) + return + } + + // Try altering offsets without closing the consumer - this should give an error. + // The error should be on a TopicPartition level, and not on the `err` level. + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + aresult, err := ac.AlterConsumerGroupOffsets(ctx, []ConsumerGroupTopicPartitions{ + { + Group: testconf.GroupID, + Partitions: []TopicPartition{ + { + Topic: &topic, + Partition: 0, + Offset: Offset(numMsgs - 1), + }, + }, + }, + }) + if err != nil { + t.Errorf("Unexpected error while altering offset %v", err) + return + } + + if len(aresult.ConsumerGroupsTopicPartitions) != 1 || + len(aresult.ConsumerGroupsTopicPartitions[0].Partitions) != 1 || + aresult.ConsumerGroupsTopicPartitions[0].Partitions[0].Error == nil { + t.Errorf("Unexpected result while altering offset, expected non-nil error in topic partition, got %v", aresult) + return + } + + // Close consumer so we can safely alter offsets. + if err = consumer.Close(); err != nil { + t.Errorf("Consumer failed to close with error %v", err) + return + } + consumerClosed = true + + // List offsets for our group/partition. + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + lresult, err := ac.ListConsumerGroupOffsets(ctx, []ConsumerGroupTopicPartitions{ + { + Group: testconf.GroupID, + Partitions: []TopicPartition{{Topic: &topic, Partition: 0}}, + }, + }) + if err != nil { + t.Errorf("Failed to list offset with error %v", err) + return + } + + if lresult.ConsumerGroupsTopicPartitions == nil || + len(lresult.ConsumerGroupsTopicPartitions) != 1 { + t.Errorf("Result length %d doesn't match expected length of 1", + len(lresult.ConsumerGroupsTopicPartitions)) + return + } + + groupTopicParitions := lresult.ConsumerGroupsTopicPartitions[0] + expectedResult := ConsumerGroupTopicPartitions{ + Group: testconf.GroupID, + Partitions: []TopicPartition{{Topic: &topic, Partition: 0, Offset: Offset(numMsgs)}}, + } + if !reflect.DeepEqual(groupTopicParitions, expectedResult) { + t.Errorf("Result[0] doesn't have expected structure %v, instead it is %v", + expectedResult, groupTopicParitions) + return + } + + // Alter offsets for our group/partitions. + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + aresult, err = ac.AlterConsumerGroupOffsets(ctx, []ConsumerGroupTopicPartitions{ + { + Group: testconf.GroupID, + Partitions: []TopicPartition{ + { + Topic: &topic, + Partition: 0, + Offset: Offset(numMsgs - 1), + }, + }, + }, + }) + if err != nil { + t.Errorf("Failed to alter offset with error %v", err) + return + } + + if aresult.ConsumerGroupsTopicPartitions == nil || + len(aresult.ConsumerGroupsTopicPartitions) != 1 { + t.Errorf("Result length %d doesn't match expected length of 1", + len(aresult.ConsumerGroupsTopicPartitions)) + return + } + + groupTopicParitions = aresult.ConsumerGroupsTopicPartitions[0] + expectedResult = ConsumerGroupTopicPartitions{ + Group: testconf.GroupID, + Partitions: []TopicPartition{{Topic: &topic, Partition: 0, Offset: Offset(numMsgs - 1)}}, + } + if !reflect.DeepEqual(groupTopicParitions, expectedResult) { + t.Errorf("Result[0] doesn't have expected structure %v, instead it is %v", + expectedResult, groupTopicParitions) + return + } + + // Check altered offsets using ListConsumerGroupOffsets. + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + lresult, err = ac.ListConsumerGroupOffsets(ctx, []ConsumerGroupTopicPartitions{ + { + Group: testconf.GroupID, + Partitions: []TopicPartition{{Topic: &topic, Partition: 0}}, + }, + }) + if err != nil { + t.Errorf("Failed to list offset with error %v", err) + return + } + + if lresult.ConsumerGroupsTopicPartitions == nil || + len(lresult.ConsumerGroupsTopicPartitions) != 1 { + t.Errorf("Result length %d doesn't match expected length of 1", + len(lresult.ConsumerGroupsTopicPartitions)) + return + } + + groupTopicParitions = lresult.ConsumerGroupsTopicPartitions[0] + expectedResult = ConsumerGroupTopicPartitions{ + Group: testconf.GroupID, + Partitions: []TopicPartition{{Topic: &topic, Partition: 0, Offset: Offset(numMsgs - 1)}}, + } + if !reflect.DeepEqual(groupTopicParitions, expectedResult) { + t.Errorf("Result[0] doesn't have expected structure %v, instead it is %v", + expectedResult, groupTopicParitions) + return + } + +} diff --git a/kafka/kafka.go b/kafka/kafka.go index 5a40ab25e..221657299 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -253,9 +253,10 @@ package kafka import ( "fmt" + "unsafe" + // Make sure librdkafka_vendor/ sub-directory is included in vendor pulls. _ "github.com/confluentinc/confluent-kafka-go/kafka/librdkafka_vendor" - "unsafe" ) /* @@ -266,6 +267,13 @@ import ( static rd_kafka_topic_partition_t *_c_rdkafka_topic_partition_list_entry(rd_kafka_topic_partition_list_t *rktparlist, int idx) { return idx < rktparlist->cnt ? &rktparlist->elems[idx] : NULL; } + +static const rd_kafka_group_result_t * +group_result_by_idx (const rd_kafka_group_result_t **groups, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return groups[idx]; +} */ import "C" @@ -316,6 +324,38 @@ func (tps TopicPartitions) Swap(i, j int) { tps[i], tps[j] = tps[j], tps[i] } +// Node represents a Kafka broker. +type Node struct { + // Node id. + ID int + // Node host. + Host string + // Node port. + Port int +} + +func (n Node) String() string { + return fmt.Sprintf("[%s:%d]/%d", n.Host, n.Port, n.ID) +} + +// ConsumerGroupTopicPartitions represents a consumer group's TopicPartitions. +type ConsumerGroupTopicPartitions struct { + // Group name + Group string + // Partitions list + Partitions []TopicPartition +} + +func (gtp ConsumerGroupTopicPartitions) String() string { + res := gtp.Group + res += "[ " + for _, tp := range gtp.Partitions { + res += tp.String() + " " + } + res += "]" + return res +} + // new_cparts_from_TopicPartitions creates a new C rd_kafka_topic_partition_list_t // from a TopicPartition array. func newCPartsFromTopicPartitions(partitions []TopicPartition) (cparts *C.rd_kafka_topic_partition_list_t) { @@ -366,6 +406,24 @@ func newTopicPartitionsFromCparts(cparts *C.rd_kafka_topic_partition_list_t) (pa return partitions } +// cToConsumerGroupTopicPartitions converts a C rd_kafka_group_result_t array to a +// ConsumerGroupTopicPartitions slice. +func (a *AdminClient) cToConsumerGroupTopicPartitions( + cGroupResults **C.rd_kafka_group_result_t, + cGroupCount C.size_t) (result []ConsumerGroupTopicPartitions) { + result = make([]ConsumerGroupTopicPartitions, uint(cGroupCount)) + + for i := uint(0); i < uint(cGroupCount); i++ { + cGroupResult := C.group_result_by_idx(cGroupResults, cGroupCount, C.size_t(i)) + cGroupPartitions := C.rd_kafka_group_result_partitions(cGroupResult) + result[i] = ConsumerGroupTopicPartitions{ + Group: C.GoString(C.rd_kafka_group_result_name(cGroupResult)), + Partitions: newTopicPartitionsFromCparts(cGroupPartitions), + } + } + return +} + // LibraryVersion returns the underlying librdkafka library version as a // (version_int, version_str) tuple. func LibraryVersion() (int, string) {