Skip to content

Commit

Permalink
consumer: allow SetOffsets for direct partition consuming
Browse files Browse the repository at this point in the history
The part that actually set offsets was not really specific to group
consuming, this was previously gated to group consuming due to updating
uncommitted. That's just a clean separate codepath.
  • Loading branch information
twmb committed Jan 6, 2022
1 parent 3cb68bf commit 2eae20d
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 47 deletions.
52 changes: 51 additions & 1 deletion pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,55 @@ func (cl *Client) ResumeFetchPartitions(topicPartitions map[string][]int32) {
c.storePaused(paused)
}

// SetOffsets sets any matching offsets in setOffsets to the given
// epoch/offset. Partitions that are not specified are not set. It is invalid
// to set topics that were not yet returned from a PollFetches: this function
// sets only partitions that were previously consumed, any extra partitions are
// skipped.
//
// If using transactions, it is advised to just use a GroupTransactSession and
// avoid this function entirely.
//
// It is strongly recommended to use this function outside of the context of a
// PollFetches loop and only when you know the group is not revoked (i.e.,
// block any concurrent revoke while issuing this call). Any other usage is
// prone to odd interactions.
func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) {
cl.setOffsets(setOffsets, true)
}

func (cl *Client) setOffsets(setOffsets map[string]map[int32]EpochOffset, log bool) {
if len(setOffsets) == 0 {
return
}

// We assignPartitions before returning, so we grab the consumer lock
// first to preserve consumer mu => group mu ordering, or to ensure
// no concurrent metadata assign for direct consuming.
c := &cl.consumer
c.mu.Lock()
defer c.mu.Unlock()

var assigns map[string]map[int32]Offset
var tps *topicsPartitions
switch {
case c.d != nil:
assigns = c.d.getSetAssigns(setOffsets)
tps = c.d.tps
case c.g != nil:
assigns = c.g.getSetAssigns(setOffsets)
tps = c.g.tps
}
if len(assigns) == 0 {
return
}
if log {
c.assignPartitions(assigns, assignSetMatching, tps, "from manual SetOffsets")
} else {
c.assignPartitions(assigns, assignSetMatching, tps, "")
}
}

// assignHow controls how assignPartitions operates.
type assignHow int8

Expand Down Expand Up @@ -708,7 +757,8 @@ func (c *consumer) doOnMetadataUpdate() {
doUpdate := func() {
// We forbid reassignments while we do a quick check for
// new assignments--for the direct consumer particularly,
// this prevents TOCTOU.
// this prevents TOCTOU, and guards against a concurrent
// assignment from SetOffsets.
c.mu.Lock()
defer c.mu.Unlock()

Expand Down
17 changes: 17 additions & 0 deletions pkg/kgo/consumer_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,23 @@ func (c *consumer) initDirect() {
d.tps.storeTopics(topics) // prime topics to load if non-regex (this is of no benefit if regex)
}

// For SetOffsets, unlike the group consumer, we just blindly translate the
// input EpochOffsets into Offsets, and those will be set directly.
func (d *directConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffset) (assigns map[string]map[int32]Offset) {
assigns = make(map[string]map[int32]Offset)
for topic, partitions := range setOffsets {
set := make(map[int32]Offset)
for partition, eo := range partitions {
set[partition] = Offset{
at: eo.Offset,
epoch: eo.Epoch,
}
}
assigns[topic] = set
}
return assigns
}

// findNewAssignments returns new partitions to consume at given offsets
// based off the current topics.
func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {
Expand Down
54 changes: 8 additions & 46 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1695,49 +1695,19 @@ func (g *groupConsumer) loopCommit() {
}
}

// SetOffsets, for consumer groups, sets any matching offsets in setOffsets to
// the given epoch/offset. Partitions that are not specified are not set. It is
// invalid to set topics that were not yet returned from a PollFetches.
// For SetOffsets, the gist of what follows:
//
// If using transactions, it is advised to just use a GroupTransactSession and
// avoid this function entirely.
//
// It is strongly recommended to use this function outside of the context of a
// PollFetches loop and only when you know the group is not revoked (i.e.,
// block any concurrent revoke while issuing this call). Any other usage is
// prone to odd interactions.
func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) {
cl.setOffsets(setOffsets, true)
}

func (cl *Client) setOffsets(setOffsets map[string]map[int32]EpochOffset, log bool) {
if len(setOffsets) == 0 {
return
}

// We assignPartitions before returning, so we grab the consumer lock
// first to preserve consumer mu => group mu ordering.
c := &cl.consumer
c.mu.Lock()
defer c.mu.Unlock()

g := c.g
if g == nil {
return
}
// We need to set uncommitted.committed; that is the guarantee of this
// function. However, if, for everything we are setting, the head equals the
// commit, then we do not need to actually invalidate our current assignments.
// This is a great optimization for transactions that are resetting their state
// on abort.
func (g *groupConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffset) (assigns map[string]map[int32]Offset) {
g.mu.Lock()
defer g.mu.Unlock()

groupTopics := g.tps.load()

// The gist of what follows:
//
// We need to set uncommitted.committed; that is the guarantee of this
// function. However, if, for everything we are setting, the head
// equals the commit, then we do not need to actually invalidate our
// current assignments. This is a great optimization for transactions
// that are resetting their state on abort.
var assigns map[string]map[int32]Offset
if g.uncommitted == nil {
g.uncommitted = make(uncommitted)
}
Expand Down Expand Up @@ -1776,15 +1746,7 @@ func (cl *Client) setOffsets(setOffsets map[string]map[int32]EpochOffset, log bo
}
}

if len(assigns) == 0 {
return
}

if log {
c.assignPartitions(assigns, assignSetMatching, g.tps, "from manual SetOffsets")
} else {
c.assignPartitions(assigns, assignSetMatching, g.tps, "")
}
return assigns
}

// UncommittedOffsets returns the latest uncommitted offsets. Uncommitted
Expand Down

0 comments on commit 2eae20d

Please sign in to comment.