Skip to content

Commit

Permalink
producer: set Record.Topic earlier
Browse files Browse the repository at this point in the history
Certain applications may need to know the record topic in
OnRecordBuffered, but if the application used DefaultProduceTopic and
then did not bubble that default topic through the entire app, knowing
the record's topic was not possible.

Now, we set the record topic earlier.

We also now pre-emptively fail records before hanging waiting for space
if we know the record is immediately invalid, rather than waiting for
buffer space and then failing.
  • Loading branch information
twmb committed Jan 3, 2023
1 parent 7cc1573 commit 66e626f
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ func (cl *Client) produce(
if promise == nil {
promise = noPromise
}
if r.Topic == "" {
r.Topic = cl.cfg.defaultProduceTopic
}

p := &cl.producer
if p.hooks != nil {
Expand All @@ -378,6 +381,15 @@ func (cl *Client) produce(
}
}

if r.Topic == "" {
p.promiseRecord(promisedRec{ctx, promise, r}, errNoTopic)
return
}
if cl.cfg.txnID != nil && atomic.LoadUint32(&p.producingTxn) != 1 {
p.promiseRecord(promisedRec{ctx, promise, r}, errNotInTransaction)
return
}

if atomic.AddInt64(&p.bufferedRecords, 1) > cl.cfg.maxBufferedRecords {
// If the client ctx cancels or the produce ctx cancels, we
// need to un-count our buffering of this record. We also need
Expand All @@ -402,20 +414,6 @@ func (cl *Client) produce(
}
}

// Neither of the errors below should be hit in applications.
if r.Topic == "" {
def := cl.cfg.defaultProduceTopic
if def == "" {
p.promiseRecord(promisedRec{ctx, promise, r}, errNoTopic)
return
}
r.Topic = def
}
if cl.cfg.txnID != nil && atomic.LoadUint32(&p.producingTxn) != 1 {
p.promiseRecord(promisedRec{ctx, promise, r}, errNotInTransaction)
return
}

cl.partitionRecord(promisedRec{ctx, promise, r})
}

Expand Down

0 comments on commit 66e626f

Please sign in to comment.