diff --git a/admin.go b/admin.go index fdb111dcd..08352b734 100644 --- a/admin.go +++ b/admin.go @@ -985,7 +985,8 @@ func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*Group if ca.conf.Version.IsAtLeast(V2_4_0_0) { // Starting in version 4, the response will include group.instance.id info for members. - describeReq.Version = 4 + // Starting in version 5, the response uses flexible encoding + describeReq.Version = 5 } else if ca.conf.Version.IsAtLeast(V2_3_0_0) { // Starting in version 3, authorized operations can be requested. describeReq.Version = 3 diff --git a/describe_groups_request.go b/describe_groups_request.go index 425f59c88..b2fb07e49 100644 --- a/describe_groups_request.go +++ b/describe_groups_request.go @@ -17,6 +17,7 @@ func (r *DescribeGroupsRequest) encode(pe packetEncoder) error { if r.Version >= 3 { pe.putBool(r.IncludeAuthorizedOperations) } + pe.putEmptyTaggedFieldArray() return nil } @@ -31,7 +32,8 @@ func (r *DescribeGroupsRequest) decode(pd packetDecoder, version int16) (err err return err } } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (r *DescribeGroupsRequest) key() int16 { @@ -43,15 +45,28 @@ func (r *DescribeGroupsRequest) version() int16 { } func (r *DescribeGroupsRequest) headerVersion() int16 { + if r.Version >= 5 { + return 2 + } return 1 } func (r *DescribeGroupsRequest) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 4 + return r.Version >= 0 && r.Version <= 5 +} + +func (r *DescribeGroupsRequest) isFlexible() bool { + return r.isFlexibleVersion(r.Version) +} + +func (r *DescribeGroupsRequest) isFlexibleVersion(version int16) bool { + return version >= 5 } func (r *DescribeGroupsRequest) requiredVersion() KafkaVersion { switch r.Version { + case 5: + return V2_4_0_0 case 4: return V2_4_0_0 case 3: diff --git a/describe_groups_request_test.go b/describe_groups_request_test.go index ff15637c3..8ffc0907b 100644 --- a/describe_groups_request_test.go +++ b/describe_groups_request_test.go @@ -15,7 +15,7 @@ var ( doubleDescribeGroupsRequestV0 = []byte{ 0, 0, 0, 2, // 2 groups 0, 3, 'f', 'o', 'o', // group name: foo - 0, 3, 'b', 'a', 'r', // group name: foo + 0, 3, 'b', 'a', 'r', // group name: bar } ) @@ -47,7 +47,7 @@ var ( doubleDescribeGroupsRequestV3 = []byte{ 0, 0, 0, 2, // 2 groups 0, 3, 'f', 'o', 'o', // group name: foo - 0, 3, 'b', 'a', 'r', // group name: foo + 0, 3, 'b', 'a', 'r', // group name: bar 1, } ) @@ -71,3 +71,43 @@ func TestDescribeGroupsRequestV3(t *testing.T) { request.IncludeAuthorizedOperations = true testRequest(t, "two groups", request, doubleDescribeGroupsRequestV3) } + +var ( + emptyDescribeGroupsRequestV5 = []byte{ + 1, // 1+0 no groups + 0, // do not include authorized operations + 0, // empty tagged fields + } + + singleDescribeGroupsRequestV5 = []byte{ + 2, // 1+1 group + 4, 'f', 'o', 'o', // group name: foo + 0, // do not include authorized operations + 0, // empty tagged fields + } + + doubleDescribeGroupsRequestV5 = []byte{ + 3, // 1+2 groups + 4, 'f', 'o', 'o', // group name: foo + 4, 'b', 'a', 'r', // group name: bar + 1, // do include authorized operations + 0, // empty tagged fields + } +) + +func TestDescribeGroupsRequestV5(t *testing.T) { + var request *DescribeGroupsRequest + + request = &DescribeGroupsRequest{Version: 5} + testRequest(t, "no groups", request, emptyDescribeGroupsRequestV5) + + request = &DescribeGroupsRequest{Version: 5} + request.AddGroup("foo") + testRequest(t, "one group", request, singleDescribeGroupsRequestV5) + + request = &DescribeGroupsRequest{Version: 5} + request.AddGroup("foo") + request.AddGroup("bar") + request.IncludeAuthorizedOperations = true + testRequest(t, "two groups", request, doubleDescribeGroupsRequestV5) +} diff --git a/describe_groups_response.go b/describe_groups_response.go index a599665f8..dcc274dc5 100644 --- a/describe_groups_response.go +++ b/describe_groups_response.go @@ -31,6 +31,7 @@ func (r *DescribeGroupsResponse) encode(pe packetEncoder) (err error) { } } + pe.putEmptyTaggedFieldArray() return nil } @@ -54,7 +55,8 @@ func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err er } } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (r *DescribeGroupsResponse) key() int16 { @@ -66,15 +68,28 @@ func (r *DescribeGroupsResponse) version() int16 { } func (r *DescribeGroupsResponse) headerVersion() int16 { + if r.Version >= 5 { + return 1 + } return 0 } func (r *DescribeGroupsResponse) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 4 + return r.Version >= 0 && r.Version <= 5 +} + +func (r *DescribeGroupsResponse) isFlexible() bool { + return r.isFlexibleVersion(r.Version) +} + +func (r *DescribeGroupsResponse) isFlexibleVersion(version int16) bool { + return version >= 5 } func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion { switch r.Version { + case 5: + return V2_4_0_0 case 4: return V2_4_0_0 case 3: @@ -148,6 +163,7 @@ func (gd *GroupDescription) encode(pe packetEncoder, version int16) (err error) pe.putInt32(gd.AuthorizedOperations) } + pe.putEmptyTaggedFieldArray() return nil } @@ -191,7 +207,8 @@ func (gd *GroupDescription) decode(pd packetDecoder, version int16) (err error) } } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } // GroupMemberDescription contains the group members. @@ -239,6 +256,7 @@ func (gmd *GroupMemberDescription) encode(pe packetEncoder, version int16) (err return err } + pe.putEmptyTaggedFieldArray() return nil } @@ -265,7 +283,8 @@ func (gmd *GroupMemberDescription) decode(pd packetDecoder, version int16) (err return err } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) { diff --git a/describe_groups_response_test.go b/describe_groups_response_test.go index 7370c8145..66abb6782 100644 --- a/describe_groups_response_test.go +++ b/describe_groups_response_test.go @@ -4,6 +4,7 @@ package sarama import ( "errors" + "fmt" "reflect" "testing" @@ -160,6 +161,44 @@ var ( 0, 0, 0, 0, // authorizedOperations 0 } + + describeGroupsResponseEmptyV5 = []byte{ + 0, 0, 0, 0, // throttle time 0 + 1, // no groups + 0, // empty tagged fields + } + + describeGroupsResponsePopulatedV5 = []byte{ + 0, 0, 0, 0, // throttle time 0 + 3, // 1+2 groups + + 0, 0, // no error + 4, 'f', 'o', 'o', // Group ID + 4, 'b', 'a', 'r', // State + 9, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // ConsumerProtocol type + 4, 'b', 'a', 'z', // Protocol name + 2, // 1 member + 3, 'i', 'd', // Member ID + 4, 'g', 'i', 'd', // Group Instance ID + 7, 's', 'a', 'r', 'a', 'm', 'a', // Client ID + 10, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host + 4, 0x01, 0x02, 0x03, // MemberMetadata + 4, 0x04, 0x05, 0x06, // MemberAssignment + 0, // empty tagged fields + 0, 0, 0, 0, // authorizedOperations 0 + 0, // empty tagged fields + + 0, 30, // ErrGroupAuthorizationFailed + 1, // empty Group ID + 1, // empty State + 1, // empty Type + 1, // empty Data + 1, // no members + 0, 0, 0, 0, // authorizedOperations 0 + 0, // empty tagged fields + 0, // empty tagged fields + + } ) func TestDescribeGroupsResponseV1plus(t *testing.T) { @@ -255,10 +294,54 @@ func TestDescribeGroupsResponseV1plus(t *testing.T) { }, }, }, + + { + "empty", + 5, + describeGroupsResponseEmptyV5, + &DescribeGroupsResponse{ + Version: 5, + }, + }, + { + "populated", + 5, + describeGroupsResponsePopulatedV5, + &DescribeGroupsResponse{ + Version: 5, + ThrottleTimeMs: int32(0), + Groups: []*GroupDescription{ + { + Version: 5, + Err: KError(0), + GroupId: "foo", + State: "bar", + ProtocolType: "consumer", + Protocol: "baz", + Members: map[string]*GroupMemberDescription{ + "id": { + Version: 5, + MemberId: "id", + GroupInstanceId: &groupInstanceId, + ClientId: "sarama", + ClientHost: "localhost", + MemberMetadata: []byte{1, 2, 3}, + MemberAssignment: []byte{4, 5, 6}, + }, + }, + }, + { + Version: 5, + Err: KError(30), + ErrorCode: 30, + }, + }, + }, + }, } for _, c := range tests { - t.Run(c.Name, func(t *testing.T) { + t.Run(fmt.Sprintf("%s-v%d", c.Name, c.Version), func(t *testing.T) { response := new(DescribeGroupsResponse) testVersionDecodable(t, c.Name, response, c.MessageBytes, c.Version) if !assert.Equal(t, c.Message, response) { diff --git a/request_test.go b/request_test.go index 19c3819af..31246c9a0 100644 --- a/request_test.go +++ b/request_test.go @@ -352,8 +352,7 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { // apiKeyLeaveGroup: 4, // up from 2 // TODO: SyncGroupRequest v4 is not supported, but expected for KafkaVersion 2.4.0 // apiKeySyncGroup: 4, // up from 3 - // TODO: DescribeGroupsRequest v5 is not supported, but expected for KafkaVersion 2.4.0 - // apiKeyDescribeGroups: 5, // up from 3 + apiKeyDescribeGroups: 5, // up from 3 apiKeyListGroups: 3, // up from 2 apiKeyApiVersions: 3, // up from 2 apiKeyCreateTopics: 5, // up from 3