diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 09f729cd..13ac511b 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -456,6 +456,55 @@ func (cl *Client) ResumeFetchPartitions(topicPartitions map[string][]int32) { c.storePaused(paused) } +// SetOffsets sets any matching offsets in setOffsets to the given +// epoch/offset. Partitions that are not specified are not set. It is invalid +// to set topics that were not yet returned from a PollFetches: this function +// sets only partitions that were previously consumed, any extra partitions are +// skipped. +// +// If using transactions, it is advised to just use a GroupTransactSession and +// avoid this function entirely. +// +// It is strongly recommended to use this function outside of the context of a +// PollFetches loop and only when you know the group is not revoked (i.e., +// block any concurrent revoke while issuing this call). Any other usage is +// prone to odd interactions. +func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) { + cl.setOffsets(setOffsets, true) +} + +func (cl *Client) setOffsets(setOffsets map[string]map[int32]EpochOffset, log bool) { + if len(setOffsets) == 0 { + return + } + + // We assignPartitions before returning, so we grab the consumer lock + // first to preserve consumer mu => group mu ordering, or to ensure + // no concurrent metadata assign for direct consuming. + c := &cl.consumer + c.mu.Lock() + defer c.mu.Unlock() + + var assigns map[string]map[int32]Offset + var tps *topicsPartitions + switch { + case c.d != nil: + assigns = c.d.getSetAssigns(setOffsets) + tps = c.d.tps + case c.g != nil: + assigns = c.g.getSetAssigns(setOffsets) + tps = c.g.tps + } + if len(assigns) == 0 { + return + } + if log { + c.assignPartitions(assigns, assignSetMatching, tps, "from manual SetOffsets") + } else { + c.assignPartitions(assigns, assignSetMatching, tps, "") + } +} + // assignHow controls how assignPartitions operates. type assignHow int8 @@ -708,7 +757,8 @@ func (c *consumer) doOnMetadataUpdate() { doUpdate := func() { // We forbid reassignments while we do a quick check for // new assignments--for the direct consumer particularly, - // this prevents TOCTOU. + // this prevents TOCTOU, and guards against a concurrent + // assignment from SetOffsets. c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/kgo/consumer_direct.go b/pkg/kgo/consumer_direct.go index 4b6cb728..fbfdae47 100644 --- a/pkg/kgo/consumer_direct.go +++ b/pkg/kgo/consumer_direct.go @@ -30,6 +30,23 @@ func (c *consumer) initDirect() { d.tps.storeTopics(topics) // prime topics to load if non-regex (this is of no benefit if regex) } +// For SetOffsets, unlike the group consumer, we just blindly translate the +// input EpochOffsets into Offsets, and those will be set directly. +func (d *directConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffset) (assigns map[string]map[int32]Offset) { + assigns = make(map[string]map[int32]Offset) + for topic, partitions := range setOffsets { + set := make(map[int32]Offset) + for partition, eo := range partitions { + set[partition] = Offset{ + at: eo.Offset, + epoch: eo.Epoch, + } + } + assigns[topic] = set + } + return assigns +} + // findNewAssignments returns new partitions to consume at given offsets // based off the current topics. func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset { diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 277012cd..ecde7810 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1695,49 +1695,19 @@ func (g *groupConsumer) loopCommit() { } } -// SetOffsets, for consumer groups, sets any matching offsets in setOffsets to -// the given epoch/offset. Partitions that are not specified are not set. It is -// invalid to set topics that were not yet returned from a PollFetches. +// For SetOffsets, the gist of what follows: // -// If using transactions, it is advised to just use a GroupTransactSession and -// avoid this function entirely. -// -// It is strongly recommended to use this function outside of the context of a -// PollFetches loop and only when you know the group is not revoked (i.e., -// block any concurrent revoke while issuing this call). Any other usage is -// prone to odd interactions. -func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) { - cl.setOffsets(setOffsets, true) -} - -func (cl *Client) setOffsets(setOffsets map[string]map[int32]EpochOffset, log bool) { - if len(setOffsets) == 0 { - return - } - - // We assignPartitions before returning, so we grab the consumer lock - // first to preserve consumer mu => group mu ordering. - c := &cl.consumer - c.mu.Lock() - defer c.mu.Unlock() - - g := c.g - if g == nil { - return - } +// We need to set uncommitted.committed; that is the guarantee of this +// function. However, if, for everything we are setting, the head equals the +// commit, then we do not need to actually invalidate our current assignments. +// This is a great optimization for transactions that are resetting their state +// on abort. +func (g *groupConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffset) (assigns map[string]map[int32]Offset) { g.mu.Lock() defer g.mu.Unlock() groupTopics := g.tps.load() - // The gist of what follows: - // - // We need to set uncommitted.committed; that is the guarantee of this - // function. However, if, for everything we are setting, the head - // equals the commit, then we do not need to actually invalidate our - // current assignments. This is a great optimization for transactions - // that are resetting their state on abort. - var assigns map[string]map[int32]Offset if g.uncommitted == nil { g.uncommitted = make(uncommitted) } @@ -1776,15 +1746,7 @@ func (cl *Client) setOffsets(setOffsets map[string]map[int32]EpochOffset, log bo } } - if len(assigns) == 0 { - return - } - - if log { - c.assignPartitions(assigns, assignSetMatching, g.tps, "from manual SetOffsets") - } else { - c.assignPartitions(assigns, assignSetMatching, g.tps, "") - } + return assigns } // UncommittedOffsets returns the latest uncommitted offsets. Uncommitted