Skip to content

Commit

Permalink
group balancer: debug => info logging; handle join better
Browse files Browse the repository at this point in the history
Previously, if a bunch of group members joined with disparate interests,
the leader would only know how to assign topics that it was interested
in, not what everything else was interested in. Now, if members join
with topics the leader does not know of, the leader performs a metadata
fetch on all topics.

However, we still do not keep around information of these other
interests, meaning that if an admin adds partitions to a topic later,
the leader will not notice this. The risk here is pretty low, but if
this is an issue later, we can add an option to keep the metadata for
all topics. The downside of this is that all leaders eventually get
bloated topics lists, meaning metadata requests slowly grow larger over
time.

With this, we also change up the logging messages a bit to be
potentially nicer, and we move them to the info level so that we always
log. For large balances, this could be a really, really big message, but
ideally balances are pretty rare.
  • Loading branch information
twmb committed Mar 29, 2021
1 parent d74bbc3 commit a670bc7
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 7 deletions.
13 changes: 9 additions & 4 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"regexp"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -260,8 +261,6 @@ type groupConsumer struct {
lastAssigned map[string][]int32 // only updated in join&sync loop
nowAssigned map[string][]int32 // only updated in join&sync loop

groupExtraTopics map[string]struct{} // TODO TODO TODO

// leader is whether we are the leader right now. This is set to false
//
// - set to false at the beginning of a join group session
Expand Down Expand Up @@ -1135,7 +1134,6 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo
)

plan, err = g.balanceGroup(protocol, resp.Members)
g.cl.cfg.logger.Log(LogLevelDebug, "balanced", "plan", plan)
if err != nil {
return
}
Expand All @@ -1162,7 +1160,14 @@ func (g *groupConsumer) handleSyncResp(resp *kmsg.SyncGroupResponse, plan balanc
return err
}

g.cl.cfg.logger.Log(LogLevelDebug, "synced", "assigned", kassignment.Topics)
var sb strings.Builder
for i, topic := range kassignment.Topics {
fmt.Fprintf(&sb, "%s%v", topic.Topic, topic.Partitions)
if i < len(kassignment.Topics)-1 {
sb.WriteString(", ")
}
}
g.cl.cfg.logger.Log(LogLevelInfo, "synced", "assigned", sb.String())

// Past this point, we will fall into the setupAssigned prerevoke code,
// meaning for cooperative, we will revoke what we need to.
Expand Down
93 changes: 90 additions & 3 deletions pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package kgo
import (
"fmt"
"sort"
"strings"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo/internal/sticky"
"github.com/twmb/franz-go/pkg/kmsg"
)
Expand Down Expand Up @@ -42,12 +44,33 @@ type groupMember struct {
owned []kmsg.GroupMemberMetadataOwnedPartition
}

func (m *groupMember) balanceInterests() string {
var sb strings.Builder
sb.WriteString("interested topics: ")
fmt.Fprintf(&sb, "%v", m.topics)
sb.WriteString(", previously owned: ")
for i, owned := range m.owned {
fmt.Fprintf(&sb, "%s%v", owned.Topic, owned.Partitions)
if i < len(m.owned) {
sb.WriteString(", ")
}
}
return sb.String()
}

type groupMemberID struct {
memberID string
instanceID string
hasInstance bool
}

func (id *groupMemberID) String() string {
if id.hasInstance {
return id.memberID + "(" + id.instanceID + ")"
}
return id.memberID
}

func (me groupMemberID) less(other groupMemberID) bool {
if me.hasInstance && other.hasInstance {
return me.instanceID < other.instanceID
Expand All @@ -65,6 +88,33 @@ func (me groupMemberID) less(other groupMemberID) bool {
// member id => topic => partitions
type balancePlan map[groupMemberID]map[string][]int32

func (p balancePlan) String() string {
var sb strings.Builder

var membersWritten int
for member, topics := range p {
membersWritten++
sb.WriteString(member.String())
sb.WriteString("{")

var topicsWritten int
for topic, partitions := range topics {
fmt.Fprintf(&sb, "%s%v", topic, partitions)
topicsWritten++
if topicsWritten < len(topics) {
sb.WriteString(", ")
}
}

sb.WriteString("}")
if membersWritten < len(p) {
sb.WriteString(", ")
}
}

return sb.String()
}

func newBalancePlan(members []groupMember) balancePlan {
plan := make(map[groupMemberID]map[string][]int32, len(members))
for i := range members {
Expand Down Expand Up @@ -113,15 +163,52 @@ func (g *groupConsumer) balanceGroup(proto string, kmembers []kmsg.JoinGroupResp
sort.Slice(members, func(i, j int) bool {
return members[i].id.less(members[j].id) // guarantee sorted members
})

myTopics := g.cl.loadTopics()
allTopics := make(map[string]struct{})
var needMeta bool

g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader")
for i := range members {
sort.Strings(members[i].topics) // guarantee sorted topics
m := &members[i]
g.cl.cfg.logger.Log(LogLevelDebug, "member interests", "id", m.id, "topics", m.topics, "previously_owned", m.owned)
sort.Strings(m.topics) // guarantee sorted topics
g.cl.cfg.logger.Log(LogLevelInfo, "balance group member", "id", m.id.String(), "interests", m.balanceInterests())

for _, topic := range m.topics {
allTopics[topic] = struct{}{}
if _, exists := myTopics[topic]; !exists {
needMeta = true
}
}
}

shortTopics := g.cl.loadShortTopics()
if needMeta {
g.cl.cfg.logger.Log(LogLevelInfo, "group members indicated interest in topics the leader is not assigned, fetching metadata for all group topics")
var metaTopics []string
for topic := range allTopics {
metaTopics = append(metaTopics, topic)
}

_, resp, err := g.cl.fetchMetadataForTopics(g.ctx, false, metaTopics)
if err != nil {
return nil, fmt.Errorf("unable to fetch metadata for group topics: %v", err)
}
for i := range resp.Topics {
t := &resp.Topics[i]
if t.ErrorCode != 0 {
g.cl.cfg.logger.Log(LogLevelWarn, "metadata resp in balance for topic has error, skipping...", "topic", t.Topic, "err", kerr.ErrorForCode(t.ErrorCode))
continue
}
shortTopics[t.Topic] = int32(len(t.Partitions))
}
}

for _, balancer := range g.balancers {
if balancer.protocolName() == proto {
return balancer.balance(members, g.cl.loadShortTopics()), nil
plan := balancer.balance(members, shortTopics)
g.cl.cfg.logger.Log(LogLevelInfo, "balanced", "plan", plan.String())
return plan, nil
}
}
return nil, ErrInvalidResp
Expand Down

0 comments on commit a670bc7

Please sign in to comment.