Skip to content

Commit

Permalink
consuming: fix logic race, simplify some logic
Browse files Browse the repository at this point in the history
If metadata stopped a consumer session concurrent with a consumer
session needing to be stopped for a topic assignment, then the reload
offsets could be pushed into loading with the next session.

If the second assignment was to filter some offsets (say due to group
rebalance), then it would miss filtering what was stopped in the
metadata request.

By ensuring that we have one worker when starting a new session, we
allow work to be done within that session before it can be stopped
again.

The race here was extremely unlikely because not only is it rare for
these two events to occur at once, it would be even more rare for the go
scheduler to not favor just continuing the one goroutine that was
executing some logic.

Finally, with a closure, we move the session stopping and partition
whatnot logic into the metadata code where the variables are
initialized. This keeps the code closer together so that it is easier to
keep everything in your head at once.
  • Loading branch information
twmb committed Aug 4, 2021
1 parent cab3818 commit 4fadcde
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 29 deletions.
33 changes: 30 additions & 3 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,17 +426,24 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
defer func() {
if session == nil { // if nil, we stopped the session
session = c.startNewSession(tps)
} else { // else we guarded it
c.unguardSessionChange(session)
}
loadOffsets.loadWithSession(session) // odds are this assign came from a metadata update, so no reason to force a refresh with loadWithSessionNow

// If we started a new session or if we unguarded, we have one
// worker. This one worker allowed us to safely add our load
// offsets before the session could be concurrently stopped
// again. Now that we have added the load offsets, we allow the
// session to be stopped.
session.decWorker()
}()

if how == assignWithoutInvalidating {
// Guarding a session change can actually create a new session
// if we had no session before, which is why we need to pass in
// our topicPartitions.
session = c.guardSessionChange(tps)
defer c.unguardSessionChange()

} else {
loadOffsets, _ = c.stopSession()

Expand Down Expand Up @@ -877,12 +884,18 @@ func (c *consumerSession) manageFetchConcurrency() {
}

func (c *consumerSession) incWorker() {
if c == noConsumerSession { // from startNewSession
return
}
c.workersMu.Lock()
defer c.workersMu.Unlock()
c.workers++
}

func (c *consumerSession) decWorker() {
if c == noConsumerSession { // from followup to startNewSession
return
}
c.workersMu.Lock()
defer c.workersMu.Unlock()
c.workers--
Expand Down Expand Up @@ -922,7 +935,11 @@ func (c *consumer) guardSessionChange(tps *topicsPartitions) *consumerSession {
return session
}

func (c *consumer) unguardSessionChange() {
// For the same reason below as in startNewSession, we inc a worker before
// unguarding. This allows the unguarding to execute a bit of logic if
// necessary before the session can be stopped.
func (c *consumer) unguardSessionChange(session *consumerSession) {
session.incWorker()
c.sessionChangeMu.Unlock()
}

Expand Down Expand Up @@ -987,10 +1004,20 @@ func (c *consumer) stopSession() (listOrEpochLoads, *topicsPartitions) {
}

// Starts a new consumer session, allowing fetches to happen.
//
// If there are no topic partitions to start with, this returns noConsumerSession.
//
// This is returned with 1 worker; decWorker must be called after return. The
// 1 worker allows for initialization work to prevent the session from being
// immediately stopped.
func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession {
session := c.newConsumerSession(tps)
c.session.Store(session)

// Ensure that this session is usable before being stopped immediately.
// The caller must dec workers.
session.incWorker()

// At this point, sources can start consuming.

c.sessionChangeMu.Unlock()
Expand Down
32 changes: 19 additions & 13 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,19 +264,29 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) {
defer tpsConsumer.storeData(tpsConsumerLoad)
}

// Migrating a cursor requires stopping any consumer session. If we
// stop a session, we need to eventually re-start any offset listing or
// epoch loading that was stopped. Thus, we simply merge what we
// stopped into what we will reload.
var (
consumerSessionStopped bool
reloadOffsets listOrEpochLoads
tpsPrior *topicsPartitions
)

// Before we return, if we stopped the session, we need to restart it
// with the topic partitions we were consuming. Lastly, we need to
// trigger the consumer metadata update to allow consumers waiting to
// continue.
stopConsumerSession := func() {
if consumerSessionStopped {
return
}
consumerSessionStopped = true
loads, tps := cl.consumer.stopSession()
reloadOffsets.mergeFrom(loads)
tpsPrior = tps
}
defer func() {
if consumerSessionStopped {
reloadOffsets.loadWithSession(cl.consumer.startNewSession(tpsPrior))
session := cl.consumer.startNewSession(tpsPrior)
defer session.decWorker()
reloadOffsets.loadWithSession(session)
}
}()

Expand All @@ -301,9 +311,8 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) {
priorParts,
newParts,
m.isProduce,
&consumerSessionStopped,
&reloadOffsets,
&tpsPrior,
stopConsumerSession,
)
}
}
Expand Down Expand Up @@ -449,9 +458,8 @@ func (cl *Client) mergeTopicPartitions(
l *topicPartitions,
r *topicPartitionsData,
isProduce bool,
consumerSessionStopped *bool,
reloadOffsets *listOrEpochLoads,
tpsPrior **topicsPartitions,
stopConsumerSession func(),
) (needsRetry bool) {
lv := *l.load() // copy so our field writes do not collide with reads

Expand Down Expand Up @@ -565,10 +573,8 @@ func (cl *Client) mergeTopicPartitions(
} else {
oldTP.migrateCursorTo(
newTP,
&cl.consumer,
consumerSessionStopped,
reloadOffsets,
tpsPrior,
stopConsumerSession,
)
}
}
Expand Down
15 changes: 2 additions & 13 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,21 +302,10 @@ func (old *topicPartition) migrateProductionTo(new *topicPartition) {
// have fewer concurrency issues to worry about.
func (old *topicPartition) migrateCursorTo(
new *topicPartition,
consumer *consumer,
consumerSessionStopped *bool,
reloadOffsets *listOrEpochLoads,
tpsPrior **topicsPartitions,
stopConsumerSession func(),
) {
// Migrating a cursor requires stopping any consumer session. If we
// stop a session, we need to eventually re-start any offset listing or
// epoch loading that was stopped. Thus, we simply merge what we
// stopped into what we will reload.
if !*consumerSessionStopped {
loads, tps := consumer.stopSession()
reloadOffsets.mergeFrom(loads)
*tpsPrior = tps
*consumerSessionStopped = true
}
stopConsumerSession()

old.cursor.source.removeCursor(old.cursor)

Expand Down

0 comments on commit 4fadcde

Please sign in to comment.