diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 3af816e2..9b3efb2a 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1211,30 +1211,35 @@ func (g *groupConsumer) handleSyncResp(resp *kmsg.SyncGroupResponse) error { return err } - kassignment := new(kmsg.GroupMemberAssignment) - if err := kassignment.ReadFrom(resp.MemberAssignment); err != nil { + var protocol string + if resp.Protocol != nil { + protocol = *resp.Protocol + } + b, err := g.findBalancer(protocol) + if err != nil { + g.cl.cfg.logger.Log(LogLevelError, "sync assignment could not find chosen balancer", "err", err) + return err + } + + assigned, err := b.ParseSyncAssignment(resp.MemberAssignment) + if err != nil { g.cl.cfg.logger.Log(LogLevelError, "sync assignment parse failed", "err", err) return err } var sb strings.Builder - for i, topic := range kassignment.Topics { - fmt.Fprintf(&sb, "%s%v", topic.Topic, topic.Partitions) - if i < len(kassignment.Topics)-1 { - sb.WriteString(", ") - } + for topic, partitions := range assigned { + fmt.Fprintf(&sb, "%s%v", topic, partitions) + sb.WriteString(", ") } - g.cl.cfg.logger.Log(LogLevelInfo, "synced", "assigned", sb.String()) + g.cl.cfg.logger.Log(LogLevelInfo, "synced", "assigned", strings.TrimSuffix(sb.String(), ", ")) // Past this point, we will fall into the setupAssigned prerevoke code, // meaning for cooperative, we will revoke what we need to. if g.cooperative { g.lastAssigned = g.nowAssigned } - g.nowAssigned = make(map[string][]int32) - for _, topic := range kassignment.Topics { - g.nowAssigned[topic.Topic] = topic.Partitions - } + g.nowAssigned = assigned g.cl.cfg.logger.Log(LogLevelInfo, "synced successfully", "assigned", g.nowAssigned) return nil } diff --git a/pkg/kgo/group_balancer.go b/pkg/kgo/group_balancer.go index 5a330fcb..ab7956d3 100644 --- a/pkg/kgo/group_balancer.go +++ b/pkg/kgo/group_balancer.go @@ -2,7 +2,6 @@ package kgo import ( "bytes" - "errors" "fmt" "sort" "strings" @@ -26,6 +25,10 @@ type GroupBalancer interface { generation int32, ) []byte + // ParseSyncAssignment returns assigned topics and partitions from an + // encoded SyncGroupResponse's MemberAssignment. + ParseSyncAssignment(assignment []byte) (map[string][]int32, error) + // MemberBalancer returns a GroupMemberBalancer for the given group // members, as well as the topics that all the members are interested // in. If the client does not have some topics in the returned topics, @@ -126,6 +129,22 @@ type ConsumerBalancerBalance interface { Balance(*ConsumerBalancer, map[string]int32) IntoSyncAssignment } +// ParseConsumerSyncAssignment returns an assignment as specified a +// kmsg.GroupMemberAssignment, that is, the type encoded in metadata for the +// consumer protocol. +func ParseConsumerSyncAssignment(assignment []byte) (map[string][]int32, error) { + var kassignment kmsg.GroupMemberAssignment + if err := kassignment.ReadFrom(assignment); err != nil { + return nil, fmt.Errorf("sync assignment parse failed: %v", err) + } + + m := make(map[string][]int32, len(kassignment.Topics)) + for _, topic := range kassignment.Topics { + m[topic.Topic] = topic.Partitions + } + return m, nil +} + // NewConsumerBalancer parses the each member's metadata as a // kmsg.GroupMemberMetadata and returns a ConsumerBalancer to use in balancing. // @@ -236,18 +255,22 @@ func sortJoinMemberPtrs(members []*kmsg.JoinGroupResponseMember) { sort.Slice(members, func(i, j int) bool { return joinMemberLess(members[i], members[j]) }) } -// balanceGroup returns a balancePlan from a join group response. -func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupResponseMember) ([]kmsg.SyncGroupRequestGroupAssignment, error) { - g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader") - - var b GroupBalancer +func (g groupConsumer) findBalancer(proto string) (GroupBalancer, error) { for _, balancer := range g.balancers { if balancer.ProtocolName() == proto { - b = balancer + return balancer, nil } } - if b == nil { - return nil, errors.New("unable to balance: none of our balances have a name equal to the balancer chosen for balancing") + return nil, fmt.Errorf("unable to balance: none of our balancers have a name equal to the balancer chosen for balancing (%s)", proto) +} + +// balanceGroup returns a balancePlan from a join group response. +func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupResponseMember) ([]kmsg.SyncGroupRequestGroupAssignment, error) { + g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader") + + b, err := g.findBalancer(proto) + if err != nil { + return nil, err } sortJoinMembers(members) @@ -368,6 +391,9 @@ func (*roundRobinBalancer) IsCooperative() bool { return false } func (*roundRobinBalancer) JoinGroupMetadata(interests []string, _ map[string][]int32, _ int32) []byte { return memberMetadataV0(interests) } +func (*roundRobinBalancer) ParseSyncAssignment(assignment []byte) (map[string][]int32, error) { + return ParseConsumerSyncAssignment(assignment) +} func (r *roundRobinBalancer) MemberBalancer(members []kmsg.JoinGroupResponseMember) (GroupMemberBalancer, map[string]struct{}, error) { b, err := NewConsumerBalancer(r, members) return b, b.MemberTopics(), err @@ -447,6 +473,9 @@ func (*rangeBalancer) IsCooperative() bool { return false } func (*rangeBalancer) JoinGroupMetadata(interests []string, _ map[string][]int32, _ int32) []byte { return memberMetadataV0(interests) } +func (*rangeBalancer) ParseSyncAssignment(assignment []byte) (map[string][]int32, error) { + return ParseConsumerSyncAssignment(assignment) +} func (r *rangeBalancer) MemberBalancer(members []kmsg.JoinGroupResponseMember) (GroupMemberBalancer, map[string]struct{}, error) { b, err := NewConsumerBalancer(r, members) return b, b.MemberTopics(), err @@ -592,7 +621,9 @@ func (s *stickyBalancer) JoinGroupMetadata(interests []string, currentAssignment } meta.UserData = stickyMeta.AppendTo(nil) return meta.AppendTo(nil) - +} +func (*stickyBalancer) ParseSyncAssignment(assignment []byte) (map[string][]int32, error) { + return ParseConsumerSyncAssignment(assignment) } func (s *stickyBalancer) MemberBalancer(members []kmsg.JoinGroupResponseMember) (GroupMemberBalancer, map[string]struct{}, error) { b, err := NewConsumerBalancer(s, members)