Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -2093,8 +2093,8 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
// Only valid for ACQUIRED offsets; the check above ensures this.
long key = offsetState.getKey();
InFlightState state = offsetState.getValue();
log.debug("Renewing acq lock for {}-{} with offsets {}-{} for member {}.",
groupId, topicIdPartition, key, key, memberId);
log.debug("Renewing acq lock for {}-{} with offset {} in batch {} for member {}.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
log.debug("Renewing acq lock for {}-{} with offset {} in batch {} for member {}.",
log.debug("Renewing acquisition lock for {}-{} with offset {} in batch {} for member {}.",

groupId, topicIdPartition, key, inFlightBatch, memberId);
state.cancelAndClearAcquisitionLockTimeoutTask();
AcquisitionLockTimerTask renewalTask = scheduleAcquisitionLockTimeout(memberId, key, key);
state.updateAcquisitionLockTimeoutTask(renewalTask);
Expand Down Expand Up @@ -2161,7 +2161,7 @@ private Optional<Throwable> acknowledgeCompleteBatch(
if (ackType == AcknowledgeType.RENEW.id) {
// Renew the acquisition lock timer for the complete batch. We have already
// checked that the batchState is ACQUIRED above.
log.debug("Renewing acq lock for {}-{} with offsets {}-{} for member {}.",
log.debug("Renewing acq lock for {}-{} with batch {}-{} for member {}.",
groupId, topicIdPartition, inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), memberId);
inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask();
AcquisitionLockTimerTask renewalTask = scheduleAcquisitionLockTimeout(memberId,
Expand Down
159 changes: 158 additions & 1 deletion core/src/test/java/kafka/server/share/SharePartitionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9974,7 +9974,7 @@ public void testRenewAcknowledgeWithPerOffsetAck() throws InterruptedException {

assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled.
assertNotEquals(taskOrig, sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask());
TestUtils.waitForCondition(() -> sharePartition.cachedState().get(0L).offsetState() != null, "offset state not populated");
assertNotNull(sharePartition.cachedState().get(0L).offsetState());

InFlightState offset0 = sharePartition.cachedState().get(0L).offsetState().get(0L);
InFlightState offset1 = sharePartition.cachedState().get(0L).offsetState().get(1L);
Expand All @@ -9987,6 +9987,163 @@ public void testRenewAcknowledgeWithPerOffsetAck() throws InterruptedException {
Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any());
}

@Test
public void testLsoMovementWithBatchWRenewal() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an extra W in the test name

Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withMaxDeliveryCount(2)
.withPersister(persister)
.build();

List<AcquiredRecords> records = fetchAcquiredRecords(sharePartition, memoryRecords(0, 10), 10);
assertEquals(1, records.size());
assertNotEquals(records.get(0).firstOffset(), records.get(0).lastOffset());
assertEquals(1, sharePartition.cachedState().size());
InFlightBatch batch = sharePartition.cachedState().get(0L);
AcquisitionLockTimerTask taskOrig = batch.batchAcquisitionLockTimeoutTask();

sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 9, List.of(AcknowledgeType.RENEW.id))));
sharePartition.updateCacheAndOffsets(5);

assertEquals(10, sharePartition.nextFetchOffset());
assertEquals(5, sharePartition.startOffset());
assertEquals(9, sharePartition.endOffset());
assertEquals(1, sharePartition.cachedState().size());

assertEquals(MEMBER_ID, sharePartition.cachedState().get(0L).batchMemberId());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).batchState());

assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled.
assertNotEquals(taskOrig, batch.batchAcquisitionLockTimeoutTask()); // Lock changes.
Mockito.verify(persister, Mockito.times(0)).writeState(Mockito.any()); // No persister call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add more asserts to atleast one of the newly added test to check the behaviour when this acquisition lock expires. Also we should add asserts over sharePartition.timer().size() to all the newly added tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent advise

}

@Test
public void testLsoMovementWithPerOffsetRenewal() throws InterruptedException {
Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withMaxDeliveryCount(2)
.withPersister(persister)
.build();

List<AcquiredRecords> records = fetchAcquiredRecords(sharePartition, memoryRecords(0, 5), 5);
assertEquals(1, records.size());
assertEquals(records.get(0).firstOffset() + 4, records.get(0).lastOffset());
assertEquals(1, sharePartition.cachedState().size());
InFlightBatch batch = sharePartition.cachedState().get(0L);
assertEquals(RecordState.ACQUIRED, batch.batchState());
AcquisitionLockTimerTask taskOrig = batch.batchAcquisitionLockTimeoutTask();

// For ACCEPT ack call.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));

when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));

sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 4,
List.of(AcknowledgeType.RENEW.id, AcknowledgeType.ACCEPT.id, AcknowledgeType.RENEW.id, AcknowledgeType.ACCEPT.id, AcknowledgeType.RENEW.id))));

sharePartition.updateCacheAndOffsets(3);

assertEquals(5, sharePartition.nextFetchOffset());
assertEquals(3, sharePartition.startOffset());
assertEquals(4, sharePartition.endOffset());
assertEquals(1, sharePartition.cachedState().size());

assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled.
assertNotEquals(taskOrig, sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(0L).offsetState());

InFlightState offset0 = sharePartition.cachedState().get(0L).offsetState().get(0L);
InFlightState offset1 = sharePartition.cachedState().get(0L).offsetState().get(1L);
InFlightState offset2 = sharePartition.cachedState().get(0L).offsetState().get(2L);
InFlightState offset3 = sharePartition.cachedState().get(0L).offsetState().get(3L);
InFlightState offset4 = sharePartition.cachedState().get(0L).offsetState().get(4L);

assertEquals(RecordState.ACQUIRED, offset0.state());
assertNotNull(offset0.acquisitionLockTimeoutTask());

assertEquals(RecordState.ACKNOWLEDGED, offset1.state());
assertNull(offset1.acquisitionLockTimeoutTask());

assertEquals(RecordState.ACQUIRED, offset2.state());
assertNotNull(offset2.acquisitionLockTimeoutTask());

assertEquals(RecordState.ACKNOWLEDGED, offset3.state());
assertNull(offset3.acquisitionLockTimeoutTask());

assertEquals(RecordState.ACQUIRED, offset4.state());
assertNotNull(offset4.acquisitionLockTimeoutTask());

Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any());
Copy link
Contributor

@adixitconfluent adixitconfluent Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel in this test testLsoMovementWithPerOffsetRenewal particularly, we should also test the acquisition lock expiry since there are some offsets that are behind the start offset and have an active acquisition lock timeout.

}

@Test
public void testRenewAcknowledgeWithPerOffsetAndBatchMix() {
Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withMaxDeliveryCount(2)
.withPersister(persister)
.build();

// Batch
List<AcquiredRecords> recordsB = fetchAcquiredRecords(sharePartition, memoryRecords(0, 1), 1);
assertEquals(1, recordsB.size());
assertEquals(recordsB.get(0).firstOffset(), recordsB.get(0).lastOffset());
assertEquals(1, sharePartition.cachedState().size());
InFlightBatch batchB = sharePartition.cachedState().get(0L);
AcquisitionLockTimerTask taskOrigB = batchB.batchAcquisitionLockTimeoutTask();

// Per offset
List<AcquiredRecords> recordsO = fetchAcquiredRecords(sharePartition, memoryRecords(1, 2), 2);
assertEquals(1, recordsO.size());
assertEquals(recordsO.get(0).firstOffset() + 1, recordsO.get(0).lastOffset());
assertEquals(2, sharePartition.cachedState().size());
InFlightBatch batchO = sharePartition.cachedState().get(0L);
assertEquals(RecordState.ACQUIRED, batchO.batchState());
AcquisitionLockTimerTask taskOrigO = batchO.batchAcquisitionLockTimeoutTask();

// For ACCEPT ack call.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));

when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));

sharePartition.acknowledge(MEMBER_ID, List.of(
new ShareAcknowledgementBatch(0, 0, List.of(AcknowledgeType.RENEW.id)),
new ShareAcknowledgementBatch(1, 2, List.of(AcknowledgeType.RENEW.id, AcknowledgeType.ACCEPT.id))
));

// Batch checks
assertTrue(taskOrigB.isCancelled()); // Original acq lock cancelled.
assertNotEquals(taskOrigB, batchB.batchAcquisitionLockTimeoutTask()); // Lock changes.

// Per offset checks
assertTrue(taskOrigO.isCancelled()); // Original acq lock cancelled.
assertNotEquals(taskOrigO, sharePartition.cachedState().get(1L).offsetState().get(1L).acquisitionLockTimeoutTask());
assertNotNull(sharePartition.cachedState().get(1L).offsetState());

InFlightState offset1 = sharePartition.cachedState().get(1L).offsetState().get(1L);
InFlightState offset2 = sharePartition.cachedState().get(1L).offsetState().get(2L);
assertEquals(RecordState.ACQUIRED, offset1.state());
assertNotNull(offset1.acquisitionLockTimeoutTask());

assertEquals(RecordState.ACKNOWLEDGED, offset2.state());
assertNull(offset2.acquisitionLockTimeoutTask());

Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any());
}

/**
* This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT).
*/
Expand Down