Skip to content

Commit

Permalink
consumer group: document CommitRecords / CommitUncommittedOffsets more
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Jun 10, 2021
1 parent 2092b4c commit 922f4b8
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,6 +1629,19 @@ func (g *groupConsumer) getUncommittedLocked(head bool) map[string]map[int32]Epo
// can avoid this scenario by calling CommitRecords in a custom OnRevoked, but
// for most workloads, a small bit of potential duplicate processing is fine.
// See the documentation on DisableAutoCommit for more details.
//
// It is recommended to always commit records in order (per partition). If you
// call this function twice with record for partition 0 at offset 999
// initially, and then with record for partition 0 at offset 4, you will rewind
// your commit.
//
// A use case for this function may be to partially process a batch of records,
// commit, and then continue to process the rest of the records. It is not
// recommended to call this for every record processed in a high throughput
// scenario, because you do not want to unnecessarily increase load on Kafka.
//
// If you do not want to wait for this function to complete before continuing
// processing records, you can call this function in a goroutine.
func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error {
// First build the offset commit map. We favor the latest epoch, then
// offset, if any records map to the same topic / partition.
Expand Down Expand Up @@ -1689,6 +1702,13 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error {
// can avoid this scenario by calling CommitRecords in a custom OnRevoked, but
// for most workloads, a small bit of potential duplicate processing is fine.
// See the documentation on DisableAutoCommit for more details.
//
// The recommended pattern for using this function is to have a poll / process
// / commit loop. First PollFetches, then process every record, then call
// CommitUncommittedOffsets.
//
// If you do not want to wait for this function to complete before continuing
// processing records, you can call this function in a goroutine.
func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error {
// This function is just the tail end of CommitRecords just above.
var rerr error
Expand Down

0 comments on commit 922f4b8

Please sign in to comment.