From df4c1ea810ff8139efc319d3df73c229302d4443 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 20 Nov 2024 15:07:36 +0800 Subject: [PATCH 1/2] feat: update response creation methods to accept version parameter - Modified createDescribeQuorumResponse, createEndQuorumEpochResponse, and createBeginQuorumEpochResponse methods to accept a version parameter. - Adjusted logic to set properties based on the version, ensuring compatibility with different response versions. - Added node endpoint handling for EndQuorumEpochResponse and BeginQuorumEpochResponse based on version checks. --- .../common/requests/RequestResponseTest.java | 65 +++++++++++++++++-- 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index cb6a2458261e..fd0177e0cadb 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1182,9 +1182,9 @@ private AbstractResponse getResponse(ApiKeys apikey, short version) { case DESCRIBE_USER_SCRAM_CREDENTIALS: return createDescribeUserScramCredentialsResponse(); case ALTER_USER_SCRAM_CREDENTIALS: return createAlterUserScramCredentialsResponse(); case VOTE: return createVoteResponse(); - case BEGIN_QUORUM_EPOCH: return createBeginQuorumEpochResponse(); - case END_QUORUM_EPOCH: return createEndQuorumEpochResponse(); - case DESCRIBE_QUORUM: return createDescribeQuorumResponse(); + case BEGIN_QUORUM_EPOCH: return createBeginQuorumEpochResponse(version); + case END_QUORUM_EPOCH: return createEndQuorumEpochResponse(version); + case DESCRIBE_QUORUM: return createDescribeQuorumResponse(version); case ALTER_PARTITION: return createAlterPartitionResponse(version); case UPDATE_FEATURES: return createUpdateFeaturesResponse(); case ENVELOPE: return createEnvelopeResponse(); @@ -1646,9 +1646,38 @@ private DescribeQuorumRequest createDescribeQuorumRequest(short version) { return new DescribeQuorumRequest.Builder(data).build(version); } - private DescribeQuorumResponse createDescribeQuorumResponse() { + private DescribeQuorumResponse createDescribeQuorumResponse(short version) { + DescribeQuorumResponseData.ReplicaState replicaState = new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(1); + if (version >= 1) { + replicaState.setLastFetchTimestamp(123L) + .setLastCaughtUpTimestamp(123L); + } + if (version >= 2) { + replicaState.setReplicaDirectoryId(Uuid.randomUuid()); + } + DescribeQuorumResponseData.PartitionData partitionData = new DescribeQuorumResponseData.PartitionData() + .setPartitionIndex(0) + .setCurrentVoters(singletonList(replicaState)) + .setObservers(singletonList(replicaState)); + DescribeQuorumResponseData.TopicData topicData = new DescribeQuorumResponseData.TopicData() + .setTopicName("topic1") + .setPartitions(singletonList(partitionData)); DescribeQuorumResponseData data = new DescribeQuorumResponseData() - .setErrorCode(Errors.NONE.code()); + .setErrorCode(Errors.NONE.code()) + .setTopics(singletonList(topicData)); + if (version >= 2) { + DescribeQuorumResponseData.NodeCollection nodes = new DescribeQuorumResponseData.NodeCollection(1); + DescribeQuorumResponseData.ListenerCollection listeners = new DescribeQuorumResponseData.ListenerCollection(1); + listeners.add(new DescribeQuorumResponseData.Listener() + .setName("CONTROLLER") + .setHost("localhost") + .setPort(9012)); + nodes.add(new DescribeQuorumResponseData.Node() + .setNodeId(1) + .setListeners(listeners)); + data.setNodes(nodes); + } return new DescribeQuorumResponse(data); } @@ -1665,7 +1694,7 @@ private EndQuorumEpochRequest createEndQuorumEpochRequest(short version) { return new EndQuorumEpochRequest.Builder(data).build(version); } - private EndQuorumEpochResponse createEndQuorumEpochResponse() { + private EndQuorumEpochResponse createEndQuorumEpochResponse(short version) { EndQuorumEpochResponseData data = new EndQuorumEpochResponseData() .setErrorCode(Errors.NONE.code()) .setTopics(singletonList(new EndQuorumEpochResponseData.TopicData() @@ -1673,6 +1702,17 @@ private EndQuorumEpochResponse createEndQuorumEpochResponse() { .setErrorCode(Errors.NONE.code()) .setLeaderEpoch(1))) .setTopicName("topic1"))); + + if (version >= 1) { + EndQuorumEpochResponseData.NodeEndpointCollection nodeEndpoints = new EndQuorumEpochResponseData.NodeEndpointCollection(1); + nodeEndpoints.add( + new EndQuorumEpochResponseData.NodeEndpoint() + .setNodeId(1) + .setHost("localhost") + .setPort(9092) + ); + data.setNodeEndpoints(nodeEndpoints); + } return new EndQuorumEpochResponse(data); } @@ -1687,7 +1727,7 @@ private BeginQuorumEpochRequest createBeginQuorumEpochRequest(short version) { return new BeginQuorumEpochRequest.Builder(data).build(version); } - private BeginQuorumEpochResponse createBeginQuorumEpochResponse() { + private BeginQuorumEpochResponse createBeginQuorumEpochResponse(short version) { BeginQuorumEpochResponseData data = new BeginQuorumEpochResponseData() .setErrorCode(Errors.NONE.code()) .setTopics(singletonList(new BeginQuorumEpochResponseData.TopicData() @@ -1696,6 +1736,17 @@ private BeginQuorumEpochResponse createBeginQuorumEpochResponse() { .setLeaderEpoch(0) .setLeaderId(1) .setPartitionIndex(2))))); + if (version >= 1) { + BeginQuorumEpochResponseData.NodeEndpointCollection nodeEndpoints = new BeginQuorumEpochResponseData.NodeEndpointCollection(1); + nodeEndpoints.add( + new BeginQuorumEpochResponseData.NodeEndpoint() + .setNodeId(1) + .setHost("localhost") + .setPort(9092) + ); + + data.setNodeEndpoints(nodeEndpoints); + } return new BeginQuorumEpochResponse(data); } From d84f1cc4daafb9d7780d46d3c937856e69d82cb6 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 27 Nov 2024 11:27:19 +0800 Subject: [PATCH 2/2] fix: correct topic name in DescribeQuorumResponse test to use cluster metadata topic name --- .../org/apache/kafka/common/requests/RequestResponseTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index fd0177e0cadb..01f6d8e64c1f 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1661,7 +1661,7 @@ private DescribeQuorumResponse createDescribeQuorumResponse(short version) { .setCurrentVoters(singletonList(replicaState)) .setObservers(singletonList(replicaState)); DescribeQuorumResponseData.TopicData topicData = new DescribeQuorumResponseData.TopicData() - .setTopicName("topic1") + .setTopicName("__cluster_metadata") .setPartitions(singletonList(partitionData)); DescribeQuorumResponseData data = new DescribeQuorumResponseData() .setErrorCode(Errors.NONE.code())