From 5978156582a0d822c919200941bf9fa937d31784 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 16 Aug 2023 21:15:12 -0600 Subject: [PATCH] kversion: fix version detection for Kafka v2.7 through 3.4 Kafka v3.4 added envelope support for the Envelope request to the zk based broker, so I added that to kversion. This made Envelope a required request for version detection ever since it was introduced -- v2.7 for raft -- and pinned version detection to "at least 2.7" for all cluster between 2.7 and 3.4. We now ignore the envelope key when version guessing because ultimately other keys are differentiating enough. This allows version detection to now again correctly guess 2.7 through 3.4. Tested against zk 3.1, 3.2, 3.5, as well as kraft 3.5 manually, and adds a unit test for the 3.1 versions. Closes #536. --- pkg/kversion/kversion.go | 16 ++++++-- pkg/kversion/kversion_test.go | 72 +++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 4 deletions(-) diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index a70d1cbb..facad4e2 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -138,7 +138,7 @@ func SkipKeys(keys ...int16) VersionGuessOpt { } // TryRaftBroker changes from guessing the version for a classical ZooKeeper -// based broker to guessing for a raft based broker (v2.8.0+). +// based broker to guessing for a raft based broker (v2.8+). // // Note that with raft, there can be a TryRaftController attempt as well. func TryRaftBroker() VersionGuessOpt { @@ -147,7 +147,7 @@ func TryRaftBroker() VersionGuessOpt { // TryRaftController changes from guessing the version for a classical // ZooKeeper based broker to guessing for a raft based controller broker -// (v2.8.0+). +// (v2.8+). // // Note that with raft, there can be a TryRaftBroker attempt as well. Odds are // that if you are an end user speaking to a raft based Kafka cluster, you are @@ -164,7 +164,7 @@ type guessCfg struct { // VersionGuess attempts to guess which version of Kafka these versions belong // to. If an exact match can be determined, this returns a string in the format -// v0.#.# or v#.# (depending on whether Kafka is pre-1.0.0 or post). For +// v0.#.# or v#.# (depending on whether Kafka is pre-1.0 or post). For // example, v0.8.0 or v2.7. // // Patch numbers are not included in the guess as it is not possible to @@ -253,7 +253,15 @@ func (g guess) String() string { func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess { cfg := guessCfg{ listener: zkBroker, - skipKeys: []int16{4, 5, 6, 7, 27}, + // Envelope was added in 2.7 for kraft and zkBroker in 3.4; we + // need to skip it for 2.7 through 3.4 otherwise the version + // detection fails. We can just skip it generally since there + // are enough differentiating factors that accurately detecting + // envelope doesn't matter. + // + // TODO: add introduced-version to differentiate some specific + // keys. + skipKeys: []int16{4, 5, 6, 7, 27, 58}, } for _, opt := range opts { opt.apply(&cfg) diff --git a/pkg/kversion/kversion_test.go b/pkg/kversion/kversion_test.go index f80deda1..fad106a2 100644 --- a/pkg/kversion/kversion_test.go +++ b/pkg/kversion/kversion_test.go @@ -119,3 +119,75 @@ func TestEqual(t *testing.T) { t.Errorf("unexpectedly not equal after backing v0.8.1 down to v0.8.0, opposite direction") } } + +func TestVersionProbeKafka3_1(t *testing.T) { + versions := map[int16]int16{ + 0: 9, // Produce + 1: 13, // Fetch + 2: 7, // ListOffsets + 3: 12, // Metadata + 4: 5, // LeaderAndISR + 5: 3, // StopReplica + 6: 7, // UpdateMetadata + 7: 3, // ControlledShutdown + 8: 8, // OffsetCommit + 9: 8, // OffsetFetch + 10: 4, // FindCoordinator + 11: 7, // JoinGroup + 12: 4, // Heartbeat + 13: 4, // LeaveGroup + 14: 5, // SyncGroup + 15: 5, // DescribeGroups + 16: 4, // ListGroups + 17: 1, // SASLHandshake + 18: 3, // ApiVersions + 19: 7, // CreateTopics + 20: 6, // DeleteTopics + 21: 2, // DeleteRecords + 22: 4, // InitProducerID + 23: 4, // OffsetForLeaderEpoch + 24: 3, // AddPartitionsToTxn + 25: 3, // AddOffsetsToTxn + 26: 3, // EndTxn + 27: 1, // WriteTxnMarkers + 28: 3, // TxnOffsetCommit + 29: 2, // DescribeACLs + 30: 2, // CreateACLs + 31: 2, // DeleteACLs + 32: 4, // DescribeConfigs + 33: 2, // AlterConfigs + 34: 2, // AlterReplicaLogDirs + 35: 2, // DescribeLogDirs + 36: 2, // SASLAuthenticate + 37: 3, // CreatePartitions + 38: 2, // CreateDelegationToken + 39: 2, // RenewDelegationToken + 40: 2, // ExpireDelegationToken + 41: 2, // DescribeDelegationToken + 42: 2, // DeleteGroups + 43: 2, // ElectLeaders + 44: 1, // IncrementalAlterConfigs + 45: 0, // AlterPartitionAssignments + 46: 0, // ListPartitionReassignments + 47: 0, // OffsetDelete + 48: 1, // DescribeClientQuotas + 49: 1, // AlterClientQuotas + 50: 0, // DescribeUserSCRAMCredentials + 51: 0, // AlterUserSCRAMCredentials + 56: 0, // AlterPartition + 57: 0, // UpdateFeatures + 60: 0, // DescribeCluster + 61: 0, // DescribeProducers + 65: 0, // DescribeTransactions + 66: 0, // ListTransactions + 67: 0, // AllocateProducerIDs + } + + var vs Versions + for k, v := range versions { + vs.SetMaxKeyVersion(k, v) + } + if guess := vs.VersionGuess(); guess != "v3.1" { + t.Errorf("unexpected version guess, got %s != exp %s", guess, "v3.1") + } +}