Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,11 @@ private ConsumerGroupMember computeNextAssignment(
Set<Integer> partitionsPendingAssignment = new HashSet<>(target);
partitionsPendingAssignment.removeAll(assignedPartitions);
hasUnreleasedPartitions = partitionsPendingAssignment.removeIf(partitionId ->
currentPartitionEpoch.apply(topicId, partitionId) != -1
currentPartitionEpoch.apply(topicId, partitionId) != -1 &&
// Don't consider a partition unreleased if it is owned by the current member
// because it is pending revocation. This is safe to do since only a single member
// can own a partition at a time.
!member.partitionsPendingRevocation().getOrDefault(topicId, Set.of()).contains(partitionId)
) || hasUnreleasedPartitions;

if (!assignedPartitions.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,81 @@ public void testUnrevokedPartitionsToUnreleasedPartitions() {
);
}

@Test
public void testUnrevokedPartitionsToStableWithReturnedPartitionsPendingRevocation() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();

CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();

ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
.setPartitionsPendingRevocation(mkAssignment(
// Partition 4 is pending revocation by the member but is back in the latest target
// assignment.
mkTopicAssignment(topicId1, 4)))
.build();

ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(12, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 2, 3, 4),
mkTopicAssignment(topicId2, 5, 6, 7))))
.withCurrentPartitionEpoch((topicId, partitionId) -> {
if (topicId.equals(topicId1)) {
// Partitions 2 and 3 are in the member's current assignment.
// Partition 4 is pending revocation by the member.
switch (partitionId) {
case 2:
case 3:
case 4:
return 10;
}
} else if (topicId.equals(topicId2)) {
// Partitions 5 and 6 are in the member's current assignment.
switch (partitionId) {
case 5:
case 6:
return 10;
}
}
return -1;
})
.withOwnedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(topicId1)
.setPartitions(Arrays.asList(2, 3)),
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(topicId2)
.setPartitions(Arrays.asList(5, 6))))
.build();

assertEquals(
new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(12)
Copy link
Member

@FrankYang0529 FrankYang0529 Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m wondering if there’s any specific reason behind using 12 as the next member epoch? The previous member epoch is 10.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just needs to be at least 2 more than the previous epoch to be realistic. Epoch 11's assignment would have moved the partition away from the member and epoch 12's assignment would have returned the partition. It doesn't actually impact the test.

.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3, 4),
mkTopicAssignment(topicId2, 5, 6, 7)))
.setPartitionsPendingRevocation(Map.of())
.build(),
updatedMember
);
}

@Test
public void testUnreleasedPartitionsToStable() {
String topic1 = "topic1";
Expand Down