Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,8 @@ func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*Group

if ca.conf.Version.IsAtLeast(V2_4_0_0) {
// Starting in version 4, the response will include group.instance.id info for members.
describeReq.Version = 4
// Starting in version 5, the response uses flexible encoding
describeReq.Version = 5
} else if ca.conf.Version.IsAtLeast(V2_3_0_0) {
// Starting in version 3, authorized operations can be requested.
describeReq.Version = 3
Expand Down
19 changes: 17 additions & 2 deletions describe_groups_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func (r *DescribeGroupsRequest) encode(pe packetEncoder) error {
if r.Version >= 3 {
pe.putBool(r.IncludeAuthorizedOperations)
}
pe.putEmptyTaggedFieldArray()
return nil
}

Expand All @@ -31,7 +32,8 @@ func (r *DescribeGroupsRequest) decode(pd packetDecoder, version int16) (err err
return err
}
}
return nil
_, err = pd.getEmptyTaggedFieldArray()
return err
}

func (r *DescribeGroupsRequest) key() int16 {
Expand All @@ -43,15 +45,28 @@ func (r *DescribeGroupsRequest) version() int16 {
}

func (r *DescribeGroupsRequest) headerVersion() int16 {
if r.Version >= 5 {
return 2
}
return 1
}

func (r *DescribeGroupsRequest) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 4
return r.Version >= 0 && r.Version <= 5
}

func (r *DescribeGroupsRequest) isFlexible() bool {
return r.isFlexibleVersion(r.Version)
}

func (r *DescribeGroupsRequest) isFlexibleVersion(version int16) bool {
return version >= 5
}

func (r *DescribeGroupsRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 5:
return V2_4_0_0
case 4:
return V2_4_0_0
case 3:
Expand Down
44 changes: 42 additions & 2 deletions describe_groups_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (
doubleDescribeGroupsRequestV0 = []byte{
0, 0, 0, 2, // 2 groups
0, 3, 'f', 'o', 'o', // group name: foo
0, 3, 'b', 'a', 'r', // group name: foo
0, 3, 'b', 'a', 'r', // group name: bar
}
)

Expand Down Expand Up @@ -47,7 +47,7 @@ var (
doubleDescribeGroupsRequestV3 = []byte{
0, 0, 0, 2, // 2 groups
0, 3, 'f', 'o', 'o', // group name: foo
0, 3, 'b', 'a', 'r', // group name: foo
0, 3, 'b', 'a', 'r', // group name: bar
1,
}
)
Expand All @@ -71,3 +71,43 @@ func TestDescribeGroupsRequestV3(t *testing.T) {
request.IncludeAuthorizedOperations = true
testRequest(t, "two groups", request, doubleDescribeGroupsRequestV3)
}

var (
emptyDescribeGroupsRequestV5 = []byte{
1, // 1+0 no groups
0, // do not include authorized operations
0, // empty tagged fields
}

singleDescribeGroupsRequestV5 = []byte{
2, // 1+1 group
4, 'f', 'o', 'o', // group name: foo
0, // do not include authorized operations
0, // empty tagged fields
}

doubleDescribeGroupsRequestV5 = []byte{
3, // 1+2 groups
4, 'f', 'o', 'o', // group name: foo
4, 'b', 'a', 'r', // group name: bar
1, // do include authorized operations
0, // empty tagged fields
}
)

func TestDescribeGroupsRequestV5(t *testing.T) {
var request *DescribeGroupsRequest

request = &DescribeGroupsRequest{Version: 5}
testRequest(t, "no groups", request, emptyDescribeGroupsRequestV5)

request = &DescribeGroupsRequest{Version: 5}
request.AddGroup("foo")
testRequest(t, "one group", request, singleDescribeGroupsRequestV5)

request = &DescribeGroupsRequest{Version: 5}
request.AddGroup("foo")
request.AddGroup("bar")
request.IncludeAuthorizedOperations = true
testRequest(t, "two groups", request, doubleDescribeGroupsRequestV5)
}
27 changes: 23 additions & 4 deletions describe_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (r *DescribeGroupsResponse) encode(pe packetEncoder) (err error) {
}
}

pe.putEmptyTaggedFieldArray()
return nil
}

Expand All @@ -54,7 +55,8 @@ func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err er
}
}

return nil
_, err = pd.getEmptyTaggedFieldArray()
return err
}

func (r *DescribeGroupsResponse) key() int16 {
Expand All @@ -66,15 +68,28 @@ func (r *DescribeGroupsResponse) version() int16 {
}

func (r *DescribeGroupsResponse) headerVersion() int16 {
if r.Version >= 5 {
return 1
}
return 0
}

func (r *DescribeGroupsResponse) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 4
return r.Version >= 0 && r.Version <= 5
}

func (r *DescribeGroupsResponse) isFlexible() bool {
return r.isFlexibleVersion(r.Version)
}

