Skip to content

Commit 034046e

Browse files
committed
fix: include assignment-less members in SyncGroup
- comment and rework sync_group_{request, response} - always send V3 SyncGroup request if config Version is 2.3.0.0 or newer - when sending SyncGroup as the leader, include empty assignments for any members that didn't receive a partition in the plan - update go.mod/go.sum for examples/consumergroup Fixes #2290
1 parent 8a001b4 commit 034046e

File tree

7 files changed

+578
-92
lines changed

7 files changed

+578
-92
lines changed

consumer_group.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,9 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
321321

322322
// Prepare distribution plan if we joined as the leader
323323
var plan BalanceStrategyPlan
324+
var members map[string]ConsumerGroupMemberMetadata
324325
if join.LeaderId == join.MemberId {
325-
members, err := join.GetMembers()
326+
members, err = join.GetMembers()
326327
if err != nil {
327328
return nil, err
328329
}
@@ -334,7 +335,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
334335
}
335336

336337
// Sync consumer group
337-
groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
338+
groupRequest, err := c.syncGroupRequest(coordinator, members, plan, join.GenerationId)
338339
if consumerGroupSyncTotal != nil {
339340
consumerGroupSyncTotal.Inc(1)
340341
}
@@ -426,15 +427,22 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
426427
return coordinator.JoinGroup(req)
427428
}
428429

429-
func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
430+
func (c *consumerGroup) syncGroupRequest(
431+
coordinator *Broker,
432+
members map[string]ConsumerGroupMemberMetadata,
433+
plan BalanceStrategyPlan,
434+
generationID int32,
435+
) (*SyncGroupResponse, error) {
430436
req := &SyncGroupRequest{
431437
GroupId: c.groupID,
432438
MemberId: c.memberID,
433439
GenerationId: generationID,
434440
}
435441
strategy := c.config.Consumer.Group.Rebalance.Strategy
436-
if c.groupInstanceId != nil {
442+
if c.config.Version.IsAtLeast(V2_3_0_0) {
437443
req.Version = 3
444+
}
445+
if c.groupInstanceId != nil {
438446
req.GroupInstanceId = c.groupInstanceId
439447
}
440448
for memberID, topics := range plan {
@@ -447,7 +455,15 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate
447455
if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
448456
return nil, err
449457
}
458+
delete(members, memberID)
450459
}
460+
// add empty assignments for any remaining members
461+
for memberID := range members {
462+
if err := req.AddGroupAssignmentMember(memberID, &ConsumerGroupMemberAssignment{}); err != nil {
463+
return nil, err
464+
}
465+
}
466+
451467
return coordinator.SyncGroup(req)
452468
}
453469

examples/consumergroup/go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@ module github.com/Shopify/sarama/examples/consumer
22

33
go 1.16
44

5-
replace github.com/Shopify/sarama => ../../
5+
require github.com/Shopify/sarama v1.34.1
66

7-
require github.com/Shopify/sarama v1.27.0
7+
replace github.com/Shopify/sarama => ../../

examples/consumergroup/go.sum

+458-28
Large diffs are not rendered by default.

request_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func testVersionDecodable(t *testing.T, name string, out versionedDecoder, in []
3535
}
3636

