From 66e626f8d99759a74d963f6361b831c1d71690a2 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 3 Jan 2023 07:40:30 -0700 Subject: [PATCH] producer: set Record.Topic earlier Certain applications may need to know the record topic in OnRecordBuffered, but if the application used DefaultProduceTopic and then did not bubble that default topic through the entire app, knowing the record's topic was not possible. Now, we set the record topic earlier. We also now pre-emptively fail records before hanging waiting for space if we know the record is immediately invalid, rather than waiting for buffer space and then failing. --- pkg/kgo/producer.go | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index ea784b38..02f3bd1a 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -370,6 +370,9 @@ func (cl *Client) produce( if promise == nil { promise = noPromise } + if r.Topic == "" { + r.Topic = cl.cfg.defaultProduceTopic + } p := &cl.producer if p.hooks != nil { @@ -378,6 +381,15 @@ func (cl *Client) produce( } } + if r.Topic == "" { + p.promiseRecord(promisedRec{ctx, promise, r}, errNoTopic) + return + } + if cl.cfg.txnID != nil && atomic.LoadUint32(&p.producingTxn) != 1 { + p.promiseRecord(promisedRec{ctx, promise, r}, errNotInTransaction) + return + } + if atomic.AddInt64(&p.bufferedRecords, 1) > cl.cfg.maxBufferedRecords { // If the client ctx cancels or the produce ctx cancels, we // need to un-count our buffering of this record. We also need @@ -402,20 +414,6 @@ func (cl *Client) produce( } } - // Neither of the errors below should be hit in applications. - if r.Topic == "" { - def := cl.cfg.defaultProduceTopic - if def == "" { - p.promiseRecord(promisedRec{ctx, promise, r}, errNoTopic) - return - } - r.Topic = def - } - if cl.cfg.txnID != nil && atomic.LoadUint32(&p.producingTxn) != 1 { - p.promiseRecord(promisedRec{ctx, promise, r}, errNotInTransaction) - return - } - cl.partitionRecord(promisedRec{ctx, promise, r}) }