From 5f508917670499a3c77a46a686cadd29edefdc58 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 16 Mar 2021 22:54:06 -0600 Subject: [PATCH] kgo.Fetches: add EachPartition callback iterator This is a handy alternative to the FetchesRecordIter, I personally prefer this style and @dcrodman indicated this would be useful as well. The only con is the callback is being passed an 88 byte sized struct, meaning it falls into DUFFCOPY and is a bit slower. Ideally the heavy iteration will be in record iteration. --- pkg/kgo/record_and_fetch.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 9a8ffe11..0d5c85c7 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -256,3 +256,36 @@ beforePartition0: goto beforePartition0 } } + +// EachPartition calls fn for each partition in Fetches. +// +// Partitions are not visited in any specific order, and a topic may be visited +// multiple times if it is spread across fetches. +func (fs Fetches) EachPartition(fn func(FetchTopicPartition)) { + for _, fetch := range fs { + for _, topic := range fetch.Topics { + for i := range topic.Partitions { + fn(FetchTopicPartition{ + Topic: topic.Topic, + Partition: topic.Partitions[i], + }) + } + } + } +} + +// FetchTopicPartition is similar to FetchTopic, but for an individual +// partition. +type FetchTopicPartition struct { + // Topic is the topic this is for. + Topic string + // Partition is an individual partition within this topic. + Partition FetchPartition +} + +// EachRecord calls fn for each record in the topic's partition. +func (r *FetchTopicPartition) EachRecord(fn func(*Record)) { + for _, r := range r.Partition.Records { + fn(r) + } +}