From 6db1c3946c2cb90aa3944a00549549c0ff6b3582 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 30 Apr 2021 12:14:55 -0600 Subject: [PATCH] Fetches: add EachTopic helper This is analogous to EachPartition, but by necessity allocates if there is more than one fetch. --- pkg/kgo/record_and_fetch.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 265c3b27..58033aee 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -279,6 +279,37 @@ func (fs Fetches) EachPartition(fn func(FetchTopicPartition)) { } } +// EachTopic calls fn for each topic in Fetches. +// +// This is a convenience function that groups all partitions for the same topic +// from many fetches into one FetchTopic. A map is internally allocated to +// group partitions per topic before calling fn. +func (fs Fetches) EachTopic(fn func(FetchTopic)) { + switch len(fs) { + case 0: + return + case 1: + for _, topic := range fs[0].Topics { + fn(topic) + } + return + } + + topics := make(map[string][]FetchPartition) + for _, fetch := range fs { + for _, topic := range fetch.Topics { + topics[topic.Topic] = append(topics[topic.Topic], topic.Partitions...) + } + } + + for topic, partitions := range topics { + fn(FetchTopic{ + topic, + partitions, + }) + } +} + // FetchTopicPartition is similar to FetchTopic, but for an individual // partition. type FetchTopicPartition struct {