Skip to content

Commit

Permalink
Rewrite List/DescribeConsumerGroups to match librdkafka
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Dec 5, 2022
1 parent 6ba9479 commit cd3057d
Show file tree
Hide file tree
Showing 9 changed files with 1,484 additions and 542 deletions.
17 changes: 17 additions & 0 deletions examples/.github/workflows/base.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: check
on: [push, pull_request]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
branch: v1.9.2
- uses: actions/setup-go@v3
with:
go-version: '=1.11'
- run: |
cd $PWD/kafka
go build ./...
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@
package main

import (
"context"
"fmt"
"os"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

if len(os.Args) < 2 {
if len(os.Args) < 3 {
fmt.Fprintf(
os.Stderr,
"Usage: %s <bootstrap-servers> [group1 group2 ....]\nIf no groups are specified, all groups are listed\n",
"Usage: %s <bootstrap-servers> <group1> [<group2> ...]\n",
os.Args[0])
os.Exit(1)
}
Expand All @@ -46,18 +48,28 @@ func main() {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer a.Close()

groupInfos, err := a.DescribeConsumerGroups(groups)
// Call DescribeConsumerGroups.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
groupInfos, err := a.DescribeConsumerGroups(ctx, groups)
if err != nil {
fmt.Printf("Failed to describe groups: %s\n", err)
os.Exit(1)
}

// Print results
fmt.Printf("A total of %d consumer groups described:\n", len(groupInfos))
for _, groupInfo := range groupInfos {
fmt.Println(groupInfo)
fmt.Printf("A total of %d consumer group(s) described:\n\n", len(groupInfos))
for _, g := range groupInfos {
fmt.Printf("GroupId: %s\n"+
"Error: %s\n"+
"IsSimpleConsumerGroup: %v\n"+
"PartitionAssignor: %s\n"+
"State: %s\n"+
"Coordinator: %+v\n"+
"Members: %+v\n\n",
g.GroupId, g.Error, g.IsSimpleConsumerGroup, g.PartitionAssignor,
g.State, g.Coordinator, g.Members)
}

a.Close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

res, err := ac.ListConsumerGroupOffsets(ctx, gps, kafka.SetAdminRequireStable(requireStable))
res, err := ac.ListConsumerGroupOffsets(ctx, gps, kafka.SetAdminRequireStableOffsets(requireStable))
if err != nil {
panic(err)
}
Expand Down
50 changes: 28 additions & 22 deletions examples/admin_list_consumer_groups/admin_list_consumer_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package main

import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
Expand All @@ -29,18 +29,23 @@ import (
func main() {

if len(os.Args) < 2 {
fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> [<timeout_seconds> = infinite]\n", os.Args[0])
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> [<state1> <state2> ...]\n", os.Args[0])
os.Exit(1)
}

bootstrapServers := os.Args[1]

timeout_parsed := -1
var err error
var states []kafka.ConsumerGroupState
if len(os.Args) > 2 {
timeout_parsed, err = strconv.Atoi(os.Args[2])
if err != nil {
fmt.Printf("Error parsing the timeout %s\n: %s", os.Args[2], err)
statesStr := os.Args[2:]
for _, stateStr := range statesStr {
state, err := kafka.ConsumerGroupStateFromString(stateStr)
if err != nil {
fmt.Fprintf(os.Stderr,
"Given state %s is not a valid state\n", stateStr)
os.Exit(1)
}
states = append(states, state)
}
}

Expand All @@ -50,25 +55,26 @@ func main() {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer a.Close()

var groupInfos []kafka.GroupInfo
if timeout_parsed == -1 {
groupInfos, err = a.ListConsumerGroups()
} else {
timeout := time.Duration(timeout_parsed) * time.Second
groupInfos, err = a.ListConsumerGroups(kafka.SetListConsumerGroupsOptionRequestTimeout(timeout))
}
// Call ListConsumerGroups.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
listGroupRes, err := a.ListConsumerGroups(
ctx, kafka.SetAdminConsumerGroupStates(states))

if err != nil {
fmt.Printf("Failed to list groups: %s\n", err)
if err != nil || len(listGroupRes.Errors) > 0 {
fmt.Printf("Failed to list groups: %s %v\n", err, listGroupRes.Errors)
os.Exit(1)
}

// Print results
fmt.Printf("A total of %d consumer groups listed:\n", len(groupInfos))
for _, groupInfo := range groupInfos {
fmt.Println(groupInfo)
groups := listGroupRes.ConsumerGroupListings
fmt.Printf("A total of %d consumer group(s) listed:\n", len(groups))
for _, group := range groups {
fmt.Printf("GroupId: %s\n", group.GroupId)
fmt.Printf("State: %s\n", group.State)
fmt.Printf("IsSimpleConsumerGroup: %v\n", group.IsSimpleConsumerGroup)
fmt.Println()
}

a.Close()
}
Loading

0 comments on commit cd3057d

Please sign in to comment.