From aaad9fbdbb44f69208742c2a5eba236358a9a1d4 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 7 Jul 2021 15:30:50 +0200 Subject: [PATCH 1/4] Filebeat: Ensure module pipelines compatibility with previous versions of Elasticsearch (#26737) Improve the pipeline/compatibility code so that all processors in a pipeline are scanned and acted-upon to ensure compatibility. This means: - Scan processors in on_failure section (both the pipeline's and each individual processor on-failure handler). - Scan the inner processor in a foreach processor. Add a new CI stage, module-compat-7.11, to filebeat/Jenkinsfile.yml and x-pack/filebeat/Jenkinsfile.yml, in order to run Filebeat modules tests under ES 7.11.2 to ensure that all pipelines are functional. This test uses a new flag, TESTING_FILEBEAT_SKIP_DIFF, to instruct the integration test to skip the comparison between the generated documents and the golden/expected files. The test will ensure that the pipeline loads, there are no ingest errors, the fields in the generated documents are valid and the number of returned documents matches the expected. This is intended to avoid having to maintain multiple versions of the golden files due to differences between ES versions and available processors. Also fixes the fortinet and threatintel modules pipelines so that they pass the new test, as some fields were left behind due to the uri_parts processor being removed. (cherry picked from commit 181cf692ac87aced2fd697268cf992228a1dbac2) # Conflicts: # x-pack/filebeat/module/threatintel/abuseurl/ingest/pipeline.yml # x-pack/filebeat/module/threatintel/anomali/ingest/pipeline.yml --- filebeat/Jenkinsfile.yml | 8 + filebeat/fileset/compatibility.go | 296 ++++++++++++------ filebeat/fileset/compatibility_test.go | 281 ++++++++++++++++- filebeat/tests/system/test_modules.py | 9 + testing/environments/7.11.yml | 38 +++ x-pack/filebeat/Jenkinsfile.yml | 8 + .../fortinet/firewall/ingest/pipeline.yml | 11 +- .../threatintel/abuseurl/ingest/pipeline.yml | 11 + .../threatintel/anomali/ingest/pipeline.yml | 11 + 9 files changed, 572 insertions(+), 101 deletions(-) create mode 100644 testing/environments/7.11.yml diff --git a/filebeat/Jenkinsfile.yml b/filebeat/Jenkinsfile.yml index 0cf61544a6fc..55a296e9fdb0 100644 --- a/filebeat/Jenkinsfile.yml +++ b/filebeat/Jenkinsfile.yml @@ -44,6 +44,14 @@ stages: pythonIntegTest: mage: "mage pythonIntegTest" ## run the ITs only if the changeset affects a specific module. stage: mandatory + module-compat-7.11: + mage: >- ## Run module integration tests under ES 7.11 to ensure ingest pipeline compatibility. + STACK_ENVIRONMENT=7.11 + TESTING_FILEBEAT_SKIP_DIFF=1 + PYTEST_ADDOPTS='-k test_modules' + mage pythonIntegTest + withModule: true + stage: mandatory macos: mage: "mage build unitTest" platforms: ## override default label in this specific stage. diff --git a/filebeat/fileset/compatibility.go b/filebeat/fileset/compatibility.go index 5e9c0cd91f1e..8fe4e64a4db6 100644 --- a/filebeat/fileset/compatibility.go +++ b/filebeat/fileset/compatibility.go @@ -18,6 +18,7 @@ package fileset import ( + "encoding/json" "fmt" "strings" @@ -30,31 +31,9 @@ import ( // processorCompatibility defines a processor's minimum version requirements or // a transformation to make it compatible. type processorCompatibility struct { - checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies. - procType string // Elasticsearch Ingest Node processor type. - adaptConfig func(processor map[string]interface{}, log *logp.Logger) compatAction // Adapt the configuration to make it compatible. -} - -type compatAction func(interface{}) (interface{}, error) - -func keepProcessor(original interface{}) (interface{}, error) { - return original, nil -} - -func dropProcessor(interface{}) (interface{}, error) { - return nil, nil -} - -func replaceProcessor(newProc interface{}) compatAction { - return func(interface{}) (interface{}, error) { - return newProc, nil - } -} - -func fail(err error) compatAction { - return func(interface{}) (interface{}, error) { - return nil, err - } + checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies. + procType string // Elasticsearch Ingest Node processor type. + adaptConfig func(processor Processor, log *logp.Logger) (Processor, error) // Adapt the configuration to make it compatible. } var processorCompatibilityChecks = []processorCompatibility{ @@ -92,9 +71,9 @@ var processorCompatibilityChecks = []processorCompatibility{ return esVersion.LessThan(common.MustNewVersion("7.0.0")) && !esVersion.LessThan(common.MustNewVersion("6.7.0")) }, - adaptConfig: func(config map[string]interface{}, _ *logp.Logger) compatAction { - config["ecs"] = true - return keepProcessor + adaptConfig: func(processor Processor, _ *logp.Logger) (Processor, error) { + processor.Set("ecs", true) + return processor, nil }, }, { @@ -102,8 +81,8 @@ var processorCompatibilityChecks = []processorCompatibility{ checkVersion: func(esVersion *common.Version) bool { return esVersion.LessThan(common.MustNewVersion("6.7.0")) }, - adaptConfig: func(config map[string]interface{}, _ *logp.Logger) compatAction { - return fail(errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required")) + adaptConfig: func(_ Processor, _ *logp.Logger) (Processor, error) { + return Processor{}, errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required") }, }, { @@ -129,85 +108,220 @@ var processorCompatibilityChecks = []processorCompatibility{ }, } +// Processor represents and Ingest Node processor definition. +type Processor struct { + name string + config map[string]interface{} +} + +// NewProcessor returns the representation of an Ingest Node processor +// for the given configuration. +func NewProcessor(raw interface{}) (p Processor, err error) { + rawAsMap, ok := raw.(map[string]interface{}) + if !ok { + return p, fmt.Errorf("processor is not an object, got %T", raw) + } + + var keys []string + for k := range rawAsMap { + keys = append(keys, k) + } + if len(keys) != 1 { + return p, fmt.Errorf("processor doesn't have exactly 1 key, got %d: %v", len(keys), keys) + } + p.name = keys[0] + if p.config, ok = rawAsMap[p.name].(map[string]interface{}); !ok { + return p, fmt.Errorf("processor config is not an object, got %T", rawAsMap[p.name]) + } + return p, nil +} + +// Name of the processor. +func (p *Processor) Name() string { + return p.name +} + +// IsNil returns a boolean indicating if the processor is the zero value. +func (p *Processor) IsNil() bool { + return p.name == "" +} + +// Config returns the processor configuration as a map. +func (p *Processor) Config() map[string]interface{} { + return p.config +} + +// GetBool returns a boolean flag from the processor's configuration. +func (p *Processor) GetBool(key string) (value, ok bool) { + value, ok = p.config[key].(bool) + return +} + +// GetString returns a string flag from the processor's configuration. +func (p *Processor) GetString(key string) (value string, ok bool) { + value, ok = p.config[key].(string) + return +} + +// GetList returns an array from the processor's configuration. +func (p *Processor) GetList(key string) (value []interface{}, ok bool) { + value, ok = p.config[key].([]interface{}) + return +} + +// Set a flag in the processor's configuration. +func (p *Processor) Set(key string, value interface{}) { + p.config[key] = value +} + +// Get a flag from the processor's configuration. +func (p *Processor) Get(key string) (value interface{}, ok bool) { + value, ok = p.config[key] + return +} + +// Delete a configuration flag. +func (p *Processor) Delete(key string) { + delete(p.config, key) +} + +// ToMap returns the representation for the processor as a map. +func (p *Processor) ToMap() map[string]interface{} { + return map[string]interface{}{ + p.name: p.config, + } +} + +// String returns a string representation for the processor. +func (p *Processor) String() string { + b, err := json.Marshal(p.ToMap()) + if err != nil { + return fmt.Sprintf("/* encoding error: %v */", err) + } + return string(b) +} + // adaptPipelineForCompatibility iterates over all processors in the pipeline // and adapts them for version of Elasticsearch used. Adapt can mean modifying // processor options or removing the processor. func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) (err error) { - p, ok := content["processors"] + log = log.With("pipeline_id", pipelineID) + // Adapt the main processors in the pipeline. + if err = adaptProcessorsForCompatibility(esVersion, content, "processors", false, log); err != nil { + return err + } + // Adapt any `on_failure` processors in the pipeline. + return adaptProcessorsForCompatibility(esVersion, content, "on_failure", true, log) +} + +func adaptProcessorsForCompatibility(esVersion common.Version, content map[string]interface{}, section string, ignoreMissingsection bool, log *logp.Logger) (err error) { + p, ok := content[section] if !ok { - return errors.New("'processors' is missing from the pipeline definition") + if ignoreMissingsection { + return nil + } + return fmt.Errorf("'%s' is missing from the pipeline definition", section) } processors, ok := p.([]interface{}) if !ok { - return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) + return fmt.Errorf("'%s' expected to be a list, found %T", section, p) } var filteredProcs []interface{} + log = log.With("processors_section", section) nextProcessor: for i, obj := range processors { - for _, proc := range processorCompatibilityChecks { - processor, ok := obj.(map[string]interface{}) - if !ok { - return fmt.Errorf("processor at index %d is not an object, got %T", i, obj) + processor, err := NewProcessor(obj) + if err != nil { + return errors.Wrapf(err, "cannot parse processor in section '%s' index %d body=%+v", section, i, obj) + } + + // Adapt any on_failure processors for this processor. + prevOnFailure, _ := processor.GetList("on_failure") + if err = adaptProcessorsForCompatibility(esVersion, processor.Config(), "on_failure", true, + log.With("parent_processor_type", processor.Name(), "parent_processor_index", i)); err != nil { + return errors.Wrapf(err, "cannot parse on_failure for processor in section '%s' index %d body=%+v", section, i, obj) + } + if onFailure, _ := processor.GetList("on_failure"); len(prevOnFailure) > 0 && len(onFailure) == 0 { + processor.Delete("on_failure") + } + + // Adapt inner processor in case of foreach. + if inner, found := processor.Get("processor"); found && processor.Name() == "foreach" { + processor.Set("processor", []interface{}{inner}) + if err = adaptProcessorsForCompatibility(esVersion, processor.Config(), "processor", false, + log.With("parent_processor_type", processor.Name(), "parent_processor_index", i)); err != nil { + return errors.Wrapf(err, "cannot parse inner processor for foreach in section '%s' index %d", section, i) } + newList, _ := processor.GetList("processor") + switch len(newList) { + case 0: + // compatibility has removed the inner processor of a foreach processor, + // must also remove the foreach processor itself. + continue nextProcessor + case 1: + // replace existing processor with possibly modified one. + processor.Set("processor", newList[0]) + default: + // This is actually not possible as compatibility checks + // can't inject extra processors. + return fmt.Errorf("parsing inner processor for foreach in section '%s' index %d results in more than one processor, which is unsupported by foreach", section, i) + } + } - configIfc, found := processor[proc.procType] - if !found { + // Run compatibility checks on the processor. + for _, proc := range processorCompatibilityChecks { + if processor.Name() != proc.procType { continue } - config, ok := configIfc.(map[string]interface{}) - if !ok { - return fmt.Errorf("processor config at index %d is not an object, got %T", i, obj) - } if !proc.checkVersion(&esVersion) { continue } - act := proc.adaptConfig(config, log.With("processor_type", proc.procType, "processor_index", i)) - obj, err = act(obj) + processor, err = proc.adaptConfig(processor, log.With("processor_type", proc.procType, "processor_index", i)) if err != nil { return fmt.Errorf("failed to adapt %q processor at index %d: %w", proc.procType, i, err) } - if obj == nil { + if processor.IsNil() { continue nextProcessor } } - filteredProcs = append(filteredProcs, obj) + filteredProcs = append(filteredProcs, processor.ToMap()) } - content["processors"] = filteredProcs + content[section] = filteredProcs return nil } // deleteProcessor returns true to indicate that the processor should be deleted // in order to adapt the pipeline for backwards compatibility to Elasticsearch. -func deleteProcessor(_ map[string]interface{}, _ *logp.Logger) compatAction { - return dropProcessor +func deleteProcessor(_ Processor, _ *logp.Logger) (Processor, error) { + return Processor{}, nil } // replaceSetIgnoreEmptyValue replaces ignore_empty_value option with an if // statement so ES less than 7.9 will work. -func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) compatAction { - _, ok := config["ignore_empty_value"].(bool) +func replaceSetIgnoreEmptyValue(processor Processor, log *logp.Logger) (Processor, error) { + _, ok := processor.GetBool("ignore_empty_value") if !ok { - return keepProcessor + return processor, nil } log.Debug("Removing unsupported 'ignore_empty_value' from set processor.") - delete(config, "ignore_empty_value") + processor.Delete("ignore_empty_value") - _, ok = config["if"].(string) + _, ok = processor.GetString("if") if ok { // assume if check is sufficient - return keepProcessor + return processor, nil } - val, ok := config["value"].(string) + val, ok := processor.GetString("value") if !ok { - return keepProcessor + return processor, nil } newIf := strings.TrimLeft(val, "{ ") @@ -215,39 +329,39 @@ func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) newIf = strings.ReplaceAll(newIf, ".", "?.") newIf = "ctx?." + newIf + " != null" - log.Debug("Adding if %s to replace 'ignore_empty_value' in set processor.", newIf) - config["if"] = newIf - return keepProcessor + log.Debugf("Adding if %s to replace 'ignore_empty_value' in set processor.", newIf) + processor.Set("if", newIf) + return processor, nil } // replaceAppendAllowDuplicates replaces allow_duplicates option with an if statement // so ES less than 7.10 will work. -func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) compatAction { - allow, ok := config["allow_duplicates"].(bool) +func replaceAppendAllowDuplicates(processor Processor, log *logp.Logger) (Processor, error) { + allow, ok := processor.GetBool("allow_duplicates") if !ok { - return keepProcessor + return processor, nil } log.Debug("Removing unsupported 'allow_duplicates' from append processor.") - delete(config, "allow_duplicates") + processor.Delete("allow_duplicates") if allow { // It was set to true, nothing else to do after removing the option. - return keepProcessor + return processor, nil } - currIf, _ := config["if"].(string) + currIf, _ := processor.GetString("if") if strings.Contains(strings.ToLower(currIf), "contains") { // If it has a contains statement, we assume it is checking for duplicates already. - return keepProcessor + return processor, nil } - field, ok := config["field"].(string) + field, ok := processor.GetString("field") if !ok { - return keepProcessor + return processor, nil } - val, ok := config["value"].(string) + val, ok := processor.GetString("value") if !ok { - return keepProcessor + return processor, nil } field = strings.ReplaceAll(field, ".", "?.") @@ -263,36 +377,34 @@ func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logge newIf := fmt.Sprintf("%s && ((ctx?.%s instanceof List && !ctx?.%s.contains(ctx?.%s)) || ctx?.%s != ctx?.%s)", currIf, field, field, val, field, val) - log.Debug("Adding if %s to replace 'allow_duplicates: false' in append processor.", newIf) - config["if"] = newIf + log.Debugf("Adding if %s to replace 'allow_duplicates: false' in append processor.", newIf) + processor.Set("if", newIf) - return keepProcessor + return processor, nil } // replaceConvertIP replaces convert processors with type: ip with a grok expression that uses // the IP pattern. -func replaceConvertIP(config map[string]interface{}, log *logp.Logger) compatAction { - wantedType, found := config["type"] - if !found || wantedType != "ip" { - return keepProcessor +func replaceConvertIP(processor Processor, log *logp.Logger) (Processor, error) { + if wantedType, _ := processor.GetString("type"); wantedType != "ip" { + return processor, nil } - log.Debug("processor input=", config) - delete(config, "type") + log.Debug("processor input=", processor.String()) + processor.Delete("type") var srcIf, dstIf interface{} - if srcIf, found = config["field"]; !found { - return fail(errors.New("field option is required for convert processor")) + var found bool + if srcIf, found = processor.Get("field"); !found { + return Processor{}, errors.New("field option is required for convert processor") } - if dstIf, found = config["target_field"]; found { - delete(config, "target_field") + if dstIf, found = processor.Get("target_field"); found { + processor.Delete("target_field") } else { dstIf = srcIf } - config["patterns"] = []string{ + processor.Set("patterns", []string{ fmt.Sprintf("^%%{IP:%s}$", dstIf), - } - grok := map[string]interface{}{ - "grok": config, - } - log.Debug("processor output=", grok) - return replaceProcessor(grok) + }) + processor.name = "grok" + log.Debug("processor output=", processor.String()) + return processor, nil } diff --git a/filebeat/fileset/compatibility_test.go b/filebeat/fileset/compatibility_test.go index e7fa8a1267ab..0d3b000d8b76 100644 --- a/filebeat/fileset/compatibility_test.go +++ b/filebeat/fileset/compatibility_test.go @@ -898,9 +898,15 @@ func TestReplaceConvertIPWithGrok(t *testing.T) { "ignore_failure": false, "tag": "myTag", "on_failure": []interface{}{ - "foo", map[string]interface{}{ - "bar": []int{1, 2, 3}, + "foo": map[string]interface{}{ + "baz": false, + }, + }, + map[string]interface{}{ + "bar": map[string]interface{}{ + "baz": true, + }, }, }, }, @@ -921,9 +927,15 @@ func TestReplaceConvertIPWithGrok(t *testing.T) { "ignore_failure": false, "tag": "myTag", "on_failure": []interface{}{ - "foo", map[string]interface{}{ - "bar": []int{1, 2, 3}, + "foo": map[string]interface{}{ + "baz": false, + }, + }, + map[string]interface{}{ + "bar": map[string]interface{}{ + "baz": true, + }, }, }, }, @@ -1068,3 +1080,264 @@ func TestRemoveRegisteredDomainProcessor(t *testing.T) { }) } } + +func TestReplaceAlternativeFlowProcessors(t *testing.T) { + logp.TestingSetup() + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "Replace in on_failure section", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}(nil), + "on_failure": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + map[string]interface{}{ + "community_id": map[string]interface{}{}, + }, + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "error.message", + "value": "something's wrong", + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}(nil), + "on_failure": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + }, + }, + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "error.message", + "value": "something's wrong", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "Replace in processor's on_failure", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "foo": map[string]interface{}{ + "bar": "baz", + "on_failure": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + map[string]interface{}{ + "community_id": map[string]interface{}{}, + }, + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "error.message", + "value": "something's wrong", + }, + }, + }, + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "foo": map[string]interface{}{ + "bar": "baz", + "on_failure": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + }, + }, + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "error.message", + "value": "something's wrong", + }, + }, + }, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "Remove empty on_failure key", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "foo": map[string]interface{}{ + "bar": "baz", + "on_failure": []interface{}{ + map[string]interface{}{ + "community_id": map[string]interface{}{}, + }, + }, + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "foo": map[string]interface{}{ + "bar": "baz", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "process foreach processor", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + "if": "ctx?.host?.hostname != null", + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "Remove leftover foreach processor", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "foreach": map[string]interface{}{ + "field": "foo", + "processor": map[string]interface{}{ + "community_id": map[string]interface{}{}, + }, + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}(nil), + }, + isErrExpected: false, + }, + { + name: "nested", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}(nil), + "on_failure": []interface{}{ + map[string]interface{}{ + "foreach": map[string]interface{}{ + "field": "foo", + "processor": map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + "if": "ctx?.host?.hostname != null", + "on_failure": []interface{}{ + map[string]interface{}{ + "community_id": map[string]interface{}{}, + }, + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "error.message", + "value": "panic", + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}(nil), + "on_failure": []interface{}{ + map[string]interface{}{ + "foreach": map[string]interface{}{ + "field": "foo", + "processor": map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + "on_failure": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "error.message", + "value": "panic", + }, + }, + }, + }, + }, + }, + }, + }, + }, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName)) + if test.isErrExpected { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +} diff --git a/filebeat/tests/system/test_modules.py b/filebeat/tests/system/test_modules.py index b2e6c03b1c52..280693e1237e 100644 --- a/filebeat/tests/system/test_modules.py +++ b/filebeat/tests/system/test_modules.py @@ -214,6 +214,15 @@ def _test_expected_events(self, test_file, objects): assert len(expected) == len(objects), "expected {} events to compare but got {}".format( len(expected), len(objects)) + # Do not perform a comparison between the resulting and expected documents + # if the TESTING_FILEBEAT_SKIP_DIFF flag is set. + # + # This allows to run a basic check with older versions of ES that can lead + # to slightly different documents without maintaining multiple sets of + # golden files. + if os.getenv("TESTING_FILEBEAT_SKIP_DIFF"): + return + for idx in range(len(expected)): ev = expected[idx] obj = objects[idx] diff --git a/testing/environments/7.11.yml b/testing/environments/7.11.yml new file mode 100644 index 000000000000..7f93445987c7 --- /dev/null +++ b/testing/environments/7.11.yml @@ -0,0 +1,38 @@ +# This is the latest 7.11 + +version: '2.3' +services: + elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:7.11.2 + healthcheck: + test: ["CMD-SHELL", "curl -s http://localhost:9200/_cat/health?h=status | grep -q green"] + retries: 300 + interval: 1s + environment: + - "ES_JAVA_OPTS=-Xms1g -Xmx1g" + - "network.host=" + - "transport.host=127.0.0.1" + - "http.host=0.0.0.0" + - "xpack.security.enabled=false" + - "script.context.template.max_compilations_rate=unlimited" + - "script.context.ingest.cache_max_size=2000" + - "script.context.processor_conditional.cache_max_size=2000" + - "script.context.template.cache_max_size=2000" + - "action.destructive_requires_name=false" + + logstash: + image: docker.elastic.co/logstash/logstash:7.11.2 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9600/_node/stats"] + retries: 600 + interval: 1s + volumes: + - ./docker/logstash/pipeline:/usr/share/logstash/pipeline:ro + - ./docker/logstash/pki:/etc/pki:ro + + kibana: + image: docker.elastic.co/kibana/kibana:7.11.2 + healthcheck: + test: ["CMD-SHELL", "curl -s http://localhost:5601/api/status | grep -q 'Looking good'"] + retries: 600 + interval: 1s diff --git a/x-pack/filebeat/Jenkinsfile.yml b/x-pack/filebeat/Jenkinsfile.yml index ef338263c6b3..09c837bccf65 100644 --- a/x-pack/filebeat/Jenkinsfile.yml +++ b/x-pack/filebeat/Jenkinsfile.yml @@ -44,6 +44,14 @@ stages: pythonIntegTest: mage: "mage pythonIntegTest" ## run the ITs only if the changeset affects a specific module. stage: mandatory + module-compat-7.11: + mage: >- ## Run module integration tests under ES 7.11 to ensure ingest pipeline compatibility. + STACK_ENVIRONMENT=7.11 + TESTING_FILEBEAT_SKIP_DIFF=1 + PYTEST_ADDOPTS='-k test_xpack_modules' + mage pythonIntegTest + withModule: true + stage: mandatory macos: mage: "mage build unitTest" platforms: ## override default label in this specific stage. diff --git a/x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml b/x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml index 9b2ea7b0b24b..2955cd66b266 100644 --- a/x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml +++ b/x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml @@ -414,17 +414,18 @@ processors: - remove: field: - _temp + - host - syslog5424_sd - syslog5424_pri - - fortinet.firewall.tz + - fortinet.firewall.agent - fortinet.firewall.date - fortinet.firewall.devid - - fortinet.firewall.eventtime - - fortinet.firewall.time - fortinet.firewall.duration - - host + - fortinet.firewall.eventtime - fortinet.firewall.hostname - - fortinet.firewall.agent + - fortinet.firewall.time + - fortinet.firewall.tz + - fortinet.firewall.url ignore_missing: true - script: lang: painless diff --git a/x-pack/filebeat/module/threatintel/abuseurl/ingest/pipeline.yml b/x-pack/filebeat/module/threatintel/abuseurl/ingest/pipeline.yml index bf674ba2c88e..4981632ad990 100644 --- a/x-pack/filebeat/module/threatintel/abuseurl/ingest/pipeline.yml +++ b/x-pack/filebeat/module/threatintel/abuseurl/ingest/pipeline.yml @@ -96,6 +96,7 @@ processors: } } } +<<<<<<< HEAD handleMap(ctx); - remove: field: @@ -104,6 +105,16 @@ processors: - threatintel.abuseurl.host - message ignore_missing: true +======= + } + handleMap(ctx); +- remove: + field: + - threatintel.abuseurl.date_added + - threatintel.abuseurl.url + - message + ignore_missing: true +>>>>>>> 181cf692ac (Filebeat: Ensure module pipelines compatibility with previous versions of Elasticsearch (#26737)) on_failure: - set: field: error.message diff --git a/x-pack/filebeat/module/threatintel/anomali/ingest/pipeline.yml b/x-pack/filebeat/module/threatintel/anomali/ingest/pipeline.yml index eccbf5e9826e..4c80d8a22115 100644 --- a/x-pack/filebeat/module/threatintel/anomali/ingest/pipeline.yml +++ b/x-pack/filebeat/module/threatintel/anomali/ingest/pipeline.yml @@ -139,6 +139,7 @@ processors: } } } +<<<<<<< HEAD handleMap(ctx); - remove: field: @@ -146,6 +147,16 @@ processors: - message - _tmp ignore_missing: true +======= + } + handleMap(ctx); +- remove: + field: + - threatintel.anomali.created + - message + - _tmp + ignore_missing: true +>>>>>>> 181cf692ac (Filebeat: Ensure module pipelines compatibility with previous versions of Elasticsearch (#26737)) on_failure: - set: field: error.message From 80c74d342c0ff67690a3b00b1647d984236ea75f Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Thu, 9 Sep 2021 10:07:39 -0400 Subject: [PATCH 2/4] Fix conflicts --- .../module/threatintel/abuseurl/ingest/pipeline.yml | 11 ----------- .../module/threatintel/anomali/ingest/pipeline.yml | 11 ----------- 2 files changed, 22 deletions(-) diff --git a/x-pack/filebeat/module/threatintel/abuseurl/ingest/pipeline.yml b/x-pack/filebeat/module/threatintel/abuseurl/ingest/pipeline.yml index 4981632ad990..bf674ba2c88e 100644 --- a/x-pack/filebeat/module/threatintel/abuseurl/ingest/pipeline.yml +++ b/x-pack/filebeat/module/threatintel/abuseurl/ingest/pipeline.yml @@ -96,7 +96,6 @@ processors: } } } -<<<<<<< HEAD handleMap(ctx); - remove: field: @@ -105,16 +104,6 @@ processors: - threatintel.abuseurl.host - message ignore_missing: true -======= - } - handleMap(ctx); -- remove: - field: - - threatintel.abuseurl.date_added - - threatintel.abuseurl.url - - message - ignore_missing: true ->>>>>>> 181cf692ac (Filebeat: Ensure module pipelines compatibility with previous versions of Elasticsearch (#26737)) on_failure: - set: field: error.message diff --git a/x-pack/filebeat/module/threatintel/anomali/ingest/pipeline.yml b/x-pack/filebeat/module/threatintel/anomali/ingest/pipeline.yml index 4c80d8a22115..eccbf5e9826e 100644 --- a/x-pack/filebeat/module/threatintel/anomali/ingest/pipeline.yml +++ b/x-pack/filebeat/module/threatintel/anomali/ingest/pipeline.yml @@ -139,7 +139,6 @@ processors: } } } -<<<<<<< HEAD handleMap(ctx); - remove: field: @@ -147,16 +146,6 @@ processors: - message - _tmp ignore_missing: true -======= - } - handleMap(ctx); -- remove: - field: - - threatintel.anomali.created - - message - - _tmp - ignore_missing: true ->>>>>>> 181cf692ac (Filebeat: Ensure module pipelines compatibility with previous versions of Elasticsearch (#26737)) on_failure: - set: field: error.message From ddbb5dcd24b17e8c4c897c39e33aa795b3290355 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Tue, 14 Jul 2020 11:38:29 -0400 Subject: [PATCH 3/4] Remove script.cache.max_size / script.max_compilations_rate from Filebeat tests (#19896) These setting has been removed as per https://github.com/elastic/elasticsearch/pull/59262 causing the ES container to not launch. elasticsearch_1 | java.lang.IllegalArgumentException: unknown setting [script.cache.max_size] please check that any required plugins are installed, or check the breaking changes documentation for removed settings --- filebeat/docker-compose.yml | 2 -- filebeat/tests/system/test_modules.py | 8 -------- filebeat/tests/system/test_pipeline.py | 8 -------- x-pack/filebeat/docker-compose.yml | 2 -- 4 files changed, 20 deletions(-) diff --git a/filebeat/docker-compose.yml b/filebeat/docker-compose.yml index 5a447d6cd66d..19302ae1e6fa 100644 --- a/filebeat/docker-compose.yml +++ b/filebeat/docker-compose.yml @@ -40,8 +40,6 @@ services: extends: file: ${ES_BEATS}/testing/environments/${TESTING_ENVIRONMENT}.yml service: elasticsearch - environment: - script.cache.max_size: "500" kafka: build: ${ES_BEATS}/testing/environments/docker/kafka diff --git a/filebeat/tests/system/test_modules.py b/filebeat/tests/system/test_modules.py index 280693e1237e..e6adfdd2c639 100644 --- a/filebeat/tests/system/test_modules.py +++ b/filebeat/tests/system/test_modules.py @@ -75,14 +75,6 @@ def init(self): self.index_name = "test-filebeat-modules" - body = { - "transient": { - "script.max_compilations_rate": "2000/1m" - } - } - - self.es.transport.perform_request('PUT', "/_cluster/settings", body=body) - @parameterized.expand(load_fileset_test_cases) @unittest.skipIf(not INTEGRATION_TESTS, "integration tests are disabled, run with INTEGRATION_TESTS=1 to enable them.") diff --git a/filebeat/tests/system/test_pipeline.py b/filebeat/tests/system/test_pipeline.py index da6d357f8e87..afb3219e62d1 100644 --- a/filebeat/tests/system/test_pipeline.py +++ b/filebeat/tests/system/test_pipeline.py @@ -45,14 +45,6 @@ def test_input_pipeline_config(self): pass self.wait_until(lambda: not self.es.indices.exists(index_name)) - body = { - "transient": { - "script.max_compilations_rate": "100/1m" - } - } - - self.es.transport.perform_request('PUT', "/_cluster/settings", body=body) - self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", elasticsearch=dict( diff --git a/x-pack/filebeat/docker-compose.yml b/x-pack/filebeat/docker-compose.yml index af81ccb13fb9..0c0b477a6114 100644 --- a/x-pack/filebeat/docker-compose.yml +++ b/x-pack/filebeat/docker-compose.yml @@ -26,6 +26,4 @@ services: extends: file: ${ES_BEATS}/testing/environments/${STACK_ENVIRONMENT}.yml service: elasticsearch - environment: - script.cache.max_size: "500" From 7601b24452a39b5add4f5a1cc59150d62c20788e Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Thu, 9 Sep 2021 11:58:29 -0400 Subject: [PATCH 4/4] Configure ingest node options in env and not in FB test_modules.py --- testing/environments/snapshot.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/testing/environments/snapshot.yml b/testing/environments/snapshot.yml index 574ea08263a7..e2f452a7eed0 100644 --- a/testing/environments/snapshot.yml +++ b/testing/environments/snapshot.yml @@ -14,6 +14,10 @@ services: - "transport.host=127.0.0.1" - "http.host=0.0.0.0" - "xpack.security.enabled=false" + - "script.context.template.max_compilations_rate=unlimited" + - "script.context.ingest.cache_max_size=2000" + - "script.context.processor_conditional.cache_max_size=2000" + - "script.context.template.cache_max_size=2000" - "action.destructive_requires_name=false" # Disable geoip updates to prevent golden file test failures when the database # changes and prevent race conditions between tests and database updates.