diff --git a/consumer_group.go b/consumer_group.go index 239da4619..f2180d299 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -529,6 +529,9 @@ func (c *consumerGroup) syncGroupRequest( if c.config.Version.IsAtLeast(V2_3_0_0) { req.Version = 3 req.GroupInstanceId = c.groupInstanceId + if c.config.Version.IsAtLeast(V2_4_0_0) { + req.Version = 4 + } } for memberID, topics := range plan { diff --git a/request_test.go b/request_test.go index 31246c9a0..b70b03ca9 100644 --- a/request_test.go +++ b/request_test.go @@ -350,8 +350,7 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { // apiKeyHeartbeat: 4, // up from 3 // TODO: LeaveGroupRequest v4 is not supported, but expected for KafkaVersion 2.4.0 // apiKeyLeaveGroup: 4, // up from 2 - // TODO: SyncGroupRequest v4 is not supported, but expected for KafkaVersion 2.4.0 - // apiKeySyncGroup: 4, // up from 3 + apiKeySyncGroup: 4, // up from 3 apiKeyDescribeGroups: 5, // up from 3 apiKeyListGroups: 3, // up from 2 apiKeyApiVersions: 3, // up from 2 diff --git a/sync_group_request.go b/sync_group_request.go index 65385ce61..b109cd918 100644 --- a/sync_group_request.go +++ b/sync_group_request.go @@ -16,6 +16,7 @@ func (a *SyncGroupRequestAssignment) encode(pe packetEncoder, version int16) (er return err } + pe.putEmptyTaggedFieldArray() return nil } @@ -28,7 +29,8 @@ func (a *SyncGroupRequestAssignment) decode(pd packetDecoder, version int16) (er return err } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } type SyncGroupRequest struct { @@ -76,6 +78,7 @@ func (s *SyncGroupRequest) encode(pe packetEncoder) (err error) { } } + pe.putEmptyTaggedFieldArray() return nil } @@ -112,7 +115,8 @@ func (s *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) { } } - return nil + _, err = pd.getEmptyTaggedFieldArray() + return err } func (r *SyncGroupRequest) key() int16 { @@ -124,15 +128,28 @@ func (r *SyncGroupRequest) version() int16 { } func (r *SyncGroupRequest) headerVersion() int16 { + if r.Version >= 4 { + return 2 + } return 1 } func (r *SyncGroupRequest) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 3 + return r.Version >= 0 && r.Version <= 4 +} + +func (r *SyncGroupRequest) isFlexible() bool { + return r.isFlexibleVersion(r.Version) +} + +func (r *SyncGroupRequest) isFlexibleVersion(version int16) bool { + return version >= 4 } func (r *SyncGroupRequest) requiredVersion() KafkaVersion { switch r.Version { + case 4: + return V2_4_0_0 case 3: return V2_3_0_0 case 2: diff --git a/sync_group_request_test.go b/sync_group_request_test.go index 4fa301693..2761dfbb1 100644 --- a/sync_group_request_test.go +++ b/sync_group_request_test.go @@ -52,6 +52,17 @@ var ( 0, 3, 'b', 'a', 'z', // Member ID 0, 0, 0, 3, 'f', 'o', 'o', // Member assignment } + populatedSyncGroupRequestV4 = []byte{ + 4, 'f', 'o', 'o', // Group ID + 0x00, 0x01, 0x02, 0x03, // Generation ID + 4, 'b', 'a', 'z', // Member ID + 4, 'g', 'i', 'd', // GroupInstance ID + 2, // 1 + one assignment + 4, 'b', 'a', 'z', // Member ID + 4, 'f', 'o', 'o', // Member assignment + 0, // empty tagged fields + 0, // empty tagged fields + } ) func TestSyncGroupRequestV3AndPlus(t *testing.T) { @@ -80,6 +91,24 @@ func TestSyncGroupRequestV3AndPlus(t *testing.T) { }, }, }, + { + "v4", + 4, + populatedSyncGroupRequestV4, + &SyncGroupRequest{ + Version: 4, + GroupId: "foo", + GenerationId: 0x00010203, + MemberId: "baz", + GroupInstanceId: &groupInstanceId, + GroupAssignments: []SyncGroupRequestAssignment{ + { + MemberId: "baz", + Assignment: []byte("foo"), + }, + }, + }, + }, } for _, c := range tests { request := new(SyncGroupRequest) diff --git a/sync_group_response.go b/sync_group_response.go index aa855aab5..a605ced67 100644 --- a/sync_group_response.go +++ b/sync_group_response.go @@ -30,7 +30,12 @@ func (r *SyncGroupResponse) encode(pe packetEncoder) error { pe.putInt32(r.ThrottleTime) } pe.putKError(r.Err) - return pe.putBytes(r.MemberAssignment) + if err := pe.putBytes(r.MemberAssignment); err != nil { + return err + } + + pe.putEmptyTaggedFieldArray() + return nil } func (r *SyncGroupResponse) decode(pd packetDecoder, version int16) (err error) { @@ -46,7 +51,12 @@ func (r *SyncGroupResponse) decode(pd packetDecoder, version int16) (err error) } r.MemberAssignment, err = pd.getBytes() - return + if err != nil { + return err + } + + _, err = pd.getEmptyTaggedFieldArray() + return err } func (r *SyncGroupResponse) key() int16 { @@ -58,15 +68,28 @@ func (r *SyncGroupResponse) version() int16 { } func (r *SyncGroupResponse) headerVersion() int16 { + if r.Version >= 4 { + return 1 + } return 0 } func (r *SyncGroupResponse) isValidVersion() bool { - return r.Version >= 0 && r.Version <= 3 + return r.Version >= 0 && r.Version <= 4 +} + +func (r *SyncGroupResponse) isFlexible() bool { + return r.isFlexibleVersion(r.Version) +} + +func (r *SyncGroupResponse) isFlexibleVersion(version int16) bool { + return version >= 4 } func (r *SyncGroupResponse) requiredVersion() KafkaVersion { switch r.Version { + case 4: + return V2_4_0_0 case 3: return V2_3_0_0 case 2: diff --git a/sync_group_response_test.go b/sync_group_response_test.go index be891fc0d..0a33241ce 100644 --- a/sync_group_response_test.go +++ b/sync_group_response_test.go @@ -23,6 +23,13 @@ var ( 0x00, 0x00, // No error 0, 0, 0, 3, 0x01, 0x02, 0x03, // Member assignment data } + + syncGroupResponseV4NoError = []byte{ + 0, 0, 0, 100, // ThrottleTimeMs + 0x00, 0x00, // No error + 4, 0x01, 0x02, 0x03, // Member assignment data + 0, // empty tagged fields + } ) func TestSyncGroupResponse(t *testing.T) { @@ -63,6 +70,17 @@ func TestSyncGroupResponse(t *testing.T) { MemberAssignment: []byte{1, 2, 3}, }, }, + { + "v4-noErr", + 4, + syncGroupResponseV4NoError, + &SyncGroupResponse{ + ThrottleTime: 100, + Version: 4, + Err: ErrNoError, + MemberAssignment: []byte{1, 2, 3}, + }, + }, } for _, c := range tests { response := new(SyncGroupResponse)