From 4fadcdec5076d6639454dae592f7e5551c1edaee Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 3 Aug 2021 18:15:38 -0600 Subject: [PATCH] consuming: fix logic race, simplify some logic If metadata stopped a consumer session concurrent with a consumer session needing to be stopped for a topic assignment, then the reload offsets could be pushed into loading with the next session. If the second assignment was to filter some offsets (say due to group rebalance), then it would miss filtering what was stopped in the metadata request. By ensuring that we have one worker when starting a new session, we allow work to be done within that session before it can be stopped again. The race here was extremely unlikely because not only is it rare for these two events to occur at once, it would be even more rare for the go scheduler to not favor just continuing the one goroutine that was executing some logic. Finally, with a closure, we move the session stopping and partition whatnot logic into the metadata code where the variables are initialized. This keeps the code closer together so that it is easier to keep everything in your head at once. --- pkg/kgo/consumer.go | 33 +++++++++++++++++++++++++++++--- pkg/kgo/metadata.go | 32 ++++++++++++++++++------------- pkg/kgo/topics_and_partitions.go | 15 ++------------- 3 files changed, 51 insertions(+), 29 deletions(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 7c3bab39..15234b43 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -426,8 +426,17 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how defer func() { if session == nil { // if nil, we stopped the session session = c.startNewSession(tps) + } else { // else we guarded it + c.unguardSessionChange(session) } loadOffsets.loadWithSession(session) // odds are this assign came from a metadata update, so no reason to force a refresh with loadWithSessionNow + + // If we started a new session or if we unguarded, we have one + // worker. This one worker allowed us to safely add our load + // offsets before the session could be concurrently stopped + // again. Now that we have added the load offsets, we allow the + // session to be stopped. + session.decWorker() }() if how == assignWithoutInvalidating { @@ -435,8 +444,6 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how // if we had no session before, which is why we need to pass in // our topicPartitions. session = c.guardSessionChange(tps) - defer c.unguardSessionChange() - } else { loadOffsets, _ = c.stopSession() @@ -877,12 +884,18 @@ func (c *consumerSession) manageFetchConcurrency() { } func (c *consumerSession) incWorker() { + if c == noConsumerSession { // from startNewSession + return + } c.workersMu.Lock() defer c.workersMu.Unlock() c.workers++ } func (c *consumerSession) decWorker() { + if c == noConsumerSession { // from followup to startNewSession + return + } c.workersMu.Lock() defer c.workersMu.Unlock() c.workers-- @@ -922,7 +935,11 @@ func (c *consumer) guardSessionChange(tps *topicsPartitions) *consumerSession { return session } -func (c *consumer) unguardSessionChange() { +// For the same reason below as in startNewSession, we inc a worker before +// unguarding. This allows the unguarding to execute a bit of logic if +// necessary before the session can be stopped. +func (c *consumer) unguardSessionChange(session *consumerSession) { + session.incWorker() c.sessionChangeMu.Unlock() } @@ -987,10 +1004,20 @@ func (c *consumer) stopSession() (listOrEpochLoads, *topicsPartitions) { } // Starts a new consumer session, allowing fetches to happen. +// +// If there are no topic partitions to start with, this returns noConsumerSession. +// +// This is returned with 1 worker; decWorker must be called after return. The +// 1 worker allows for initialization work to prevent the session from being +// immediately stopped. func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession { session := c.newConsumerSession(tps) c.session.Store(session) + // Ensure that this session is usable before being stopped immediately. + // The caller must dec workers. + session.incWorker() + // At this point, sources can start consuming. c.sessionChangeMu.Unlock() diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index ae552740..3640a3cd 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -264,19 +264,29 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) { defer tpsConsumer.storeData(tpsConsumerLoad) } + // Migrating a cursor requires stopping any consumer session. If we + // stop a session, we need to eventually re-start any offset listing or + // epoch loading that was stopped. Thus, we simply merge what we + // stopped into what we will reload. var ( consumerSessionStopped bool reloadOffsets listOrEpochLoads tpsPrior *topicsPartitions ) - - // Before we return, if we stopped the session, we need to restart it - // with the topic partitions we were consuming. Lastly, we need to - // trigger the consumer metadata update to allow consumers waiting to - // continue. + stopConsumerSession := func() { + if consumerSessionStopped { + return + } + consumerSessionStopped = true + loads, tps := cl.consumer.stopSession() + reloadOffsets.mergeFrom(loads) + tpsPrior = tps + } defer func() { if consumerSessionStopped { - reloadOffsets.loadWithSession(cl.consumer.startNewSession(tpsPrior)) + session := cl.consumer.startNewSession(tpsPrior) + defer session.decWorker() + reloadOffsets.loadWithSession(session) } }() @@ -301,9 +311,8 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) { priorParts, newParts, m.isProduce, - &consumerSessionStopped, &reloadOffsets, - &tpsPrior, + stopConsumerSession, ) } } @@ -449,9 +458,8 @@ func (cl *Client) mergeTopicPartitions( l *topicPartitions, r *topicPartitionsData, isProduce bool, - consumerSessionStopped *bool, reloadOffsets *listOrEpochLoads, - tpsPrior **topicsPartitions, + stopConsumerSession func(), ) (needsRetry bool) { lv := *l.load() // copy so our field writes do not collide with reads @@ -565,10 +573,8 @@ func (cl *Client) mergeTopicPartitions( } else { oldTP.migrateCursorTo( newTP, - &cl.consumer, - consumerSessionStopped, reloadOffsets, - tpsPrior, + stopConsumerSession, ) } } diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index 12245c96..d0a1263f 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -302,21 +302,10 @@ func (old *topicPartition) migrateProductionTo(new *topicPartition) { // have fewer concurrency issues to worry about. func (old *topicPartition) migrateCursorTo( new *topicPartition, - consumer *consumer, - consumerSessionStopped *bool, reloadOffsets *listOrEpochLoads, - tpsPrior **topicsPartitions, + stopConsumerSession func(), ) { - // Migrating a cursor requires stopping any consumer session. If we - // stop a session, we need to eventually re-start any offset listing or - // epoch loading that was stopped. Thus, we simply merge what we - // stopped into what we will reload. - if !*consumerSessionStopped { - loads, tps := consumer.stopSession() - reloadOffsets.mergeFrom(loads) - *tpsPrior = tps - *consumerSessionStopped = true - } + stopConsumerSession() old.cursor.source.removeCursor(old.cursor)