diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index edac0bce..ab512194 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -215,6 +215,13 @@ type FetchPartition struct { Records []*Record } +// EachRecord calls fn for each record in the partition. +func (p *FetchPartition) EachRecord(fn func(*Record)) { + for _, r := range p.Records { + fn(r) + } +} + // FetchTopic is a response for a fetched topic from a broker. type FetchTopic struct { // Topic is the topic this is for. @@ -224,6 +231,13 @@ type FetchTopic struct { Partitions []FetchPartition } +// EachPartition calls fn for each partition in Fetches. +func (t *FetchTopic) EachPartition(fn func(FetchPartition)) { + for i := range t.Partitions { + fn(t.Partitions[i]) + } +} + // Fetch is an individual response from a broker. type Fetch struct { // Topics are all topics being responded to from a fetch to a broker.