diff --git a/request_test.go b/request_test.go index 42d7a7d2b..6d120cb9c 100644 --- a/request_test.go +++ b/request_test.go @@ -156,6 +156,8 @@ func allocateResponseBody(req protocolBody) protocolBody { return &CreatePartitionsResponse{Version: version} case apiKeyDeleteGroups: return &DeleteGroupsResponse{Version: version} + case apiKeyElectLeaders: + return &ElectLeadersResponse{Version: version} case apiKeyIncrementalAlterConfigs: return &IncrementalAlterConfigsResponse{Version: version} case apiKeyAlterPartitionReassignments: @@ -182,6 +184,21 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { apiVersions map[int16]int16 } + saramaMaxVersions := newKafkaVersion(999, 999, 999, 999) + maxVersion := func(pb protocolBody) int16 { + var ( + i int16 + v int16 + ) + for i = 0; ; i++ { + pb.setVersion(i) + if !pb.isValidVersion() { + return v + } + v = i + } + } + tests := []test{ { V1_1_0_0, @@ -234,97 +251,107 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { { V2_0_0_0, map[int16]int16{ - apiKeyProduce: 6, - apiKeyFetch: 8, - apiKeyListOffsets: 3, - apiKeyMetadata: 6, - apiKeyLeaderAndIsr: 1, - apiKeyStopReplica: 0, - apiKeyUpdateMetadata: 4, - apiKeyControlledShutdown: 1, - apiKeyOffsetCommit: 4, - apiKeyOffsetFetch: 4, - apiKeyFindCoordinator: 2, - apiKeyJoinGroup: 3, - apiKeyHeartbeat: 2, - apiKeyLeaveGroup: 2, - apiKeySyncGroup: 2, - apiKeyDescribeGroups: 2, - apiKeyListGroups: 2, - apiKeySaslHandshake: 1, - apiKeyApiVersions: 2, - apiKeyCreateTopics: 3, - apiKeyDeleteTopics: 2, - apiKeyDeleteRecords: 1, - apiKeyInitProducerId: 1, - apiKeyOffsetForLeaderEpoch: 1, - apiKeyAddPartitionsToTxn: 1, - apiKeyAddOffsetsToTxn: 1, - apiKeyEndTxn: 1, - apiKeyWriteTxnMarkers: 0, - apiKeyTxnOffsetCommit: 1, - apiKeyDescribeAcls: 1, - apiKeyCreateAcls: 1, - apiKeyDeleteAcls: 1, - apiKeyDescribeConfigs: 2, - apiKeyAlterConfigs: 1, - apiKeyAlterReplicaLogDirs: 1, - apiKeyDescribeLogDirs: 1, - apiKeySASLAuth: 0, - apiKeyCreatePartitions: 1, - apiKeyCreateDelegationToken: 1, - apiKeyRenewDelegationToken: 1, - apiKeyExpireDelegationToken: 1, - apiKeyDescribeDelegationToken: 1, - apiKeyDeleteGroups: 1, + apiKeyProduce: 6, // up from 5 + apiKeyFetch: 8, // up from 7 + apiKeyListOffsets: 3, // up from 2 + apiKeyMetadata: 6, // up from 5 + apiKeyOffsetCommit: 4, // up from 3 + apiKeyOffsetFetch: 4, // up from 3 + apiKeyFindCoordinator: 2, // up from 1 + apiKeyJoinGroup: 3, // up from 2 + apiKeyHeartbeat: 2, // up from 1 + apiKeyLeaveGroup: 2, // up from 1 + apiKeySyncGroup: 2, // up from 1 + apiKeyDescribeGroups: 2, // up from 1 + apiKeyListGroups: 2, // up from 1 + apiKeyApiVersions: 2, // up from 1 + apiKeyCreateTopics: 3, // up from 2 + apiKeyDeleteTopics: 2, // up from 1 + apiKeyDeleteRecords: 1, // up from 0 + apiKeyInitProducerId: 1, // up from 0 + apiKeyOffsetForLeaderEpoch: 1, // up from 0 + apiKeyAddPartitionsToTxn: 1, // up from 0 + apiKeyAddOffsetsToTxn: 1, // up from 0 + apiKeyEndTxn: 1, // up from 0 + apiKeyTxnOffsetCommit: 1, // up from 0 + apiKeyDescribeAcls: 1, // up from 0 + apiKeyCreateAcls: 1, // up from 0 + apiKeyDeleteAcls: 1, // up from 0 + apiKeyDescribeConfigs: 2, // up from 1 + apiKeyAlterConfigs: 1, // up from 0 + apiKeyAlterReplicaLogDirs: 1, // up from 0 + apiKeyDescribeLogDirs: 1, // up from 0 + apiKeyCreatePartitions: 1, // up from 0 + apiKeyCreateDelegationToken: 1, // up from 0 + apiKeyRenewDelegationToken: 1, // up from 0 + apiKeyExpireDelegationToken: 1, // up from 0 + apiKeyDescribeDelegationToken: 1, // up from 0 + apiKeyDeleteGroups: 1, // up from 0 }, }, { V2_1_0_0, map[int16]int16{ - apiKeyProduce: 7, - apiKeyFetch: 10, - apiKeyListOffsets: 4, - apiKeyMetadata: 7, - apiKeyLeaderAndIsr: 1, - apiKeyStopReplica: 0, - apiKeyUpdateMetadata: 4, - apiKeyControlledShutdown: 1, - apiKeyOffsetCommit: 6, - apiKeyOffsetFetch: 5, - apiKeyFindCoordinator: 2, - apiKeyJoinGroup: 3, - apiKeyHeartbeat: 2, - apiKeyLeaveGroup: 2, - apiKeySyncGroup: 2, - apiKeyDescribeGroups: 2, - apiKeyListGroups: 2, - apiKeySaslHandshake: 1, - apiKeyApiVersions: 2, - apiKeyCreateTopics: 3, - apiKeyDeleteTopics: 3, - apiKeyDeleteRecords: 1, - apiKeyInitProducerId: 1, - apiKeyOffsetForLeaderEpoch: 2, - apiKeyAddPartitionsToTxn: 1, - apiKeyAddOffsetsToTxn: 1, - apiKeyEndTxn: 1, - apiKeyWriteTxnMarkers: 0, - apiKeyTxnOffsetCommit: 2, - apiKeyDescribeAcls: 1, - apiKeyCreateAcls: 1, - apiKeyDeleteAcls: 1, - apiKeyDescribeConfigs: 2, - apiKeyAlterConfigs: 1, - apiKeyAlterReplicaLogDirs: 1, - apiKeyDescribeLogDirs: 1, - apiKeySASLAuth: 0, - apiKeyCreatePartitions: 1, - apiKeyCreateDelegationToken: 1, - apiKeyRenewDelegationToken: 1, - apiKeyExpireDelegationToken: 1, - apiKeyDescribeDelegationToken: 1, - apiKeyDeleteGroups: 1, + apiKeyProduce: 7, // up from 6 + apiKeyFetch: 10, // up from 8 + apiKeyListOffsets: 4, // up from 3 + apiKeyMetadata: 7, // up from 6 + apiKeyOffsetCommit: 6, // up from 4 + apiKeyOffsetFetch: 5, // up from 4 + apiKeyOffsetForLeaderEpoch: 2, // up from 1 + apiKeyTxnOffsetCommit: 2, // up from 1 + apiKeyDeleteTopics: 3, // up from 2 + }, + }, + { + V2_2_0_0, + map[int16]int16{ + // apiKeyListOffsets: 5, // up from 4, TODO: not supported by Sarama yet + apiKeyLeaderAndIsr: 2, // up from 1 + apiKeyStopReplica: 1, // up from 0 + apiKeyUpdateMetadata: 5, // up from 4 + apiKeyControlledShutdown: 2, // up from 1 + apiKeyJoinGroup: 4, // up from 3 + apiKeySASLAuth: 1, // up from 0 + apiKeyElectLeaders: 0, // new in 2.2 + }, + }, + { + saramaMaxVersions, // placeholder version for current maximums implemented by Sarama + map[int16]int16{ + apiKeyProduce: maxVersion(&ProduceRequest{}), + apiKeyFetch: maxVersion(&FetchRequest{}), + apiKeyListOffsets: maxVersion(&OffsetRequest{}), + apiKeyMetadata: maxVersion(&MetadataRequest{}), + apiKeyOffsetCommit: maxVersion(&OffsetCommitRequest{}), + apiKeyOffsetFetch: maxVersion(&OffsetFetchRequest{}), + apiKeyFindCoordinator: maxVersion(&FindCoordinatorRequest{}), + apiKeyJoinGroup: maxVersion(&JoinGroupRequest{}), + apiKeyHeartbeat: maxVersion(&HeartbeatRequest{}), + apiKeyLeaveGroup: maxVersion(&LeaveGroupRequest{}), + apiKeySyncGroup: maxVersion(&SyncGroupRequest{}), + apiKeyDescribeGroups: maxVersion(&DescribeGroupsRequest{}), + apiKeyListGroups: maxVersion(&ListGroupsRequest{}), + apiKeySaslHandshake: maxVersion(&SaslHandshakeRequest{}), + apiKeyApiVersions: maxVersion(&ApiVersionsRequest{}), + apiKeyCreateTopics: maxVersion(&CreateTopicsRequest{}), + apiKeyDeleteTopics: maxVersion(&DeleteTopicsRequest{}), + apiKeyDeleteRecords: maxVersion(&DeleteRecordsRequest{}), + apiKeyInitProducerId: maxVersion(&InitProducerIDRequest{}), + apiKeyAddPartitionsToTxn: maxVersion(&AddPartitionsToTxnRequest{}), + apiKeyAddOffsetsToTxn: maxVersion(&AddOffsetsToTxnRequest{}), + apiKeyEndTxn: maxVersion(&EndTxnRequest{}), + apiKeyTxnOffsetCommit: maxVersion(&TxnOffsetCommitRequest{}), + apiKeyDescribeAcls: maxVersion(&DescribeAclsRequest{}), + apiKeyCreateAcls: maxVersion(&CreateAclsRequest{}), + apiKeyDeleteAcls: maxVersion(&DeleteAclsRequest{}), + apiKeyDescribeConfigs: maxVersion(&DescribeConfigsRequest{}), + apiKeyAlterConfigs: maxVersion(&AlterConfigsRequest{}), + apiKeyDescribeLogDirs: maxVersion(&DescribeLogDirsRequest{}), + apiKeySASLAuth: maxVersion(&SaslAuthenticateRequest{}), + apiKeyCreatePartitions: maxVersion(&CreatePartitionsRequest{}), + apiKeyDeleteGroups: maxVersion(&DeleteGroupsRequest{}), + apiKeyElectLeaders: maxVersion(&ElectLeadersRequest{}), }, }, } @@ -336,6 +363,7 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { if req == nil { t.Skipf("apikey %d is not implemented", key) } + t.Logf("Testing %s V%d", reflect.TypeOf(req), version) resp := allocateResponseBody(req) assert.NotNil(t, resp, fmt.Sprintf("%s has no matching response type in allocateResponseBody", reflect.TypeOf(req))) assert.Equal(t, req.isValidVersion(), resp.isValidVersion(), fmt.Sprintf("%s isValidVersion should match %s", reflect.TypeOf(req), reflect.TypeOf(resp)))