Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,10 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
_ = b.Open(conf) // Ensure that broker is opened

request := &ListGroupsRequest{}
if ca.conf.Version.IsAtLeast(V2_6_0_0) {
if ca.conf.Version.IsAtLeast(V3_8_0_0) {
// Version 5 adds the TypesFilter field (KIP-848).
request.Version = 5
} else if ca.conf.Version.IsAtLeast(V2_6_0_0) {
// Version 4 adds the StatesFilter field (KIP-518).
request.Version = 4
} else if ca.conf.Version.IsAtLeast(V2_4_0_0) {
Expand Down
45 changes: 45 additions & 0 deletions functional_admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ package sarama

import (
"context"
"maps"
"slices"
"testing"

"github.com/davecgh/go-spew/spew"
"github.com/stretchr/testify/assert"
)

func TestFuncAdminQuotas(t *testing.T) {
Expand Down Expand Up @@ -188,6 +191,48 @@ func TestFuncAdminDescribeGroups(t *testing.T) {
m2.AssertCleanShutdown()
}

func TestFuncAdminListConsumerGroups(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

group1 := testFuncConsumerGroupID(t)
group2 := testFuncConsumerGroupID(t)

config := NewFunctionalTestConfig()
adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, adminClient)

config1 := NewFunctionalTestConfig()
config1.ClientID = "M1"
config1.Consumer.Offsets.Initial = OffsetNewest
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, group1, 100, nil, "test.4")
defer m1.Close()

config2 := NewFunctionalTestConfig()
config2.ClientID = "M2"
config2.Consumer.Offsets.Initial = OffsetNewest
config2.Consumer.Group.InstanceId = "Instance2"
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, group2, 100, nil, "test.4")
defer m2.Close()

m1.WaitForState(2)
m2.WaitForState(2)

res, err := adminClient.ListConsumerGroups()
if err != nil {
t.Fatal(err)
}
assert.GreaterOrEqual(t, len(res), 2)
assert.Contains(t, slices.Collect(maps.Keys(res)), group1)
assert.Contains(t, slices.Collect(maps.Keys(res)), group2)

m1.AssertCleanShutdown()
m2.AssertCleanShutdown()
}

func TestFuncAdminListConsumerGroupOffsets(t *testing.T) {
checkKafkaVersion(t, "0.8.2.0")
setupFunctionalTest(t)
Expand Down
28 changes: 27 additions & 1 deletion list_groups_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
type ListGroupsRequest struct {
Version int16
StatesFilter []string // version 4 or later
TypesFilter []string // version 5 or later
}

func (r *ListGroupsRequest) setVersion(v int16) {
Expand All @@ -19,6 +20,15 @@ func (r *ListGroupsRequest) encode(pe packetEncoder) error {
}
}
}
if r.Version >= 5 {
pe.putCompactArrayLength(len(r.TypesFilter))
for _, filter := range r.TypesFilter {
err := pe.putCompactString(filter)
if err != nil {
return err
}
}
}
if r.Version >= 3 {
pe.putEmptyTaggedFieldArray()
}
Expand All @@ -41,6 +51,20 @@ func (r *ListGroupsRequest) decode(pd packetDecoder, version int16) (err error)
}
}
}
if r.Version >= 5 {
filterLen, err := pd.getCompactArrayLength()
if err != nil {
return err
}
if filterLen > 0 {
r.TypesFilter = make([]string, filterLen)
for i := 0; i < filterLen; i++ {
if r.TypesFilter[i], err = pd.getCompactString(); err != nil {
return err
}
}
}
}
if r.Version >= 3 {
if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
return err
Expand All @@ -65,11 +89,13 @@ func (r *ListGroupsRequest) headerVersion() int16 {
}

func (r *ListGroupsRequest) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 4
return r.Version >= 0 && r.Version <= 5
}

