Skip to content

Commit

Permalink
kgo.Fetches: add EachPartition callback iterator
Browse files Browse the repository at this point in the history
This is a handy alternative to the FetchesRecordIter, I personally
prefer this style and @dcrodman indicated this would be useful as well.

The only con is the callback is being passed an 88 byte sized struct,
meaning it falls into DUFFCOPY and is a bit slower. Ideally the heavy
iteration will be in record iteration.
  • Loading branch information
twmb committed Mar 20, 2021
1 parent 167de6b commit 5f50891
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,36 @@ beforePartition0:
goto beforePartition0
}
}

// EachPartition calls fn for each partition in Fetches.
//
// Partitions are not visited in any specific order, and a topic may be visited
// multiple times if it is spread across fetches.
func (fs Fetches) EachPartition(fn func(FetchTopicPartition)) {
for _, fetch := range fs {
for _, topic := range fetch.Topics {
for i := range topic.Partitions {
fn(FetchTopicPartition{
Topic: topic.Topic,
Partition: topic.Partitions[i],
})
}
}
}
}

// FetchTopicPartition is similar to FetchTopic, but for an individual
// partition.
type FetchTopicPartition struct {
// Topic is the topic this is for.
Topic string
// Partition is an individual partition within this topic.
Partition FetchPartition
}

// EachRecord calls fn for each record in the topic's partition.
func (r *FetchTopicPartition) EachRecord(fn func(*Record)) {
for _, r := range r.Partition.Records {
fn(r)
}
}

0 comments on commit 5f50891

Please sign in to comment.