Skip to content

Commit

Permalink
Address some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jolshan committed Dec 17, 2024
1 parent 3e740ec commit 54ad886
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ public void initTransactions() {
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordInit(time.nanoseconds() - now);
transactionManager.maybeUpdateTransactionV2Enabled();
transactionManager.maybeUpdateTransactionV2Enabled(true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult

// If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request.
// If we are upgrading to TV2 transactions on the next transaction, also bump the epoch.
maybeUpdateTransactionV2Enabled();
maybeUpdateTransactionV2Enabled(false);
if (clientSideEpochBumpRequired || isUpgradingToV2) {
return initializeTransactions(this.producerIdAndEpoch);
}
Expand Down Expand Up @@ -447,16 +447,16 @@ public boolean isTransactional() {
* Sets isUpgradingToV2 if the previous value was false and now it is true.
*/

public synchronized void maybeUpdateTransactionV2Enabled() {
public synchronized void maybeUpdateTransactionV2Enabled(boolean onInitiatialization) {
if (latestFinalizedFeaturesEpoch >= apiVersions.getMaxFinalizedFeaturesEpoch()) {
return;
}
ApiVersions.FinalizedFeaturesInfo info = apiVersions.getFinalizedFeaturesInfo();
latestFinalizedFeaturesEpoch = info.finalizedFeaturesEpoch;
Short transactionVersion = info.finalizedFeatures.get("transaction.version");
boolean previousValue = isTransactionV2Enabled;
boolean wasTransactionV2Enabled = isTransactionV2Enabled;
isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2;
isUpgradingToV2 = currentState != State.READY && !previousValue && isTransactionV2Enabled;
isUpgradingToV2 = !onInitiatialization && !wasTransactionV2Enabled && isTransactionV2Enabled;
}

public boolean isTransactionV2Enabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4321,7 +4321,7 @@ private void doInitTransactions(long producerId, short epoch) {

prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
runUntil(transactionManager::hasProducerId);
transactionManager.maybeUpdateTransactionV2Enabled();
transactionManager.maybeUpdateTransactionV2Enabled(true);

result.await();
assertTrue(result.isSuccessful());
Expand Down

0 comments on commit 54ad886

Please sign in to comment.