diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java index 63d61a3b923cb..d5aa719c2191a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java @@ -394,7 +394,11 @@ private ConsumerGroupMember computeNextAssignment( Set partitionsPendingAssignment = new HashSet<>(target); partitionsPendingAssignment.removeAll(assignedPartitions); hasUnreleasedPartitions = partitionsPendingAssignment.removeIf(partitionId -> - currentPartitionEpoch.apply(topicId, partitionId) != -1 + currentPartitionEpoch.apply(topicId, partitionId) != -1 && + // Don't consider a partition unreleased if it is owned by the current member + // because it is pending revocation. This is safe to do since only a single member + // can own a partition at a time. + !member.partitionsPendingRevocation().getOrDefault(topicId, Set.of()).contains(partitionId) ) || hasUnreleasedPartitions; if (!assignedPartitions.isEmpty()) { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java index 48441780689ea..20f8e9dfd9790 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java @@ -498,6 +498,81 @@ public void testUnrevokedPartitionsToUnreleasedPartitions() { ); } + @Test + public void testUnrevokedPartitionsToStableWithReturnedPartitionsPendingRevocation() { + String topic1 = "topic1"; + String topic2 = "topic2"; + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, topic1, 10) + .addTopic(topicId2, topic2, 10) + .buildCoordinatorMetadataImage(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 2, 3), + mkTopicAssignment(topicId2, 5, 6))) + .setPartitionsPendingRevocation(mkAssignment( + // Partition 4 is pending revocation by the member but is back in the latest target + // assignment. + mkTopicAssignment(topicId1, 4))) + .build(); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withMetadataImage(metadataImage) + .withTargetAssignment(12, new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 2, 3, 4), + mkTopicAssignment(topicId2, 5, 6, 7)))) + .withCurrentPartitionEpoch((topicId, partitionId) -> { + if (topicId.equals(topicId1)) { + // Partitions 2 and 3 are in the member's current assignment. + // Partition 4 is pending revocation by the member. + switch (partitionId) { + case 2: + case 3: + case 4: + return 10; + } + } else if (topicId.equals(topicId2)) { + // Partitions 5 and 6 are in the member's current assignment. + switch (partitionId) { + case 5: + case 6: + return 10; + } + } + return -1; + }) + .withOwnedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicId1) + .setPartitions(Arrays.asList(2, 3)), + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicId2) + .setPartitions(Arrays.asList(5, 6)))) + .build(); + + assertEquals( + new ConsumerGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setMemberEpoch(12) + .setPreviousMemberEpoch(10) + .setSubscribedTopicNames(List.of(topic1, topic2)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 2, 3, 4), + mkTopicAssignment(topicId2, 5, 6, 7))) + .setPartitionsPendingRevocation(Map.of()) + .build(), + updatedMember + ); + } + @Test public void testUnreleasedPartitionsToStable() { String topic1 = "topic1";