From 42e5b5713cbb20a4e701aaf942fb4f24158eceb7 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 30 Apr 2022 14:06:40 -0600 Subject: [PATCH] producer: allow a canceled context & aborting to quit unknown wait Unknown topic waiting was missing two areas to "quit" that existed _after_ a topic was loaded (and no longer unknown). Aborting or a canceled record context would not quit waiting. We now quit on both. For aborting, we add a new channel to the unknown type for must-quit errors. Previously, all errors being sent to unknown.wait would be retriable (either retriable kerr's, or retriable client connection failures). We do not want to send our must-quit signal to the unknown.wait channel because *theoretically* it could be fully buffered and thus either we would have to (a) wait to send, which has problems if the wait loop quits, or (b) select/default, which would drop our must-quit error. We now use a size-1 channel that, if ever received from, always quits the loop. As well, for a ctx-canceled record, similar to batches, we use the first record's context. Closes #158. --- pkg/kgo/producer.go | 30 ++++++++++++++++++++++++------ pkg/kgo/topics_and_partitions.go | 2 +- pkg/kgo/txn.go | 12 ++++++++++++ 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index a90392a6..b6c6e23a 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -89,7 +89,8 @@ func (cl *Client) BufferedProduceRecords() int64 { type unknownTopicProduces struct { buffered []promisedRec - wait chan error + wait chan error // retriable errors + fatal chan error // must-signal quit errors; capacity 1 } func (p *producer) init(cl *Client) { @@ -787,36 +788,53 @@ func (cl *Client) addUnknownTopicRecord(pr promisedRec) { unknown = &unknownTopicProduces{ buffered: make([]promisedRec, 0, 100), wait: make(chan error, 5), + fatal: make(chan error, 1), } cl.producer.unknownTopics[pr.Topic] = unknown } unknown.buffered = append(unknown.buffered, pr) if len(unknown.buffered) == 1 { - go cl.waitUnknownTopic(pr.Topic, unknown) + go cl.waitUnknownTopic(pr.ctx, pr.Topic, unknown) } } // waitUnknownTopic waits for a notification func (cl *Client) waitUnknownTopic( + rctx context.Context, topic string, unknown *unknownTopicProduces, ) { cl.cfg.logger.Log(LogLevelInfo, "producing to a new topic for the first time, fetching metadata to learn its partitions", "topic", topic) - var after <-chan time.Time + + var ( + tries int + unknownTries int64 + err error + after <-chan time.Time + ) + if timeout := cl.cfg.recordTimeout; timeout > 0 { timer := time.NewTimer(cl.cfg.recordTimeout) defer timer.Stop() after = timer.C } - var tries int - var unknownTries int64 - var err error + + // Ordering: aborting is set first, then unknown topics are manually + // canceled in a lock. New unknown topics after that lock will see + // aborting here and immediately cancel themselves. + if cl.producer.isAborting() { + err = ErrAborting + } + for err == nil { select { + case <-rctx.Done(): + err = rctx.Err() case <-cl.ctx.Done(): err = ErrClientClosed case <-after: err = ErrRecordTimeout + case err = <-unknown.fatal: case retriableErr, ok := <-unknown.wait: if !ok { cl.cfg.logger.Log(LogLevelInfo, "done waiting for metadata for new topic", "topic", topic) diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index 5872dff3..154db3a8 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -391,7 +391,7 @@ func (cl *Client) bumpMetadataFailForTopics(requested map[string]*topicPartition defer p.unknownTopicsMu.Unlock() for topic, unknown := range p.unknownTopics { - // if nil, mode 1, else mode 2 + // if nil, mode 1 (req err), else mode 2 (missing resp) if missing != nil && !missing[topic] { continue } diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 6cf84006..1a7dda3c 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -706,6 +706,18 @@ func (cl *Client) AbortBufferedRecords(ctx context.Context) error { cl.cfg.logger.Log(LogLevelInfo, "producer state set to aborting; continuing to wait via flushing") defer cl.cfg.logger.Log(LogLevelDebug, "aborted buffered records") + // We must clear unknown topics ourselves, because flush just waits + // like normal. + p := &cl.producer + p.unknownTopicsMu.Lock() + for _, unknown := range p.unknownTopics { + select { + case unknown.fatal <- ErrAborting: + default: + } + } + p.unknownTopicsMu.Unlock() + // Setting the aborting state allows records to fail before // or after produce requests; thus, now we just flush. return cl.Flush(ctx)