3737
func testRequest(t *testing.T, name string, rb protocolBody, expected []byte) {
38+
t.Helper()
3839
if !rb.requiredVersion().IsAtLeast(MinVersion) {
3940
t.Errorf("Request %s has invalid required version", name)
4041
}
@@ -74,6 +75,7 @@ func testRequestEncode(t *testing.T, name string, rb protocolBody, expected []by
7475
}
7576

7677
func testRequestDecode(t *testing.T, name string, rb protocolBody, packet []byte) {
78+
t.Helper()
7779
decoded, n, err := decodeRequest(bytes.NewReader(packet))
7880
if err != nil {
7981
t.Error("Failed to decode request", err)

sync_group_request.go

+82-53
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,111 @@
11
package sarama
22

3+
type SyncGroupRequestAssignment struct {
4+
// MemberId contains the ID of the member to assign.
5+
MemberId string
6+
// Assignment contains the member assignment.
7+
Assignment []byte
8+
}
9+
10+
func (a *SyncGroupRequestAssignment) encode(pe packetEncoder, version int16) (err error) {
11+
if err := pe.putString(a.MemberId); err != nil {
12+
return err
13+
}
14+
15+
if err := pe.putBytes(a.Assignment); err != nil {
16+
return err
17+
}
18+
19+
return nil
20+
}
21+
22+
func (a *SyncGroupRequestAssignment) decode(pd packetDecoder, version int16) (err error) {
23+
if a.MemberId, err = pd.getString(); err != nil {
24+
return err
25+
}
26+
27+
if a.Assignment, err = pd.getBytes(); err != nil {
28+
return err
29+
}
30+
31+
return nil
32+
}
33+
334
type SyncGroupRequest struct {
4-
Version int16
5-
GroupId string
6-
GenerationId int32
7-
MemberId string
8-
GroupInstanceId *string
9-
GroupAssignments map[string][]byte
35+
// Version defines the protocol version to use for encode and decode
36+
Version int16
37+
// GroupId contains the unique group identifier.
38+
GroupId string
39+
// GenerationId contains the generation of the group.
40+
GenerationId int32
41+
// MemberId contains the member ID assigned by the group.
42+
MemberId string
43+
// GroupInstanceId contains the unique identifier of the consumer instance provided by end user.
44+
GroupInstanceId *string
45+
// GroupAssignments contains each assignment.
46+
GroupAssignments []SyncGroupRequestAssignment
1047
}
1148

12-
func (r *SyncGroupRequest) encode(pe packetEncoder) error {
13-
if err := pe.putString(r.GroupId); err != nil {
49+
func (s *SyncGroupRequest) encode(pe packetEncoder) (err error) {
50+
if err := pe.putString(s.GroupId); err != nil {
1451
return err
1552
}
1653

17-
pe.putInt32(r.GenerationId)
54+
pe.putInt32(s.GenerationId)
1855

19-
if err := pe.putString(r.MemberId); err != nil {
56+
if err := pe.putString(s.MemberId); err != nil {
2057
return err
2158
}
2259

23-
if r.Version >= 3 {
24-
if err := pe.putNullableString(r.GroupInstanceId); err != nil {
60+
if s.Version >= 3 {
61+
if err := pe.putNullableString(s.GroupInstanceId); err != nil {
2562
return err
2663
}
2764
}
2865

29-
if err := pe.putArrayLength(len(r.GroupAssignments)); err != nil {
66+
if err := pe.putArrayLength(len(s.GroupAssignments)); err != nil {
3067
return err
3168
}
32-
for memberId, memberAssignment := range r.GroupAssignments {
33-
if err := pe.putString(memberId); err != nil {
34-
return err
35-
}
36-
if err := pe.putBytes(memberAssignment); err != nil {
69+
for _, block := range s.GroupAssignments {
70+
if err := block.encode(pe, s.Version); err != nil {
3771
return err
3872
}
3973
}
4074

4175
return nil
4276
}
4377

44-
func (r *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) {
45-
r.Version = version
46-
47-
if r.GroupId, err = pd.getString(); err != nil {
48-
return
49-
}
50-
if r.GenerationId, err = pd.getInt32(); err != nil {
51-
return
52-
}
53-
if r.MemberId, err = pd.getString(); err != nil {
54-
return
55-
}
56-
if r.Version >= 3 {
57-
if r.GroupInstanceId, err = pd.getNullableString(); err != nil {
58-
return
59-
}
78+
func (s *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) {
79+
s.Version = version
80+
if s.GroupId, err = pd.getString(); err != nil {
81+
return err
6082
}
6183

62-
n, err := pd.getArrayLength()
63-
if err != nil {
84+
if s.GenerationId, err = pd.getInt32(); err != nil {
6485
return err
6586
}
66-
if n == 0 {
67-
return nil
87+
88+
if s.MemberId, err = pd.getString(); err != nil {
89+
return err
6890
}
6991

70-
r.GroupAssignments = make(map[string][]byte)
71-
for i := 0; i < n; i++ {
72-
memberId, err := pd.getString()
73-
if err != nil {
74-
return err
75-
}
76-
memberAssignment, err := pd.getBytes()
77-
if err != nil {
92+
if s.Version >= 3 {
93+
if s.GroupInstanceId, err = pd.getNullableString(); err != nil {
7894
return err
7995
}
96+
}
8097

81-
r.GroupAssignments[memberId] = memberAssignment
98+
if numAssignments, err := pd.getArrayLength(); err != nil {
99+
return err
100+
} else if numAssignments > 0 {
101+
s.GroupAssignments = make([]SyncGroupRequestAssignment, numAssignments)
102+
for i := 0; i < numAssignments; i++ {
103+
var block SyncGroupRequestAssignment
104+
if err := block.decode(pd, s.Version); err != nil {
105+
return err
106+
}
107+
s.GroupAssignments[i] = block
108+
}
82109
}
83110

84111
return nil
@@ -105,14 +132,16 @@ func (r *SyncGroupRequest) requiredVersion() KafkaVersion {
105132
}
106133

107134
func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) {
108-
if r.GroupAssignments == nil {
109-
r.GroupAssignments = make(map[string][]byte)
110-
}
111-
112-
r.GroupAssignments[memberId] = memberAssignment
135+
r.GroupAssignments = append(r.GroupAssignments, SyncGroupRequestAssignment{
136+
MemberId: memberId,
137+
Assignment: memberAssignment,
138+
})
113139
}
114140

115-
func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error {
141+
func (r *SyncGroupRequest) AddGroupAssignmentMember(
142+
memberId string,
143+
memberAssignment *ConsumerGroupMemberAssignment,
144+
) error {
116145
bin, err := encode(memberAssignment, nil)
117146
if err != nil {
118147
return err

sync_group_request_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,11 @@ func TestSyncGroupRequestV3AndPlus(t *testing.T) {
7070
GenerationId: 0x00010203,
7171
MemberId: "baz",
7272
GroupInstanceId: &groupInstanceId,
73-
GroupAssignments: map[string][]byte{
74-
"baz": []byte("foo"),
73+
GroupAssignments: []SyncGroupRequestAssignment{
74+
{
75+
MemberId: "baz",
76+
Assignment: []byte("foo"),
77+
},
7578
},
7679
},
7780
},

sync_group_response.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
package sarama
22

33
type SyncGroupResponse struct {
4-
Version int16
5-
ThrottleTime int32
6-
Err KError
4+
// Version defines the protocol version to use for encode and decode
5+
Version int16
6+
// ThrottleTimeMs contains the duration in milliseconds for which the
7+
// request was throttled due to a quota violation, or zero if the request
8+
// did not violate any quota.
9+
ThrottleTime int32
10+
// Err contains the error code, or 0 if there was no error.
11+
Err KError
12+
// MemberAssignment contains the member assignment.
713
MemberAssignment []byte
814
}
915

0 commit comments

Comments
 (0)