Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -2124,7 +2124,7 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(offsetState.getKey(),
offsetState.getKey(), updateResult.state().id(), (short) updateResult.deliveryCount())));
if (isStateTerminal(updateResult.state())) {
inFlightTerminalRecords.incrementAndGet();
deliveryCompleteCount.incrementAndGet();
}
// Do not update the nextFetchOffset as the offset has not completed the transition yet.
}
Expand Down
79 changes: 77 additions & 2 deletions core/src/test/java/kafka/server/share/SharePartitionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -9940,7 +9942,68 @@ public void testRenewAcknowledgeWithCompleteBatchAck() throws InterruptedExcepti
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 0, List.of(AcknowledgeType.RENEW.id))));
assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled.
assertNotEquals(taskOrig, batch.batchAcquisitionLockTimeoutTask()); // Lock changes.
assertEquals(1, sharePartition.timer().size()); // Timer jobs
assertEquals(RecordState.ACQUIRED, batch.batchState());
Mockito.verify(persister, Mockito.times(0)).writeState(Mockito.any()); // No persister call.

// Expire timer
// On expiration state will transition to AVAILABLE resulting in persister write RPC
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));

mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1); // Trigger expire

assertNull(batch.batchAcquisitionLockTimeoutTask());
assertEquals(RecordState.AVAILABLE, batch.batchState()); // Verify batch record state
Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); // 1 persister call.
}
Copy link
Contributor

@adixitconfluent adixitconfluent Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertEquals(0, sharePartition.timer().size()); // No timer jobs after expiration adding this assertion at the end of the test should be useful for the future. Its fine if you ignore this suggestion as well.


@Test
public void testRenewAcknowledgeOnExpiredBatch() throws InterruptedException {
Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withMaxDeliveryCount(2)
.withPersister(persister)
.build();

List<AcquiredRecords> records = fetchAcquiredRecords(sharePartition, memoryRecords(0, 1), 1);
assertEquals(1, records.size());
assertEquals(records.get(0).firstOffset(), records.get(0).lastOffset());
assertEquals(1, sharePartition.cachedState().size());
InFlightBatch batch = sharePartition.cachedState().get(0L);
AcquisitionLockTimerTask taskOrig = batch.batchAcquisitionLockTimeoutTask();

// Expire acq lock timeout.
// Persister mocking for recordState transition.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));

when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));

mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1);
TestUtils.waitForCondition(() -> batch.batchAcquisitionLockTimeoutTask() == null, "Acq lock timeout not cancelled.");
CompletableFuture<Void> future = sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 0, List.of(AcknowledgeType.RENEW.id))));

assertTrue(future.isCompletedExceptionally());
try {
future.get();
fail("No exception thrown");
} catch (Exception e) {
assertNotNull(e);
assertInstanceOf(InvalidRecordStateException.class, e.getCause());
}
assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled.
assertNotEquals(taskOrig, batch.batchAcquisitionLockTimeoutTask()); // Lock changes.
assertEquals(0, sharePartition.timer().size()); // Timer jobs
assertEquals(RecordState.AVAILABLE, batch.batchState());
Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); // No persister call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Incorrect comment, there is 1 persister call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I quoted incorrect line. The correct line is 10060. Test - testRenewAcknowledgeOnExpiredBatch code - Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any());

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It says "1 more persister call" - not "1 persister call"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry the line 10,006 in test testRenewAcknowledgeOnExpiredBatch

}

@Test
Expand Down Expand Up @@ -9980,15 +10043,23 @@ public void testRenewAcknowledgeWithPerOffsetAck() throws InterruptedException {
InFlightState offset1 = sharePartition.cachedState().get(0L).offsetState().get(1L);
assertEquals(RecordState.ACQUIRED, offset0.state());
assertNotNull(offset0.acquisitionLockTimeoutTask());
assertEquals(1, sharePartition.timer().size()); // Timer jobs

assertEquals(RecordState.ACKNOWLEDGED, offset1.state());
assertNull(offset1.acquisitionLockTimeoutTask());

Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any());

// Expire timer
mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1); // Trigger expire

assertNull(offset0.acquisitionLockTimeoutTask());
assertEquals(RecordState.AVAILABLE, offset0.state()); // Verify batch record state
Mockito.verify(persister, Mockito.times(2)).writeState(Mockito.any()); // 1 more persister call.
}

@Test
public void testLsoMovementWithBatchWRenewal() {
public void testLsoMovementWithBatchRenewal() {
Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
Expand Down Expand Up @@ -10017,11 +10088,12 @@ public void testLsoMovementWithBatchWRenewal() {

assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled.
assertNotEquals(taskOrig, batch.batchAcquisitionLockTimeoutTask()); // Lock changes.
assertEquals(1, sharePartition.timer().size()); // Timer jobs
Mockito.verify(persister, Mockito.times(0)).writeState(Mockito.any()); // No persister call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add more asserts to atleast one of the newly added test to check the behaviour when this acquisition lock expires. Also we should add asserts over sharePartition.timer().size() to all the newly added tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent advise

}

@Test
public void testLsoMovementWithPerOffsetRenewal() throws InterruptedException {
public void testLsoMovementWithPerOffsetRenewal() {
Persister persister = Mockito.mock(Persister.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
Expand Down Expand Up @@ -10081,6 +10153,8 @@ public void testLsoMovementWithPerOffsetRenewal() throws InterruptedException {
assertEquals(RecordState.ACQUIRED, offset4.state());
assertNotNull(offset4.acquisitionLockTimeoutTask());

assertEquals(3, sharePartition.timer().size()); // Timer jobs - 3 because the renewed offsets are non-contiguous.

Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any());
Copy link
Contributor

@adixitconfluent adixitconfluent Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel in this test testLsoMovementWithPerOffsetRenewal particularly, we should also test the acquisition lock expiry since there are some offsets that are behind the start offset and have an active acquisition lock timeout.

}

Expand Down Expand Up @@ -10141,6 +10215,7 @@ public void testRenewAcknowledgeWithPerOffsetAndBatchMix() {
assertEquals(RecordState.ACKNOWLEDGED, offset2.state());
assertNull(offset2.acquisitionLockTimeoutTask());

assertEquals(2, sharePartition.timer().size()); // Timer jobs one for batch and one for single renewal in per offset.
Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any());
}

Expand Down
Loading