Skip to content

Commit

Permalink
KAFKA-17898: Refine Epoch Bumping Logic (#17849)
Browse files Browse the repository at this point in the history
With KAFKA-14562, we implemented epoch bump on both the client and the server. Mentioned below are the different epoch bump scenarios we have on hand after enabled tv2

Non-Transactional Producers
• Epoch bumping is always allowed.
• Different code paths are used to handle epoch bumping.

Transactional Producers

No Epoch Bump Allowed
• coordinatorSupportsBumpingEpoch = false when initPIDVersion < 3 or initPIDVersion = null.

Client-Triggered Epoch Bump Allowed
• coordinatorSupportsBumpingEpoch = true when initPIDVersion >= 3.
• TransactionVersion2Enabled = false when endTxnVersion < 5.

Only Server-Triggered Epoch Bump Allowed
• TransactionVersion2Enabled = true and endTxnVersion >= 5.

We want to refine the code and make it more structured to correctly handle epoch bumping in the above mentioned cases.

The changes made in this patch are:

Rename epochBumpRequired to epochBumpTriggerRequired to symbolize a manual epoch bump request from the client
Modify canEpochBump method according to the above mentioned scenarios

Reviewers: Artem Livshits <[email protected]>, Calvin Liu <[email protected]>, Justine Olshan <[email protected]>
  • Loading branch information
rreddy-22 authored Nov 25, 2024
1 parent 7f8a592 commit 4fc9e44
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public class TransactionManager {
private volatile RuntimeException lastError = null;
private volatile ProducerIdAndEpoch producerIdAndEpoch;
private volatile boolean transactionStarted = false;
private volatile boolean epochBumpRequired = false;
private volatile boolean clientSideEpochBumpRequired = false;
private volatile long latestFinalizedFeaturesEpoch = -1;
private volatile boolean isTransactionV2Enabled = false;

Expand Down Expand Up @@ -351,7 +351,7 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult
enqueueRequest(handler);

// If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request.
if (epochBumpRequired) {
if (clientSideEpochBumpRequired) {
return initializeTransactions(this.producerIdAndEpoch);
}

Expand Down Expand Up @@ -477,6 +477,26 @@ synchronized void transitionToFatalError(RuntimeException exception) {
}
}

/**
* Transitions to an abortable error state if the coordinator can handle an abortable error or
* to a fatal error if not.
*
* @param abortableException The exception in case of an abortable error.
* @param fatalException The exception in case of a fatal error.
*/
private void transitionToAbortableErrorOrFatalError(
RuntimeException abortableException,
RuntimeException fatalException
) {
if (canHandleAbortableError()) {
if (needToTriggerEpochBumpFromClient())
clientSideEpochBumpRequired = true;
transitionToAbortableError(abortableException);
} else {
transitionToFatalError(fatalException);
}
}

// visible for testing
synchronized boolean isPartitionAdded(TopicPartition partition) {
return partitionsInTransaction.contains(partition);
Expand Down Expand Up @@ -544,8 +564,11 @@ private void resetSequenceNumbers() {
this.partitionsWithUnresolvedSequences.clear();
}

synchronized void requestEpochBumpForPartition(TopicPartition tp) {
epochBumpRequired = true;
/**
* This method is used to trigger an epoch bump for non-transactional idempotent producers.
*/
synchronized void requestIdempotentEpochBumpForPartition(TopicPartition tp) {
clientSideEpochBumpRequired = true;
this.partitionsToRewriteSequences.add(tp);
}

Expand All @@ -564,12 +587,12 @@ private void bumpIdempotentProducerEpoch() {
}
this.partitionsToRewriteSequences.clear();

epochBumpRequired = false;
clientSideEpochBumpRequired = false;
}

synchronized void bumpIdempotentEpochAndResetIdIfNeeded() {
if (!isTransactional()) {
if (epochBumpRequired) {
if (clientSideEpochBumpRequired) {
bumpIdempotentProducerEpoch();
}
if (currentState != State.INITIALIZING && !hasProducerId()) {
Expand Down Expand Up @@ -675,8 +698,8 @@ public synchronized void maybeTransitionToErrorState(RuntimeException exception)
|| exception instanceof UnsupportedVersionException) {
transitionToFatalError(exception);
} else if (isTransactional()) {
if (canBumpEpoch() && !isCompleting()) {
epochBumpRequired = true;
if (needToTriggerEpochBumpFromClient() && !isCompleting()) {
clientSideEpochBumpRequired = true;
}
transitionToAbortableError(exception);
}
Expand All @@ -699,7 +722,7 @@ synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException except

// If we fail with an OutOfOrderSequenceException, we have a gap in the log. Bump the epoch for this
// partition, which will reset the sequence number to 0 and allow us to continue
requestEpochBumpForPartition(batch.topicPartition);
requestIdempotentEpochBumpForPartition(batch.topicPartition);
} else if (exception instanceof UnknownProducerIdException) {
// If we get an UnknownProducerId for a partition, then the broker has no state for that producer. It will
// therefore accept a write with sequence number 0. We reset the sequence number for the partition here so
Expand All @@ -710,7 +733,7 @@ synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException except
} else {
if (adjustSequenceNumbers) {
if (!isTransactional()) {
requestEpochBumpForPartition(batch.topicPartition);
requestIdempotentEpochBumpForPartition(batch.topicPartition);
} else {
txnPartitionMap.adjustSequencesDueToFailedBatch(batch);
}
Expand Down Expand Up @@ -760,21 +783,17 @@ synchronized void maybeResolveSequences() {
// For the transactional producer, we bump the epoch if possible, otherwise we transition to a fatal error
String unackedMessagesErr = "The client hasn't received acknowledgment for some previously " +
"sent messages and can no longer retry them. ";
if (canBumpEpoch()) {
epochBumpRequired = true;
KafkaException exception = new KafkaException(unackedMessagesErr + "It is safe to abort " +
"the transaction and continue.");
transitionToAbortableError(exception);
} else {
KafkaException exception = new KafkaException(unackedMessagesErr + "It isn't safe to continue.");
transitionToFatalError(exception);
}
KafkaException abortableException = new KafkaException(unackedMessagesErr + "It is safe to abort " +
"the transaction and continue.");
KafkaException fatalException = new KafkaException(unackedMessagesErr + "It isn't safe to continue.");

transitionToAbortableErrorOrFatalError(abortableException, fatalException);
} else {
// For the idempotent producer, bump the epoch
log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " +
"Going to bump epoch and reset sequence numbers.", topicPartition,
lastAckedSequence(topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER), sequenceNumber(topicPartition));
requestEpochBumpForPartition(topicPartition);
requestIdempotentEpochBumpForPartition(topicPartition);
}

iter.remove();
Expand Down Expand Up @@ -943,15 +962,15 @@ synchronized boolean canRetry(ProduceResponse.PartitionResponse response, Produc
if (isTransactional()) {
txnPartitionMap.startSequencesAtBeginning(batch.topicPartition, this.producerIdAndEpoch);
} else {
requestEpochBumpForPartition(batch.topicPartition);
requestIdempotentEpochBumpForPartition(batch.topicPartition);
}
return true;
}

if (!isTransactional()) {
// For the idempotent producer, always retry UNKNOWN_PRODUCER_ID errors. If the batch has the current
// producer ID and epoch, request a bump of the epoch. Otherwise just retry the produce.
requestEpochBumpForPartition(batch.topicPartition);
requestIdempotentEpochBumpForPartition(batch.topicPartition);
return true;
}
} else if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER) {
Expand All @@ -967,7 +986,7 @@ synchronized boolean canRetry(ProduceResponse.PartitionResponse response, Produc
// and wait to see if the sequence resolves
if (!hasUnresolvedSequence(batch.topicPartition) ||
isNextSequenceForUnresolvedPartition(batch.topicPartition, batch.baseSequence())) {
requestEpochBumpForPartition(batch.topicPartition);
requestIdempotentEpochBumpForPartition(batch.topicPartition);
}
return true;
}
Expand Down Expand Up @@ -1164,23 +1183,59 @@ private TransactionalRequestResult handleCachedTransactionRequestResult(
return result;
}

/**
* Determines if an epoch bump can be triggered manually based on the api versions.
*
* <b>NOTE:</b>
* This method should only be used for transactional producers.
* For non-transactional producers epoch bumping is always allowed.
*
* <ol>
* <li><b>Client-Triggered Epoch Bump</b>:
* If the coordinator supports epoch bumping (initProducerIdVersion.maxVersion() >= 3),
* client-triggered epoch bumping is allowed, returns true.
* <code>clientSideEpochBumpTriggerRequired</code> must be set to true in this case.</li>
*
* <li><b>No Epoch Bump Allowed</b>:
* If the coordinator does not support epoch bumping, returns false.</li>
*
* <li><b>Server-Triggered Only</b>:
* When TransactionV2 is enabled, epoch bumping is handled automatically
* by the server in EndTxn, so manual epoch bumping is not required, returns false.</li>
* </ol>
*
* @return true if a client-triggered epoch bump is allowed, otherwise false.
*/
// package-private for testing
boolean canBumpEpoch() {
if (!isTransactional()) {
return true;
}
boolean needToTriggerEpochBumpFromClient() {
return coordinatorSupportsBumpingEpoch && !isTransactionV2Enabled;
}

return coordinatorSupportsBumpingEpoch;
/**
* Determines if the coordinator can handle an abortable error.
* Recovering from an abortable error requires an epoch bump which can be triggered by the client
* or automatically taken care of at the end of every transaction (Transaction V2).
* Use <code>needToTriggerEpochBumpFromClient</code> to check whether the epoch bump needs to be triggered
* manually.
*
* <b>NOTE:</b>
* This method should only be used for transactional producers.
* There is no concept of abortable errors for idempotent producers.
*
* @return true if an abortable error can be handled, otherwise false.
*/
boolean canHandleAbortableError() {
return coordinatorSupportsBumpingEpoch || isTransactionV2Enabled;
}

private void completeTransaction() {
if (epochBumpRequired) {
if (clientSideEpochBumpRequired) {
transitionTo(State.INITIALIZING);
} else {
transitionTo(State.READY);
}
lastError = null;
epochBumpRequired = false;
clientSideEpochBumpRequired = false;
transactionStarted = false;
newPartitionsInTransaction.clear();
pendingPartitionsInTransaction.clear();
Expand Down Expand Up @@ -1209,9 +1264,23 @@ void abortableError(RuntimeException e) {
transitionToAbortableError(e);
}

/**
* Determines if an error should be treated as abortable or fatal, based on transaction state and configuration.
* <ol><l> NOTE: Only use this method for transactional producers </l></ol>
*
* - <b>Abortable Error</b>:
* An abortable error can be handled effectively, if epoch bumping is supported.
* 1) If transactionV2 is enabled, automatic epoch bumping happens at the end of every transaction.
* 2) If the client can trigger an epoch bump, the abortable error can be handled.
*
*- <b>Fatal Error</b>:
* If epoch bumping is not supported, the system cannot recover and the error must be treated as fatal.
* @param e the error to determine as either abortable or fatal.
*/
void abortableErrorIfPossible(RuntimeException e) {
if (canBumpEpoch()) {
epochBumpRequired = true;
if (canHandleAbortableError()) {
if (needToTriggerEpochBumpFromClient())
clientSideEpochBumpRequired = true;
abortableError(e);
} else {
fatalError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ public void testBatchCompletedAfterProducerReset(boolean transactionV2Enabled) {
assertEquals(2, transactionManager.sequenceNumber(tp0));

// The producerId might be reset due to a failure on another partition
transactionManager.requestEpochBumpForPartition(tp1);
transactionManager.requestIdempotentEpochBumpForPartition(tp1);
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
initializeIdempotentProducerId(producerId + 1, (short) 0);

Expand Down Expand Up @@ -780,6 +780,21 @@ private ProducerBatch writeIdempotentBatchWithValue(TransactionManager manager,
return batch;
}

private ProducerBatch writeTransactionalBatchWithValue(
TransactionManager manager,
TopicPartition tp,
String value
) {
manager.maybeUpdateProducerIdAndEpoch(tp);
int seq = manager.sequenceNumber(tp);
manager.incrementSequenceNumber(tp, 1);
ProducerBatch batch = batchWithValue(tp, value);
batch.setProducerState(manager.producerIdAndEpoch(), seq, true);
manager.addInFlightBatch(batch);
batch.close();
return batch;
}

private ProducerBatch batchWithValue(TopicPartition tp, String value) {
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(64),
Compression.NONE, TimestampType.CREATE_TIME, 0L);
Expand Down Expand Up @@ -814,7 +829,7 @@ public void testProducerIdReset(boolean transactionV2Enabled) {
transactionManager.incrementSequenceNumber(tp1, 3);
assertEquals(transactionManager.sequenceNumber(tp1), 3);

transactionManager.requestEpochBumpForPartition(tp0);
transactionManager.requestIdempotentEpochBumpForPartition(tp0);
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
assertEquals(transactionManager.sequenceNumber(tp0), 0);
assertEquals(transactionManager.sequenceNumber(tp1), 3);
Expand Down Expand Up @@ -2948,7 +2963,7 @@ public void testNoProducerIdResetAfterLastInFlightBatchSucceeds(boolean transact

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEpochBumpAfterLastInflightBatchFails(boolean transactionV2Enabled) {
public void testEpochBumpAfterLastInFlightBatchFailsIdempotentProducer(boolean transactionV2Enabled) {
initializeTransactionManager(Optional.empty(), transactionV2Enabled);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
initializeIdempotentProducerId(producerId, epoch);
Expand Down Expand Up @@ -2980,6 +2995,39 @@ public void testEpochBumpAfterLastInflightBatchFails(boolean transactionV2Enable
assertEquals(0, transactionManager.sequenceNumber(tp0));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMaybeResolveSequencesTransactionalProducer(boolean transactionV2Enabled) throws Exception {
initializeTransactionManager(Optional.of(transactionalId), transactionV2Enabled);

// Initialize transaction with initial producer ID and epoch.
doInitTransactions(producerId, epoch);

transactionManager.beginTransaction();
transactionManager.maybeAddPartition(tp0);
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
runUntil(() -> transactionManager.isPartitionAdded(tp0));

ProducerBatch b1 = writeTransactionalBatchWithValue(transactionManager, tp0, "1");
assertEquals(Integer.valueOf(1), transactionManager.sequenceNumber(tp0));

transactionManager.markSequenceUnresolved(b1);
assertTrue(transactionManager.hasUnresolvedSequences());

transactionManager.handleFailedBatch(b1, new TimeoutException(), false);
// Call maybeResolveSequences to trigger resolution logic
transactionManager.maybeResolveSequences();

// Verify the type of error state the transaction is in.
if (transactionManager.isTransactionV2Enabled() || transactionManager.needToTriggerEpochBumpFromClient()) {
// Expected to throw an abortable error when epoch bumping is allowed
assertTrue(transactionManager.hasAbortableError());
} else {
// Expected to throw a fatal error when epoch bumping is not allowed
assertTrue(transactionManager.hasFatalError());
}
}

@Test
public void testEpochUpdateAfterBumpFromEndTxnResponseInV2() throws InterruptedException {
initializeTransactionManager(Optional.of(transactionalId), true);
Expand Down Expand Up @@ -3506,13 +3554,13 @@ public void testRetryCommitTransactionAfterAbortTimeout() {
}

@Test
public void testCanBumpEpochDuringCoordinatorDisconnect() {
public void testNeedToTriggerEpochBumpFromClientDuringCoordinatorDisconnect() {
doInitTransactions(0, (short) 0);
runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertTrue(transactionManager.canBumpEpoch());
assertTrue(transactionManager.needToTriggerEpochBumpFromClient());

apiVersions.remove(transactionManager.coordinator(CoordinatorType.TRANSACTION).idString());
assertTrue(transactionManager.canBumpEpoch());
assertTrue(transactionManager.needToTriggerEpochBumpFromClient());
}

@ParameterizedTest
Expand Down

0 comments on commit 4fc9e44

Please sign in to comment.