From 101d6bd9024f7b0d6e197d8bc342e21069b94577 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 11 Nov 2021 18:56:35 -0700 Subject: [PATCH] consuming: log added/skipped when consuming by regex This will give clarity into which topics are evaluated and decided to be added for consuming or skipped because of regular expression mismatch. --- pkg/kgo/consumer_direct.go | 11 ++++++++++- pkg/kgo/consumer_group.go | 38 +++++++++++++++++++++++++++++++++++++- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/pkg/kgo/consumer_direct.go b/pkg/kgo/consumer_direct.go index b97ee8d3..4b6cb728 100644 --- a/pkg/kgo/consumer_direct.go +++ b/pkg/kgo/consumer_direct.go @@ -35,6 +35,11 @@ func (c *consumer) initDirect() { func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset { topics := d.tps.load() + var rns reNews + if d.cfg.regex { + defer rns.log(d.cfg) + } + toUse := make(map[string]map[int32]Offset, 10) for topic, topicPartitions := range topics { // If we are using regex topics, we have to check all @@ -43,11 +48,15 @@ func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset { if d.cfg.regex { want, seen := d.reSeen[topic] if !seen { - for _, re := range d.cfg.topics { + for rawRe, re := range d.cfg.topics { if want = re.MatchString(topic); want { + rns.add(rawRe, topic) break } } + if !want { + rns.skip(topic) + } d.reSeen[topic] = want } useTopic = want diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 53f89f59..48afe3e7 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1312,6 +1312,11 @@ func (g *groupConsumer) findNewAssignments() { delta int } + var rns reNews + if g.cfg.regex { + defer rns.log(&g.cl.cfg) + } + var numNewTopics int toChange := make(map[string]change, len(topics)) for topic, topicPartitions := range topics { @@ -1330,11 +1335,15 @@ func (g *groupConsumer) findNewAssignments() { if g.cfg.regex { want, seen := g.reSeen[topic] if !seen { - for _, re := range g.cfg.topics { + for rawRe, re := range g.cfg.topics { if want = re.MatchString(topic); want { + rns.add(rawRe, topic) break } } + if !want { + rns.skip(topic) + } g.reSeen[topic] = want } useTopic = want @@ -2261,3 +2270,30 @@ func (g *groupConsumer) commit( onDone(g.cl, req, resp, nil) }() } + +type reNews struct { + added map[string][]string + skipped []string +} + +func (r *reNews) add(re, match string) { + if r.added == nil { + r.added = make(map[string][]string) + } + r.added[re] = append(r.added[re], match) +} + +func (r *reNews) skip(topic string) { + r.skipped = append(r.skipped, topic) +} + +func (r *reNews) log(cfg *cfg) { + var addeds []string + for re, matches := range r.added { + sort.Strings(matches) + addeds = append(addeds, fmt.Sprintf("%s[%s]", re, strings.Join(matches, " "))) + } + added := strings.Join(addeds, " ") + sort.Strings(r.skipped) + cfg.logger.Log(LogLevelInfo, "consumer regular expressions evaluated on new topics", "added", added, "evaluated_and_skipped", r.skipped) +}