From 28bba4375e34c2df09182ea8815afc94d1fcbb2b Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 24 Aug 2021 20:41:15 -0600 Subject: [PATCH] autocommitting: only commit previously polled fetches Before this commit, autocommitting could lead to message loss. If fetches were polled, and then an autocommit happened before any messages were processed, and then the process crashed, all polled messages would be lost. Now, we only allow *previously* polled fetches to be committed. The assumption is that PollFetches is only called application-side when all previously polled fetches are finalized within the application (processed / ready to be committed). This new behavior ensures at-least-once message processing at all layers, but does have a risk of duplicates during rebalances. This new behavior also showed that passing the group context to OnRevoke was insufficient: if we used the group context to commit when a group is being left (which triggers OnRevoke before actually leaving), then the commit would fail. The previous recommendation was to commit before leaving. We may as well just use the client context. This *does* mean that users will not be able to detect when a group has exited within the callback, meaning their callback can take so long that they are booted from the group, but that's essentially the previous behavior anyway: the user could do stuff, take so long they're booted, and then fail whatever they're doing *and* be booted from the group. Now they wont fail whatever they're doing, but they'll still be booted from the group. Worst case, we can add the group context back later if the need arises. We also add a few extra logs: in any OnXyz, we log what we entered the callback with, and then potentially call into user callbacks. Closes #57 --- pkg/kgo/config.go | 58 ++++++++--------- pkg/kgo/consumer_group.go | 131 ++++++++++++++++++++++++++++---------- pkg/kgo/group_test.go | 48 +++++++------- 3 files changed, 147 insertions(+), 90 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 5d3defcb..81c928fb 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -163,6 +163,7 @@ type cfg struct { setCommitCallback bool autocommitDisable bool // true if autocommit was disabled or we are transactional + autocommitGreedy bool autocommitInterval time.Duration commitCallback func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) } @@ -1132,24 +1133,14 @@ func ConsumeRegex() ConsumerOpt { // ConsumerGroup sets the consumer group for the client to join and consume in. // This option is required if using any other group options. // -// Note that when group consuming, the default is to autocommit every 5s. -// Autocommitting risks losing data if your applications crashes after -// autocommitting but before you have processed polled records. To ensure -// that you lose absolutely no data, you can disable autocommitting and -// manually commit, like so: -// -// if err := cl.CommitUncommittedOffsets(context.Background()) { -// // handle err; unable to commit -// } -// -// The main downside with disabling autocommitting is that you run the risk of -// some duplicate processing of records than necessary. See the documentation -// on DisableAutoCommit for more details. -// -// If you can tolerate a little bit of data loss from crashes because you do -// not expect to ever crash, then relying on autocommitting is a fine option. -// However, if you can tolerate a little bit of duplicate processing, manually -// committing is very easy. +// Note that when group consuming, the default is to autocommit every 5s. To be +// safe, autocommitting only commits what is *previously* polled. If you poll +// once, nothing will be committed. If you poll again, the first poll is +// available to be committed. This ensures at-least-once processing, but does +// mean there is likely some duplicate processing during rebalances. When your +// client shuts down, you should issue one final synchronous commit before +// leaving the group (because you will not be polling again, and you are not +// waiting for an autocommit). func ConsumerGroup(group string) GroupOpt { return groupOpt{func(cfg *cfg) { cfg.group = group }} } @@ -1237,8 +1228,8 @@ func RequireStableFetchOffsets() GroupOpt { // interval. It is possible for the group, immediately after finishing a // balance, to re-enter a new balancing session. // -// The OnAssigned function is passed the group's context, which is only -// canceled if the group is left or the client is closed. +// The OnAssigned function is passed the client's context, which is only +// canceled if the client is closed. func OnAssigned(onAssigned func(context.Context, *Client, map[string][]int32)) GroupOpt { return groupOpt{func(cfg *cfg) { cfg.onAssigned, cfg.setAssigned = onAssigned, true }} } @@ -1251,17 +1242,14 @@ func OnAssigned(onAssigned func(context.Context, *Client, map[string][]int32)) G // balance, to re-enter a new balancing session. // // If autocommit is enabled, the default OnRevoked is a blocking commit all -// offsets. The reason for a blocking commit is so that no later commit cancels -// the blocking commit. If the commit in OnRevoked were canceled, then the -// rebalance would proceed immediately, the commit that canceled the blocking -// commit would fail, and duplicates could be consumed after the rebalance -// completes. -// -// The OnRevoked function is passed the group's context, which is only canceled -// if the group is left or the client is closed. Since OnRevoked is called when -// leaving a group, you likely want to commit before leaving, and to ignore -// context.Canceled / return early if your handling in OnRevoked fails due to -// the context being canceled. +// non-dirty offsets (where dirty is the most recent poll). The reason for a +// blocking commit is so that no later commit cancels the blocking commit. If +// the commit in OnRevoked were canceled, then the rebalance would proceed +// immediately, the commit that canceled the blocking commit would fail, and +// duplicates could be consumed after the rebalance completes. +// +// The OnRevoked function is passed the client's context, which is only +// canceled if the client is closed. // // OnRevoked function is called at the end of a group session even if there are // no partitions being revoked. @@ -1308,6 +1296,14 @@ func DisableAutoCommit() GroupOpt { return groupOpt{func(cfg *cfg) { cfg.autocommitDisable = true }} } +// GreedyAutoCommit opts in to committing everything that has been polled when +// autocommitting (the dirty offsets), rather than committing what has +// previously been polled. This option may result in message loss if your +// application crashes. +func GreedyAutoCommit() GroupOpt { + return groupOpt{func(cfg *cfg) { cfg.autocommitGreedy = true }} +} + // AutoCommitInterval sets how long to go between autocommits, overriding the // default 5s. func AutoCommitInterval(interval time.Duration) GroupOpt { diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index fc402b77..f275fdd4 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -167,6 +167,30 @@ func (c *consumer) initGroup() { g.cfg.autocommitDisable = true } + for _, logOn := range []struct { + name string + set *func(context.Context, *Client, map[string][]int32) + }{ + {"OnAssigned", &g.cfg.onAssigned}, + {"OnRevoked", &g.cfg.onRevoked}, + {"OnLost", &g.cfg.onLost}, + } { + user := *logOn.set + name := logOn.name + *logOn.set = func(ctx context.Context, cl *Client, m map[string][]int32) { + var ctxExpired bool + select { + case <-ctx.Done(): + ctxExpired = true + default: + } + cl.cfg.logger.Log(LogLevelDebug, "entering "+name, "with", m, "context_expired", ctxExpired) + if user != nil { + user(ctx, cl, m) + } + } + } + // For non-regex topics, we explicitly ensure they exist for loading // metadata. This is of no impact if we are *also* consuming via regex, // but that is no problem. @@ -231,16 +255,16 @@ func (g *groupConsumer) manage() { // onRevoked, but since we are handling this case for // the cooperative consumer we may as well just also // include the eager consumer. - g.cfg.onRevoked(g.ctx, g.cl, g.nowAssigned) + g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned) } else if g.cfg.onLost != nil { // Any other error is perceived as a fatal error, // and we go into OnLost as appropriate. - g.cfg.onLost(g.ctx, g.cl, g.nowAssigned) + g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned) hook() } else if g.cfg.onRevoked != nil { // If OnLost is not specified, we fallback to OnRevoked. - g.cfg.onRevoked(g.ctx, g.cl, g.nowAssigned) + g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned) hook() } @@ -424,7 +448,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned) } if g.cfg.onRevoked != nil { - g.cfg.onRevoked(g.ctx, g.cl, g.nowAssigned) + g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned) } g.nowAssigned = nil @@ -486,7 +510,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi g.cfg.logger.Log(LogLevelInfo, "cooperative consumer calling onRevoke", "group", g.cfg.group, "lost", lost, "stage", stage) } if g.cfg.onRevoked != nil { - g.cfg.onRevoked(g.ctx, g.cl, lost) + g.cfg.onRevoked(g.cl.ctx, g.cl, lost) } } @@ -558,7 +582,7 @@ func (s *assignRevokeSession) assign(g *groupConsumer, newAssigned map[string][] // We always call on assigned, even if nothing new is // assigned. This allows consumers to know that // assignment is done and do setup logic. - g.cfg.onAssigned(g.ctx, g.cl, newAssigned) + g.cfg.onAssigned(g.cl.ctx, g.cl, newAssigned) } }() return s.assignDone @@ -1157,6 +1181,7 @@ start: Offset: offset.at, } topicUncommitted[partition] = uncommit{ + dirty: committed, head: committed, committed: committed, } @@ -1265,8 +1290,9 @@ func (g *groupConsumer) findNewAssignments() { // The reason head is just past the latest offset is because we want // to commit TO an offset, not BEFORE an offset. type uncommit struct { - head EpochOffset - committed EpochOffset + dirty EpochOffset // if autocommitting, what will move to head on next Poll + head EpochOffset // ready to commit + committed EpochOffset // what is committed } // EpochOffset combines a record offset with the leader epoch the broker @@ -1283,6 +1309,8 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) { var b bytes.Buffer debug := g.cfg.logger.Level() >= LogLevelDebug + setHead := g.cfg.autocommitDisable || g.cfg.autocommitGreedy + g.mu.Lock() defer g.mu.Unlock() @@ -1311,19 +1339,28 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) { } } - uncommit := topicOffsets[partition.Partition] - // Our new head points just past the final consumed offset, // that is, if we rejoin, this is the offset to begin at. - newOffset := final.Offset + 1 + set := EpochOffset{ + final.LeaderEpoch, // -1 if old message / unknown + final.Offset + 1, + } + prior := topicOffsets[partition.Partition] + if debug { - fmt.Fprintf(&b, "%d{%d=>%d}, ", partition.Partition, uncommit.head.Offset, newOffset) + if setHead { + fmt.Fprintf(&b, "%d{%d=>%d}, ", partition.Partition, prior.head.Offset, set.Offset) + } else { + fmt.Fprintf(&b, "%d{%d=>%d=>%d}, ", partition.Partition, prior.head.Offset, prior.dirty.Offset, set.Offset) + } } - uncommit.head = EpochOffset{ - final.LeaderEpoch, // -1 if old message / unknown - newOffset, + + prior.head = prior.dirty + prior.dirty = set + if setHead { + prior.head = set } - topicOffsets[partition.Partition] = uncommit + topicOffsets[partition.Partition] = prior } if debug { @@ -1412,10 +1449,28 @@ func (g *groupConsumer) updateCommitted( fmt.Fprintf(&b, "%d{%d=>%d}, ", reqPart.Partition, uncommit.committed.Offset, reqPart.Offset) } - uncommit.committed = EpochOffset{ + set := EpochOffset{ reqPart.LeaderEpoch, reqPart.Offset, } + uncommit.committed = set + + // We always commit either dirty offsets or head + // offsets. For sanity, we bump both dirty/head to the + // commit if they are before the commit. We only expect + // head to be before the commit, if committing manually + // through UncommittedOffsets in an OnRevoke with + // autocommitting enabled. + for _, next := range []*EpochOffset{ + &uncommit.head, + &uncommit.committed, + } { + if set.Epoch > next.Epoch || + set.Epoch == next.Epoch && + set.Offset > next.Offset { + *next = set + } + } topic[respPart.Partition] = uncommit } @@ -1473,10 +1528,14 @@ func (g *groupConsumer) loopCommit() { // after the group context is canceled (which is the first // thing that happens so as to quit the manage loop before // leaving a group). + // + // We always commit only the head. If we are autocommitting + // dirty, then updateUncommitted updates the head to dirty + // offsets. g.mu.Lock() if !g.blockAuto { g.cfg.logger.Log(LogLevelDebug, "autocommitting", "group", g.cfg.group) - g.commit(g.ctx, g.getUncommittedLocked(true), g.cfg.commitCallback) + g.commit(g.ctx, g.getUncommittedLocked(true, false), g.cfg.commitCallback) } g.mu.Unlock() } @@ -1536,22 +1595,20 @@ func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) { var topicAssigns map[int32]Offset for partition, epochOffset := range partitions { current, exists := topicUncommitted[partition] - if exists && current.head == epochOffset { - current.committed = epochOffset - topicUncommitted[partition] = current - continue + topicUncommitted[partition] = uncommit{ + dirty: epochOffset, + head: epochOffset, + committed: epochOffset, } - if topicAssigns == nil { + if exists && current.dirty == epochOffset { + continue + } else if topicAssigns == nil { topicAssigns = make(map[int32]Offset, len(partitions)) } topicAssigns[partition] = Offset{ at: epochOffset.Offset, epoch: epochOffset.Epoch, } - topicUncommitted[partition] = uncommit{ - head: epochOffset, - committed: epochOffset, - } } if len(topicAssigns) > 0 { if assigns == nil { @@ -1582,7 +1639,7 @@ func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) { // may fail with REBALANCE_IN_PROGRESS. func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset { if g := cl.consumer.g; g != nil { - return g.getUncommitted() + return g.getUncommitted(true) } return nil } @@ -1599,16 +1656,16 @@ func (cl *Client) CommittedOffsets() map[string]map[int32]EpochOffset { g.mu.Lock() defer g.mu.Unlock() - return g.getUncommittedLocked(false) + return g.getUncommittedLocked(false, false) } -func (g *groupConsumer) getUncommitted() map[string]map[int32]EpochOffset { +func (g *groupConsumer) getUncommitted(dirty bool) map[string]map[int32]EpochOffset { g.mu.Lock() defer g.mu.Unlock() - return g.getUncommittedLocked(true) + return g.getUncommittedLocked(true, dirty) } -func (g *groupConsumer) getUncommittedLocked(head bool) map[string]map[int32]EpochOffset { +func (g *groupConsumer) getUncommittedLocked(head, dirty bool) map[string]map[int32]EpochOffset { if g.uncommitted == nil { return nil } @@ -1617,7 +1674,7 @@ func (g *groupConsumer) getUncommittedLocked(head bool) map[string]map[int32]Epo for topic, partitions := range g.uncommitted { var topicUncommitted map[int32]EpochOffset for partition, uncommit := range partitions { - if head && uncommit.head == uncommit.committed { + if head && uncommit.dirty == uncommit.committed { continue } if topicUncommitted == nil { @@ -1631,7 +1688,11 @@ func (g *groupConsumer) getUncommittedLocked(head bool) map[string]map[int32]Epo } } if head { - topicUncommitted[partition] = uncommit.head + if dirty { + topicUncommitted[partition] = uncommit.dirty + } else { + topicUncommitted[partition] = uncommit.head + } } else { topicUncommitted[partition] = uncommit.committed } @@ -1917,7 +1978,7 @@ func (g *groupConsumer) defaultRevoke(context.Context, *Client, map[string][]int // We use the client's context rather than the group context, // because this could come from the group being left. The group // context will already be canceled. - g.commitOffsetsSync(g.cl.ctx, g.getUncommitted(), g.cfg.commitCallback) + g.commitOffsetsSync(g.cl.ctx, g.getUncommitted(false), g.cfg.commitCallback) } } diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index f87b5778..2c502383 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -115,33 +115,35 @@ func (c *testConsumer) goGroupETL(etlsBeforeQuit int) { func (c *testConsumer) etl(etlsBeforeQuit int) { defer c.wg.Done() + netls := 0 // for if etlsBeforeQuit is non-negative + opts := []Opt{ WithLogger(testLogger()), ConsumerGroup(c.group), ConsumeTopics(c.consumeFrom), Balancers(c.balancer), - } - if etlsBeforeQuit >= 0 { - // If we quit before consuming to the end, the behavior we are - // triggering is to poll a batch and _not_ commit. Thus, if we - // have etlsBeforeQuit, we do _not_ commit on leave, and so we - // disable autocommitting. + // Even with autocommitting, autocommitting does not commit + // *the latest* when being revoked. We always want to commit + // everything we have processed, because our loop below always + // is successful. If we do not commit on revoke, we would have + // duplicate processing. // - // However, we still want to commit on valid rebalances, so we - // set that option, BUT we do not want to commit on lost, which - // triggers when we leave. - opts = append(opts, - DisableAutoCommit(), - OnRevoked(func(ctx context.Context, cl *Client, _ map[string][]int32) { - // context.Canceled means the client left the - // group, so we ignore that. - if err := cl.CommitUncommittedOffsets(ctx); err != nil && err != context.Canceled { - c.errCh <- fmt.Errorf("unable to commit: %v", err) - } - }), - OnLost(func(context.Context, *Client, map[string][]int32) {}), - ) + // If we have etlsBeforeQuit, the behavior we want to trigger + // is to *not* commit when we leave. + // + // Lastly, we do not want to fall back from OnLost: OnLost + // should not be called due to us not erroring, but we may as + // well explicitly disable it. + OnRevoked(func(ctx context.Context, cl *Client, _ map[string][]int32) { + if etlsBeforeQuit >= 0 && netls >= etlsBeforeQuit { + return + } + if err := cl.CommitUncommittedOffsets(ctx); err != nil { + c.errCh <- fmt.Errorf("unable to commit: %v", err) + } + }), + OnLost(func(context.Context, *Client, map[string][]int32) {}), } cl, _ := NewClient(opts...) @@ -155,8 +157,6 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { } }() - netls := 0 // for if etlsBeforeQuit is non-negative - for { // If we etl a few times before quitting, then we want to @@ -178,7 +178,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { if consumed := atomic.LoadUint64(&c.consumed); consumed == testRecordLimit { return } else if consumed > testRecordLimit { - panic("invalid: consumed too much") + panic(fmt.Sprintf("invalid: consumed too much from %s (group %s)", c.consumeFrom, c.group)) } } @@ -204,7 +204,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { c.mu.Lock() // check dup if _, exists := c.partOffsets[partOffset{r.Partition, r.Offset}]; exists { - c.errCh <- fmt.Errorf("saw double offset p%do%d", r.Partition, r.Offset) + c.errCh <- fmt.Errorf("saw double offset t %s p%do%d", r.Topic, r.Partition, r.Offset) } c.partOffsets[partOffset{r.Partition, r.Offset}] = struct{}{}