Skip to content

Commit

Permalink
FetchTopic: add EachPartition; FetchPartition: add EachRecord
Browse files Browse the repository at this point in the history
This adds helpers that are already (nearly) duplicates on other types.

Fetches itself has EachPartition, but the callback takes
FetchTopicPartition so that the user can know the topic of the
partition.

Because the user already has the topic for FetchTopic, its EachPartition
callback uses a FetchPartition.

FetchPartition's EachRecord is an exact mirror of FetchTopicPartition's
EachRecord.

There could be a few more callbacks but I'm not sure the use of them
yet. For example, EachRecord on FetchTopic makes a little bit less sense
because users are likely more interested in finer grained detail if
they're using an API that returns a FetchTopic. EachRecords on Fetches
makes sense because it inherently means the users are only interested in
records, and it makes sense on Fetch{,Topic}Partition since it completes
the Each callbacks as an alternative to for loops.
  • Loading branch information
twmb committed Sep 14, 2021
1 parent 8f648e7 commit 563e016
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ type FetchPartition struct {
Records []*Record
}

// EachRecord calls fn for each record in the partition.
func (p *FetchPartition) EachRecord(fn func(*Record)) {
for _, r := range p.Records {
fn(r)
}
}

// FetchTopic is a response for a fetched topic from a broker.
type FetchTopic struct {
// Topic is the topic this is for.
Expand All @@ -224,6 +231,13 @@ type FetchTopic struct {
Partitions []FetchPartition
}

// EachPartition calls fn for each partition in Fetches.
func (t *FetchTopic) EachPartition(fn func(FetchPartition)) {
for i := range t.Partitions {
fn(t.Partitions[i])
}
}

// Fetch is an individual response from a broker.
type Fetch struct {
// Topics are all topics being responded to from a fetch to a broker.
Expand Down

0 comments on commit 563e016

Please sign in to comment.