func (r *ListGroupsRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 5:
return V3_8_0_0
case 4:
return V2_6_0_0
case 3:
Expand Down
12 changes: 12 additions & 0 deletions list_groups_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,16 @@ func TestListGroupsRequest(t *testing.T) {
6, 'E', 'm', 'p', 't', 'y', // compact string
0, // empty tag buffer
})

testRequest(t, "ListGroupsRequest", &ListGroupsRequest{
Version: 5,
StatesFilter: []string{"Empty"},
TypesFilter: []string{"Classic"},
}, []byte{
2, // compact array length (1)
6, 'E', 'm', 'p', 't', 'y', // compact string
2, // compact array length (1)
8, 'C', 'l', 'a', 's', 's', 'i', 'c', // compact string
0, // empty tag buffer
})
}
23 changes: 20 additions & 3 deletions list_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func (r *ListGroupsResponse) setVersion(v int16) {

type GroupData struct {
GroupState string // version 4 or later
GroupType string // version 5 or later
}

func (r *ListGroupsResponse) encode(pe packetEncoder) error {
Expand Down Expand Up @@ -51,6 +52,13 @@ func (r *ListGroupsResponse) encode(pe packetEncoder) error {
return err
}
}

if r.Version >= 5 {
groupData := r.GroupsData[groupId]
if err := pe.putCompactString(groupData.GroupType); err != nil {
return err
}
}
}
}

Expand Down Expand Up @@ -115,13 +123,20 @@ func (r *ListGroupsResponse) decode(pd packetDecoder, version int16) error {
r.Groups[groupId] = protocolType

if r.Version >= 4 {
var groupData GroupData
groupState, err := pd.getCompactString()
if err != nil {
return err
}
r.GroupsData[groupId] = GroupData{
GroupState: groupState,
groupData.GroupState = groupState
if r.Version >= 5 {
groupType, err := pd.getCompactString()
if err != nil {
return err
}
groupData.GroupType = groupType
}
r.GroupsData[groupId] = groupData
}

if r.Version >= 3 {
Expand Down Expand Up @@ -156,11 +171,13 @@ func (r *ListGroupsResponse) headerVersion() int16 {
}

func (r *ListGroupsResponse) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 4
return r.Version >= 0 && r.Version <= 5
}

func (r *ListGroupsResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 5:
return V3_8_0_0
case 4:
return V2_6_0_0
case 3:
Expand Down
32 changes: 31 additions & 1 deletion list_groups_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ var (
0, // Empty tag buffer
0, // Empty tag buffer
}

listGroupResponseV5 = []byte{
0, 0, 0, 0, // no throttle time
0, 0, // no error
2, // compact array length (1)
4, 'f', 'o', 'o', // group name (compact string)
9, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // protocol type (compact string)
6, 'E', 'm', 'p', 't', 'y', // state (compact string)
8, 'C', 'l', 'a', 's', 's', 'i', 'c', // type (compact string)
0, // Empty tag buffer
0, // Empty tag buffer
}
)

func TestListGroupsResponse(t *testing.T) {
Expand Down Expand Up @@ -82,6 +94,24 @@ func TestListGroupsResponse(t *testing.T) {
t.Error("Expected foo group to use consumer protocol")
}
if response.GroupsData["foo"].GroupState != "Empty" {
t.Error("Expected foo grup to have empty state")
t.Error("Expected foo group to have empty state")
}

response = new(ListGroupsResponse)
testVersionDecodable(t, "no error", response, listGroupResponseV5, 5)
if !errors.Is(response.Err, ErrNoError) {
t.Error("Expected no gerror, found:", response.Err)
}
if len(response.Groups) != 1 {
t.Error("Expected one group")
}
if response.Groups["foo"] != "consumer" {
t.Error("Expected foo group to use consumer protocol")
}
if response.GroupsData["foo"].GroupState != "Empty" {
t.Error("Expected foo group to have empty state")
}
if response.GroupsData["foo"].GroupType != "Classic" {
t.Error("Expected foo group to have type 'Classic', found: ", response.GroupsData["foo"].GroupType)
}
}
Loading