Skip to content

Commit

Permalink
producer: fix potential lingering recBuf issue
Browse files Browse the repository at this point in the history
See large embedded comment; we no longer remove topics from the producer
map ever.
  • Loading branch information
twmb committed Apr 26, 2021
1 parent 19d57dc commit ce113d5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 16 deletions.
29 changes: 16 additions & 13 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,6 @@ func (cl *Client) waitUnknownTopic(
// unknown topic map is still the same.
p := &cl.producer

p.topicsMu.Lock()
defer p.topicsMu.Unlock()
p.unknownTopicsMu.Lock()
defer p.unknownTopicsMu.Unlock()

Expand All @@ -529,20 +527,25 @@ func (cl *Client) waitUnknownTopic(
cl.cfg.logger.Log(LogLevelInfo, "unknown topic wait failed, done retrying, failing all records", "topic", topic)

delete(p.unknownTopics, topic)
cl.deleteUnknownTopic(topic, unknown, err)
cl.failUnknownTopicRecords(topic, unknown, err)
}

// Called under both topic and unknown mu's, this clears the topic from the
// producer and finishes all buffered promises.
// Called under the unknown mu, this finishes promises for an unknown topic.
//
// We do not need to clear recBufs in sinks when deleting unknown topics
// because unknown topics implies that partitions were never loaded, thus no
// recBufs in sinks.
func (cl *Client) deleteUnknownTopic(topic string, unknown *unknownTopicProduces, err error) {
topics := cl.producer.topics.clone()
delete(topics, topic)
cl.producer.topics.storeData(topics)

// We do not delete from the producer's topics due to potential concurrent
// metadata updating issues: if the metadata has an active request loading for
// a topic we are actively deleting now, and that request finally loads the
// topic successfully, it will create recBuf pointers that will not be cleaned
// up.
//
// We could work around this using the same blockingMetadataFn type logic that
// we use when unsetting a consumer, but it's more finnicky for a producer
// because we want to knife out a single topic.
//
// Leaving a topic buffered even if we failed it as unknown should be of no
// consequence because clients should not really be producing to loads of
// unknown topics.
func (cl *Client) failUnknownTopicRecords(topic string, unknown *unknownTopicProduces, err error) {
go func() {
for _, pr := range unknown.buffered {
cl.finishRecordPromise(pr, err)
Expand Down
4 changes: 1 addition & 3 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ func (cl *Client) storePartitionsUpdate(topic string, l *topicPartitions, lv *to

p := &cl.producer

p.topicsMu.Lock()
defer p.topicsMu.Unlock()
p.unknownTopicsMu.Lock()
defer p.unknownTopicsMu.Unlock()

Expand Down Expand Up @@ -151,7 +149,7 @@ func (cl *Client) storePartitionsUpdate(topic string, l *topicPartitions, lv *to
close(unknown.wait) // allow waiting goroutine to quit

if len(lv.partitions) == 0 {
cl.deleteUnknownTopic(topic, unknown, lv.loadErr)
cl.failUnknownTopicRecords(topic, unknown, lv.loadErr)
} else {
for _, pr := range unknown.buffered {
cl.doPartitionRecord(l, lv, pr)
Expand Down

0 comments on commit ce113d5

Please sign in to comment.