diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4e351e1dfa21..0aacb43832a5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -520,6 +520,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - The disk queue is now GA. {pull}27515[27515] - Allow non-padded base64 data to be decoded by decode_base64_field {pull}27311[27311], {issue}27021[27021] - The Kafka support library Sarama has been updated to 1.29.1. {pull}27717[27717] +- Kafka is now supported up to version 2.8.0. {pull}27720[27720] *Auditbeat* diff --git a/filebeat/docs/inputs/input-kafka.asciidoc b/filebeat/docs/inputs/input-kafka.asciidoc index e134fb864045..4a567d73bd15 100644 --- a/filebeat/docs/inputs/input-kafka.asciidoc +++ b/filebeat/docs/inputs/input-kafka.asciidoc @@ -51,7 +51,7 @@ link:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-for-kafka-ecos [[kafka-input-compatibility]] ==== Compatibility -This input works with all Kafka versions in between 0.11 and 2.1.0. Older versions +This input works with all Kafka versions in between 0.11 and 2.8.0. Older versions might work as well, but are not supported. [id="{beatname_lc}-input-{type}-options"] diff --git a/libbeat/common/kafka/version.go b/libbeat/common/kafka/version.go index f39f019441ee..3df44b86216a 100644 --- a/libbeat/common/kafka/version.go +++ b/libbeat/common/kafka/version.go @@ -26,78 +26,47 @@ import ( // Version is a kafka version type Version string -// TODO: remove me. -// Compat version overwrite for missing versions in sarama -// Public API is compatible between these versions. var ( - v0_10_2_1 = parseKafkaVersion("0.10.2.1") - v0_11_0_1 = parseKafkaVersion("0.11.0.1") - v0_11_0_2 = parseKafkaVersion("0.11.0.2") - v1_0_1 = parseKafkaVersion("1.0.1") - v1_0_2 = parseKafkaVersion("1.0.2") - v1_1_1 = parseKafkaVersion("1.1.1") - - kafkaVersions = map[string]sarama.KafkaVersion{ - "0.8.2.0": sarama.V0_8_2_0, - "0.8.2.1": sarama.V0_8_2_1, - "0.8.2.2": sarama.V0_8_2_2, - "0.8.2": sarama.V0_8_2_2, - "0.8": sarama.V0_8_2_2, - - "0.9.0.0": sarama.V0_9_0_0, - "0.9.0.1": sarama.V0_9_0_1, - "0.9.0": sarama.V0_9_0_1, - "0.9": sarama.V0_9_0_1, - - "0.10.0.0": sarama.V0_10_0_0, - "0.10.0.1": sarama.V0_10_0_1, - "0.10.0": sarama.V0_10_0_1, - "0.10.1.0": sarama.V0_10_1_0, - "0.10.1": sarama.V0_10_1_0, - "0.10.2.0": sarama.V0_10_2_0, - "0.10.2.1": v0_10_2_1, - "0.10.2": v0_10_2_1, - "0.10": v0_10_2_1, - - "0.11.0.0": sarama.V0_11_0_0, - "0.11.0.1": v0_11_0_1, - "0.11.0.2": v0_11_0_2, - "0.11.0": v0_11_0_2, - "0.11": v0_11_0_2, - - "1.0.0": sarama.V1_0_0_0, - "1.0.1": v1_0_1, - "1.0.2": v1_0_2, - "1.0": v1_0_2, - "1.1.0": sarama.V1_1_0_0, - "1.1.1": v1_1_1, - "1.1": v1_1_1, - "1": v1_1_1, - - "2.0.0": sarama.V2_0_0_0, - "2.0.1": sarama.V2_0_1_0, - "2.0": sarama.V2_0_1_0, - "2.1": sarama.V2_1_0_0, - "2.2": sarama.V2_2_0_0, - "2": sarama.V2_1_0_0, + // Sarama expects version strings to be fully expanded, e.g. "1.1.1". + // We also allow versions to be specified as a prefix, e.g. "1", + // understood as referencing the most recent version starting with "1". + // truncatedKafkaVersions stores a lookup of the abbreviations we accept. + truncatedKafkaVersions = map[string]sarama.KafkaVersion{ + "0.8.2": sarama.V0_8_2_2, + "0.8": sarama.V0_8_2_2, + + "0.9.0": sarama.V0_9_0_1, + "0.9": sarama.V0_9_0_1, + + "0.10.0": sarama.V0_10_0_1, + "0.10.1": sarama.V0_10_1_0, + "0.10.2": sarama.V0_10_2_1, + "0.10": sarama.V0_10_2_1, + + "0.11.0": sarama.V0_11_0_2, + "0.11": sarama.V0_11_0_2, + + "1.0": sarama.V1_0_0_0, + "1.1": sarama.V1_1_1_0, + "1": sarama.V1_1_1_0, + + "2.0": sarama.V2_0_1_0, + "2.1": sarama.V2_1_0_0, + "2.2": sarama.V2_2_0_0, + "2.3": sarama.V2_3_0_0, + "2.4": sarama.V2_4_0_0, + "2.5": sarama.V2_5_0_0, + "2.6": sarama.V2_6_0_0, + "2": sarama.V2_6_0_0, } ) -func parseKafkaVersion(s string) sarama.KafkaVersion { - v, err := sarama.ParseKafkaVersion(s) - if err != nil { - panic(err) - } - return v -} - // Validate that a kafka version is among the possible options func (v *Version) Validate() error { - if _, ok := kafkaVersions[string(*v)]; !ok { - return fmt.Errorf("unknown/unsupported kafka vesion '%v'", *v) + if _, ok := v.Get(); ok { + return nil } - - return nil + return fmt.Errorf("unknown/unsupported kafka version '%v'", *v) } // Unpack a kafka version @@ -113,6 +82,20 @@ func (v *Version) Unpack(s string) error { // Get a sarama kafka version func (v Version) Get() (sarama.KafkaVersion, bool) { - kv, ok := kafkaVersions[string(v)] - return kv, ok + // First check if it's one of the abbreviations we accept. + // If not, let sarama parse it. + s := string(v) + if version, ok := truncatedKafkaVersions[s]; ok { + return version, true + } + version, err := sarama.ParseKafkaVersion(s) + if err != nil { + return sarama.KafkaVersion{}, false + } + for _, supp := range sarama.SupportedVersions { + if version == supp { + return version, true + } + } + return sarama.KafkaVersion{}, false } diff --git a/libbeat/common/kafka/version_test.go b/libbeat/common/kafka/version_test.go new file mode 100644 index 000000000000..19d5b04ad233 --- /dev/null +++ b/libbeat/common/kafka/version_test.go @@ -0,0 +1,80 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package kafka + +import ( + "testing" + + "github.com/Shopify/sarama" +) + +func TestVersionGet(t *testing.T) { + valid := map[Version]sarama.KafkaVersion{ + "0.11": sarama.V0_11_0_2, + "1": sarama.V1_1_1_0, + "2.0.0": sarama.V2_0_0_0, + "2.0.1": sarama.V2_0_1_0, + "2.0": sarama.V2_0_1_0, + "2.5": sarama.V2_5_0_0, + } + invalid := []Version{ + "1.1.2", + "1.2.3", + "1.3", + "hello", + "2.0.3", + } + for s, expect := range valid { + got, ok := s.Get() + if !ok { + t.Errorf("'%v' should parse as Kafka version %v, got nothing", + s, expect) + } else if got != expect { + t.Errorf("'%v' should parse as Kafka version %v, got %v", + s, expect, got) + } + } + for _, s := range invalid { + got, ok := s.Get() + if ok { + t.Errorf("'%v' is not a valid Kafka version but parsed as %v", + s, got) + } + } +} + +func TestSaramaUpdate(t *testing.T) { + // If any of these versions are considered valid by our parsing code, + // it means someone updated sarama without updating the parsing code + // for the new version. Gently remind them. + flagVersions := []Version{"2.8.1", "2.9.0"} + for _, v := range flagVersions { + if _, ok := v.Get(); ok { + t.Fatalf( + "Kafka version %v is now considered valid. Did you update Sarama?\n"+ + "If so, remember to:\n"+ + "- Update truncatedKafkaVersions in libbeat/common/kafka/version.go\n"+ + "- Update the documentation to list the latest version:\n"+ + " * libbeat/outputs/kafka/docs/kafka.asciidoc\n"+ + " * filebeat/docs/inputs/inputs-kafka.asciidoc\n"+ + "- Update TestSaramaUpdate in libbeat/common/kafka/version_test.go\n", + v) + + } + } +} diff --git a/libbeat/outputs/kafka/docs/kafka.asciidoc b/libbeat/outputs/kafka/docs/kafka.asciidoc index f61d4c5d9855..85b467ab3f20 100644 --- a/libbeat/outputs/kafka/docs/kafka.asciidoc +++ b/libbeat/outputs/kafka/docs/kafka.asciidoc @@ -34,7 +34,7 @@ NOTE: Events bigger than <> will be [[kafka-compatibility]] ==== Compatibility -This output works with all Kafka versions in between 0.11 and 2.2.2. Older versions +This output works with all Kafka versions in between 0.11 and 2.8.0. Older versions might work as well, but are not supported. ==== Configuration options