Skip to content

Commit

Permalink
[admin] Add consumer group bindings for KIP 222, 518, 396 (partial) a…
Browse files Browse the repository at this point in the history
…nd DeleteConsumerGroups (#923)

Implement ListConsumerGroups, DescribeConsumerGroups,
ListConsumerGroupOffsets (KIP-222), DeleteConsumerGroups,
AlterConsumerGroupOffsets (KIP-396),
Allow listing consumer groups per state (KIP-518).

Co-authored-by: Milind L <[email protected]>
Co-authored-by: Santwana Verma <[email protected]>
  • Loading branch information
3 people authored and PrasanthV454 committed Mar 17, 2023
1 parent 68031b2 commit 2a41eb9
Show file tree
Hide file tree
Showing 12 changed files with 2,132 additions and 10 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,25 @@ This is a maintenance release:
used, the method will block until the fetcher state is updated (typically
within microseconds).
* The minimum version of Go supported has been changed from 1.11 to 1.14.
* [KIP-222](https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API)
Add Consumer Group operations to Admin API.
* [KIP-518](https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state)
Allow listing consumer groups per state.
* [KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484)
Partially implemented: support for AlterConsumerGroupOffsets.
* As result of the above KIPs, added (#923)
- `ListConsumerGroups` Admin operation. Supports listing by state.
- `DescribeConsumerGroups` Admin operation. Supports multiple groups.
- `DeleteConsumerGroups` Admin operation. Supports multiple groups (@vsantwana).
- `ListConsumerGroupOffsets` Admin operation. Currently, only supports
1 group with multiple partitions. Supports the `requireStable` option.
- `AlterConsumerGroupOffsets` Admin operation. Currently, only supports
1 group with multiple offsets.


confluent-kafka-go is based on librdkafka v2.0.0, see the
[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v2.0.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.


## v1.9.2
Expand Down
5 changes: 5 additions & 0 deletions examples/.gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
admin_alter_consumer_group_offsets/admin_alter_consumer_group_offsets
admin_create_acls/admin_create_acls
admin_create_topic/admin_create_topic
admin_delete_acls/admin_delete_acls
admin_delete_consumer_groups/admin_delete_consumer_groups
admin_delete_topics/admin_delete_topics
admin_describe_acls/admin_describe_acls
admin_describe_config/admin_describe_config
admin_describe_consumer_groups/admin_describe_consumer_groups
admin_list_consumer_groups/admin_list_consumer_groups
admin_list_consumer_group_offsets/admin_list_consumer_group_offsets
avro_generic_consumer_example/avro_generic_consumer_example
avro_generic_producer_example/avro_generic_producer_example
avro_specific_consumer_example/avro_specific_consumer_example
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Copyright 2022 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Alter consumer group offsets
package main

import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
args := os.Args

if len(args) < 6 {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap_servers> <group_id> "+
"<topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ...]\n",
args[0])
os.Exit(1)
}

// Create new AdminClient.
ac, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": args[1],
})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer ac.Close()

var partitions []kafka.TopicPartition
for i := 3; i+2 < len(args); i += 3 {
partition, err := strconv.ParseInt(args[i+1], 10, 32)
if err != nil {
panic(err)
}
offset, err := strconv.ParseUint(args[i+2], 10, 64)
if err != nil {
panic(err)
}

partitions = append(partitions, kafka.TopicPartition{
Topic: &args[i],
Partition: int32(partition),
Offset: kafka.Offset(offset),
})
}

gps := []kafka.ConsumerGroupTopicPartitions{
{
Group: args[2],
Partitions: partitions,
},
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

res, err := ac.AlterConsumerGroupOffsets(ctx, gps)
if err != nil {
fmt.Printf("Failed to alter consumer group offsets: %s\n", err)
os.Exit(1)
}

fmt.Printf("AlterConsumerGroupOffsets result: %+v\n", res)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Copyright 2022 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Delete consumer group
package main

import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
args := os.Args

if len(args) < 4 {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap_servers> <request_timeout_sec> <group1> [<group2> ...]\n", args[0])
os.Exit(1)
}

// Create new AdminClient.
ac, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": args[1],
})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer ac.Close()

timeoutSec, err := strconv.Atoi(args[2])
if err != nil {
fmt.Printf("Failed to parse timeout: %s\n", err)
os.Exit(1)
}

groups := args[3:]

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

res, err := ac.DeleteConsumerGroups(ctx, groups,
kafka.SetAdminRequestTimeout(time.Duration(timeoutSec)*time.Second))
if err != nil {
fmt.Printf("Failed to delete groups: %s\n", err)
os.Exit(1)
}

fmt.Printf("DeleteConsumerGroups result: %+v\n", res)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Copyright 2022 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Describe consumer groups
package main

import (
"context"
"fmt"
"os"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
if len(os.Args) < 3 {
fmt.Fprintf(
os.Stderr,
"Usage: %s <bootstrap-servers> <group1> [<group2> ...]\n",
os.Args[0])
os.Exit(1)
}

bootstrapServers := os.Args[1]
groups := os.Args[2:]

// Create a new AdminClient.
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer a.Close()

// Call DescribeConsumerGroups.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
describeGroupsResult, err := a.DescribeConsumerGroups(ctx, groups)
if err != nil {
fmt.Printf("Failed to describe groups: %s\n", err)
os.Exit(1)
}

// Print results
fmt.Printf("A total of %d consumer group(s) described:\n\n",
len(describeGroupsResult.ConsumerGroupDescriptions))
for _, g := range describeGroupsResult.ConsumerGroupDescriptions {
fmt.Printf("GroupId: %s\n"+
"Error: %s\n"+
"IsSimpleConsumerGroup: %v\n"+
"PartitionAssignor: %s\n"+
"State: %s\n"+
"Coordinator: %+v\n"+
"Members: %+v\n\n",
g.GroupID, g.Error, g.IsSimpleConsumerGroup, g.PartitionAssignor,
g.State, g.Coordinator, g.Members)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Copyright 2022 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// List consumer group offsets
package main

import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
args := os.Args

if len(args) < 6 {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap_servers> <group_id> <require_stable> "+
"<topic1> <partition1> [<topic2> <partition2> .... ]\n", args[0])
os.Exit(1)
}

requireStable, err := strconv.ParseBool(args[3])
if err != nil {
fmt.Printf(
"Failed to parse value of require_stable %s: %s\n", args[3], err)
os.Exit(1)
}

// Create new AdminClient.
ac, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": args[1],
})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer ac.Close()

var partitions []kafka.TopicPartition
for i := 4; i+1 < len(args); i += 2 {
partition, err := strconv.ParseInt(args[i+1], 10, 32)
if err != nil {
fmt.Printf("Failed to parse partition %s: %s\n", args[i+1], err)
os.Exit(1)
}

partitions = append(partitions, kafka.TopicPartition{
Topic: &args[i],
Partition: int32(partition),
})
}

gps := []kafka.ConsumerGroupTopicPartitions{
{
Group: args[2],
Partitions: partitions,
},
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

res, err := ac.ListConsumerGroupOffsets(
ctx, gps, kafka.SetAdminRequireStableOffsets(requireStable))
if err != nil {
fmt.Printf("Failed to list consumer group offsets %s\n", err)
os.Exit(1)
}

fmt.Printf("ListConsumerGroupOffset result: %+v\n", res)
}
Loading

0 comments on commit 2a41eb9

Please sign in to comment.