diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 8aa5ee51..d82bead3 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -517,8 +517,6 @@ func (cl *Client) waitUnknownTopic( // unknown topic map is still the same. p := &cl.producer - p.topicsMu.Lock() - defer p.topicsMu.Unlock() p.unknownTopicsMu.Lock() defer p.unknownTopicsMu.Unlock() @@ -529,20 +527,25 @@ func (cl *Client) waitUnknownTopic( cl.cfg.logger.Log(LogLevelInfo, "unknown topic wait failed, done retrying, failing all records", "topic", topic) delete(p.unknownTopics, topic) - cl.deleteUnknownTopic(topic, unknown, err) + cl.failUnknownTopicRecords(topic, unknown, err) } -// Called under both topic and unknown mu's, this clears the topic from the -// producer and finishes all buffered promises. +// Called under the unknown mu, this finishes promises for an unknown topic. // -// We do not need to clear recBufs in sinks when deleting unknown topics -// because unknown topics implies that partitions were never loaded, thus no -// recBufs in sinks. -func (cl *Client) deleteUnknownTopic(topic string, unknown *unknownTopicProduces, err error) { - topics := cl.producer.topics.clone() - delete(topics, topic) - cl.producer.topics.storeData(topics) - +// We do not delete from the producer's topics due to potential concurrent +// metadata updating issues: if the metadata has an active request loading for +// a topic we are actively deleting now, and that request finally loads the +// topic successfully, it will create recBuf pointers that will not be cleaned +// up. +// +// We could work around this using the same blockingMetadataFn type logic that +// we use when unsetting a consumer, but it's more finnicky for a producer +// because we want to knife out a single topic. +// +// Leaving a topic buffered even if we failed it as unknown should be of no +// consequence because clients should not really be producing to loads of +// unknown topics. +func (cl *Client) failUnknownTopicRecords(topic string, unknown *unknownTopicProduces, err error) { go func() { for _, pr := range unknown.buffered { cl.finishRecordPromise(pr, err) diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index e76ca369..4543f265 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -99,8 +99,6 @@ func (cl *Client) storePartitionsUpdate(topic string, l *topicPartitions, lv *to p := &cl.producer - p.topicsMu.Lock() - defer p.topicsMu.Unlock() p.unknownTopicsMu.Lock() defer p.unknownTopicsMu.Unlock() @@ -151,7 +149,7 @@ func (cl *Client) storePartitionsUpdate(topic string, l *topicPartitions, lv *to close(unknown.wait) // allow waiting goroutine to quit if len(lv.partitions) == 0 { - cl.deleteUnknownTopic(topic, unknown, lv.loadErr) + cl.failUnknownTopicRecords(topic, unknown, lv.loadErr) } else { for _, pr := range unknown.buffered { cl.doPartitionRecord(l, lv, pr)