Skip to content

Commit f27873d

Browse files
committed
Remove extra state, move check to avoid races.
1 parent b70fdf2 commit f27873d

File tree

1 file changed

+8
-12
lines changed

1 file changed

+8
-12
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

+8-12
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,11 @@ public class TransactionManager {
194194
private volatile boolean transactionStarted = false;
195195
private volatile boolean clientSideEpochBumpRequired = false;
196196
private volatile long latestFinalizedFeaturesEpoch = -1;
197-
private volatile boolean isUpgradingToV2 = false;
198197
private volatile boolean isTransactionV2Enabled = false;
199198

200199
private enum State {
201200
UNINITIALIZED,
202201
INITIALIZING,
203-
UPGRADING,
204202
READY,
205203
IN_TRANSACTION,
206204
COMMITTING_TRANSACTION,
@@ -213,11 +211,9 @@ private boolean isTransitionValid(State source, State target) {
213211
case UNINITIALIZED:
214212
return source == READY || source == ABORTABLE_ERROR;
215213
case INITIALIZING:
216-
return source == UNINITIALIZED || source == ABORTING_TRANSACTION;
217-
case UPGRADING:
218-
return source == ABORTING_TRANSACTION || source == COMMITTING_TRANSACTION;
214+
return source == UNINITIALIZED || source == ABORTING_TRANSACTION || source == COMMITTING_TRANSACTION;
219215
case READY:
220-
return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION || source == UPGRADING;
216+
return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
221217
case IN_TRANSACTION:
222218
return source == READY;
223219
case COMMITTING_TRANSACTION:
@@ -351,13 +347,16 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult
351347
isTransactionV2Enabled
352348
);
353349

350+
// Maybe update the transaction version here before we enqueue the EndTxn request so there are no races with
351+
// completion of the EndTxn request.
352+
maybeUpdateTransactionV2Enabled(false);
353+
354354
EndTxnHandler handler = new EndTxnHandler(builder);
355355
enqueueRequest(handler);
356356

357357
// If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request.
358358
// If we are upgrading to TV2 transactions on the next transaction, also bump the epoch.
359-
maybeUpdateTransactionV2Enabled(false);
360-
if (clientSideEpochBumpRequired || isUpgradingToV2) {
359+
if (clientSideEpochBumpRequired) {
361360
return initializeTransactions(this.producerIdAndEpoch);
362361
}
363362

@@ -456,7 +455,7 @@ public synchronized void maybeUpdateTransactionV2Enabled(boolean onInitiatializa
456455
Short transactionVersion = info.finalizedFeatures.get("transaction.version");
457456
boolean wasTransactionV2Enabled = isTransactionV2Enabled;
458457
isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2;
459-
isUpgradingToV2 = !onInitiatialization && !wasTransactionV2Enabled && isTransactionV2Enabled;
458+
clientSideEpochBumpRequired = !onInitiatialization && !wasTransactionV2Enabled && isTransactionV2Enabled;
460459
}
461460

462461
public boolean isTransactionV2Enabled() {
@@ -1254,14 +1253,11 @@ boolean canHandleAbortableError() {
12541253
private void completeTransaction() {
12551254
if (clientSideEpochBumpRequired) {
12561255
transitionTo(State.INITIALIZING);
1257-
} else if (isUpgradingToV2) {
1258-
transitionTo(State.UPGRADING);
12591256
} else {
12601257
transitionTo(State.READY);
12611258
}
12621259
lastError = null;
12631260
clientSideEpochBumpRequired = false;
1264-
isUpgradingToV2 = false;
12651261
transactionStarted = false;
12661262
newPartitionsInTransaction.clear();
12671263
pendingPartitionsInTransaction.clear();

0 commit comments

Comments
 (0)