From dc5283e20fb2f79f017580f52172646dcb787392 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 10 Jul 2023 13:53:52 -0600 Subject: [PATCH] kgo: re-fix #493, supporting other buggy clients, and add a test --- pkg/kgo/group_balancer.go | 2 +- pkg/kgo/group_balancer_test.go | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/kgo/group_balancer.go b/pkg/kgo/group_balancer.go index 87a3a248..e83758cb 100644 --- a/pkg/kgo/group_balancer.go +++ b/pkg/kgo/group_balancer.go @@ -216,7 +216,7 @@ func NewConsumerBalancer(balance ConsumerBalancerBalance, members []kmsg.JoinGro // workaround. See #493. if bytes.HasPrefix(memberMeta, []byte{0, 1}) { memberMeta[0] = 0 - memberMeta[0] = 0 + memberMeta[1] = 0 if err = meta.ReadFrom(memberMeta); err != nil { return nil, fmt.Errorf("unable to read member metadata: %v", err) } diff --git a/pkg/kgo/group_balancer_test.go b/pkg/kgo/group_balancer_test.go index 3209551f..a49f82d4 100644 --- a/pkg/kgo/group_balancer_test.go +++ b/pkg/kgo/group_balancer_test.go @@ -98,3 +98,18 @@ func Test_stickyAdjustCooperative(t *testing.T) { t.Errorf("got plan != exp\ngot: %#v\nexp: %#v\n", inPlan, expPlan) } } + +func TestNewConsumerBalancerIssue493(t *testing.T) { + m := kmsg.NewConsumerMemberMetadata() + m.Version = 0 + m.Topics = []string{"foo"} + protoMeta := m.AppendTo(nil) + protoMeta[1] = 1 + member := kmsg.NewJoinGroupResponseMember() + member.MemberID = "test" + member.ProtocolMetadata = protoMeta + _, err := NewConsumerBalancer(nil, []kmsg.JoinGroupResponseMember{member}) + if err != nil { + t.Errorf("got unexpected error: %v", err) + } +}