diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 74b2f8e6..00aeb2da 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -5,6 +5,7 @@ import ( "fmt" "regexp" "sort" + "strings" "sync" "time" @@ -260,8 +261,6 @@ type groupConsumer struct { lastAssigned map[string][]int32 // only updated in join&sync loop nowAssigned map[string][]int32 // only updated in join&sync loop - groupExtraTopics map[string]struct{} // TODO TODO TODO - // leader is whether we are the leader right now. This is set to false // // - set to false at the beginning of a join group session @@ -1135,7 +1134,6 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo ) plan, err = g.balanceGroup(protocol, resp.Members) - g.cl.cfg.logger.Log(LogLevelDebug, "balanced", "plan", plan) if err != nil { return } @@ -1162,7 +1160,14 @@ func (g *groupConsumer) handleSyncResp(resp *kmsg.SyncGroupResponse, plan balanc return err } - g.cl.cfg.logger.Log(LogLevelDebug, "synced", "assigned", kassignment.Topics) + var sb strings.Builder + for i, topic := range kassignment.Topics { + fmt.Fprintf(&sb, "%s%v", topic.Topic, topic.Partitions) + if i < len(kassignment.Topics)-1 { + sb.WriteString(", ") + } + } + g.cl.cfg.logger.Log(LogLevelInfo, "synced", "assigned", sb.String()) // Past this point, we will fall into the setupAssigned prerevoke code, // meaning for cooperative, we will revoke what we need to. diff --git a/pkg/kgo/group_balancer.go b/pkg/kgo/group_balancer.go index e6965090..5384ad47 100644 --- a/pkg/kgo/group_balancer.go +++ b/pkg/kgo/group_balancer.go @@ -3,7 +3,9 @@ package kgo import ( "fmt" "sort" + "strings" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo/internal/sticky" "github.com/twmb/franz-go/pkg/kmsg" ) @@ -42,12 +44,33 @@ type groupMember struct { owned []kmsg.GroupMemberMetadataOwnedPartition } +func (m *groupMember) balanceInterests() string { + var sb strings.Builder + sb.WriteString("interested topics: ") + fmt.Fprintf(&sb, "%v", m.topics) + sb.WriteString(", previously owned: ") + for i, owned := range m.owned { + fmt.Fprintf(&sb, "%s%v", owned.Topic, owned.Partitions) + if i < len(m.owned) { + sb.WriteString(", ") + } + } + return sb.String() +} + type groupMemberID struct { memberID string instanceID string hasInstance bool } +func (id *groupMemberID) String() string { + if id.hasInstance { + return id.memberID + "(" + id.instanceID + ")" + } + return id.memberID +} + func (me groupMemberID) less(other groupMemberID) bool { if me.hasInstance && other.hasInstance { return me.instanceID < other.instanceID @@ -65,6 +88,33 @@ func (me groupMemberID) less(other groupMemberID) bool { // member id => topic => partitions type balancePlan map[groupMemberID]map[string][]int32 +func (p balancePlan) String() string { + var sb strings.Builder + + var membersWritten int + for member, topics := range p { + membersWritten++ + sb.WriteString(member.String()) + sb.WriteString("{") + + var topicsWritten int + for topic, partitions := range topics { + fmt.Fprintf(&sb, "%s%v", topic, partitions) + topicsWritten++ + if topicsWritten < len(topics) { + sb.WriteString(", ") + } + } + + sb.WriteString("}") + if membersWritten < len(p) { + sb.WriteString(", ") + } + } + + return sb.String() +} + func newBalancePlan(members []groupMember) balancePlan { plan := make(map[groupMemberID]map[string][]int32, len(members)) for i := range members { @@ -113,15 +163,52 @@ func (g *groupConsumer) balanceGroup(proto string, kmembers []kmsg.JoinGroupResp sort.Slice(members, func(i, j int) bool { return members[i].id.less(members[j].id) // guarantee sorted members }) + + myTopics := g.cl.loadTopics() + allTopics := make(map[string]struct{}) + var needMeta bool + + g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader") for i := range members { - sort.Strings(members[i].topics) // guarantee sorted topics m := &members[i] - g.cl.cfg.logger.Log(LogLevelDebug, "member interests", "id", m.id, "topics", m.topics, "previously_owned", m.owned) + sort.Strings(m.topics) // guarantee sorted topics + g.cl.cfg.logger.Log(LogLevelInfo, "balance group member", "id", m.id.String(), "interests", m.balanceInterests()) + + for _, topic := range m.topics { + allTopics[topic] = struct{}{} + if _, exists := myTopics[topic]; !exists { + needMeta = true + } + } + } + + shortTopics := g.cl.loadShortTopics() + if needMeta { + g.cl.cfg.logger.Log(LogLevelInfo, "group members indicated interest in topics the leader is not assigned, fetching metadata for all group topics") + var metaTopics []string + for topic := range allTopics { + metaTopics = append(metaTopics, topic) + } + + _, resp, err := g.cl.fetchMetadataForTopics(g.ctx, false, metaTopics) + if err != nil { + return nil, fmt.Errorf("unable to fetch metadata for group topics: %v", err) + } + for i := range resp.Topics { + t := &resp.Topics[i] + if t.ErrorCode != 0 { + g.cl.cfg.logger.Log(LogLevelWarn, "metadata resp in balance for topic has error, skipping...", "topic", t.Topic, "err", kerr.ErrorForCode(t.ErrorCode)) + continue + } + shortTopics[t.Topic] = int32(len(t.Partitions)) + } } for _, balancer := range g.balancers { if balancer.protocolName() == proto { - return balancer.balance(members, g.cl.loadShortTopics()), nil + plan := balancer.balance(members, shortTopics) + g.cl.cfg.logger.Log(LogLevelInfo, "balanced", "plan", plan.String()) + return plan, nil } } return nil, ErrInvalidResp