Skip to content

Commit

Permalink
Fetches: add EachTopic helper
Browse files Browse the repository at this point in the history
This is analogous to EachPartition, but by necessity allocates if there
is more than one fetch.
  • Loading branch information
twmb committed Apr 30, 2021
1 parent 5f8a4bc commit 6db1c39
Showing 1 changed file with 31 additions and 0 deletions.
31 changes: 31 additions & 0 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,37 @@ func (fs Fetches) EachPartition(fn func(FetchTopicPartition)) {
}
}

// EachTopic calls fn for each topic in Fetches.
//
// This is a convenience function that groups all partitions for the same topic
// from many fetches into one FetchTopic. A map is internally allocated to
// group partitions per topic before calling fn.
func (fs Fetches) EachTopic(fn func(FetchTopic)) {
switch len(fs) {
case 0:
return
case 1:
for _, topic := range fs[0].Topics {
fn(topic)
}
return
}

topics := make(map[string][]FetchPartition)
for _, fetch := range fs {
for _, topic := range fetch.Topics {
topics[topic.Topic] = append(topics[topic.Topic], topic.Partitions...)
}
}

for topic, partitions := range topics {
fn(FetchTopic{
topic,
partitions,
})
}
}

// FetchTopicPartition is similar to FetchTopic, but for an individual
// partition.
type FetchTopicPartition struct {
Expand Down

0 comments on commit 6db1c39

Please sign in to comment.