diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index ea784b38..02f3bd1a 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -370,6 +370,9 @@ func (cl *Client) produce( if promise == nil { promise = noPromise } + if r.Topic == "" { + r.Topic = cl.cfg.defaultProduceTopic + } p := &cl.producer if p.hooks != nil { @@ -378,6 +381,15 @@ func (cl *Client) produce( } } + if r.Topic == "" { + p.promiseRecord(promisedRec{ctx, promise, r}, errNoTopic) + return + } + if cl.cfg.txnID != nil && atomic.LoadUint32(&p.producingTxn) != 1 { + p.promiseRecord(promisedRec{ctx, promise, r}, errNotInTransaction) + return + } + if atomic.AddInt64(&p.bufferedRecords, 1) > cl.cfg.maxBufferedRecords { // If the client ctx cancels or the produce ctx cancels, we // need to un-count our buffering of this record. We also need @@ -402,20 +414,6 @@ func (cl *Client) produce( } } - // Neither of the errors below should be hit in applications. - if r.Topic == "" { - def := cl.cfg.defaultProduceTopic - if def == "" { - p.promiseRecord(promisedRec{ctx, promise, r}, errNoTopic) - return - } - r.Topic = def - } - if cl.cfg.txnID != nil && atomic.LoadUint32(&p.producingTxn) != 1 { - p.promiseRecord(promisedRec{ctx, promise, r}, errNotInTransaction) - return - } - cl.partitionRecord(promisedRec{ctx, promise, r}) }