diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index a70d1cbb..facad4e2 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -138,7 +138,7 @@ func SkipKeys(keys ...int16) VersionGuessOpt { } // TryRaftBroker changes from guessing the version for a classical ZooKeeper -// based broker to guessing for a raft based broker (v2.8.0+). +// based broker to guessing for a raft based broker (v2.8+). // // Note that with raft, there can be a TryRaftController attempt as well. func TryRaftBroker() VersionGuessOpt { @@ -147,7 +147,7 @@ func TryRaftBroker() VersionGuessOpt { // TryRaftController changes from guessing the version for a classical // ZooKeeper based broker to guessing for a raft based controller broker -// (v2.8.0+). +// (v2.8+). // // Note that with raft, there can be a TryRaftBroker attempt as well. Odds are // that if you are an end user speaking to a raft based Kafka cluster, you are @@ -164,7 +164,7 @@ type guessCfg struct { // VersionGuess attempts to guess which version of Kafka these versions belong // to. If an exact match can be determined, this returns a string in the format -// v0.#.# or v#.# (depending on whether Kafka is pre-1.0.0 or post). For +// v0.#.# or v#.# (depending on whether Kafka is pre-1.0 or post). For // example, v0.8.0 or v2.7. // // Patch numbers are not included in the guess as it is not possible to @@ -253,7 +253,15 @@ func (g guess) String() string { func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess { cfg := guessCfg{ listener: zkBroker, - skipKeys: []int16{4, 5, 6, 7, 27}, + // Envelope was added in 2.7 for kraft and zkBroker in 3.4; we + // need to skip it for 2.7 through 3.4 otherwise the version + // detection fails. We can just skip it generally since there + // are enough differentiating factors that accurately detecting + // envelope doesn't matter. + // + // TODO: add introduced-version to differentiate some specific + // keys. + skipKeys: []int16{4, 5, 6, 7, 27, 58}, } for _, opt := range opts { opt.apply(&cfg) diff --git a/pkg/kversion/kversion_test.go b/pkg/kversion/kversion_test.go index f80deda1..fad106a2 100644 --- a/pkg/kversion/kversion_test.go +++ b/pkg/kversion/kversion_test.go @@ -119,3 +119,75 @@ func TestEqual(t *testing.T) { t.Errorf("unexpectedly not equal after backing v0.8.1 down to v0.8.0, opposite direction") } } + +func TestVersionProbeKafka3_1(t *testing.T) { + versions := map[int16]int16{ + 0: 9, // Produce + 1: 13, // Fetch + 2: 7, // ListOffsets + 3: 12, // Metadata + 4: 5, // LeaderAndISR + 5: 3, // StopReplica + 6: 7, // UpdateMetadata + 7: 3, // ControlledShutdown + 8: 8, // OffsetCommit + 9: 8, // OffsetFetch + 10: 4, // FindCoordinator + 11: 7, // JoinGroup + 12: 4, // Heartbeat + 13: 4, // LeaveGroup + 14: 5, // SyncGroup + 15: 5, // DescribeGroups + 16: 4, // ListGroups + 17: 1, // SASLHandshake + 18: 3, // ApiVersions + 19: 7, // CreateTopics + 20: 6, // DeleteTopics + 21: 2, // DeleteRecords + 22: 4, // InitProducerID + 23: 4, // OffsetForLeaderEpoch + 24: 3, // AddPartitionsToTxn + 25: 3, // AddOffsetsToTxn + 26: 3, // EndTxn + 27: 1, // WriteTxnMarkers + 28: 3, // TxnOffsetCommit + 29: 2, // DescribeACLs + 30: 2, // CreateACLs + 31: 2, // DeleteACLs + 32: 4, // DescribeConfigs + 33: 2, // AlterConfigs + 34: 2, // AlterReplicaLogDirs + 35: 2, // DescribeLogDirs + 36: 2, // SASLAuthenticate + 37: 3, // CreatePartitions + 38: 2, // CreateDelegationToken + 39: 2, // RenewDelegationToken + 40: 2, // ExpireDelegationToken + 41: 2, // DescribeDelegationToken + 42: 2, // DeleteGroups + 43: 2, // ElectLeaders + 44: 1, // IncrementalAlterConfigs + 45: 0, // AlterPartitionAssignments + 46: 0, // ListPartitionReassignments + 47: 0, // OffsetDelete + 48: 1, // DescribeClientQuotas + 49: 1, // AlterClientQuotas + 50: 0, // DescribeUserSCRAMCredentials + 51: 0, // AlterUserSCRAMCredentials + 56: 0, // AlterPartition + 57: 0, // UpdateFeatures + 60: 0, // DescribeCluster + 61: 0, // DescribeProducers + 65: 0, // DescribeTransactions + 66: 0, // ListTransactions + 67: 0, // AllocateProducerIDs + } + + var vs Versions + for k, v := range versions { + vs.SetMaxKeyVersion(k, v) + } + if guess := vs.VersionGuess(); guess != "v3.1" { + t.Errorf("unexpected version guess, got %s != exp %s", guess, "v3.1") + } +}