Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18039: Test all versions of quorum responses in RequestResponseTest #17873

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1182,9 +1182,9 @@ private AbstractResponse getResponse(ApiKeys apikey, short version) {
case DESCRIBE_USER_SCRAM_CREDENTIALS: return createDescribeUserScramCredentialsResponse();
case ALTER_USER_SCRAM_CREDENTIALS: return createAlterUserScramCredentialsResponse();
case VOTE: return createVoteResponse();
case BEGIN_QUORUM_EPOCH: return createBeginQuorumEpochResponse();
case END_QUORUM_EPOCH: return createEndQuorumEpochResponse();
case DESCRIBE_QUORUM: return createDescribeQuorumResponse();
case BEGIN_QUORUM_EPOCH: return createBeginQuorumEpochResponse(version);
case END_QUORUM_EPOCH: return createEndQuorumEpochResponse(version);
case DESCRIBE_QUORUM: return createDescribeQuorumResponse(version);
case ALTER_PARTITION: return createAlterPartitionResponse(version);
case UPDATE_FEATURES: return createUpdateFeaturesResponse();
case ENVELOPE: return createEnvelopeResponse();
Expand Down Expand Up @@ -1646,9 +1646,38 @@ private DescribeQuorumRequest createDescribeQuorumRequest(short version) {
return new DescribeQuorumRequest.Builder(data).build(version);
}

private DescribeQuorumResponse createDescribeQuorumResponse() {
private DescribeQuorumResponse createDescribeQuorumResponse(short version) {
DescribeQuorumResponseData.ReplicaState replicaState = new DescribeQuorumResponseData.ReplicaState()
.setReplicaId(1);
if (version >= 1) {
replicaState.setLastFetchTimestamp(123L)
.setLastCaughtUpTimestamp(123L);
}
if (version >= 2) {
replicaState.setReplicaDirectoryId(Uuid.randomUuid());
}
DescribeQuorumResponseData.PartitionData partitionData = new DescribeQuorumResponseData.PartitionData()
.setPartitionIndex(0)
.setCurrentVoters(singletonList(replicaState))
.setObservers(singletonList(replicaState));
DescribeQuorumResponseData.TopicData topicData = new DescribeQuorumResponseData.TopicData()
.setTopicName("topic1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we just make this __cluster_metadata

Copy link
Contributor Author

@peterxcli peterxcli Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ahuang98 Thanks, fixed

.setPartitions(singletonList(partitionData));
DescribeQuorumResponseData data = new DescribeQuorumResponseData()
.setErrorCode(Errors.NONE.code());
.setErrorCode(Errors.NONE.code())
.setTopics(singletonList(topicData));
if (version >= 2) {
DescribeQuorumResponseData.NodeCollection nodes = new DescribeQuorumResponseData.NodeCollection(1);
DescribeQuorumResponseData.ListenerCollection listeners = new DescribeQuorumResponseData.ListenerCollection(1);
listeners.add(new DescribeQuorumResponseData.Listener()
.setName("CONTROLLER")
.setHost("localhost")
.setPort(9012));
nodes.add(new DescribeQuorumResponseData.Node()
.setNodeId(1)
.setListeners(listeners));
data.setNodes(nodes);
}
return new DescribeQuorumResponse(data);
}

Expand All @@ -1665,14 +1694,25 @@ private EndQuorumEpochRequest createEndQuorumEpochRequest(short version) {
return new EndQuorumEpochRequest.Builder(data).build(version);
}

private EndQuorumEpochResponse createEndQuorumEpochResponse() {
private EndQuorumEpochResponse createEndQuorumEpochResponse(short version) {
EndQuorumEpochResponseData data = new EndQuorumEpochResponseData()
.setErrorCode(Errors.NONE.code())
.setTopics(singletonList(new EndQuorumEpochResponseData.TopicData()
.setPartitions(singletonList(new EndQuorumEpochResponseData.PartitionData()
.setErrorCode(Errors.NONE.code())
.setLeaderEpoch(1)))
.setTopicName("topic1")));

if (version >= 1) {
EndQuorumEpochResponseData.NodeEndpointCollection nodeEndpoints = new EndQuorumEpochResponseData.NodeEndpointCollection(1);
nodeEndpoints.add(
new EndQuorumEpochResponseData.NodeEndpoint()
.setNodeId(1)
.setHost("localhost")
.setPort(9092)
);
data.setNodeEndpoints(nodeEndpoints);
}
return new EndQuorumEpochResponse(data);
}

Expand All @@ -1687,7 +1727,7 @@ private BeginQuorumEpochRequest createBeginQuorumEpochRequest(short version) {
return new BeginQuorumEpochRequest.Builder(data).build(version);
}

private BeginQuorumEpochResponse createBeginQuorumEpochResponse() {
private BeginQuorumEpochResponse createBeginQuorumEpochResponse(short version) {
BeginQuorumEpochResponseData data = new BeginQuorumEpochResponseData()
.setErrorCode(Errors.NONE.code())
.setTopics(singletonList(new BeginQuorumEpochResponseData.TopicData()
Expand All @@ -1696,6 +1736,17 @@ private BeginQuorumEpochResponse createBeginQuorumEpochResponse() {
.setLeaderEpoch(0)
.setLeaderId(1)
.setPartitionIndex(2)))));
if (version >= 1) {
BeginQuorumEpochResponseData.NodeEndpointCollection nodeEndpoints = new BeginQuorumEpochResponseData.NodeEndpointCollection(1);
nodeEndpoints.add(
new BeginQuorumEpochResponseData.NodeEndpoint()
.setNodeId(1)
.setHost("localhost")
.setPort(9092)
);

data.setNodeEndpoints(nodeEndpoints);
}
return new BeginQuorumEpochResponse(data);
}

Expand Down