From 00e4e76b6a18163d55fccf8353eeecc8d5956662 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 7 Jul 2023 16:55:04 -0600 Subject: [PATCH] kgo: tolerate buggy v1 group member metadata Closes #493. --- pkg/kgo/group_balancer.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/kgo/group_balancer.go b/pkg/kgo/group_balancer.go index a3f21017..87a3a248 100644 --- a/pkg/kgo/group_balancer.go +++ b/pkg/kgo/group_balancer.go @@ -204,8 +204,23 @@ func NewConsumerBalancer(balance ConsumerBalancerBalance, members []kmsg.JoinGro for i, member := range members { meta := &b.metadatas[i] meta.Default() - if err := meta.ReadFrom(member.ProtocolMetadata); err != nil { - return nil, fmt.Errorf("unable to read member metadata: %v", err) + memberMeta := member.ProtocolMetadata + if err := meta.ReadFrom(memberMeta); err != nil { + // Some buggy clients claimed support for v1 but then + // did not add OwnedPartitions, resulting in a short + // metadata. If we fail at reading and the version is + // v1, we retry again as v0. We do not support other + // versions because hopefully other clients stop + // claiming higher and higher version support and not + // actually supporting them. Sarama has a similarish + // workaround. See #493. + if bytes.HasPrefix(memberMeta, []byte{0, 1}) { + memberMeta[0] = 0 + memberMeta[0] = 0 + if err = meta.ReadFrom(memberMeta); err != nil { + return nil, fmt.Errorf("unable to read member metadata: %v", err) + } + } } for _, topic := range meta.Topics { b.topics[topic] = struct{}{}