From 563e0167af1a96fba283742f6cf3e5b17b4a04be Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 13 Sep 2021 22:18:32 -0600 Subject: [PATCH] FetchTopic: add EachPartition; FetchPartition: add EachRecord This adds helpers that are already (nearly) duplicates on other types. Fetches itself has EachPartition, but the callback takes FetchTopicPartition so that the user can know the topic of the partition. Because the user already has the topic for FetchTopic, its EachPartition callback uses a FetchPartition. FetchPartition's EachRecord is an exact mirror of FetchTopicPartition's EachRecord. There could be a few more callbacks but I'm not sure the use of them yet. For example, EachRecord on FetchTopic makes a little bit less sense because users are likely more interested in finer grained detail if they're using an API that returns a FetchTopic. EachRecords on Fetches makes sense because it inherently means the users are only interested in records, and it makes sense on Fetch{,Topic}Partition since it completes the Each callbacks as an alternative to for loops. --- pkg/kgo/record_and_fetch.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index edac0bce..ab512194 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -215,6 +215,13 @@ type FetchPartition struct { Records []*Record } +// EachRecord calls fn for each record in the partition. +func (p *FetchPartition) EachRecord(fn func(*Record)) { + for _, r := range p.Records { + fn(r) + } +} + // FetchTopic is a response for a fetched topic from a broker. type FetchTopic struct { // Topic is the topic this is for. @@ -224,6 +231,13 @@ type FetchTopic struct { Partitions []FetchPartition } +// EachPartition calls fn for each partition in Fetches. +func (t *FetchTopic) EachPartition(fn func(FetchPartition)) { + for i := range t.Partitions { + fn(t.Partitions[i]) + } +} + // Fetch is an individual response from a broker. type Fetch struct { // Topics are all topics being responded to from a fetch to a broker.