From dd711d44f13d13b837ff74fed28334a204649453 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Mon, 3 Nov 2025 16:32:47 +0530 Subject: [PATCH 1/9] KAFKA-19843: SharePartition RENEW ack impl. --- .../kafka/server/share/SharePartition.java | 157 ++++++++++----- .../server/share/SharePartitionTest.java | 183 +++++++++++++----- 2 files changed, 244 insertions(+), 96 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 0bf3eb10b2726..637c8a1c9dc25 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -19,6 +19,7 @@ import kafka.server.ReplicaManager; import kafka.server.share.SharePartitionManager.SharePartitionListener; +import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; @@ -138,6 +139,16 @@ enum SharePartitionState { FENCED } + /** + * To provide static mapping between acknowledgement type bytes to RecordState. + */ + private static final Map ACK_TYPE_TO_RECORD_STATE = Map.of( + (byte) 0, RecordState.ARCHIVED, // Represents gap + AcknowledgeType.ACCEPT.id, RecordState.ACKNOWLEDGED, + AcknowledgeType.RELEASE.id, RecordState.AVAILABLE, + AcknowledgeType.REJECT.id, RecordState.ARCHIVED + ); + /** * The group id of the share partition belongs to. */ @@ -916,9 +927,9 @@ public CompletableFuture acknowledge( for (ShareAcknowledgementBatch batch : acknowledgementBatches) { // Client can either send a single entry in acknowledgeTypes which represents the state // of the complete batch or can send individual offsets state. - Map recordStateMap; + Map ackTypeMap; try { - recordStateMap = fetchRecordStateMapForAcknowledgementBatch(batch); + ackTypeMap = fetchAckTypeMapForBatch(batch); } catch (IllegalArgumentException e) { log.debug("Invalid acknowledge type: {} for share partition: {}-{}", batch.acknowledgeTypes(), groupId, topicIdPartition); @@ -946,7 +957,7 @@ public CompletableFuture acknowledge( Optional ackThrowable = acknowledgeBatchRecords( memberId, batch, - recordStateMap, + ackTypeMap, subMap, persisterBatches ); @@ -1856,26 +1867,21 @@ private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batch return batchFirstOffset < localStartOffset && batchLastOffset >= localStartOffset; } - private Map fetchRecordStateMapForAcknowledgementBatch( - ShareAcknowledgementBatch batch) { + // Visibility for test + static Map fetchAckTypeMapForBatch(ShareAcknowledgementBatch batch) { // Client can either send a single entry in acknowledgeTypes which represents the state // of the complete batch or can send individual offsets state. Construct a map with record state // for each offset in the batch, if single acknowledge type is sent, the map will have only one entry. - Map recordStateMap = new HashMap<>(); + Map ackTypeMap = new HashMap<>(); for (int index = 0; index < batch.acknowledgeTypes().size(); index++) { - recordStateMap.put(batch.firstOffset() + index, - fetchRecordState(batch.acknowledgeTypes().get(index))); + byte ackType = batch.acknowledgeTypes().get(index); + // Validate + if (ackType != 0) { + AcknowledgeType.forId(ackType); + } + ackTypeMap.put(batch.firstOffset() + index, ackType); } - return recordStateMap; - } - - private static RecordState fetchRecordState(byte acknowledgeType) { - return switch (acknowledgeType) { - case 1 /* ACCEPT */ -> RecordState.ACKNOWLEDGED; - case 2 /* RELEASE */ -> RecordState.AVAILABLE; - case 3, 0 /* REJECT / GAP */ -> RecordState.ARCHIVED; - default -> throw new IllegalArgumentException("Invalid acknowledge type: " + acknowledgeType); - }; + return ackTypeMap; } private NavigableMap fetchSubMapForAcknowledgementBatch( @@ -1930,7 +1936,7 @@ private NavigableMap fetchSubMapForAcknowledgementBatch( private Optional acknowledgeBatchRecords( String memberId, ShareAcknowledgementBatch batch, - Map recordStateMap, + Map ackTypeMap, NavigableMap subMap, List persisterBatches ) { @@ -1994,11 +2000,11 @@ private Optional acknowledgeBatchRecords( } throwable = acknowledgePerOffsetBatchRecords(memberId, batch, inFlightBatch, - recordStateMap, persisterBatches); + ackTypeMap, persisterBatches); } else { // The in-flight batch is a full match hence change the state of the complete batch. throwable = acknowledgeCompleteBatch(batch, inFlightBatch, - recordStateMap.get(batch.firstOffset()), persisterBatches); + ackTypeMap.get(batch.firstOffset()), persisterBatches, memberId); } if (throwable.isPresent()) { @@ -2034,14 +2040,11 @@ private Optional acknowledgePerOffsetBatchRecords( String memberId, ShareAcknowledgementBatch batch, InFlightBatch inFlightBatch, - Map recordStateMap, + Map ackTypeMap, List persisterBatches ) { lock.writeLock().lock(); try { - // Fetch the first record state from the map to be used as default record state in case the - // offset record state is not provided by client. - RecordState recordStateDefault = recordStateMap.get(batch.firstOffset()); for (Map.Entry offsetState : inFlightBatch.offsetState().entrySet()) { // 1. For the first batch which might have offsets prior to the request base @@ -2081,31 +2084,50 @@ private Optional acknowledgePerOffsetBatchRecords( new InvalidRecordStateException("Member is not the owner of offset")); } - // Determine the record state for the offset. If the per offset record state is not provided - // by the client, then use the batch record state. - RecordState recordState = - recordStateMap.size() > 1 ? recordStateMap.get(offsetState.getKey()) : - recordStateDefault; - InFlightState updateResult = offsetState.getValue().startStateTransition( - recordState, - DeliveryCountOps.NO_OP, - this.maxDeliveryCount, - EMPTY_MEMBER_ID - ); - if (updateResult == null) { - log.debug("Unable to acknowledge records for the offset: {} in batch: {}" - + " for the share partition: {}-{}", offsetState.getKey(), - inFlightBatch, groupId, topicIdPartition); - return Optional.of(new InvalidRecordStateException( - "Unable to acknowledge records for the batch")); - } - // Successfully updated the state of the offset and created a persister state batch for write to persister. - persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(offsetState.getKey(), - offsetState.getKey(), updateResult.state().id(), (short) updateResult.deliveryCount()))); - if (isStateTerminal(updateResult.state())) { - inFlightTerminalRecords.incrementAndGet(); + // In case of 0 size ackTypeMap, we have already validated the batch.acknowledgeTypes. + byte ackType = ackTypeMap.size() > 1 ? ackTypeMap.get(offsetState.getKey()) : batch.acknowledgeTypes().get(0); + + if (ackType == AcknowledgeType.RENEW.id) { + // If RENEW, renew the acquisition lock timer for this offset and continue without changing state. + // We do not care about recordState map here. + // Only valid for ACQUIRED offsets; the check above ensures this. + long key = offsetState.getKey(); + InFlightState state = offsetState.getValue(); + log.debug("Renewing acq lock for {}-{} with offsets {}-{} for member {}.", + groupId, topicIdPartition, key, key, memberId); + state.cancelAndClearAcquisitionLockTimeoutTask(); + AcquisitionLockTimerTask renewalTask = scheduleAcquisitionLockTimeout(memberId, key, key); + state.updateAcquisitionLockTimeoutTask(renewalTask); + } else { + // Determine the record state for the offset. If the per offset record state is not provided + // by the client, then use the batch record state. This will always be present as it is a static + // mapping between bytes and record state type. All ack types have been added except for RENEW which + // has been handled above. + RecordState recordState = ACK_TYPE_TO_RECORD_STATE.get(ackType); + Objects.requireNonNull(recordState); + + InFlightState updateResult = offsetState.getValue().startStateTransition( + recordState, + DeliveryCountOps.NO_OP, + this.maxDeliveryCount, + EMPTY_MEMBER_ID + ); + + if (updateResult == null) { + log.debug("Unable to acknowledge records for the offset: {} in batch: {}" + + " for the share partition: {}-{}", offsetState.getKey(), + inFlightBatch, groupId, topicIdPartition); + return Optional.of(new InvalidRecordStateException( + "Unable to acknowledge records for the batch")); + } + // Successfully updated the state of the offset and created a persister state batch for write to persister. + persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(offsetState.getKey(), + offsetState.getKey(), updateResult.state().id(), (short) updateResult.deliveryCount()))); + if (isStateTerminal(updateResult.state())) { + inFlightTerminalRecords.incrementAndGet(); + } + // Do not update the nextFetchOffset as the offset has not completed the transition yet. } - // Do not update the nextFetchOffset as the offset has not completed the transition yet. } } finally { lock.writeLock().unlock(); @@ -2116,8 +2138,9 @@ private Optional acknowledgePerOffsetBatchRecords( private Optional acknowledgeCompleteBatch( ShareAcknowledgementBatch batch, InFlightBatch inFlightBatch, - RecordState recordState, - List persisterBatches + byte ackType, + List persisterBatches, + String memberId ) { lock.writeLock().lock(); try { @@ -2131,10 +2154,35 @@ private Optional acknowledgeCompleteBatch( "The batch cannot be acknowledged. The batch is not in the acquired state.")); } + // If the request is a full-batch RENEW acknowledgement (ack type 4), then renew the + // acquisition lock without changing the state or persisting anything. + // Before reaching this point, it should be verified that it is full batch ack and + // not per offset ack as well as startOffset not moved. + if (ackType == AcknowledgeType.RENEW.id) { + if (inFlightBatch.batchState() != RecordState.ACQUIRED) { + log.debug("The batch with RENEW ack is not in the acquired state: {} for share partition: {}-{}", + inFlightBatch, groupId, topicIdPartition); + return Optional.of(new InvalidRecordStateException( + "The batch cannot be RENEW acknowledged. The batch is not in the acquired state.")); + } + // Renew the acquisition lock timer for the complete batch. + log.debug("Renewing acq lock for {}-{} with offsets {}-{} for member {}.", + groupId, topicIdPartition, inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), memberId); + inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask(); + AcquisitionLockTimerTask renewalTask = scheduleAcquisitionLockTimeout(memberId, + inFlightBatch.firstOffset(), inFlightBatch.lastOffset()); + inFlightBatch.updateAcquisitionLockTimeout(renewalTask); + // Nothing to persist. + return Optional.empty(); + } + // Change the state of complete batch since the same state exists for the entire inFlight batch. - // The member id is reset to EMPTY_MEMBER_ID irrespective of the acknowledge type as the batch is + // The member id is reset to EMPTY_MEMBER_ID irrespective of the ack type as the batch is // either released or moved to a state where member id existence is not important. The member id // is only important when the batch is acquired. + RecordState recordState = ACK_TYPE_TO_RECORD_STATE.get(ackType); + Objects.requireNonNull(recordState); + InFlightState updateResult = inFlightBatch.startBatchStateTransition( recordState, DeliveryCountOps.NO_OP, @@ -3121,4 +3169,9 @@ private record LastOffsetAndMaxRecords( long lastOffset, int maxRecords ) { } + + // Visibility for testing + static Map ackTypeToRecordStateMapping() { + return ACK_TYPE_TO_RECORD_STATE; + } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index eeb8ce3002e95..e59c0c1359174 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -56,6 +56,7 @@ import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; +import org.apache.kafka.server.share.fetch.InFlightBatch; import org.apache.kafka.server.share.fetch.InFlightState; import org.apache.kafka.server.share.fetch.RecordState; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; @@ -99,9 +100,11 @@ import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder; import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.yammerMetricValue; import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -187,9 +190,9 @@ public void testMaybeInitialize() throws InterruptedException { // inFlightTerminalRecords is incremented by the number of ACKNOWLEDGED and ARCHIVED records in readState result. assertEquals(5, sharePartition.inFlightTerminalRecords()); - TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2, + waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2, "In-flight batch count should be 2."); - TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 11, + waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 11, "In-flight message count should be 11."); assertEquals(11, sharePartitionMetrics.inFlightBatchMessageCount().sum()); assertEquals(2, sharePartitionMetrics.inFlightBatchMessageCount().count()); @@ -360,9 +363,9 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsByDuration() assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch()); assertEquals(0, sharePartition.inFlightTerminalRecords()); - TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 0, + waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 0, "In-flight batch count should be 0."); - TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 0, + waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 0, "In-flight message count should be 0."); } @@ -1811,9 +1814,9 @@ public void testAcquireSingleRecord() throws InterruptedException { // inFlightTerminalRecords will not be changed because no record went to a Terminal state. assertEquals(0, sharePartition.inFlightTerminalRecords()); - TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, "In-flight batch count should be 1."); - TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 1, + waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 1, "In-flight message count should be 1."); assertEquals(1, sharePartitionMetrics.inFlightBatchMessageCount().sum()); } @@ -1841,9 +1844,9 @@ public void testAcquireMultipleRecords() throws InterruptedException { // inFlightTerminalRecords will not be changed because no record went to a Terminal state. assertEquals(0, sharePartition.inFlightTerminalRecords()); - TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, "In-flight batch count should be 1."); - TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 5, + waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 5, "In-flight message count should be 5."); assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().sum()); } @@ -1934,9 +1937,9 @@ public void testAcquireWithMultipleBatchesAndMaxFetchRecords() throws Interrupte assertEquals(1, sharePartition.cachedState().get(10L).batchDeliveryCount()); assertNull(sharePartition.cachedState().get(10L).offsetState()); - TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, "In-flight batch count should be 1."); - TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 20, + waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 20, "In-flight message count should be 20."); assertEquals(20, sharePartitionMetrics.inFlightBatchMessageCount().sum()); } @@ -2180,9 +2183,9 @@ public void testAcquireWithBatchSizeAndEndOffsetLargerThanBatchFirstOffset() assertTrue(sharePartition.cachedState().containsKey(4L)); assertTrue(sharePartition.cachedState().containsKey(10L)); - TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2, + waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2, "In-flight batch count should be 2."); - TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 13, + waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 13, "In-flight message count should be 13."); assertEquals(13, sharePartitionMetrics.inFlightBatchMessageCount().sum()); assertEquals(2, sharePartitionMetrics.inFlightBatchMessageCount().count()); @@ -3938,7 +3941,7 @@ public void testAcquisitionLockForAcquiringSingleRecord() throws InterruptedExce // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.nextFetchOffset() == 0 && sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 && @@ -3969,7 +3972,7 @@ public void testAcquisitionLockForAcquiringMultipleRecords() throws InterruptedE // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 10 && sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE @@ -4008,7 +4011,7 @@ public void testAcquisitionLockForAcquiringMultipleRecordsWithOverlapAndNewBatch // Allowing acquisition lock to expire. The acquisition lock timeout will cause release of records for all the acquired records. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && @@ -4040,7 +4043,7 @@ public void testAcquisitionLockForAcquiringSameBatchAgain() throws InterruptedEx // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 10 && sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE, @@ -4081,7 +4084,7 @@ public void testAcquisitionLockOnAcknowledgingSingleRecordBatch() throws Interru // Allowing acquisition lock to expire. This will not cause any change to cached state map since the batch is already acknowledged. // Hence, the acquisition lock timeout task would be cancelled already. - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && @@ -4112,7 +4115,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleRecordBatch() throws Inter // Allowing acquisition lock to expire. This will not cause any change to cached state map since the batch is already acknowledged. // Hence, the acquisition lock timeout task would be cancelled already. - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE && @@ -4166,7 +4169,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleRecordBatchWithGapOffsets( // Allowing acquisition lock to expire. The acquisition lock timeout will cause release of records for batch with starting offset 1. // Since, other records have been acknowledged. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 1 && sharePartition.cachedState().get(1L).batchAcquisitionLockTimeoutTask() == null && @@ -4196,7 +4199,7 @@ public void testAcquisitionLockForAcquiringSubsetBatchAgain() throws Interrupted // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 10 && sharePartition.cachedState().size() == 1 && @@ -4223,7 +4226,7 @@ public void testAcquisitionLockForAcquiringSubsetBatchAgain() throws Interrupted // Allowing acquisition lock to expire for the acquired subset batch. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(10L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); @@ -4307,7 +4310,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleSubsetRecordBatchWithGapOf // Allowing acquisition lock to expire for the offsets that have not been acknowledged yet. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> { Map expectedOffsetStateMap1 = new HashMap<>(); expectedOffsetStateMap1.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); @@ -4371,7 +4374,7 @@ public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws Inter // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE && @@ -4391,7 +4394,7 @@ public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws Inter // Allowing acquisition lock to expire to archive the records that reach max delivery count. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && // After the second delivery attempt fails to acknowledge the record correctly, the record should be archived. @@ -4419,7 +4422,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedE // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && @@ -4445,7 +4448,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedE // Allowing acquisition lock to expire to archive the records that reach max delivery count. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(0L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); @@ -4497,7 +4500,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState() // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && @@ -4514,7 +4517,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState() // Allowing acquisition lock to expire to archive the records that reach max delivery count. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && // After the second failed attempt to acknowledge the record batch successfully, the record batch is archived. // Since this is the first batch in the share partition, SPSO moves forward and the cachedState is cleared @@ -4539,7 +4542,7 @@ public void testAcknowledgeAfterAcquisitionLockTimeout() throws InterruptedExcep // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE && @@ -4604,7 +4607,7 @@ public void testAcquisitionLockAfterDifferentAcknowledges() throws InterruptedEx // Allowing acquisition lock to expire will only affect the offsets that have not been acknowledged yet. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> { // Check cached state. Map expectedOffsetStateMap = new HashMap<>(); @@ -4651,7 +4654,7 @@ public void testAcquisitionLockOnBatchWithWriteShareGroupStateFailure() throws I // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && sharePartition.cachedState().size() == 1 && @@ -4688,7 +4691,7 @@ public void testInFlightTerminalRecordsOnLockExpiryAndWriteFailureOnBatchLastDel // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && sharePartition.cachedState().size() == 2 && @@ -4712,7 +4715,7 @@ public void testInFlightTerminalRecordsOnLockExpiryAndWriteFailureOnBatchLastDel // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && sharePartition.cachedState().size() == 2 && @@ -4759,7 +4762,7 @@ public void testInFlightTerminalRecordsOnLockExpiryAndWriteFailureOnOffsetLastDe // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); @@ -4795,7 +4798,7 @@ public void testInFlightTerminalRecordsOnLockExpiryAndWriteFailureOnOffsetLastDe // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); @@ -4845,7 +4848,7 @@ public void testAcquisitionLockOnOffsetWithWriteShareGroupStateFailure() throws // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); @@ -6879,7 +6882,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovement() throws // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> { Map expectedOffsetStateMap1 = new HashMap<>(); expectedOffsetStateMap1.put(5L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); @@ -6941,7 +6944,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovementToStartOf // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.cachedState().get(5L).batchMemberId().equals(EMPTY_MEMBER_ID) && sharePartition.cachedState().get(5L).batchState() == RecordState.ARCHIVED && sharePartition.cachedState().get(10L).batchMemberId().equals(EMPTY_MEMBER_ID) && @@ -6974,7 +6977,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovementToMiddleO // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); @@ -7179,7 +7182,7 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledge() throws In // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.nextFetchOffset() == 7 && sharePartition.cachedState().isEmpty() && sharePartition.startOffset() == 7 && sharePartition.endOffset() == 7, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, @@ -7234,7 +7237,7 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledgeBatchLastOff // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.nextFetchOffset() == 3 && sharePartition.cachedState().isEmpty() && sharePartition.startOffset() == 3 && sharePartition.endOffset() == 3, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, @@ -9745,7 +9748,7 @@ public void testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws Interrup // Allowing acquisition lock to expire. This will not cause any change because the record is not in ACQUIRED state. // This will remove the entry of the timer task from timer. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.cachedState().get(0L).batchState() == RecordState.ACKNOWLEDGED && sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 && sharePartition.timer().size() == 0, @@ -9832,7 +9835,7 @@ public void testRecordArchivedWithWriteStateRPCFailure() throws InterruptedExcep // Allowing acquisition lock to expire. This will also ensure that acquisition lock timeout task // is run successfully post write state RPC failure. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.cachedState().get(2L).offsetState().get(3L).state() == RecordState.AVAILABLE && sharePartition.cachedState().get(7L).batchState() == RecordState.AVAILABLE && sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount() == 1 && @@ -9861,7 +9864,7 @@ public void testRecordArchivedWithWriteStateRPCFailure() throws InterruptedExcep // Verify the timer tasks have run and the state is archived for the offsets which are not acknowledged, // but the acquisition lock timeout task should be just expired for acknowledged offsets, though // the state should not be archived. - TestUtils.waitForCondition( + waitForCondition( () -> sharePartition.cachedState().get(2L).offsetState().get(2L).state() == RecordState.ARCHIVED && sharePartition.cachedState().get(2L).offsetState().get(3L).state() == RecordState.ACKNOWLEDGED && sharePartition.cachedState().get(2L).offsetState().get(3L).acquisitionLockTimeoutTask().hasExpired() && @@ -9893,6 +9896,98 @@ public void testRecordArchivedWithWriteStateRPCFailure() throws InterruptedExcep assertEquals(5, sharePartition.inFlightTerminalRecords()); } + @Test + public void testAckTypeToRecordStateMapping() { + // This test will help catch bugs if the map changes. + Map actualMap = SharePartition.ackTypeToRecordStateMapping(); + assertEquals(4, actualMap.size()); + + Map expected = Map.of( + (byte) 0, RecordState.ARCHIVED, + AcknowledgeType.ACCEPT.id, RecordState.ACKNOWLEDGED, + AcknowledgeType.RELEASE.id, RecordState.AVAILABLE, + AcknowledgeType.REJECT.id, RecordState.ARCHIVED + ); + + for (byte key : expected.keySet()) { + assertEquals(expected.get(key), actualMap.get(key)); + } + } + + @Test + public void testFetchAckTypeMapForBatch() { + ShareAcknowledgementBatch batch = mock(ShareAcknowledgementBatch.class); + when(batch.acknowledgeTypes()).thenReturn(List.of((byte) -1)); + assertThrows(IllegalArgumentException.class, () -> SharePartition.fetchAckTypeMapForBatch(batch)); + } + + @Test + public void testRenewAcknowledgeWithCompleteBatchAck() throws InterruptedException { + Persister persister = Mockito.mock(Persister.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withMaxDeliveryCount(2) + .withPersister(persister) + .build(); + + List records = fetchAcquiredRecords(sharePartition, memoryRecords(0, 1), 1); + assertEquals(1, records.size()); + assertEquals(records.get(0).firstOffset(), records.get(0).lastOffset()); + assertEquals(1, sharePartition.cachedState().size()); + InFlightBatch batch = sharePartition.cachedState().get(0L); + AcquisitionLockTimerTask taskOrig = batch.batchAcquisitionLockTimeoutTask(); + + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 0, List.of(AcknowledgeType.RENEW.id)))); + assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled. + assertNotEquals(taskOrig, batch.batchAcquisitionLockTimeoutTask()); // Lock changes. + Mockito.verify(persister, Mockito.times(0)).writeState(Mockito.any()); // No persister call. + } + + @Test + public void testRenewAcknowledgeWithPerOffsetAck() throws InterruptedException { + Persister persister = Mockito.mock(Persister.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withMaxDeliveryCount(2) + .withPersister(persister) + .build(); + + List records = fetchAcquiredRecords(sharePartition, memoryRecords(0, 2), 2); + assertEquals(1, records.size()); + assertEquals(records.get(0).firstOffset() + 1, records.get(0).lastOffset()); + assertEquals(1, sharePartition.cachedState().size()); + InFlightBatch batch = sharePartition.cachedState().get(0L); + assertEquals(RecordState.ACQUIRED, batch.batchState()); + AcquisitionLockTimerTask taskOrig = batch.batchAcquisitionLockTimeoutTask(); + + // For ACCEPT ack call. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + + when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); + + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 1, + List.of(AcknowledgeType.RENEW.id, AcknowledgeType.ACCEPT.id)))); + + assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled. + assertNotEquals(taskOrig, sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask()); + waitForCondition(() -> sharePartition.cachedState().get(0L).offsetState() != null, "offset state not populated"); + + InFlightState offset0 = sharePartition.cachedState().get(0L).offsetState().get(0L); + InFlightState offset1 = sharePartition.cachedState().get(0L).offsetState().get(1L); + assertEquals(RecordState.ACQUIRED, offset0.state()); + assertNotNull(offset0.acquisitionLockTimeoutTask()); + + assertEquals(RecordState.ACKNOWLEDGED, offset1.state()); + assertNull(offset1.acquisitionLockTimeoutTask()); + + Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); + } + /** * This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT). */ From f3bf5d84cc0ef0c074a413c442ae229a4abfc8fb Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Mon, 3 Nov 2025 17:09:02 +0530 Subject: [PATCH 2/9] remove unnecessary cleanup. --- .../server/share/SharePartitionTest.java | 91 +++++++++---------- 1 file changed, 45 insertions(+), 46 deletions(-) diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index e59c0c1359174..964dbc105beb8 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -100,7 +100,6 @@ import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder; import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.yammerMetricValue; import static org.apache.kafka.test.TestUtils.assertFutureThrows; -import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -190,9 +189,9 @@ public void testMaybeInitialize() throws InterruptedException { // inFlightTerminalRecords is incremented by the number of ACKNOWLEDGED and ARCHIVED records in readState result. assertEquals(5, sharePartition.inFlightTerminalRecords()); - waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2, + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2, "In-flight batch count should be 2."); - waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 11, + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 11, "In-flight message count should be 11."); assertEquals(11, sharePartitionMetrics.inFlightBatchMessageCount().sum()); assertEquals(2, sharePartitionMetrics.inFlightBatchMessageCount().count()); @@ -363,9 +362,9 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsByDuration() assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch()); assertEquals(0, sharePartition.inFlightTerminalRecords()); - waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 0, + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 0, "In-flight batch count should be 0."); - waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 0, + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 0, "In-flight message count should be 0."); } @@ -1814,9 +1813,9 @@ public void testAcquireSingleRecord() throws InterruptedException { // inFlightTerminalRecords will not be changed because no record went to a Terminal state. assertEquals(0, sharePartition.inFlightTerminalRecords()); - waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, "In-flight batch count should be 1."); - waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 1, + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 1, "In-flight message count should be 1."); assertEquals(1, sharePartitionMetrics.inFlightBatchMessageCount().sum()); } @@ -1844,9 +1843,9 @@ public void testAcquireMultipleRecords() throws InterruptedException { // inFlightTerminalRecords will not be changed because no record went to a Terminal state. assertEquals(0, sharePartition.inFlightTerminalRecords()); - waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, "In-flight batch count should be 1."); - waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 5, + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 5, "In-flight message count should be 5."); assertEquals(5, sharePartitionMetrics.inFlightBatchMessageCount().sum()); } @@ -1937,9 +1936,9 @@ public void testAcquireWithMultipleBatchesAndMaxFetchRecords() throws Interrupte assertEquals(1, sharePartition.cachedState().get(10L).batchDeliveryCount()); assertNull(sharePartition.cachedState().get(10L).offsetState()); - waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1, "In-flight batch count should be 1."); - waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 20, + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 20, "In-flight message count should be 20."); assertEquals(20, sharePartitionMetrics.inFlightBatchMessageCount().sum()); } @@ -2183,9 +2182,9 @@ public void testAcquireWithBatchSizeAndEndOffsetLargerThanBatchFirstOffset() assertTrue(sharePartition.cachedState().containsKey(4L)); assertTrue(sharePartition.cachedState().containsKey(10L)); - waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2, + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2, "In-flight batch count should be 2."); - waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 13, + TestUtils.waitForCondition(() -> yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 13, "In-flight message count should be 13."); assertEquals(13, sharePartitionMetrics.inFlightBatchMessageCount().sum()); assertEquals(2, sharePartitionMetrics.inFlightBatchMessageCount().count()); @@ -3941,7 +3940,7 @@ public void testAcquisitionLockForAcquiringSingleRecord() throws InterruptedExce // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.nextFetchOffset() == 0 && sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 && @@ -3972,7 +3971,7 @@ public void testAcquisitionLockForAcquiringMultipleRecords() throws InterruptedE // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 10 && sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE @@ -4011,7 +4010,7 @@ public void testAcquisitionLockForAcquiringMultipleRecordsWithOverlapAndNewBatch // Allowing acquisition lock to expire. The acquisition lock timeout will cause release of records for all the acquired records. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && @@ -4043,7 +4042,7 @@ public void testAcquisitionLockForAcquiringSameBatchAgain() throws InterruptedEx // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 10 && sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE, @@ -4084,7 +4083,7 @@ public void testAcquisitionLockOnAcknowledgingSingleRecordBatch() throws Interru // Allowing acquisition lock to expire. This will not cause any change to cached state map since the batch is already acknowledged. // Hence, the acquisition lock timeout task would be cancelled already. - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && @@ -4115,7 +4114,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleRecordBatch() throws Inter // Allowing acquisition lock to expire. This will not cause any change to cached state map since the batch is already acknowledged. // Hence, the acquisition lock timeout task would be cancelled already. - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE && @@ -4169,7 +4168,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleRecordBatchWithGapOffsets( // Allowing acquisition lock to expire. The acquisition lock timeout will cause release of records for batch with starting offset 1. // Since, other records have been acknowledged. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 1 && sharePartition.cachedState().get(1L).batchAcquisitionLockTimeoutTask() == null && @@ -4199,7 +4198,7 @@ public void testAcquisitionLockForAcquiringSubsetBatchAgain() throws Interrupted // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 10 && sharePartition.cachedState().size() == 1 && @@ -4226,7 +4225,7 @@ public void testAcquisitionLockForAcquiringSubsetBatchAgain() throws Interrupted // Allowing acquisition lock to expire for the acquired subset batch. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(10L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); @@ -4310,7 +4309,7 @@ public void testAcquisitionLockOnAcknowledgingMultipleSubsetRecordBatchWithGapOf // Allowing acquisition lock to expire for the offsets that have not been acknowledged yet. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap1 = new HashMap<>(); expectedOffsetStateMap1.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); @@ -4374,7 +4373,7 @@ public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws Inter // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && sharePartition.cachedState().get(10L).batchState() == RecordState.AVAILABLE && @@ -4394,7 +4393,7 @@ public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws Inter // Allowing acquisition lock to expire to archive the records that reach max delivery count. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && // After the second delivery attempt fails to acknowledge the record correctly, the record should be archived. @@ -4422,7 +4421,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedE // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && @@ -4448,7 +4447,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedE // Allowing acquisition lock to expire to archive the records that reach max delivery count. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(0L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); @@ -4500,7 +4499,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState() // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE && @@ -4517,7 +4516,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState() // Allowing acquisition lock to expire to archive the records that reach max delivery count. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && // After the second failed attempt to acknowledge the record batch successfully, the record batch is archived. // Since this is the first batch in the share partition, SPSO moves forward and the cachedState is cleared @@ -4542,7 +4541,7 @@ public void testAcknowledgeAfterAcquisitionLockTimeout() throws InterruptedExcep // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && sharePartition.cachedState().get(5L).batchState() == RecordState.AVAILABLE && @@ -4607,7 +4606,7 @@ public void testAcquisitionLockAfterDifferentAcknowledges() throws InterruptedEx // Allowing acquisition lock to expire will only affect the offsets that have not been acknowledged yet. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> { // Check cached state. Map expectedOffsetStateMap = new HashMap<>(); @@ -4654,7 +4653,7 @@ public void testAcquisitionLockOnBatchWithWriteShareGroupStateFailure() throws I // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && sharePartition.cachedState().size() == 1 && @@ -4691,7 +4690,7 @@ public void testInFlightTerminalRecordsOnLockExpiryAndWriteFailureOnBatchLastDel // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && sharePartition.cachedState().size() == 2 && @@ -4715,7 +4714,7 @@ public void testInFlightTerminalRecordsOnLockExpiryAndWriteFailureOnBatchLastDel // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && sharePartition.cachedState().size() == 2 && @@ -4762,7 +4761,7 @@ public void testInFlightTerminalRecordsOnLockExpiryAndWriteFailureOnOffsetLastDe // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); @@ -4798,7 +4797,7 @@ public void testInFlightTerminalRecordsOnLockExpiryAndWriteFailureOnOffsetLastDe // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); @@ -4848,7 +4847,7 @@ public void testAcquisitionLockOnOffsetWithWriteShareGroupStateFailure() throws // Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); @@ -6882,7 +6881,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovement() throws // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap1 = new HashMap<>(); expectedOffsetStateMap1.put(5L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); @@ -6944,7 +6943,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovementToStartOf // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.cachedState().get(5L).batchMemberId().equals(EMPTY_MEMBER_ID) && sharePartition.cachedState().get(5L).batchState() == RecordState.ARCHIVED && sharePartition.cachedState().get(10L).batchMemberId().equals(EMPTY_MEMBER_ID) && @@ -6977,7 +6976,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovementToMiddleO // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> { Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); @@ -7182,7 +7181,7 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledge() throws In // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.nextFetchOffset() == 7 && sharePartition.cachedState().isEmpty() && sharePartition.startOffset() == 7 && sharePartition.endOffset() == 7, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, @@ -7237,7 +7236,7 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledgeBatchLastOff // Allowing acquisition lock to expire. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.nextFetchOffset() == 3 && sharePartition.cachedState().isEmpty() && sharePartition.startOffset() == 3 && sharePartition.endOffset() == 3, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, @@ -9748,7 +9747,7 @@ public void testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws Interrup // Allowing acquisition lock to expire. This will not cause any change because the record is not in ACQUIRED state. // This will remove the entry of the timer task from timer. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.cachedState().get(0L).batchState() == RecordState.ACKNOWLEDGED && sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 && sharePartition.timer().size() == 0, @@ -9835,7 +9834,7 @@ public void testRecordArchivedWithWriteStateRPCFailure() throws InterruptedExcep // Allowing acquisition lock to expire. This will also ensure that acquisition lock timeout task // is run successfully post write state RPC failure. mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.cachedState().get(2L).offsetState().get(3L).state() == RecordState.AVAILABLE && sharePartition.cachedState().get(7L).batchState() == RecordState.AVAILABLE && sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount() == 1 && @@ -9864,7 +9863,7 @@ public void testRecordArchivedWithWriteStateRPCFailure() throws InterruptedExcep // Verify the timer tasks have run and the state is archived for the offsets which are not acknowledged, // but the acquisition lock timeout task should be just expired for acknowledged offsets, though // the state should not be archived. - waitForCondition( + TestUtils.waitForCondition( () -> sharePartition.cachedState().get(2L).offsetState().get(2L).state() == RecordState.ARCHIVED && sharePartition.cachedState().get(2L).offsetState().get(3L).state() == RecordState.ACKNOWLEDGED && sharePartition.cachedState().get(2L).offsetState().get(3L).acquisitionLockTimeoutTask().hasExpired() && @@ -9975,7 +9974,7 @@ public void testRenewAcknowledgeWithPerOffsetAck() throws InterruptedException { assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled. assertNotEquals(taskOrig, sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask()); - waitForCondition(() -> sharePartition.cachedState().get(0L).offsetState() != null, "offset state not populated"); + TestUtils.waitForCondition(() -> sharePartition.cachedState().get(0L).offsetState() != null, "offset state not populated"); InFlightState offset0 = sharePartition.cachedState().get(0L).offsetState().get(0L); InFlightState offset1 = sharePartition.cachedState().get(0L).offsetState().get(1L); From c1cfcc19373069891821f2bfc4c0a1e19bfdfb10 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Mon, 3 Nov 2025 18:44:28 +0530 Subject: [PATCH 3/9] inc comments --- .../src/main/java/kafka/server/share/SharePartition.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 637c8a1c9dc25..7b8954ef1d2d2 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -2159,13 +2159,8 @@ private Optional acknowledgeCompleteBatch( // Before reaching this point, it should be verified that it is full batch ack and // not per offset ack as well as startOffset not moved. if (ackType == AcknowledgeType.RENEW.id) { - if (inFlightBatch.batchState() != RecordState.ACQUIRED) { - log.debug("The batch with RENEW ack is not in the acquired state: {} for share partition: {}-{}", - inFlightBatch, groupId, topicIdPartition); - return Optional.of(new InvalidRecordStateException( - "The batch cannot be RENEW acknowledged. The batch is not in the acquired state.")); - } - // Renew the acquisition lock timer for the complete batch. + // Renew the acquisition lock timer for the complete batch. We have already + // checked that the batchState is ACQUIRED above. log.debug("Renewing acq lock for {}-{} with offsets {}-{} for member {}.", groupId, topicIdPartition, inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), memberId); inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask(); From fb2283aa089da0ba7d59560c37403a7549e76cec Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Mon, 3 Nov 2025 22:22:19 +0530 Subject: [PATCH 4/9] inc comments --- .../kafka/server/share/SharePartition.java | 6 +- .../server/share/SharePartitionTest.java | 159 +++++++++++++++++- 2 files changed, 161 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 7b8954ef1d2d2..f79d404605500 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -2093,8 +2093,8 @@ private Optional acknowledgePerOffsetBatchRecords( // Only valid for ACQUIRED offsets; the check above ensures this. long key = offsetState.getKey(); InFlightState state = offsetState.getValue(); - log.debug("Renewing acq lock for {}-{} with offsets {}-{} for member {}.", - groupId, topicIdPartition, key, key, memberId); + log.debug("Renewing acq lock for {}-{} with offset {} in batch {} for member {}.", + groupId, topicIdPartition, key, inFlightBatch, memberId); state.cancelAndClearAcquisitionLockTimeoutTask(); AcquisitionLockTimerTask renewalTask = scheduleAcquisitionLockTimeout(memberId, key, key); state.updateAcquisitionLockTimeoutTask(renewalTask); @@ -2161,7 +2161,7 @@ private Optional acknowledgeCompleteBatch( if (ackType == AcknowledgeType.RENEW.id) { // Renew the acquisition lock timer for the complete batch. We have already // checked that the batchState is ACQUIRED above. - log.debug("Renewing acq lock for {}-{} with offsets {}-{} for member {}.", + log.debug("Renewing acq lock for {}-{} with batch {}-{} for member {}.", groupId, topicIdPartition, inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), memberId); inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask(); AcquisitionLockTimerTask renewalTask = scheduleAcquisitionLockTimeout(memberId, diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 964dbc105beb8..fd76fd594c171 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -9974,7 +9974,7 @@ public void testRenewAcknowledgeWithPerOffsetAck() throws InterruptedException { assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled. assertNotEquals(taskOrig, sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask()); - TestUtils.waitForCondition(() -> sharePartition.cachedState().get(0L).offsetState() != null, "offset state not populated"); + assertNotNull(sharePartition.cachedState().get(0L).offsetState()); InFlightState offset0 = sharePartition.cachedState().get(0L).offsetState().get(0L); InFlightState offset1 = sharePartition.cachedState().get(0L).offsetState().get(1L); @@ -9987,6 +9987,163 @@ public void testRenewAcknowledgeWithPerOffsetAck() throws InterruptedException { Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); } + @Test + public void testLsoMovementWithBatchWRenewal() { + Persister persister = Mockito.mock(Persister.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withMaxDeliveryCount(2) + .withPersister(persister) + .build(); + + List records = fetchAcquiredRecords(sharePartition, memoryRecords(0, 10), 10); + assertEquals(1, records.size()); + assertNotEquals(records.get(0).firstOffset(), records.get(0).lastOffset()); + assertEquals(1, sharePartition.cachedState().size()); + InFlightBatch batch = sharePartition.cachedState().get(0L); + AcquisitionLockTimerTask taskOrig = batch.batchAcquisitionLockTimeoutTask(); + + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 9, List.of(AcknowledgeType.RENEW.id)))); + sharePartition.updateCacheAndOffsets(5); + + assertEquals(10, sharePartition.nextFetchOffset()); + assertEquals(5, sharePartition.startOffset()); + assertEquals(9, sharePartition.endOffset()); + assertEquals(1, sharePartition.cachedState().size()); + + assertEquals(MEMBER_ID, sharePartition.cachedState().get(0L).batchMemberId()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).batchState()); + + assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled. + assertNotEquals(taskOrig, batch.batchAcquisitionLockTimeoutTask()); // Lock changes. + Mockito.verify(persister, Mockito.times(0)).writeState(Mockito.any()); // No persister call. + } + + @Test + public void testLsoMovementWithPerOffsetRenewal() throws InterruptedException { + Persister persister = Mockito.mock(Persister.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withMaxDeliveryCount(2) + .withPersister(persister) + .build(); + + List records = fetchAcquiredRecords(sharePartition, memoryRecords(0, 5), 5); + assertEquals(1, records.size()); + assertEquals(records.get(0).firstOffset() + 4, records.get(0).lastOffset()); + assertEquals(1, sharePartition.cachedState().size()); + InFlightBatch batch = sharePartition.cachedState().get(0L); + assertEquals(RecordState.ACQUIRED, batch.batchState()); + AcquisitionLockTimerTask taskOrig = batch.batchAcquisitionLockTimeoutTask(); + + // For ACCEPT ack call. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + + when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); + + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 4, + List.of(AcknowledgeType.RENEW.id, AcknowledgeType.ACCEPT.id, AcknowledgeType.RENEW.id, AcknowledgeType.ACCEPT.id, AcknowledgeType.RENEW.id)))); + + sharePartition.updateCacheAndOffsets(3); + + assertEquals(5, sharePartition.nextFetchOffset()); + assertEquals(3, sharePartition.startOffset()); + assertEquals(4, sharePartition.endOffset()); + assertEquals(1, sharePartition.cachedState().size()); + + assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled. + assertNotEquals(taskOrig, sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(0L).offsetState()); + + InFlightState offset0 = sharePartition.cachedState().get(0L).offsetState().get(0L); + InFlightState offset1 = sharePartition.cachedState().get(0L).offsetState().get(1L); + InFlightState offset2 = sharePartition.cachedState().get(0L).offsetState().get(2L); + InFlightState offset3 = sharePartition.cachedState().get(0L).offsetState().get(3L); + InFlightState offset4 = sharePartition.cachedState().get(0L).offsetState().get(4L); + + assertEquals(RecordState.ACQUIRED, offset0.state()); + assertNotNull(offset0.acquisitionLockTimeoutTask()); + + assertEquals(RecordState.ACKNOWLEDGED, offset1.state()); + assertNull(offset1.acquisitionLockTimeoutTask()); + + assertEquals(RecordState.ACQUIRED, offset2.state()); + assertNotNull(offset2.acquisitionLockTimeoutTask()); + + assertEquals(RecordState.ACKNOWLEDGED, offset3.state()); + assertNull(offset3.acquisitionLockTimeoutTask()); + + assertEquals(RecordState.ACQUIRED, offset4.state()); + assertNotNull(offset4.acquisitionLockTimeoutTask()); + + Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); + } + + @Test + public void testRenewAcknowledgeWithPerOffsetAndBatchMix() { + Persister persister = Mockito.mock(Persister.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withMaxDeliveryCount(2) + .withPersister(persister) + .build(); + + // Batch + List recordsB = fetchAcquiredRecords(sharePartition, memoryRecords(0, 1), 1); + assertEquals(1, recordsB.size()); + assertEquals(recordsB.get(0).firstOffset(), recordsB.get(0).lastOffset()); + assertEquals(1, sharePartition.cachedState().size()); + InFlightBatch batchB = sharePartition.cachedState().get(0L); + AcquisitionLockTimerTask taskOrigB = batchB.batchAcquisitionLockTimeoutTask(); + + // Per offset + List recordsO = fetchAcquiredRecords(sharePartition, memoryRecords(1, 2), 2); + assertEquals(1, recordsO.size()); + assertEquals(recordsO.get(0).firstOffset() + 1, recordsO.get(0).lastOffset()); + assertEquals(2, sharePartition.cachedState().size()); + InFlightBatch batchO = sharePartition.cachedState().get(0L); + assertEquals(RecordState.ACQUIRED, batchO.batchState()); + AcquisitionLockTimerTask taskOrigO = batchO.batchAcquisitionLockTimeoutTask(); + + // For ACCEPT ack call. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + + when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); + + sharePartition.acknowledge(MEMBER_ID, List.of( + new ShareAcknowledgementBatch(0, 0, List.of(AcknowledgeType.RENEW.id)), + new ShareAcknowledgementBatch(1, 2, List.of(AcknowledgeType.RENEW.id, AcknowledgeType.ACCEPT.id)) + )); + + // Batch checks + assertTrue(taskOrigB.isCancelled()); // Original acq lock cancelled. + assertNotEquals(taskOrigB, batchB.batchAcquisitionLockTimeoutTask()); // Lock changes. + + // Per offset checks + assertTrue(taskOrigO.isCancelled()); // Original acq lock cancelled. + assertNotEquals(taskOrigO, sharePartition.cachedState().get(1L).offsetState().get(1L).acquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(1L).offsetState()); + + InFlightState offset1 = sharePartition.cachedState().get(1L).offsetState().get(1L); + InFlightState offset2 = sharePartition.cachedState().get(1L).offsetState().get(2L); + assertEquals(RecordState.ACQUIRED, offset1.state()); + assertNotNull(offset1.acquisitionLockTimeoutTask()); + + assertEquals(RecordState.ACKNOWLEDGED, offset2.state()); + assertNull(offset2.acquisitionLockTimeoutTask()); + + Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); + } + /** * This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT). */ From 5dcfd340c92dc170c7b2c59c461e9a5ab1e4883d Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 4 Nov 2025 12:03:53 +0530 Subject: [PATCH 5/9] enriched tests --- .../server/share/SharePartitionTest.java | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index fd76fd594c171..97ad02fd28996 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -9940,7 +9940,23 @@ public void testRenewAcknowledgeWithCompleteBatchAck() throws InterruptedExcepti sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 0, List.of(AcknowledgeType.RENEW.id)))); assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled. assertNotEquals(taskOrig, batch.batchAcquisitionLockTimeoutTask()); // Lock changes. + assertEquals(1, sharePartition.timer().size()); // Timer jobs + assertEquals(RecordState.ACQUIRED, batch.batchState()); Mockito.verify(persister, Mockito.times(0)).writeState(Mockito.any()); // No persister call. + + // Expire timer + // On expiration state will transition to AVAILABLE resulting in persister write RPC + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); + + mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1); // Trigger expire + + assertNull(batch.batchAcquisitionLockTimeoutTask()); + assertEquals(RecordState.AVAILABLE, batch.batchState()); // Verify batch record state + Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); // 1 persister call. } @Test @@ -9980,15 +9996,23 @@ public void testRenewAcknowledgeWithPerOffsetAck() throws InterruptedException { InFlightState offset1 = sharePartition.cachedState().get(0L).offsetState().get(1L); assertEquals(RecordState.ACQUIRED, offset0.state()); assertNotNull(offset0.acquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); // Timer jobs assertEquals(RecordState.ACKNOWLEDGED, offset1.state()); assertNull(offset1.acquisitionLockTimeoutTask()); Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); + + // Expire timer + mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1); // Trigger expire + + assertNull(offset0.acquisitionLockTimeoutTask()); + assertEquals(RecordState.AVAILABLE, offset0.state()); // Verify batch record state + Mockito.verify(persister, Mockito.times(2)).writeState(Mockito.any()); // 1 more persister call. } @Test - public void testLsoMovementWithBatchWRenewal() { + public void testLsoMovementWithBatchRenewal() { Persister persister = Mockito.mock(Persister.class); SharePartition sharePartition = SharePartitionBuilder.builder() .withState(SharePartitionState.ACTIVE) @@ -10017,11 +10041,12 @@ public void testLsoMovementWithBatchWRenewal() { assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled. assertNotEquals(taskOrig, batch.batchAcquisitionLockTimeoutTask()); // Lock changes. + assertEquals(1, sharePartition.timer().size()); // Timer jobs Mockito.verify(persister, Mockito.times(0)).writeState(Mockito.any()); // No persister call. } @Test - public void testLsoMovementWithPerOffsetRenewal() throws InterruptedException { + public void testLsoMovementWithPerOffsetRenewal() { Persister persister = Mockito.mock(Persister.class); SharePartition sharePartition = SharePartitionBuilder.builder() .withState(SharePartitionState.ACTIVE) @@ -10081,6 +10106,8 @@ public void testLsoMovementWithPerOffsetRenewal() throws InterruptedException { assertEquals(RecordState.ACQUIRED, offset4.state()); assertNotNull(offset4.acquisitionLockTimeoutTask()); + assertEquals(3, sharePartition.timer().size()); // Timer jobs - 3 because the renewed offsets are non-contiguous. + Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); } @@ -10141,6 +10168,7 @@ public void testRenewAcknowledgeWithPerOffsetAndBatchMix() { assertEquals(RecordState.ACKNOWLEDGED, offset2.state()); assertNull(offset2.acquisitionLockTimeoutTask()); + assertEquals(2, sharePartition.timer().size()); // Timer jobs one for batch and one for single renewal in per offset. Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); } From 91a87a9093a330ad0919fa14dfa3ecb85e8b5eda Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 4 Nov 2025 12:40:28 +0530 Subject: [PATCH 6/9] add test to renew expired batch --- .../server/share/SharePartitionTest.java | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index a411f06c7506c..c5f5b5b04ca68 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -103,11 +103,13 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -9959,6 +9961,51 @@ public void testRenewAcknowledgeWithCompleteBatchAck() throws InterruptedExcepti Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); // 1 persister call. } + @Test + public void testRenewAcknowledgeOnExpiredBatch() throws InterruptedException { + Persister persister = Mockito.mock(Persister.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withMaxDeliveryCount(2) + .withPersister(persister) + .build(); + + List records = fetchAcquiredRecords(sharePartition, memoryRecords(0, 1), 1); + assertEquals(1, records.size()); + assertEquals(records.get(0).firstOffset(), records.get(0).lastOffset()); + assertEquals(1, sharePartition.cachedState().size()); + InFlightBatch batch = sharePartition.cachedState().get(0L); + AcquisitionLockTimerTask taskOrig = batch.batchAcquisitionLockTimeoutTask(); + + // Expire acq lock timeout. + // Persister mocking for recordState transition. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + + when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); + + mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1); + TestUtils.waitForCondition(() -> batch.batchAcquisitionLockTimeoutTask() == null, "Acq lock timeout not cancelled."); + CompletableFuture future = sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 0, List.of(AcknowledgeType.RENEW.id)))); + + assertTrue(future.isCompletedExceptionally()); + try { + future.get(); + fail("No exception thrown"); + } catch (Exception e) { + assertNotNull(e); + assertInstanceOf(InvalidRecordStateException.class, e.getCause()); + } + assertTrue(taskOrig.isCancelled()); // Original acq lock cancelled. + assertNotEquals(taskOrig, batch.batchAcquisitionLockTimeoutTask()); // Lock changes. + assertEquals(0, sharePartition.timer().size()); // Timer jobs + assertEquals(RecordState.AVAILABLE, batch.batchState()); + Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); // No persister call. + } + @Test public void testRenewAcknowledgeWithPerOffsetAck() throws InterruptedException { Persister persister = Mockito.mock(Persister.class); From df0e9b76a99a5b4522bc5347a6165d1be16fbf5b Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 4 Nov 2025 17:03:18 +0530 Subject: [PATCH 7/9] inc comments --- .../kafka/server/share/SharePartitionTest.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index c5f5b5b04ca68..c2fa5d0f86a39 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -9958,6 +9958,7 @@ public void testRenewAcknowledgeWithCompleteBatchAck() throws InterruptedExcepti assertNull(batch.batchAcquisitionLockTimeoutTask()); assertEquals(RecordState.AVAILABLE, batch.batchState()); // Verify batch record state + assertEquals(0, sharePartition.timer().size()); // Timer jobs Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); // 1 persister call. } @@ -10055,6 +10056,7 @@ public void testRenewAcknowledgeWithPerOffsetAck() throws InterruptedException { assertNull(offset0.acquisitionLockTimeoutTask()); assertEquals(RecordState.AVAILABLE, offset0.state()); // Verify batch record state + assertEquals(0, sharePartition.timer().size()); // Timer jobs Mockito.verify(persister, Mockito.times(2)).writeState(Mockito.any()); // 1 more persister call. } @@ -10093,7 +10095,7 @@ public void testLsoMovementWithBatchRenewal() { } @Test - public void testLsoMovementWithPerOffsetRenewal() { + public void testLsoMovementWithPerOffsetRenewal() throws InterruptedException { Persister persister = Mockito.mock(Persister.class); SharePartition sharePartition = SharePartitionBuilder.builder() .withState(SharePartitionState.ACTIVE) @@ -10155,7 +10157,18 @@ public void testLsoMovementWithPerOffsetRenewal() { assertEquals(3, sharePartition.timer().size()); // Timer jobs - 3 because the renewed offsets are non-contiguous. - Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); + // Expire timer + mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1); // Trigger expire + List expectedStates = List.of(RecordState.ARCHIVED, RecordState.ACKNOWLEDGED, RecordState.AVAILABLE, RecordState.ACKNOWLEDGED, RecordState.AVAILABLE); + for (long i = 0; i <= 4; i++) { + InFlightState offset = sharePartition.cachedState().get(0L).offsetState().get(i); + assertNull(offset.acquisitionLockTimeoutTask()); + assertEquals(expectedStates.get((int) i), offset.state()); + } + + assertEquals(0, sharePartition.timer().size()); // Timer jobs + + Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any()); } @Test From bbdd9db0b83a3e9ee18a30b608474ac427c266f7 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 4 Nov 2025 17:33:03 +0530 Subject: [PATCH 8/9] update comment --- core/src/test/java/kafka/server/share/SharePartitionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index c2fa5d0f86a39..49487582d0856 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -10004,7 +10004,7 @@ public void testRenewAcknowledgeOnExpiredBatch() throws InterruptedException { assertNotEquals(taskOrig, batch.batchAcquisitionLockTimeoutTask()); // Lock changes. assertEquals(0, sharePartition.timer().size()); // Timer jobs assertEquals(RecordState.AVAILABLE, batch.batchState()); - Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); // No persister call. + Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any()); // 1 persister call to update record state. } @Test From 3aac3e378774d65cf9028a13af284d4f0ff3fd33 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 4 Nov 2025 18:19:37 +0530 Subject: [PATCH 9/9] added todo --- core/src/test/java/kafka/server/share/SharePartitionTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 49487582d0856..eb20dc9b8e3e5 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -10159,6 +10159,7 @@ public void testLsoMovementWithPerOffsetRenewal() throws InterruptedException { // Expire timer mockTimer.advanceClock(ACQUISITION_LOCK_TIMEOUT_MS + 1); // Trigger expire + // todo: index 2 in expectedStates should be RecordState.ARCHIVED - fix after ticket KAFKA-19859 is addressed. List expectedStates = List.of(RecordState.ARCHIVED, RecordState.ACKNOWLEDGED, RecordState.AVAILABLE, RecordState.ACKNOWLEDGED, RecordState.AVAILABLE); for (long i = 0; i <= 4; i++) { InFlightState offset = sharePartition.cachedState().get(0L).offsetState().get(i);