Skip to content

Commit

Permalink
Merge pull request #404 from twmb/bugfix
Browse files Browse the repository at this point in the history
producer: avoid deadlock when when quickly recreating a topic
  • Loading branch information
twmb authored Mar 21, 2023
2 parents d984c21 + 5e0ba1c commit 1a59c2d
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 28 deletions.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ linters-settings:
- evalOrder
- ifElseChain
- importShadow
- ioutilDeprecated
- ptrToRefParam
- sloppyReassign
- tooManyResultsChecker
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
v1.13.1
===

This patch release fixes a bug where a producer could enter a deadlock if a
topic is deleted and recreated very quickly while producing.

- [`769e02f`](https://github.com/twmb/franz-go/commit/769e02f) producer: avoid deadlock when when quickly recreating a topic

v1.13.0
===

Expand Down
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,8 @@ For 100 byte messages,
- This client is 2.5x faster at producing than sarama, and 1.5x faster at
consuming.

- This client is 2.4x faster at producing than segment's kafka-go, and so
much faster at consuming that I'm not sure I wrote the consuming comparison
correctly here.
- This client is 2.4x faster at producing than segment's kafka-go, and anywhere
from 2x to 6x faster at consuming.

To check benchmarks yourself, see the [bench](./examples/bench) example. This
example lets you produce or consume to a cluster and see the byte / record
Expand Down
61 changes: 37 additions & 24 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,34 +682,47 @@ func (cl *Client) resetAllProducerSequences() {
func (cl *Client) failProducerID(id int64, epoch int16, err error) {
p := &cl.producer

p.idMu.Lock()
defer p.idMu.Unlock()

current := p.id.Load().(*producerID)
if current.id != id || current.epoch != epoch {
cl.cfg.logger.Log(LogLevelInfo, "ignoring a fail producer id request due to current id being different",
"current_id", current.id,
"current_epoch", current.epoch,
"current_err", current.err,
"fail_id", id,
"fail_epoch", epoch,
"fail_err", err,
)
return // failed an old id
}

// If this is not UnknownProducerID, then we cannot recover production.
// We do not lock the idMu when failing a producer ID, for two reasons.
//
// If this is UnknownProducerID without a txnID, then we are here from
// stopOnDataLoss in sink.go (see large comment there).
// 1) With how we store below, we do not need to. We only fail if the
// ID we are failing has not changed and if the ID we are failing has
// not failed already. Failing outside the lock is the same as failing
// within the lock.
//
// If this is UnknownProducerID with a txnID, then EndTransaction will
// recover us.
p.id.Store(&producerID{
// 2) Locking would cause a deadlock, because producerID locks
// idMu=>recBuf.Mu, whereas we failing while locked within a recBuf in
// sink.go.
new := &producerID{
id: id,
epoch: epoch,
err: err,
})
}
for {
current := p.id.Load().(*producerID)
if current.id != id || current.epoch != epoch {
cl.cfg.logger.Log(LogLevelInfo, "ignoring a fail producer id request due to current id being different",
"current_id", current.id,
"current_epoch", current.epoch,
"current_err", current.err,
"fail_id", id,
"fail_epoch", epoch,
"fail_err", err,
)
return
}
if current.err != nil {
cl.cfg.logger.Log(LogLevelInfo, "ignoring a fail producer id because our producer id has already been failed",
"current_id", current.id,
"current_epoch", current.epoch,
"current_err", current.err,
"fail_err", err,
)
return
}
if p.id.CompareAndSwap(current, new) {
return
}
}
}

// doInitProducerID inits the idempotent ID and potentially the transactional
Expand Down Expand Up @@ -795,7 +808,7 @@ func (cl *Client) partitionsForTopicProduce(pr promisedRec) (*topicPartitions, *

p.topics.storeTopics([]string{topic})
cl.addUnknownTopicRecord(pr)
cl.triggerUpdateMetadataNow("forced load because we are producing to a new topic for the first time")
cl.triggerUpdateMetadataNow("forced load because we are producing to a topic for the first time")
return nil, nil
}
}
Expand Down

0 comments on commit 1a59c2d

Please sign in to comment.