Skip to content

Commit

Permalink
consumer group: track topics that the leader is not interested in
Browse files Browse the repository at this point in the history
Previously, the group leader would not track topics that it is not
interested in. If A joins interested in topic "foo", and B joins in
topic "bar", if A is leader, it would balance foo and bar correctly, but
then it would not watch for metadata updates on bar.

Now, if "external" topics are detected, the group member stores all
group topics. These are used in metadata requests and the external
topics are now checked for changes after metadata updates.
  • Loading branch information
twmb committed Mar 1, 2022
1 parent e8495bb commit 0bfaf64
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 29 deletions.
161 changes: 134 additions & 27 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kerr"
Expand Down Expand Up @@ -73,6 +74,12 @@ type groupConsumer struct {
// EndTransaction.
offsetsAddedToTxn bool

// If we are leader, then other members may express interest to consume
// topics that we are not interested in consuming. We track the entire
// group's topics in external, and our fetchMetadata loop uses this.
// We store this as a pointer for address comparisons.
external atomic.Value // *groupExternal

//////////////
// mu block //
//////////////
Expand Down Expand Up @@ -310,6 +317,7 @@ func (g *groupConsumer) manage() {
g.fetching = nil

g.leader.set(false)
g.resetExternal()
}

if errors.Is(err, context.Canceled) { // context was canceled, quit now
Expand Down Expand Up @@ -813,7 +821,6 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
// revoke, we wait for it to complete regardless of any future
// error.
if didMetadone && didRevoke {
g.cfg.logger.Log(LogLevelInfo, "heartbeat loop complete", "group", g.cfg.group, "err", lastErr)
return lastErr
}

Expand Down Expand Up @@ -905,6 +912,15 @@ func (g *groupConsumer) rejoin(why string) {
func (g *groupConsumer) joinAndSync() error {
g.cfg.logger.Log(LogLevelInfo, "joining group", "group", g.cfg.group)
g.leader.set(false)
g.getAndResetExternalRejoin()
defer func() {
// If we are not leader, we clear any tracking of external
// topics from when we were previously leader, since tracking
// these is just a waste.
if !g.leader.get() {
g.resetExternal()
}
}()

start:
select {
Expand Down Expand Up @@ -1022,30 +1038,15 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo
leader := resp.LeaderID == resp.MemberID
if leader {
g.leader.set(true)

if resp.SkipAssignment {
g.cfg.logger.Log(LogLevelInfo, "joined, skipping assignment even though leader (KIP-814)",
"group", g.cfg.group,
"member_id", g.memberID,
"instance_id", g.cfg.instanceID,
"generation", g.generation,
"balance_protocol", protocol,
"leader", true,
)
} else {
g.cfg.logger.Log(LogLevelInfo, "joined, balancing group",
"group", g.cfg.group,
"member_id", g.memberID,
"instance_id", g.cfg.instanceID,
"generation", g.generation,
"balance_protocol", protocol,
"leader", true,
)
plan, err = g.balanceGroup(protocol, resp.Members)
if err != nil {
return
}
}
g.cfg.logger.Log(LogLevelInfo, "joined, balancing group",
"group", g.cfg.group,
"member_id", g.memberID,
"instance_id", g.cfg.instanceID,
"generation", g.generation,
"balance_protocol", protocol,
"leader", true,
)
plan, err = g.balanceGroup(protocol, resp.Members, resp.SkipAssignment)
} else {
g.cfg.logger.Log(LogLevelInfo, "joined",
"group", g.cfg.group,
Expand All @@ -1058,6 +1059,106 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo
return
}

// If other group members consume topics we are not interested in, we track the
// entire group's topics in this groupExternal type. On metadata update, we see
// if any partitions for any of these topics have changed, and if so, we as
// leader rejoin the group.
//
// Our external topics are cleared whenever we join and are not leader. We keep
// our previous external topics if we are leader: on the first balance as
// leader, we request metadata for all topics, then on followup balances, we
// already have that metadata and do not need to reload it when balancing.
//
// Whenever metadata updates, we detect if a rejoin is needed and always reset
// the rejoin status.
type groupExternal struct {
tps atomic.Value // map[string]int32
rejoin atomicBool
}

func (g *groupConsumer) loadExternal() *groupExternal {
e := g.external.Load()
if e != nil {
return e.(*groupExternal)
}
return nil
}

// We reset our external topics whenever join&sync loop errors, or when we join
// and are not leader.
func (g *groupConsumer) resetExternal() {
g.external.Store((*groupExternal)(nil))
}

// If this is our first join as leader, or if a new member joined with new
// topics we were not tracking, we re-initialize external with the all-topics
// metadata refresh.
func (g *groupConsumer) initExternal(current map[string]int32) {
var e groupExternal
e.tps.Store(dupmsi32(current))
g.external.Store(&e)
}

// Reset whenever we join, & potentially used to rejoin when finding new
// assignments (i.e., end of metadata).
func (g *groupConsumer) getAndResetExternalRejoin() bool {
e := g.loadExternal()
if e == nil {
return false
}
defer e.rejoin.set(false)
return e.rejoin.get()
}

// Runs fn over a load, not copy, of our map.
func (g *groupExternal) fn(fn func(map[string]int32)) {
if g == nil {
return
}
v := g.tps.Load()
if v == nil {
return
}
tps := v.(map[string]int32)
fn(tps)
}

// Runs fn over a clone of our external map and updates the map.
func (g *groupExternal) cloned(fn func(map[string]int32)) {
g.fn(func(tps map[string]int32) {
dup := dupmsi32(tps)
fn(dup)
g.tps.Store(dup)
})
}

func (g *groupExternal) eachTopic(fn func(string)) {
g.fn(func(tps map[string]int32) {
for t := range tps {
fn(t)
}
})
}

func (g *groupExternal) updateLatest(meta map[string]*topicPartitionsData) {
g.cloned(func(tps map[string]int32) {
var rejoin bool
for t, ps := range tps {
latest := meta[t]
if latest == nil || latest.loadErr != nil {
continue
}
if psLatest := int32(len(latest.partitions)); psLatest != ps {
rejoin = true
tps[t] = psLatest
}
}
if rejoin {
g.rejoin.set(true)
}
})
}

func (g *groupConsumer) handleSyncResp(protocol string, resp *kmsg.SyncGroupResponse) error {
if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
return err
Expand Down Expand Up @@ -1399,7 +1500,9 @@ func (g *groupConsumer) findNewAssignments() {
}
}

if len(toChange) == 0 {
externalRejoin := g.leader.get() && g.getAndResetExternalRejoin()

if len(toChange) == 0 && !externalRejoin {
return
}

Expand All @@ -1423,7 +1526,11 @@ func (g *groupConsumer) findNewAssignments() {
if numNewTopics > 0 {
g.rejoin("rejoining because there are more topics to consume, our interests have changed")
} else if g.leader.get() {
g.rejoin("rejoining because we are the leader and noticed some topics have new partitions")
if len(toChange) > 0 {
g.rejoin("rejoining because we are the leader and noticed some topics have new partitions")
} else if externalRejoin {
g.rejoin("leader detected that partitions on topics another member is consuming have changed, rejoining to trigger rebalance")
}
}
}

Expand Down
41 changes: 39 additions & 2 deletions pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,17 @@ func (g *groupConsumer) findBalancer(from, proto string) (GroupBalancer, error)
}

// balanceGroup returns a balancePlan from a join group response.
func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupResponseMember) ([]kmsg.SyncGroupRequestGroupAssignment, error) {
g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader")
//
// If the group has topics this leader does not want to consume, this also
// returns all topics and partitions; the leader will then periodically do its
// own metadata update to see if partition counts have changed for these random
// topics.
func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupResponseMember, skipBalance bool) ([]kmsg.SyncGroupRequestGroupAssignment, error) {
if skipBalance {
g.cl.cfg.logger.Log(LogLevelInfo, "parsing group balance as leader but not assigning (KIP-814)")
} else {
g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader")
}

b, err := g.findBalancer("balance group", proto)
if err != nil {
Expand All @@ -318,6 +327,23 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo
topicPartitionCount[topic] = int32(len(data.load().partitions))
}

// If our consumer metadata does not contain all topics, the group is
// expressing interests in topics we are not consuming. Perhaps we have
// those topics saved in our external topics map.
if needMeta {
g.loadExternal().fn(func(m map[string]int32) {
needMeta = false
for topic := range topics {
partitions, exists := m[topic]
if !exists {
needMeta = true
continue
}
topicPartitionCount[topic] = partitions
}
})
}

if needMeta {
g.cl.cfg.logger.Log(LogLevelInfo, "group members indicated interest in topics the leader is not assigned, fetching metadata for all group topics")
var metaTopics []string
Expand All @@ -341,6 +367,8 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo
}
topicPartitionCount[*t.Topic] = int32(len(t.Partitions))
}

g.initExternal(topicPartitionCount)
}

// If the returned balancer is a ConsumerBalancer (which it likely
Expand Down Expand Up @@ -368,6 +396,15 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo
g.cl.cfg.logger.Log(LogLevelInfo, "unable to log information about group member interests: the user has defined a custom balancer (not a *ConsumerBalancer)")
}

// KIP-814: we are leader and we know what the entire group is
// consuming. Crucially, we parsed topics that we are potentially not
// interested in and are now tracking them for metadata updates. We
// have logged the current interests, we do not need to actually
// balance.
if skipBalance {
return nil, nil
}

// If the returned IntoSyncAssignment is a BalancePlan, which it likely
// is if the balancer is a ConsumerBalancer, then we can again print
// more useful debugging information.
Expand Down
6 changes: 6 additions & 0 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
var (
tpsProducerLoad = cl.producer.topics.load()
tpsConsumer *topicsPartitions
groupExternal *groupExternal
all = cl.cfg.regex
reqTopics []string
)
Expand All @@ -260,6 +261,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
tpsConsumer = c.d.tps
case c.g != nil:
tpsConsumer = c.g.tps
groupExternal = c.g.loadExternal()
}

if !all {
Expand All @@ -272,6 +274,9 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
reqTopicsSet[topic] = struct{}{}
}
}
groupExternal.eachTopic(func(t string) {
reqTopicsSet[t] = struct{}{}
})
reqTopics = make([]string, 0, len(reqTopicsSet))
for topic := range reqTopicsSet {
reqTopics = append(reqTopics, topic)
Expand All @@ -286,6 +291,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
)
return nil, err
}
groupExternal.updateLatest(latest)

// If we are consuming with regex and fetched all topics, the metadata
// may have returned topics the consumer is not yet tracking. We ensure
Expand Down
8 changes: 8 additions & 0 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import (
"github.com/twmb/franz-go/pkg/kerr"
)

func dupmsi32(m map[string]int32) map[string]int32 {
d := make(map[string]int32, len(m))
for t, ps := range m {
d[t] = ps
}
return d
}

type tpsFmt map[string][]int32

func (f tpsFmt) String() string {
Expand Down

0 comments on commit 0bfaf64

Please sign in to comment.