From 9ae1b0f01784d314d21100e7d021467626b20137 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 9 Dec 2024 08:39:22 +0000 Subject: [PATCH] KAFKA-18134; Disallow group upgrades when custom assignors are used (#18046) Disallow upgrades from classic groups to consumer groups when any member's assignment has non-empty userData. Reviewers: David Jacot --- .../ConsumerProtocolMigrationTest.scala | 120 +++++++++++++++++- .../group/GroupMetadataManager.java | 39 ++++-- .../group/modern/consumer/ConsumerGroup.java | 16 ++- .../group/GroupMetadataManagerTest.java | 115 +++++++++++++++++ 4 files changed, 272 insertions(+), 18 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala index 31821ceb3a35d..666557fe15c57 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala @@ -486,6 +486,121 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord ) } + /** + * The test method checks the following scenario: + * 1. Creating a classic group with member 1, whose assignment has non-empty user data. + * 2. Member 2 using consumer protocol joins. The group cannot be upgraded and the join is + * rejected. + * 3. Member 1 leaves. + * 4. Member 2 using consumer protocol joins. The group is upgraded. + */ + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "bidirectional") + ) + ) + def testOnlineMigrationWithNonEmptyUserDataInAssignment(): Unit = { + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + // Classic member 1 joins the classic group. + val groupId = "grp" + + val memberId1 = joinDynamicConsumerGroupWithOldProtocol( + groupId = groupId, + metadata = metadata(List.empty), + assignment = assignment(List(0, 1, 2), ByteBuffer.allocate(1)) + )._1 + + // The joining request with a consumer group member 2 is rejected. + val errorMessage = consumerGroupHeartbeat( + groupId = groupId, + memberId = Uuid.randomUuid.toString, + rebalanceTimeoutMs = 5 * 60 * 1000, + subscribedTopicNames = List("foo"), + topicPartitions = List.empty, + expectedError = Errors.GROUP_ID_NOT_FOUND + ).errorMessage + + assertEquals( + "Cannot upgrade classic group grp to consumer group because an unsupported custom assignor is in use. " + + "Please refer to the documentation or switch to a default assignor before re-attempting the upgrade.", + errorMessage + ) + + // The group is still a classic group. + assertEquals( + List( + new ListGroupsResponseData.ListedGroup() + .setGroupId(groupId) + .setProtocolType("consumer") + .setGroupState(ClassicGroupState.STABLE.toString) + .setGroupType(Group.GroupType.CLASSIC.toString) + ), + listGroups( + statesFilter = List.empty, + typesFilter = List(Group.GroupType.CLASSIC.toString) + ) + ) + + // Classic member 1 leaves the group. + leaveGroup( + groupId = groupId, + memberId = memberId1, + useNewProtocol = false, + version = ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled) + ) + + // Verify that the group is empty. + assertEquals( + List( + new ListGroupsResponseData.ListedGroup() + .setGroupId(groupId) + .setProtocolType("consumer") + .setGroupState(ClassicGroupState.EMPTY.toString) + .setGroupType(Group.GroupType.CLASSIC.toString) + ), + listGroups( + statesFilter = List.empty, + typesFilter = List(Group.GroupType.CLASSIC.toString) + ) + ) + + // The joining request with a consumer group member is accepted. + consumerGroupHeartbeat( + groupId = groupId, + memberId = Uuid.randomUuid.toString, + rebalanceTimeoutMs = 5 * 60 * 1000, + subscribedTopicNames = List("foo"), + topicPartitions = List.empty, + expectedError = Errors.NONE + ) + + // The group has become a consumer group. + assertEquals( + List( + new ListGroupsResponseData.ListedGroup() + .setGroupId(groupId) + .setProtocolType("consumer") + .setGroupState(ConsumerGroupState.STABLE.toString) + .setGroupType(Group.GroupType.CONSUMER.toString) + ), + listGroups( + statesFilter = List.empty, + typesFilter = List(Group.GroupType.CONSUMER.toString) + ) + ) + } + private def testUpgradeFromEmptyClassicToConsumerGroup(): Unit = { // Creates the __consumer_offsets topics because it won't be created automatically // in this test because it does not use FindCoordinator API. @@ -1262,10 +1377,11 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord ).array } - private def assignment(assignedPartitions: List[Int]): Array[Byte] = { + private def assignment(assignedPartitions: List[Int], userData: ByteBuffer = null): Array[Byte] = { ConsumerProtocol.serializeAssignment( new ConsumerPartitionAssignor.Assignment( - assignedPartitions.map(new TopicPartition("foo", _)).asJava + assignedPartitions.map(new TopicPartition("foo", _)).asJava, + userData ) ).array } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 90d8db4d32013..dd0c6954088b4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnreleasedInstanceIdException; import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; @@ -186,6 +187,7 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.modern.ModernGroupMember.hasAssignedPartitionsChanged; import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged; /** @@ -674,7 +676,8 @@ ConsumerGroup getOrMaybeCreateConsumerGroup( } else { if (group.type() == CONSUMER) { return (ConsumerGroup) group; - } else if (createIfNotExists && group.type() == CLASSIC && validateOnlineUpgrade((ClassicGroup) group)) { + } else if (createIfNotExists && group.type() == CLASSIC) { + validateOnlineUpgrade((ClassicGroup) group); return convertToConsumerGroup((ClassicGroup) group, records); } else { throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId)); @@ -1033,23 +1036,28 @@ private void convertToClassicGroup( * Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request. * * @param classicGroup A ClassicGroup. - * @return A boolean indicating whether it's valid to online upgrade the classic group. + * @throws GroupIdNotFoundException if the group cannot be upgraded. */ - private boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + private void validateOnlineUpgrade(ClassicGroup classicGroup) { if (!config.consumerGroupMigrationPolicy().isUpgradeEnabled()) { - log.info("Cannot upgrade classic group {} to consumer group because the online upgrade is disabled.", + log.info("Cannot upgrade classic group {} to consumer group because online upgrade is disabled.", classicGroup.groupId()); - return false; + throw new GroupIdNotFoundException( + String.format("Cannot upgrade classic group %s to consumer group because online upgrade is disabled.", classicGroup.groupId()) + ); } else if (!classicGroup.usesConsumerGroupProtocol()) { log.info("Cannot upgrade classic group {} to consumer group because the group does not use the consumer embedded protocol.", classicGroup.groupId()); - return false; + throw new GroupIdNotFoundException( + String.format("Cannot upgrade classic group %s to consumer group because the group does not use the consumer embedded protocol.", classicGroup.groupId()) + ); } else if (classicGroup.numMembers() > config.consumerGroupMaxSize()) { log.info("Cannot upgrade classic group {} to consumer group because the group size exceeds the consumer group maximum size.", classicGroup.groupId()); - return false; + throw new GroupIdNotFoundException( + String.format("Cannot upgrade classic group %s to consumer group because the group size exceeds the consumer group maximum size.", classicGroup.groupId()) + ); } - return true; } /** @@ -1078,12 +1086,21 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List testConsumerGroupHeartbeatWithCustomAssignorClassicGroupSource() { + return Stream.of( + Arguments.of(null, true), + Arguments.of(ByteBuffer.allocate(0), true), + Arguments.of(ByteBuffer.allocate(1), false) + ); + } + + @ParameterizedTest + @MethodSource("testConsumerGroupHeartbeatWithCustomAssignorClassicGroupSource") + public void testConsumerGroupHeartbeatWithCustomAssignorClassicGroup(ByteBuffer userData, boolean expectUpgrade) { + String groupId = "group-id"; + String memberId1 = "member-id-1"; + String memberId2 = "member-id-2"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of( + memberId1, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + )), + memberId2, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(barTopicId, 0) + )) + ))); + + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 1) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build(); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.UPGRADE.toString()) + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1); + protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( + List.of(fooTopicName, barTopicName), + null, + List.of( + new TopicPartition(fooTopicName, 0), + new TopicPartition(barTopicName, 0) + ) + )))) + ); + + Map assignments = Map.of( + memberId1, + Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(List.of( + new TopicPartition(fooTopicName, 0), + new TopicPartition(barTopicName, 0) + ), userData))) + ); + + // Create a stable classic group with member 1. + ClassicGroup group = context.createClassicGroup(groupId); + group.setProtocolName(Optional.of("range")); + group.add( + new ClassicGroupMember( + memberId1, + Optional.empty(), + "client-id", + "client-host", + 10000, + 5000, + "consumer", + protocols, + assignments.get(memberId1) + ) + ); + + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(COMPLETING_REBALANCE); + group.transitionTo(STABLE); + + context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, assignments, metadataImage.features().metadataVersion())); + context.commit(); + group = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false); + + // A new member 2 with new protocol joins the classic group, triggering the upgrade. + ConsumerGroupHeartbeatRequestData consumerGroupHeartbeatRequestData = + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(List.of(fooTopicName, barTopicName)) + .setTopicPartitions(Collections.emptyList()); + + if (expectUpgrade) { + context.consumerGroupHeartbeat(consumerGroupHeartbeatRequestData); + } else { + Exception ex = assertThrows(GroupIdNotFoundException.class, () -> context.consumerGroupHeartbeat(consumerGroupHeartbeatRequestData)); + assertEquals( + "Cannot upgrade classic group group-id to consumer group because an unsupported custom assignor is in use. " + + "Please refer to the documentation or switch to a default assignor before re-attempting the upgrade.", ex.getMessage()); + } + } + @Test public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() { String groupId = "group-id";