diff --git a/admin.go b/admin.go index 9e52887de..658904640 100644 --- a/admin.go +++ b/admin.go @@ -1008,7 +1008,10 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e _ = b.Open(conf) // Ensure that broker is opened request := &ListGroupsRequest{} - if ca.conf.Version.IsAtLeast(V2_6_0_0) { + if ca.conf.Version.IsAtLeast(V3_8_0_0) { + // Version 5 adds the TypesFilter field (KIP-848). + request.Version = 5 + } else if ca.conf.Version.IsAtLeast(V2_6_0_0) { // Version 4 adds the StatesFilter field (KIP-518). request.Version = 4 } else if ca.conf.Version.IsAtLeast(V2_4_0_0) { diff --git a/functional_admin_test.go b/functional_admin_test.go index 16d4efd7b..bf7052bc8 100644 --- a/functional_admin_test.go +++ b/functional_admin_test.go @@ -4,9 +4,12 @@ package sarama import ( "context" + "maps" + "slices" "testing" "github.com/davecgh/go-spew/spew" + "github.com/stretchr/testify/assert" ) func TestFuncAdminQuotas(t *testing.T) { @@ -188,6 +191,48 @@ func TestFuncAdminDescribeGroups(t *testing.T) { m2.AssertCleanShutdown() } +func TestFuncAdminListConsumerGroups(t *testing.T) { + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + group1 := testFuncConsumerGroupID(t) + group2 := testFuncConsumerGroupID(t) + + config := NewFunctionalTestConfig() + adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, adminClient) + + config1 := NewFunctionalTestConfig() + config1.ClientID = "M1" + config1.Consumer.Offsets.Initial = OffsetNewest + m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, group1, 100, nil, "test.4") + defer m1.Close() + + config2 := NewFunctionalTestConfig() + config2.ClientID = "M2" + config2.Consumer.Offsets.Initial = OffsetNewest + config2.Consumer.Group.InstanceId = "Instance2" + m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, group2, 100, nil, "test.4") + defer m2.Close() + + m1.WaitForState(2) + m2.WaitForState(2) + + res, err := adminClient.ListConsumerGroups() + if err != nil { + t.Fatal(err) + } + assert.GreaterOrEqual(t, len(res), 2) + assert.Contains(t, slices.Collect(maps.Keys(res)), group1) + assert.Contains(t, slices.Collect(maps.Keys(res)), group2) + + m1.AssertCleanShutdown() + m2.AssertCleanShutdown() +} + func TestFuncAdminListConsumerGroupOffsets(t *testing.T) { checkKafkaVersion(t, "0.8.2.0") setupFunctionalTest(t) diff --git a/list_groups_request.go b/list_groups_request.go index 4a6dbc086..075fe211b 100644 --- a/list_groups_request.go +++ b/list_groups_request.go @@ -3,6 +3,7 @@ package sarama type ListGroupsRequest struct { Version int16 StatesFilter []string // version 4 or later + TypesFilter []string // version 5 or later } func (r *ListGroupsRequest) setVersion(v int16) { @@ -19,6 +20,15 @@ func (r *ListGroupsRequest) encode(pe packetEncoder) error { } } } + if r.Version >= 5 { + pe.putCompactArrayLength(len(r.TypesFilter)) + for _, filter := range r.TypesFilter { + err := pe.putCompactString(filter) + if err != nil { + return err + } + } + } if r.Version >= 3 { pe.putEmptyTaggedFieldArray() } @@ -41,6 +51,20 @@ func (r *ListGroupsRequest) decode(pd packetDecoder, version int16) (err error) } } } + if r.Version >= 5 { + filterLen, err := pd.getCompactArrayLength() + if err != nil { + return err + } + if filterLen > 0 { + r.TypesFilter = make([]string, filterLen) + for i := 0; i < filterLen; i++ { + if r.TypesFilter[i], err = pd.getCompactString(); err != nil { + return err + } + } + } + } if r.Version >= 3 { if _, err = pd.getEmptyTaggedFieldArray(); err != nil { return err @@ -65,11 +89,13 @@ func (r *ListGroupsRequest) headerVersion() int16 { } func (r *ListGroupsRequest) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 4 + return r.Version >= 0 && r.Version <= 5 } func (r *ListGroupsRequest) requiredVersion() KafkaVersion { switch r.Version { + case 5: + return V3_8_0_0 case 4: return V2_6_0_0 case 3: diff --git a/list_groups_request_test.go b/list_groups_request_test.go index 7eb784b1e..c95baf5b2 100644 --- a/list_groups_request_test.go +++ b/list_groups_request_test.go @@ -36,4 +36,16 @@ func TestListGroupsRequest(t *testing.T) { 6, 'E', 'm', 'p', 't', 'y', // compact string 0, // empty tag buffer }) + + testRequest(t, "ListGroupsRequest", &ListGroupsRequest{ + Version: 5, + StatesFilter: []string{"Empty"}, + TypesFilter: []string{"Classic"}, + }, []byte{ + 2, // compact array length (1) + 6, 'E', 'm', 'p', 't', 'y', // compact string + 2, // compact array length (1) + 8, 'C', 'l', 'a', 's', 's', 'i', 'c', // compact string + 0, // empty tag buffer + }) } diff --git a/list_groups_response.go b/list_groups_response.go index db31527a3..60607bd7e 100644 --- a/list_groups_response.go +++ b/list_groups_response.go @@ -14,6 +14,7 @@ func (r *ListGroupsResponse) setVersion(v int16) { type GroupData struct { GroupState string // version 4 or later + GroupType string // version 5 or later } func (r *ListGroupsResponse) encode(pe packetEncoder) error { @@ -51,6 +52,13 @@ func (r *ListGroupsResponse) encode(pe packetEncoder) error { return err } } + + if r.Version >= 5 { + groupData := r.GroupsData[groupId] + if err := pe.putCompactString(groupData.GroupType); err != nil { + return err + } + } } } @@ -115,13 +123,20 @@ func (r *ListGroupsResponse) decode(pd packetDecoder, version int16) error { r.Groups[groupId] = protocolType if r.Version >= 4 { + var groupData GroupData groupState, err := pd.getCompactString() if err != nil { return err } - r.GroupsData[groupId] = GroupData{ - GroupState: groupState, + groupData.GroupState = groupState + if r.Version >= 5 { + groupType, err := pd.getCompactString() + if err != nil { + return err + } + groupData.GroupType = groupType } + r.GroupsData[groupId] = groupData } if r.Version >= 3 { @@ -156,11 +171,13 @@ func (r *ListGroupsResponse) headerVersion() int16 { } func (r *ListGroupsResponse) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 4 + return r.Version >= 0 && r.Version <= 5 } func (r *ListGroupsResponse) requiredVersion() KafkaVersion { switch r.Version { + case 5: + return V3_8_0_0 case 4: return V2_6_0_0 case 3: diff --git a/list_groups_response_test.go b/list_groups_response_test.go index ad75c5faf..116cc1327 100644 --- a/list_groups_response_test.go +++ b/list_groups_response_test.go @@ -35,6 +35,18 @@ var ( 0, // Empty tag buffer 0, // Empty tag buffer } + + listGroupResponseV5 = []byte{ + 0, 0, 0, 0, // no throttle time + 0, 0, // no error + 2, // compact array length (1) + 4, 'f', 'o', 'o', // group name (compact string) + 9, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // protocol type (compact string) + 6, 'E', 'm', 'p', 't', 'y', // state (compact string) + 8, 'C', 'l', 'a', 's', 's', 'i', 'c', // type (compact string) + 0, // Empty tag buffer + 0, // Empty tag buffer + } ) func TestListGroupsResponse(t *testing.T) { @@ -82,6 +94,24 @@ func TestListGroupsResponse(t *testing.T) { t.Error("Expected foo group to use consumer protocol") } if response.GroupsData["foo"].GroupState != "Empty" { - t.Error("Expected foo grup to have empty state") + t.Error("Expected foo group to have empty state") + } + + response = new(ListGroupsResponse) + testVersionDecodable(t, "no error", response, listGroupResponseV5, 5) + if !errors.Is(response.Err, ErrNoError) { + t.Error("Expected no gerror, found:", response.Err) + } + if len(response.Groups) != 1 { + t.Error("Expected one group") + } + if response.Groups["foo"] != "consumer" { + t.Error("Expected foo group to use consumer protocol") + } + if response.GroupsData["foo"].GroupState != "Empty" { + t.Error("Expected foo group to have empty state") + } + if response.GroupsData["foo"].GroupType != "Classic" { + t.Error("Expected foo group to have type 'Classic', found: ", response.GroupsData["foo"].GroupType) } }