Skip to content

Commit ba2b4bc

Browse files
committed
Add producer transaction API
externalize transaction manager to it's own file add KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR to ensure transaction topic can be in ha mode
1 parent 3083a9b commit ba2b4bc

35 files changed

+5549
-101
lines changed

async_producer.go

+167-57
Original file line numberDiff line numberDiff line change
@@ -48,65 +48,27 @@ type AsyncProducer interface {
4848
// you can set Producer.Return.Errors in your config to false, which prevents
4949
// errors to be returned.
5050
Errors() <-chan *ProducerError
51-
}
52-
53-
// transactionManager keeps the state necessary to ensure idempotent production
54-
type transactionManager struct {
55-
producerID int64
56-
producerEpoch int16
57-
sequenceNumbers map[string]int32
58-
mutex sync.Mutex
59-
}
60-
61-
const (
62-
noProducerID = -1
63-
noProducerEpoch = -1
64-
)
6551

66-
func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
67-
key := fmt.Sprintf("%s-%d", topic, partition)
68-
t.mutex.Lock()
69-
defer t.mutex.Unlock()
70-
sequence := t.sequenceNumbers[key]
71-
t.sequenceNumbers[key] = sequence + 1
72-
return sequence, t.producerEpoch
73-
}
52+
// IsTransactional return true when current producer is is transactional.
53+
IsTransactional() bool
7454

75-
func (t *transactionManager) bumpEpoch() {
76-
t.mutex.Lock()
77-
defer t.mutex.Unlock()
78-
t.producerEpoch++
79-
for k := range t.sequenceNumbers {
80-
t.sequenceNumbers[k] = 0
81-
}
82-
}
55+
// TxnStatus return current producer transaction status.
56+
TxnStatus() ProducerTxnStatusFlag
8357

84-
func (t *transactionManager) getProducerID() (int64, int16) {
85-
t.mutex.Lock()
86-
defer t.mutex.Unlock()
87-
return t.producerID, t.producerEpoch
88-
}
58+
// BeginTxn mark current transaction as ready.
59+
BeginTxn() error
8960

90-
func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
91-
txnmgr := &transactionManager{
92-
producerID: noProducerID,
93-
producerEpoch: noProducerEpoch,
94-
}
61+
// CommitTxn commit current transaction.
62+
CommitTxn() error
9563

96-
if conf.Producer.Idempotent {
97-
initProducerIDResponse, err := client.InitProducerID()
98-
if err != nil {
99-
return nil, err
100-
}
101-
txnmgr.producerID = initProducerIDResponse.ProducerID
102-
txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
103-
txnmgr.sequenceNumbers = make(map[string]int32)
104-
txnmgr.mutex = sync.Mutex{}
64+
// AbortTxn abort current transaction.
65+
AbortTxn() error
10566

106-
Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
107-
}
67+
// AddOffsetsToTxn add associated offsets to current transaction.
68+
AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error
10869

109-
return txnmgr, nil
70+
// AddMessageToTxn add message offsets to current transaction.
71+
AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
11072
}
11173

