Skip to content

Commit

Permalink
kgo: re-fix #493, supporting other buggy clients, and add a test
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Jul 10, 2023
1 parent 32ac27f commit dc5283e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func NewConsumerBalancer(balance ConsumerBalancerBalance, members []kmsg.JoinGro
// workaround. See #493.
if bytes.HasPrefix(memberMeta, []byte{0, 1}) {
memberMeta[0] = 0
memberMeta[0] = 0
memberMeta[1] = 0
if err = meta.ReadFrom(memberMeta); err != nil {
return nil, fmt.Errorf("unable to read member metadata: %v", err)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/kgo/group_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,18 @@ func Test_stickyAdjustCooperative(t *testing.T) {
t.Errorf("got plan != exp\ngot: %#v\nexp: %#v\n", inPlan, expPlan)
}
}

func TestNewConsumerBalancerIssue493(t *testing.T) {
m := kmsg.NewConsumerMemberMetadata()
m.Version = 0
m.Topics = []string{"foo"}
protoMeta := m.AppendTo(nil)
protoMeta[1] = 1
member := kmsg.NewJoinGroupResponseMember()
member.MemberID = "test"
member.ProtocolMetadata = protoMeta
_, err := NewConsumerBalancer(nil, []kmsg.JoinGroupResponseMember{member})
if err != nil {
t.Errorf("got unexpected error: %v", err)
}
}

0 comments on commit dc5283e

Please sign in to comment.