Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Auditd: Fix Top Exec Commands dashboard visualization. {pull}27638[27638]
- Store offset in `log.offset` field of events from the filestream input. {pull}27688[27688]
- Fix `httpjson` input rate limit processing and documentation. {pull}[]
- Update Filebeat compatibility function to remove processor description field on ES < 7.9.0 {pull}27774[27774]

*Heartbeat*

Expand Down
26 changes: 23 additions & 3 deletions filebeat/fileset/compatibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ var processorCompatibilityChecks = []processorCompatibility{
},
adaptConfig: deleteProcessor,
},
{
procType: "*",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("7.9.0"))
},
adaptConfig: removeDescription,
},
}

// Processor represents and Ingest Node processor definition.
Expand Down Expand Up @@ -273,17 +280,17 @@ nextProcessor:

// Run compatibility checks on the processor.
for _, proc := range processorCompatibilityChecks {
if processor.Name() != proc.procType {
if processor.Name() != proc.procType && proc.procType != "*" {
continue
}

if !proc.checkVersion(&esVersion) {
continue
}

processor, err = proc.adaptConfig(processor, log.With("processor_type", proc.procType, "processor_index", i))
processor, err = proc.adaptConfig(processor, log.With("processor_type", processor.Name(), "processor_index", i))
if err != nil {
return fmt.Errorf("failed to adapt %q processor at index %d: %w", proc.procType, i, err)
return fmt.Errorf("failed to adapt %q processor at index %d: %w", processor.Name(), i, err)
}
if processor.IsNil() {
continue nextProcessor
Expand Down Expand Up @@ -408,3 +415,16 @@ func replaceConvertIP(processor Processor, log *logp.Logger) (Processor, error)
log.Debug("processor output=", processor.String())
return processor, nil
}

// removeDescription removes the description config option so ES less than 7.9 will work.
func removeDescription(processor Processor, log *logp.Logger) (Processor, error) {
_, ok := processor.GetString("description")
if !ok {
return processor, nil
}

log.Debug("Removing unsupported 'description' from processor.")
processor.Delete("description")

return processor, nil
}
116 changes: 115 additions & 1 deletion filebeat/fileset/compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,6 @@ func TestReplaceConvertIPWithGrok(t *testing.T) {
"^%{IP:bar}$",
},
"ignore_missing": true,
"description": "foo bar",
"if": "condition",
"ignore_failure": false,
"tag": "myTag",
Expand Down Expand Up @@ -1341,3 +1340,118 @@ func TestReplaceAlternativeFlowProcessors(t *testing.T) {
})
}
}

func TestRemoveDescription(t *testing.T) {
cases := []struct {
name string
esVersion *common.Version
content map[string]interface{}
expected map[string]interface{}
isErrExpected bool
}{
{
name: "ES < 7.9.0",
esVersion: common.MustNewVersion("7.8.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"description": "This is a description",
},
},
map[string]interface{}{
"script": map[string]interface{}{
"source": "abcd",
"lang": "painless",
"description": "This is a description",
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
},
},
map[string]interface{}{
"script": map[string]interface{}{
"source": "abcd",
"lang": "painless",
},
},
},
},
isErrExpected: false,
},
{
name: "ES == 7.9.0",
esVersion: common.MustNewVersion("7.9.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"description": "This is a description",
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"description": "This is a description",
},
},
},
},
isErrExpected: false,
},
{
name: "ES > 7.9.0",
esVersion: common.MustNewVersion("8.0.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"description": "This is a description",
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"description": "This is a description",
},
},
},
},
isErrExpected: false,
},
}

for _, test := range cases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
if test.isErrExpected {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, test.expected, test.content, test.name)
}
})
}
}