11274
type asyncProducer struct {
@@ -122,6 +84,7 @@ type asyncProducer struct {
12284
brokerLock sync.Mutex
12385

12486
txnmgr *transactionManager
87+
txLock sync.Mutex
12588
}
12689

12790
// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
@@ -175,9 +138,12 @@ func newAsyncProducer(client Client) (AsyncProducer, error) {
175138
type flagSet int8
176139

177140
const (
178-
syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
179-
fin // final message from partitionProducer to brokerProducer and back
180-
shutdown // start the shutdown process
141+
syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
142+
fin // final message from partitionProducer to brokerProducer and back
143+
shutdown // start the shutdown process
144+
endtxn // endtxn
145+
committxn // endtxn
146+
aborttxn // endtxn
181147
)
182148

183149
// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
@@ -283,6 +249,97 @@ func (pe ProducerErrors) Error() string {
283249
return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
284250
}
285251

252+
func (p *asyncProducer) IsTransactional() bool {
253+
return p.txnmgr.isTransactional()
254+
}
255+
256+
func (p *asyncProducer) AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error {
257+
offsets := make(map[string][]*PartitionOffsetMetadata)
258+
offsets[msg.Topic] = []*PartitionOffsetMetadata{
259+
{
260+
Partition: msg.Partition,
261+
Offset: msg.Offset + 1,
262+
Metadata: metadata,
263+
},
264+
}
265+
return p.AddOffsetsToTxn(offsets, groupId)
266+
}
267+
268+
func (p *asyncProducer) AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error {
269+
p.txLock.Lock()
270+
defer p.txLock.Unlock()
271+
272+
if !p.IsTransactional() {
273+
DebugLogger.Printf("producer/txnmgr [%s] attempt to call AddOffsetsToTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
274+
return ErrNonTransactedProducer
275+
}
276+
277+
DebugLogger.Printf("producer/txnmgr [%s] add offsets to transaction\n", p.txnmgr.transactionalID)
278+
return p.txnmgr.addOffsetsToTxn(offsets, groupId)
279+
}
280+
281+
func (p *asyncProducer) TxnStatus() ProducerTxnStatusFlag {
282+
return p.txnmgr.currentTxnStatus()
283+
}
284+
285+
func (p *asyncProducer) BeginTxn() error {
286+
p.txLock.Lock()
287+
defer p.txLock.Unlock()
288+
289+
if !p.IsTransactional() {
290+
DebugLogger.Println("producer/txnmgr attempt to call BeginTxn on a non-transactional producer")
291+
return ErrNonTransactedProducer
292+
}
293+
294+
return p.txnmgr.transitionTo(ProducerTxnFlagInTransaction, nil)
295+
}
296+
297+
func (p *asyncProducer) CommitTxn() error {
298+
p.txLock.Lock()
299+
defer p.txLock.Unlock()
300+
301+
if !p.IsTransactional() {
302+
DebugLogger.Printf("producer/txnmgr [%s] attempt to call CommitTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
303+
return ErrNonTransactedProducer
304+
}
305+
306+
DebugLogger.Printf("producer/txnmgr [%s] committing transaction\n", p.txnmgr.transactionalID)
307+
err := p.finishTransaction(true)
308+
if err != nil {
309+
return err
310+
}
311+
DebugLogger.Printf("producer/txnmgr [%s] transaction committed\n", p.txnmgr.transactionalID)
312+
return nil
313+
}
314+
315+
func (p *asyncProducer) AbortTxn() error {
316+
p.txLock.Lock()
317+
defer p.txLock.Unlock()
318+
319+
if !p.IsTransactional() {
320+
DebugLogger.Printf("producer/txnmgr [%s] attempt to call AbortTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
321+
return ErrNonTransactedProducer
322+
}
323+
DebugLogger.Printf("producer/txnmgr [%s] aborting transaction\n", p.txnmgr.transactionalID)
324+
err := p.finishTransaction(false)
325+
if err != nil {
326+
return err
327+
}
328+
DebugLogger.Printf("producer/txnmgr [%s] transaction aborted\n", p.txnmgr.transactionalID)
329+
return nil
330+
}
331+
332+
func (p *asyncProducer) finishTransaction(commit bool) error {
333+
p.inFlight.Add(1)
334+
if commit {
335+
p.input <- &ProducerMessage{flags: endtxn | committxn}
336+
} else {
337+
p.input <- &ProducerMessage{flags: endtxn | aborttxn}
338+
}
339+
p.inFlight.Wait()
340+
return p.txnmgr.finishTransaction(commit)
341+
}
342+
286343
func (p *asyncProducer) Errors() <-chan *ProducerError {
287344
return p.errors
288345
}
@@ -336,11 +393,27 @@ func (p *asyncProducer) dispatcher() {
336393
continue
337394
}
338395

396+
if msg.flags&endtxn != 0 {
397+
var err error
398+
if msg.flags&committxn != 0 {
399+
err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagCommittingTransaction, nil)
400+
} else {
401+
err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagAbortingTransaction, nil)
402+
}
403+
if err != nil {
404+
Logger.Printf("producer/txnmgr unable to end transaction %s", err)
405+
}
406+
p.inFlight.Done()
407+
continue
408+
}
409+
339410
if msg.flags&shutdown != 0 {
340411
shuttingDown = true
341412
p.inFlight.Done()
342413
continue
343-
} else if msg.retries == 0 {
414+
}
415+
416+
if msg.retries == 0 {
344417
if shuttingDown {
345418
// we can't just call returnError here because that decrements the wait group,
346419
// which hasn't been incremented yet for this message, and shouldn't be
@@ -353,6 +426,13 @@ func (p *asyncProducer) dispatcher() {
353426
continue
354427
}
355428
p.inFlight.Add(1)
429+
// Ignore retried msg, there are already in txn.
430+
// Can't produce new record when transaction is not started.
431+
if p.IsTransactional() && p.txnmgr.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 {
432+
Logger.Printf("attempt to send message when transaction is not started or is in ending state, got %d, expect %d\n", p.txnmgr.currentTxnStatus(), ProducerTxnFlagInTransaction)
433+
p.returnError(msg, ErrTransactionNotReady)
434+
continue
435+
}
356436
}
357437

358438
for _, interceptor := range p.conf.Producer.Interceptors {
@@ -605,6 +685,10 @@ func (pp *partitionProducer) dispatch() {
605685
msg.hasSequence = true
606686
}
607687

688+
if pp.parent.IsTransactional() {
689+
pp.parent.txnmgr.maybeAddPartitionToCurrentTxn(pp.topic, pp.partition)
690+
}
691+
608692
pp.brokerProducer.input <- msg
609693
}
610694
}
@@ -715,6 +799,16 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
715799
}
716800
}(set)
717801

802+
if p.IsTransactional() {
803+
// Add partition to tx before sending current batch
804+
err := p.txnmgr.publishTxnPartitions()
805+
if err != nil {
806+
// Request failed to be sent
807+
sendResponse(nil, err)
808+
continue
809+
}
810+
}
811+
718812
// Use AsyncProduce vs Produce to not block waiting for the response
719813
// so that we can pipeline multiple produce requests and achieve higher throughput, see:
720814
// https://kafka.apache.org/protocol#protocol_network
@@ -1152,10 +1246,26 @@ func (p *asyncProducer) bumpIdempotentProducerEpoch() {
11521246
}
11531247
}
11541248

1249+
func (p *asyncProducer) maybeTransitionToErrorState(err error) error {
1250+
if errors.Is(err, ErrClusterAuthorizationFailed) ||
1251+
errors.Is(err, ErrProducerFenced) ||
1252+
errors.Is(err, ErrUnsupportedVersion) ||
1253+
errors.Is(err, ErrTransactionalIDAuthorizationFailed) {
1254+
return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err)
1255+
}
1256+
if p.txnmgr.coordinatorSupportsBumpingEpoch && p.txnmgr.currentTxnStatus()&ProducerTxnFlagEndTransaction == 0 {
1257+
p.txnmgr.epochBumpRequired = true
1258+
}
1259+
return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err)
1260+
}
1261+
11551262
func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
1263+
if p.IsTransactional() {
1264+
_ = p.maybeTransitionToErrorState(err)
1265+
}
11561266
// We need to reset the producer ID epoch if we set a sequence number on it, because the broker
11571267
// will never see a message with this number, so we can never continue the sequence.
1158-
if msg.hasSequence {
1268+
if !p.IsTransactional() && msg.hasSequence {
11591269
Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
11601270
p.bumpIdempotentProducerEpoch()
11611271
}

0 commit comments

Comments
 (0)