Skip to content

Commit

Permalink
group assigning: add debug logs for balancing plans
Browse files Browse the repository at this point in the history
- The leader logs each member's interests and previously owned
  partitions
- The leader logs the resulting plan
- Each member logs what it received
  • Loading branch information
twmb committed Mar 5, 2021
1 parent e038916 commit 8178f2c
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,7 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo
)

plan, err = g.balanceGroup(protocol, resp.Members)
g.cl.cfg.logger.Log(LogLevelDebug, "balanced", "plan", plan)
if err != nil {
return
}
Expand Down Expand Up @@ -1089,6 +1090,8 @@ func (g *groupConsumer) handleSyncResp(resp *kmsg.SyncGroupResponse, plan balanc
return err
}

g.cl.cfg.logger.Log(LogLevelDebug, "synced", "assigned", kassignment.Topics)

// Past this point, we will fall into the setupAssigned prerevoke code,
// meaning for cooperative, we will revoke what we need to.
if g.cooperative {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func (g *groupConsumer) balanceGroup(proto string, kmembers []kmsg.JoinGroupResp
})
for i := range members {
sort.Strings(members[i].topics) // guarantee sorted topics
m := &members[i]
g.cl.cfg.logger.Log(LogLevelDebug, "member interests", "id", m.id, "topics", m.topics, "previously_owned", m.owned)
}

for _, balancer := range g.balancers {
Expand Down

0 comments on commit 8178f2c

Please sign in to comment.