Skip to content

Commit

Permalink
producer: allow a canceled context & aborting to quit unknown wait
Browse files Browse the repository at this point in the history
Unknown topic waiting was missing two areas to "quit" that existed
_after_ a topic was loaded (and no longer unknown). Aborting or a
canceled record context would not quit waiting.

We now quit on both. For aborting, we add a new channel to the unknown
type for must-quit errors. Previously, all errors being sent to
unknown.wait would be retriable (either retriable kerr's, or retriable
client connection failures). We do not want to send our must-quit signal
to the unknown.wait channel because *theoretically* it could be fully
buffered and thus either we would have to (a) wait to send, which has
problems if the wait loop quits, or (b) select/default, which would drop
our must-quit error.

We now use a size-1 channel that, if ever received from, always quits
the loop. As well, for a ctx-canceled record, similar to batches, we use
the first record's context.

Closes #158.
  • Loading branch information
twmb committed Apr 30, 2022
1 parent ad989e9 commit 42e5b57
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 7 deletions.
30 changes: 24 additions & 6 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func (cl *Client) BufferedProduceRecords() int64 {

type unknownTopicProduces struct {
buffered []promisedRec
wait chan error
wait chan error // retriable errors
fatal chan error // must-signal quit errors; capacity 1
}

func (p *producer) init(cl *Client) {
Expand Down Expand Up @@ -787,36 +788,53 @@ func (cl *Client) addUnknownTopicRecord(pr promisedRec) {
unknown = &unknownTopicProduces{
buffered: make([]promisedRec, 0, 100),
wait: make(chan error, 5),
fatal: make(chan error, 1),
}
cl.producer.unknownTopics[pr.Topic] = unknown
}
unknown.buffered = append(unknown.buffered, pr)
if len(unknown.buffered) == 1 {
go cl.waitUnknownTopic(pr.Topic, unknown)
go cl.waitUnknownTopic(pr.ctx, pr.Topic, unknown)
}
}

// waitUnknownTopic waits for a notification
func (cl *Client) waitUnknownTopic(
rctx context.Context,
topic string,
unknown *unknownTopicProduces,
) {
cl.cfg.logger.Log(LogLevelInfo, "producing to a new topic for the first time, fetching metadata to learn its partitions", "topic", topic)
var after <-chan time.Time

var (
tries int
unknownTries int64
err error
after <-chan time.Time
)

if timeout := cl.cfg.recordTimeout; timeout > 0 {
timer := time.NewTimer(cl.cfg.recordTimeout)
defer timer.Stop()
after = timer.C
}
var tries int
var unknownTries int64
var err error

// Ordering: aborting is set first, then unknown topics are manually
// canceled in a lock. New unknown topics after that lock will see
// aborting here and immediately cancel themselves.
if cl.producer.isAborting() {
err = ErrAborting
}

for err == nil {
select {
case <-rctx.Done():
err = rctx.Err()
case <-cl.ctx.Done():
err = ErrClientClosed
case <-after:
err = ErrRecordTimeout
case err = <-unknown.fatal:
case retriableErr, ok := <-unknown.wait:
if !ok {
cl.cfg.logger.Log(LogLevelInfo, "done waiting for metadata for new topic", "topic", topic)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (cl *Client) bumpMetadataFailForTopics(requested map[string]*topicPartition
defer p.unknownTopicsMu.Unlock()

for topic, unknown := range p.unknownTopics {
// if nil, mode 1, else mode 2
// if nil, mode 1 (req err), else mode 2 (missing resp)
if missing != nil && !missing[topic] {
continue
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,18 @@ func (cl *Client) AbortBufferedRecords(ctx context.Context) error {
cl.cfg.logger.Log(LogLevelInfo, "producer state set to aborting; continuing to wait via flushing")
defer cl.cfg.logger.Log(LogLevelDebug, "aborted buffered records")

// We must clear unknown topics ourselves, because flush just waits
// like normal.
p := &cl.producer
p.unknownTopicsMu.Lock()
for _, unknown := range p.unknownTopics {
select {
case unknown.fatal <- ErrAborting:
default:
}
}
p.unknownTopicsMu.Unlock()

// Setting the aborting state allows records to fail before
// or after produce requests; thus, now we just flush.
return cl.Flush(ctx)
Expand Down

0 comments on commit 42e5b57

Please sign in to comment.