Skip to content

Commit

Permalink
consuming: log added/skipped when consuming by regex
Browse files Browse the repository at this point in the history
This will give clarity into which topics are evaluated and decided to be
added for consuming or skipped because of regular expression mismatch.
  • Loading branch information
twmb committed Nov 12, 2021
1 parent b6759bc commit 101d6bd
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
11 changes: 10 additions & 1 deletion pkg/kgo/consumer_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func (c *consumer) initDirect() {
func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {
topics := d.tps.load()

var rns reNews
if d.cfg.regex {
defer rns.log(d.cfg)
}

toUse := make(map[string]map[int32]Offset, 10)
for topic, topicPartitions := range topics {
// If we are using regex topics, we have to check all
Expand All @@ -43,11 +48,15 @@ func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {
if d.cfg.regex {
want, seen := d.reSeen[topic]
if !seen {
for _, re := range d.cfg.topics {
for rawRe, re := range d.cfg.topics {
if want = re.MatchString(topic); want {
rns.add(rawRe, topic)
break
}
}
if !want {
rns.skip(topic)
}
d.reSeen[topic] = want
}
useTopic = want
Expand Down
38 changes: 37 additions & 1 deletion pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,11 @@ func (g *groupConsumer) findNewAssignments() {
delta int
}

var rns reNews
if g.cfg.regex {
defer rns.log(&g.cl.cfg)
}

var numNewTopics int
toChange := make(map[string]change, len(topics))
for topic, topicPartitions := range topics {
Expand All @@ -1330,11 +1335,15 @@ func (g *groupConsumer) findNewAssignments() {
if g.cfg.regex {
want, seen := g.reSeen[topic]
if !seen {
for _, re := range g.cfg.topics {
for rawRe, re := range g.cfg.topics {
if want = re.MatchString(topic); want {
rns.add(rawRe, topic)
break
}
}
if !want {
rns.skip(topic)
}
g.reSeen[topic] = want
}
useTopic = want
Expand Down Expand Up @@ -2261,3 +2270,30 @@ func (g *groupConsumer) commit(
onDone(g.cl, req, resp, nil)
}()
}

type reNews struct {
added map[string][]string
skipped []string
}

func (r *reNews) add(re, match string) {
if r.added == nil {
r.added = make(map[string][]string)
}
r.added[re] = append(r.added[re], match)
}

func (r *reNews) skip(topic string) {
r.skipped = append(r.skipped, topic)
}

func (r *reNews) log(cfg *cfg) {
var addeds []string
for re, matches := range r.added {
sort.Strings(matches)
addeds = append(addeds, fmt.Sprintf("%s[%s]", re, strings.Join(matches, " ")))
}
added := strings.Join(addeds, " ")
sort.Strings(r.skipped)
cfg.logger.Log(LogLevelInfo, "consumer regular expressions evaluated on new topics", "added", added, "evaluated_and_skipped", r.skipped)
}

0 comments on commit 101d6bd

Please sign in to comment.