Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,9 @@ func (c *consumerGroup) syncGroupRequest(
if c.config.Version.IsAtLeast(V2_3_0_0) {
req.Version = 3
req.GroupInstanceId = c.groupInstanceId
if c.config.Version.IsAtLeast(V2_4_0_0) {
req.Version = 4
}
}

for memberID, topics := range plan {
Expand Down
3 changes: 1 addition & 2 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,7 @@ func TestAllocateBodyProtocolVersions(t *testing.T) {
// apiKeyHeartbeat: 4, // up from 3
// TODO: LeaveGroupRequest v4 is not supported, but expected for KafkaVersion 2.4.0
// apiKeyLeaveGroup: 4, // up from 2
// TODO: SyncGroupRequest v4 is not supported, but expected for KafkaVersion 2.4.0
// apiKeySyncGroup: 4, // up from 3
apiKeySyncGroup: 4, // up from 3
apiKeyDescribeGroups: 5, // up from 3
apiKeyListGroups: 3, // up from 2
apiKeyApiVersions: 3, // up from 2
Expand Down
23 changes: 20 additions & 3 deletions sync_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func (a *SyncGroupRequestAssignment) encode(pe packetEncoder, version int16) (er
return err
}

pe.putEmptyTaggedFieldArray()
return nil
}

Expand All @@ -28,7 +29,8 @@ func (a *SyncGroupRequestAssignment) decode(pd packetDecoder, version int16) (er
return err
}

return nil
_, err = pd.getEmptyTaggedFieldArray()
return err
}

type SyncGroupRequest struct {
Expand Down Expand Up @@ -76,6 +78,7 @@ func (s *SyncGroupRequest) encode(pe packetEncoder) (err error) {
}
}

pe.putEmptyTaggedFieldArray()
return nil
}

Expand Down Expand Up @@ -112,7 +115,8 @@ func (s *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) {
}
}

return nil
_, err = pd.getEmptyTaggedFieldArray()
return err
}

func (r *SyncGroupRequest) key() int16 {
Expand All @@ -124,15 +128,28 @@ func (r *SyncGroupRequest) version() int16 {
}

func (r *SyncGroupRequest) headerVersion() int16 {
if r.Version >= 4 {
return 2
}
return 1
}

func (r *SyncGroupRequest) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 3
return r.Version >= 0 && r.Version <= 4
}

func (r *SyncGroupRequest) isFlexible() bool {
return r.isFlexibleVersion(r.Version)
}

func (r *SyncGroupRequest) isFlexibleVersion(version int16) bool {
return version >= 4
}

func (r *SyncGroupRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 4:
return V2_4_0_0
case 3:
return V2_3_0_0
case 2:
Expand Down
29 changes: 29 additions & 0 deletions sync_group_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ var (
0, 3, 'b', 'a', 'z', // Member ID
0, 0, 0, 3, 'f', 'o', 'o', // Member assignment
}
populatedSyncGroupRequestV4 = []byte{
4, 'f', 'o', 'o', // Group ID
0x00, 0x01, 0x02, 0x03, // Generation ID
4, 'b', 'a', 'z', // Member ID
4, 'g', 'i', 'd', // GroupInstance ID
2, // 1 + one assignment
4, 'b', 'a', 'z', // Member ID
4, 'f', 'o', 'o', // Member assignment
0, // empty tagged fields
0, // empty tagged fields
}
)

func TestSyncGroupRequestV3AndPlus(t *testing.T) {
Expand Down Expand Up @@ -80,6 +91,24 @@ func TestSyncGroupRequestV3AndPlus(t *testing.T) {
},
},
},
{
"v4",
4,
populatedSyncGroupRequestV4,
&SyncGroupRequest{
Version: 4,
GroupId: "foo",
GenerationId: 0x00010203,
MemberId: "baz",
GroupInstanceId: &groupInstanceId,
GroupAssignments: []SyncGroupRequestAssignment{
{
MemberId: "baz",
Assignment: []byte("foo"),
},
},
},
},
}
for _, c := range tests {
request := new(SyncGroupRequest)
Expand Down
29 changes: 26 additions & 3 deletions sync_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ func (r *SyncGroupResponse) encode(pe packetEncoder) error {
pe.putInt32(r.ThrottleTime)
}
pe.putKError(r.Err)
return pe.putBytes(r.MemberAssignment)
if err := pe.putBytes(r.MemberAssignment); err != nil {
return err
}

pe.putEmptyTaggedFieldArray()
return nil
}

func (r *SyncGroupResponse) decode(pd packetDecoder, version int16) (err error) {
Expand All @@ -46,7 +51,12 @@ func (r *SyncGroupResponse) decode(pd packetDecoder, version int16) (err error)
}

r.MemberAssignment, err = pd.getBytes()
return
if err != nil {
return err
}

_, err = pd.getEmptyTaggedFieldArray()
return err
}

func (r *SyncGroupResponse) key() int16 {
Expand All @@ -58,15 +68,28 @@ func (r *SyncGroupResponse) version() int16 {
}

func (r *SyncGroupResponse) headerVersion() int16 {
if r.Version >= 4 {
return 1
}
return 0
}

func (r *SyncGroupResponse) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 3
return r.Version >= 0 && r.Version <= 4
}

func (r *SyncGroupResponse) isFlexible() bool {
return r.isFlexibleVersion(r.Version)
}

func (r *SyncGroupResponse) isFlexibleVersion(version int16) bool {
return version >= 4
}

func (r *SyncGroupResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 4:
return V2_4_0_0
case 3:
return V2_3_0_0
case 2:
Expand Down
18 changes: 18 additions & 0 deletions sync_group_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ var (
0x00, 0x00, // No error
0, 0, 0, 3, 0x01, 0x02, 0x03, // Member assignment data
}

syncGroupResponseV4NoError = []byte{
0, 0, 0, 100, // ThrottleTimeMs
0x00, 0x00, // No error
4, 0x01, 0x02, 0x03, // Member assignment data
0, // empty tagged fields
}
)

func TestSyncGroupResponse(t *testing.T) {
Expand Down Expand Up @@ -63,6 +70,17 @@ func TestSyncGroupResponse(t *testing.T) {
MemberAssignment: []byte{1, 2, 3},
},
},
{
"v4-noErr",
4,
syncGroupResponseV4NoError,
&SyncGroupResponse{
ThrottleTime: 100,
Version: 4,
Err: ErrNoError,
MemberAssignment: []byte{1, 2, 3},
},
},
}
for _, c := range tests {
response := new(SyncGroupResponse)
Expand Down
Loading