diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 332007ff..4dbf5406 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1874,7 +1874,7 @@ func (cl *Client) BlockingCommitOffsets( // function is complete and the onDone has returned. // // This function itself does not wait for the commit to finish. By default, -// this function is an asyncronous commit. You can use onDone to make it sync. +// this function is an asynchronous commit. You can use onDone to make it sync. // // Note that this function ensures absolute ordering of commit requests by // canceling prior requests and ensuring they are done before executing a new @@ -1893,7 +1893,7 @@ func (cl *Client) BlockingCommitOffsets( // This is most likely to happen if a commit occurs too late in a rebalance // event. // -// If manually committing, you want to set OnRevoked to commit syncronously +// If manually committing, you want to set OnRevoked to commit synchronously // using BlockingCommitOffsets. Otherwise if committing async OnRevoked may // return and a new group session may start before the commit is issued, // leading to the commit being ignored and leading to duplicate messages. diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index d82bead3..6e9c3242 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -82,11 +82,44 @@ func (p *producer) isAborting() bool { return atomic.LoadUint32(&p.aborting) == func noPromise(*Record, error) {} +// ProduceSync is a synchronous produce. Please see the Produce documentation +// for an in depth description of how producing works. +// +// Note that it is heavily recommended to not use ProduceSync. Producing +// buffers multiple records into a single request issued to Kafka. A +// synchronous produce implies you may be producing one record per request, +// which is inefficient, slower, and puts more load on Kafka itself. +// +// This function should only be used when producing infrequently enough that +// waiting for a single record to be produced is what would happen anyway with +// Produce. +// +// If the produce is successful, the record's attrs / offset / etc. fields are +// updated appropriately. +func (cl *Client) ProduceSync(ctx context.Context, r *Record) error { + var ( + wg sync.WaitGroup + err error + promise = func(_ *Record, perr error) { + err = perr + wg.Done() + } + ) + wg.Add(1) + if perr := cl.Produce(ctx, r, promise); perr != nil { + return perr + } + wg.Wait() + return err +} + // Produce sends a Kafka record to the topic in the record's Topic field, -// calling promise with the record or an error when Kafka replies. +// calling promise with the record or an error when Kafka replies. For a +// synchronous produce (which is not recommended), see ProduceSync. // // The promise is optional, but not using it means you will not know if Kafka -// recorded a record properly. +// recorded a record properly. If there was no produce error, the record's +// attrs / offset / etc. fields are updated appropriately. // // If the record is too large to fit in a batch on its own in a produce // request, the promise is called immediately before this function returns with diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index abe8a2ca..85c459b7 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -29,7 +29,7 @@ type sink struct { drainState workLoop // seqRespsMu, guarded by seqRespsMu, contains responses that must - // be handled sequentially. These responses are handled asyncronously, + // be handled sequentially. These responses are handled asynchronously, // but sequentially. seqRespsMu sync.Mutex seqResps []*seqResp @@ -1753,9 +1753,7 @@ func (r seqRecBatch) appendTo( dst = kbin.AppendInt64(dst, r.firstTimestamp+int64(lastRecord.timestampDelta)) seq := r.seq - if !idempotent { - producerID = -1 - producerEpoch = -1 + if !idempotent { // producerID and producerEpoch are already -1 if idempotent (due to producerID() itself returning -1) seq = 0 } dst = kbin.AppendInt64(dst, producerID)