func (r *DescribeGroupsResponse) isFlexibleVersion(version int16) bool {
return version >= 5
}

func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 5:
return V2_4_0_0
case 4:
return V2_4_0_0
case 3:
Expand Down Expand Up @@ -148,6 +163,7 @@ func (gd *GroupDescription) encode(pe packetEncoder, version int16) (err error)
pe.putInt32(gd.AuthorizedOperations)
}

pe.putEmptyTaggedFieldArray()
return nil
}

Expand Down Expand Up @@ -191,7 +207,8 @@ func (gd *GroupDescription) decode(pd packetDecoder, version int16) (err error)
}
}

return nil
_, err = pd.getEmptyTaggedFieldArray()
return err
}

// GroupMemberDescription contains the group members.
Expand Down Expand Up @@ -239,6 +256,7 @@ func (gmd *GroupMemberDescription) encode(pe packetEncoder, version int16) (err
return err
}

pe.putEmptyTaggedFieldArray()
return nil
}

Expand All @@ -265,7 +283,8 @@ func (gmd *GroupMemberDescription) decode(pd packetDecoder, version int16) (err
return err
}

return nil
_, err = pd.getEmptyTaggedFieldArray()
return err
}

func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
Expand Down
85 changes: 84 additions & 1 deletion describe_groups_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package sarama

import (
"errors"
"fmt"
"reflect"
"testing"

Expand Down Expand Up @@ -160,6 +161,44 @@ var (
0, 0, 0, 0, // authorizedOperations 0

}

describeGroupsResponseEmptyV5 = []byte{
0, 0, 0, 0, // throttle time 0
1, // no groups
0, // empty tagged fields
}

describeGroupsResponsePopulatedV5 = []byte{
0, 0, 0, 0, // throttle time 0
3, // 1+2 groups

0, 0, // no error
4, 'f', 'o', 'o', // Group ID
4, 'b', 'a', 'r', // State
9, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // ConsumerProtocol type
4, 'b', 'a', 'z', // Protocol name
2, // 1 member
3, 'i', 'd', // Member ID
4, 'g', 'i', 'd', // Group Instance ID
7, 's', 'a', 'r', 'a', 'm', 'a', // Client ID
10, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host
4, 0x01, 0x02, 0x03, // MemberMetadata
4, 0x04, 0x05, 0x06, // MemberAssignment
0, // empty tagged fields
0, 0, 0, 0, // authorizedOperations 0
0, // empty tagged fields

0, 30, // ErrGroupAuthorizationFailed
1, // empty Group ID
1, // empty State
1, // empty Type
1, // empty Data
1, // no members
0, 0, 0, 0, // authorizedOperations 0
0, // empty tagged fields
0, // empty tagged fields

}
)

func TestDescribeGroupsResponseV1plus(t *testing.T) {
Expand Down Expand Up @@ -255,10 +294,54 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) {
},
},
},

{
"empty",
5,
describeGroupsResponseEmptyV5,
&DescribeGroupsResponse{
Version: 5,
},
},
{
"populated",
5,
describeGroupsResponsePopulatedV5,
&DescribeGroupsResponse{
Version: 5,
ThrottleTimeMs: int32(0),
Groups: []*GroupDescription{
{
Version: 5,
Err: KError(0),
GroupId: "foo",
State: "bar",
ProtocolType: "consumer",
Protocol: "baz",
Members: map[string]*GroupMemberDescription{
"id": {
Version: 5,
MemberId: "id",
GroupInstanceId: &groupInstanceId,
ClientId: "sarama",
ClientHost: "localhost",
MemberMetadata: []byte{1, 2, 3},
MemberAssignment: []byte{4, 5, 6},
},
},
},
{
Version: 5,
Err: KError(30),
ErrorCode: 30,
},
},
},
},
}

for _, c := range tests {
t.Run(c.Name, func(t *testing.T) {
t.Run(fmt.Sprintf("%s-v%d", c.Name, c.Version), func(t *testing.T) {
response := new(DescribeGroupsResponse)
testVersionDecodable(t, c.Name, response, c.MessageBytes, c.Version)
if !assert.Equal(t, c.Message, response) {
Expand Down
3 changes: 1 addition & 2 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,7 @@ func TestAllocateBodyProtocolVersions(t *testing.T) {
// apiKeyLeaveGroup: 4, // up from 2
// TODO: SyncGroupRequest v4 is not supported, but expected for KafkaVersion 2.4.0
// apiKeySyncGroup: 4, // up from 3
// TODO: DescribeGroupsRequest v5 is not supported, but expected for KafkaVersion 2.4.0
// apiKeyDescribeGroups: 5, // up from 3
apiKeyDescribeGroups: 5, // up from 3
apiKeyListGroups: 3, // up from 2
apiKeyApiVersions: 3, // up from 2
apiKeyCreateTopics: 5, // up from 3
Expand Down
Loading