From 8178f2cc5883304d409f653ff7e5d42163cf3e8e Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 4 Mar 2021 22:27:03 -0700 Subject: [PATCH] group assigning: add debug logs for balancing plans - The leader logs each member's interests and previously owned partitions - The leader logs the resulting plan - Each member logs what it received --- pkg/kgo/consumer_group.go | 3 +++ pkg/kgo/group_balancer.go | 2 ++ 2 files changed, 5 insertions(+) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 99dd242a..7497ee43 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1062,6 +1062,7 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo ) plan, err = g.balanceGroup(protocol, resp.Members) + g.cl.cfg.logger.Log(LogLevelDebug, "balanced", "plan", plan) if err != nil { return } @@ -1089,6 +1090,8 @@ func (g *groupConsumer) handleSyncResp(resp *kmsg.SyncGroupResponse, plan balanc return err } + g.cl.cfg.logger.Log(LogLevelDebug, "synced", "assigned", kassignment.Topics) + // Past this point, we will fall into the setupAssigned prerevoke code, // meaning for cooperative, we will revoke what we need to. if g.cooperative { diff --git a/pkg/kgo/group_balancer.go b/pkg/kgo/group_balancer.go index a84a68f1..e6965090 100644 --- a/pkg/kgo/group_balancer.go +++ b/pkg/kgo/group_balancer.go @@ -115,6 +115,8 @@ func (g *groupConsumer) balanceGroup(proto string, kmembers []kmsg.JoinGroupResp }) for i := range members { sort.Strings(members[i].topics) // guarantee sorted topics + m := &members[i] + g.cl.cfg.logger.Log(LogLevelDebug, "member interests", "id", m.id, "topics", m.topics, "previously_owned", m.owned) } for _, balancer := range g.balancers {