Skip to content

Conversation

@smjn
Copy link
Collaborator

@smjn smjn commented Nov 3, 2025

  • KIP-1222 introduced the notion of a RENEW ack type which allows
    renewal of existing acquisition lock timeout by sending the ack type
    RENEW for a specific record in share consumer explicit mode.
  • In this PR, we have added impl in the SharePartition which performs
    the existing lock cancellation and refresh for batches as well as per
    offset.
  • Unit tests have been added in SharePartitionTest.

Reviewers: Abhinav Dixit [email protected], Andrew Schofield
[email protected]

@smjn smjn requested review from AndrewJSchofield and apoorvmittal10 and removed request for AndrewJSchofield November 3, 2025 11:08
@github-actions github-actions bot added the triage PRs from the community label Nov 3, 2025
@smjn smjn requested a review from AndrewJSchofield November 3, 2025 11:08
@github-actions github-actions bot added core Kafka Broker KIP-932 Queues for Kafka labels Nov 3, 2025
@AndrewJSchofield AndrewJSchofield added ci-approved and removed triage PRs from the community labels Nov 3, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Some comments from an initial review.

recordStateMap.put(batch.firstOffset() + index,
fetchRecordState(batch.acknowledgeTypes().get(index)));
byte ackType = batch.acknowledgeTypes().get(index);
// Validate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is going to throw an exception if the ackType is invalid, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is going to throw an exception if the ackType is invalid, right?

Yes, correct AcknowledgeType.forId(ackType) will throw the exception. In this case the caller is SharePartition.acknowledge which surrounds the call to this method in a try catch.

// Before reaching this point, it should be verified that it is full batch ack and
// not per offset ack as well as startOffset not moved.
if (ackType == AcknowledgeType.RENEW.id) {
if (inFlightBatch.batchState() != RecordState.ACQUIRED) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is redundant because of the check on line 2150. We know the the batch state is ACQUIRED if we have reached this point.

(byte) 0, RecordState.ARCHIVED, // Represents gap
AcknowledgeType.ACCEPT.id, RecordState.ACKNOWLEDGED,
AcknowledgeType.RELEASE.id, RecordState.AVAILABLE,
AcknowledgeType.REJECT.id, RecordState.ARCHIVED
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether logically a fifth entry of AcknowledgeType.RENEW.id, RecordState.ACQUIRED makes sense. wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though it is correct, we currently have no use for the the recordState value corresponding to to RENEW ack type. So it felt a bit more clean to leave it out.

@smjn smjn requested a review from AndrewJSchofield November 3, 2025 13:05
if (isStateTerminal(updateResult.state())) {
inFlightTerminalRecords.incrementAndGet();
// In case of 0 size ackTypeMap, we have already validated the batch.acknowledgeTypes.
byte ackType = ackTypeMap.size() > 1 ? ackTypeMap.get(offsetState.getKey()) : batch.acknowledgeTypes().get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just confirming, we could have used ackTypeMap.get(batch.firstOffset()) instead of batch.acknowledgeTypes().get(0) as well, right?

Copy link
Collaborator Author

@smjn smjn Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent Yes correct - just avoiding a lookup

// Only valid for ACQUIRED offsets; the check above ensures this.
long key = offsetState.getKey();
InFlightState state = offsetState.getValue();
log.debug("Renewing acq lock for {}-{} with offsets {}-{} for member {}.",
Copy link
Contributor

@adixitconfluent adixitconfluent Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it is better if we log something like log.debug("Renewing acq lock for {}-{} with offset {} in batch {} for member {}.", groupId, topicIdPartition, key, inFlightBatch, memberId) and when we are logging it for batch, it should be log.debug("Renewing acq lock for {}-{} with batch {}-{} for member {}.", groupId, topicIdPartition, inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), memberId);

I think that is more explicit for debugging purpose.


assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled.
assertNotEquals(taskOrig, sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask());
TestUtils.waitForCondition(() -> sharePartition.cachedState().get(0L).offsetState() != null, "offset state not populated");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need waitForCondition in this scenario, just assertNotNull(sharePartition.cachedState().get(0L).offsetState()) should be enough

Copy link
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks fine. A few comments to address above.
Though the above approach looks fine but I feel that the number of changes can be lowered down if we enhance function fetchRecordState(byte acknowledgeType) by adding case 4 /* RENEW */ -> RecordState.ACQUIRED; and continue using function fetchRecordStateMapForAcknowledgementBatch. Why are we not taking that approach rather than creating fetchAckTypeMapForBatch

Regarding tests, I think we should also add some tests for the following scenarios -

  1. LSO movement is happening while some offsets/batches are in renewal mode.
  2. combination of per-batch and per-offsets tracked in flight state for renewal acknowledgement

@smjn
Copy link
Collaborator Author

smjn commented Nov 3, 2025

Overall looks fine. A few comments to address above. Though the above approach looks fine but I feel that the number of changes can be lowered down if we enhance function fetchRecordState(byte acknowledgeType) by adding case 4 /* RENEW */ -> RecordState.ACQUIRED; and continue using function fetchRecordStateMapForAcknowledgementBatch. Why are we not taking that approach rather than creating fetchAckTypeMapForBatch

Regarding tests, I think we should also add some tests for the following scenarios -

  1. LSO movement is happening while some offsets/batches are in renewal mode.
  2. combination of per-batch and per-offsets tracked in flight state for renewal acknowledgement

Rather than fetching global transitions from existing method, the global hashmap was considered a slightly better approach, hence the overall mechanics were changed. The state for RENEW was deliberately avoided to signify that there is no transition involved when RENEWal happens.

Will add tests.

@smjn
Copy link
Collaborator Author

smjn commented Nov 3, 2025

@adixitconfluent Thank you for the valuable feedback. Have incorporated most of the changes.

}

@Test
public void testLsoMovementWithBatchWRenewal() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an extra W in the test name


assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled.
assertNotEquals(taskOrig, batch.batchAcquisitionLockTimeoutTask()); // Lock changes.
Mockito.verify(persister, Mockito.times(0)).writeState(Mockito.any()); // No persister call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add more asserts to atleast one of the newly added test to check the behaviour when this acquisition lock expires. Also we should add asserts over sharePartition.timer().size() to all the newly added tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent advise

@smjn smjn requested a review from adixitconfluent November 4, 2025 06:40
);

assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState());
assertEquals(15, sharePartition.startOffset());
Copy link
Contributor

@adixitconfluent adixitconfluent Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertEquals(0, sharePartition.timer().size()); // No timer jobs after expiration adding this assertion at the end of the test should be useful for the future. Its fine if you ignore this suggestion as well.

Copy link
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @smjn . Recommended some minor test enhancements in comments.

assertEquals(40, persisterReadResultGapWindow.endOffset());

// deliveryCompleteCount is incremented by the number of ACKNOWLEDGED and ARCHIVED records in readState result.
assertEquals(17, sharePartition.deliveryCompleteCount());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Incorrect comment, there is 1 persister call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I quoted incorrect line. The correct line is 10060. Test - testRenewAcknowledgeOnExpiredBatch code - Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any());

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It says "1 more persister call" - not "1 persister call"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry the line 10,006 in test testRenewAcknowledgeOnExpiredBatch

Copy link
Contributor

@adixitconfluent adixitconfluent Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel in this test testLsoMovementWithPerOffsetRenewal particularly, we should also test the acquisition lock expiry since there are some offsets that are behind the start offset and have an active acquisition lock timeout.

@smjn smjn requested a review from adixitconfluent November 4, 2025 11:38
@smjn
Copy link
Collaborator Author

smjn commented Nov 4, 2025

@adixitconfluent Thanks for the review, inc comments.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.


// Expire timer
mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1); // Trigger expire
List<RecordState> expectedStates = List.of(RecordState.ARCHIVED, RecordState.ACKNOWLEDGED, RecordState.AVAILABLE, RecordState.ACKNOWLEDGED, RecordState.AVAILABLE);
Copy link
Contributor

@adixitconfluent adixitconfluent Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the test case testLsoMovementWithPerOffsetRenewal passes, this is actually wrong. The state for the offset 2 should be ARCHIVED not AVAILABLE. There is a bug in start offset movement within code for share partition which is causing this issue. I'd suggest adding a comment The state for the offset 2 should be ARCHIVEDnotAVAILABLE. KAFKA-19859 will fix the incorrect start offset movement leading to the correction of this test case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch in the existing code. I agree with fixing in a separate PR.


// Expire timer
mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1); // Trigger expire
List<RecordState> expectedStates = List.of(RecordState.ARCHIVED, RecordState.ACKNOWLEDGED, RecordState.AVAILABLE, RecordState.ACKNOWLEDGED, RecordState.AVAILABLE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch in the existing code. I agree with fixing in a separate PR.

Copy link
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes. LGTM

@AndrewJSchofield
Copy link
Member

Build failing due to a flaky test RemoteLogManagerTest > "testCopyQuota(boolean).quotaExceeded=false". We should mark it as such.

@AndrewJSchofield
Copy link
Member

https://issues.apache.org/jira/browse/KAFKA-19578 to mark it as flaky.

@AndrewJSchofield AndrewJSchofield merged commit e658d54 into apache:trunk Nov 4, 2025
20 of 22 checks passed
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for delayed review, some comments.

// mapping between bytes and record state type. All ack types have been added except for RENEW which
// has been handled above.
RecordState recordState = ACK_TYPE_TO_RECORD_STATE.get(ackType);
Objects.requireNonNull(recordState);
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, though post validation of ackType we should not get null recordState though but as we have check for Objects.requireNonNull in code then it can throw null pointer exception, is it the correct way to handle the incorrect ackType?

// Only valid for ACQUIRED offsets; the check above ensures this.
long key = offsetState.getKey();
InFlightState state = offsetState.getValue();
log.debug("Renewing acq lock for {}-{} with offset {} in batch {} for member {}.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
log.debug("Renewing acq lock for {}-{} with offset {} in batch {} for member {}.",
log.debug("Renewing acquisition lock for {}-{} with offset {} in batch {} for member {}.",

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved core Kafka Broker KIP-932 Queues for Kafka

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants